Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apache spark latest pull #12

Merged
merged 151 commits into from
Feb 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
151 commits
Select commit Hold shift + click to select a range
121dc96
[SPARK-23087][SQL] CheckCartesianProduct too restrictive when conditi…
mgaido91 Jan 21, 2018
4f43d27
[SPARK-22119][ML] Add cosine distance to KMeans
mgaido91 Jan 21, 2018
2239d7a
[SPARK-21293][SS][SPARKR] Add doc example for streaming join, dedup
felixcheung Jan 21, 2018
12faae2
[SPARK-23169][INFRA][R] Run lintr on the changes of lint-r script and…
HyukjinKwon Jan 22, 2018
602c6d8
[SPARK-20947][PYTHON] Fix encoding/decoding error in pipe action
Jan 22, 2018
11daeb8
[SPARK-22976][CORE] Cluster mode driver dir removed while running
RussellSpitzer Jan 22, 2018
8142a3b
[MINOR][SQL] Fix wrong comments on org.apache.spark.sql.parquet.row.a…
dongjoon-hyun Jan 22, 2018
ec22897
[SPARK-23020][CORE] Fix races in launcher code, test.
Jan 22, 2018
60175e9
[MINOR][DOC] Fix the path to the examples jar
tashoyan Jan 22, 2018
7328116
[SPARK-23122][PYSPARK][FOLLOW-UP] Update the docs for UDF Registration
gatorsmile Jan 22, 2018
7880188
[SPARK-23170][SQL] Dump the statistics of effective runs of analyzer …
gatorsmile Jan 22, 2018
896e45a
[MINOR][SQL][TEST] Test case cleanups for recent PRs
gatorsmile Jan 22, 2018
5d680ca
[SPARK-23090][SQL] polish ColumnVector
cloud-fan Jan 22, 2018
87ffe7a
[SPARK-7721][PYTHON][TESTS] Adds PySpark coverage generation script
HyukjinKwon Jan 22, 2018
4327ccf
[SPARK-11630][CORE] ClosureCleaner moved from warning to debug
rekhajoshm Jan 22, 2018
446948a
[SPARK-23121][CORE] Fix for ui becoming unaccessible for long running…
smurakozi Jan 22, 2018
76b8b84
[MINOR] Typo fixes
jaceklaskowski Jan 22, 2018
51eb750
[SPARK-22389][SQL] data source v2 partitioning reporting interface
cloud-fan Jan 22, 2018
b2ce17b
[SPARK-22274][PYTHON][SQL] User-defined aggregation functions with pa…
icexelloss Jan 23, 2018
96cb60b
[SPARK-22465][FOLLOWUP] Update the number of partitions of default pa…
jiangxb1987 Jan 23, 2018
ee572ba
[SPARK-20749][SQL][FOLLOW-UP] Override prettyName for bit_length and …
gatorsmile Jan 23, 2018
bdebb8e
[SPARK-20664][SPARK-23103][CORE] Follow-up: remove workaround for .
Jan 23, 2018
dc4761f
[SPARK-17088][HIVE] Fix 'sharesHadoopClasses' option when creating cl…
Jan 23, 2018
05839d1
[SPARK-22735][ML][DOC] Added VectorSizeHint docs and examples.
MrBago Jan 23, 2018
613c290
[SPARK-23192][SQL] Keep the Hint after Using Cached Data
gatorsmile Jan 23, 2018
44cc4da
[SPARK-23195][SQL] Keep the Hint of Cached Data
gatorsmile Jan 24, 2018
15adcc8
[SPARK-23197][DSTREAMS] Increased timeouts to resolve flakiness
tdas Jan 24, 2018
a3911cf
[SPARK-23177][SQL][PYSPARK] Extract zero-parameter UDFs from aggregate
viirya Jan 24, 2018
f54b65c
[SPARK-21727][R] Allow multi-element atomic vector as column type in …
neilalex Jan 24, 2018
4e7b490
Revert "[SPARK-23195][SQL] Keep the Hint of Cached Data"
gatorsmile Jan 24, 2018
7af1a32
[SPARK-23174][BUILD][PYTHON] python code style checker update
rekhajoshm Jan 24, 2018
de36f65
[SPARK-23148][SQL] Allow pathnames with special characters for CSV / …
Jan 24, 2018
0ec95bb
[SPARK-22577][CORE] executor page blacklist status should update with…
attilapiros Jan 24, 2018
e18d6f5
[SPARK-20906][SPARKR] Add API doc example for Constrained Logistic Re…
felixcheung Jan 24, 2018
8c273b4
[SPARK-23020][CORE][FOLLOWUP] Fix Java style check issues.
ueshin Jan 24, 2018
bbb87b3
[SPARK-22837][SQL] Session timeout checker does not work in SessionMa…
Jan 24, 2018
840dea6
[SPARK-23152][ML] - Correctly guard against empty datasets
tovbinm Jan 24, 2018
0e178e1
[SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite "Shuffle regi…
mpetruska Jan 24, 2018
bc9641d
[SPARK-23198][SS][TEST] Fix KafkaContinuousSourceStressForDontFailOnD…
dongjoon-hyun Jan 24, 2018
6f0ba84
[MINOR][SQL] add new unit test to LimitPushdown
heary-cao Jan 24, 2018
45b4bbf
[SPARK-23129][CORE] Make deserializeStream of DiskMapIterator init la…
caneGuy Jan 25, 2018
e29b08a
[SPARK-23208][SQL] Fix code generation for complex create array (rela…
hvanhovell Jan 25, 2018
39ee2ac
[SPARK-23163][DOC][PYTHON] Sync ML Python API with Scala
BryanCutler Jan 25, 2018
d20bbc2
[SPARK-21717][SQL] Decouple consume functions of physical operators i…
viirya Jan 25, 2018
8532e26
[SPARK-23112][DOC] Add highlights and migration guide for 2.3
Jan 25, 2018
8480c0c
[SPARK-23081][PYTHON] Add colRegex API to PySpark
huaxingao Jan 25, 2018
e57f394
[SPARK-23032][SQL] Add a per-query codegenStageId to WholeStageCodege…
rednaxelafx Jan 26, 2018
7bd46d9
[SPARK-23205][ML] Update ImageSchema.readImages to correctly set alph…
smurching Jan 26, 2018
70a68b3
[SPARK-23020][CORE] Fix race in SparkAppHandle cleanup, again.
Jan 26, 2018
d172181
[SPARK-23200] Reset Kubernetes-specific config on Checkpoint restore
ssaavedra Jan 26, 2018
cd3956d
[SPARK-22799][ML] Bucketizer should throw exception if single- and mu…
mgaido91 Jan 26, 2018
c22eaa9
[SPARK-22797][PYSPARK] Bucketizer support multi-column
zhengruifeng Jan 26, 2018
3e25251
[SPARK-22068][CORE] Reduce the duplicate code between putIteratorAsVa…
ConeyLiu Jan 26, 2018
dd8e257
[SPARK-23218][SQL] simplify ColumnVector.getArray
cloud-fan Jan 26, 2018
a8a3e9b
Revert "[SPARK-22797][PYSPARK] Bucketizer support multi-column"
Jan 26, 2018
94c67a7
[SPARK-23207][SQL] Shuffle+Repartition on a DataFrame could lead to i…
jiangxb1987 Jan 26, 2018
0737449
[SPARK-23242][SS][TESTS] Don't run tests in KafkaSourceSuiteBase twice
zsxwing Jan 27, 2018
5b5447c
[SPARK-23214][SQL] cached data should not carry extra hint info
cloud-fan Jan 27, 2018
e7bc9f0
[MINOR][SS][DOC] Fix `Trigger` Scala/Java doc examples
dongjoon-hyun Jan 27, 2018
6328868
[SPARK-23245][SS][TESTS] Don't access `lastExecution.executedPlan` in…
jose-torres Jan 27, 2018
3227d14
[SPARK-23233][PYTHON] Reset the cache in asNondeterministic to set de…
HyukjinKwon Jan 27, 2018
b8c32dc
[SPARK-23248][PYTHON][EXAMPLES] Relocate module docstrings to the top…
HyukjinKwon Jan 28, 2018
c40fda9
[SPARK-23166][ML] Add maxDF Parameter to CountVectorizer
ymazari Jan 28, 2018
686a622
[SPARK-23250][DOCS] Typo in JavaDoc/ScalaDoc for DataFrameWriter
clchen28 Jan 28, 2018
49b0207
[SPARK-23196] Unify continuous and microbatch V2 sinks
jose-torres Jan 29, 2018
39d2c6b
[SPARK-23238][SQL] Externalize SQLConf configurations exposed in docu…
HyukjinKwon Jan 29, 2018
badf0d0
[SPARK-23219][SQL] Rename ReadTask to DataReaderFactory in data sourc…
gengliangwang Jan 29, 2018
54dd7cf
[SPARK-23199][SQL] improved Removes repetition from group expressions…
heary-cao Jan 29, 2018
fbce2ed
[SPARK-23059][SQL][TEST] Correct some improper with view related meth…
xubo245 Jan 29, 2018
2d903cf
[SPARK-23223][SQL] Make stacking dataset transforms more performant
hvanhovell Jan 29, 2018
0d60b32
[SPARK-22221][DOCS] Adding User Documentation for Arrow
BryanCutler Jan 29, 2018
e30b34f
[SPARK-22916][SQL][FOLLOW-UP] Update the Description of Join Selection
gatorsmile Jan 29, 2018
b834446
[SPARK-23209][core] Allow credential manager to work when Hive not av…
Jan 29, 2018
f235df6
[SPARK-22221][SQL][FOLLOWUP] Externalize spark.sql.execution.arrow.ma…
BryanCutler Jan 30, 2018
31bd1da
[SPARK-23088][CORE] History server not showing incomplete/running app…
Jan 30, 2018
b375397
[SPARK-23207][SQL][FOLLOW-UP] Don't perform local sort for DataFrame.…
jiangxb1987 Jan 30, 2018
8b98324
[SPARK-23157][SQL] Explain restriction on column expression in withCo…
Jan 30, 2018
5056877
[SPARK-23138][ML][DOC] Multiclass logistic regression summary example…
sethah Jan 30, 2018
0a9ac02
[SPARK-23260][SPARK-23262][SQL] several data source v2 naming cleanup
cloud-fan Jan 30, 2018
7a2ada2
[SPARK-23261][PYSPARK] Rename Pandas UDFs
gatorsmile Jan 30, 2018
84bcf9d
[SPARK-23222][SQL] Make DataFrameRangeSuite not flaky
viirya Jan 30, 2018
a23187f
[SPARK-23174][BUILD][PYTHON][FOLLOWUP] Add pycodestyle*.py to .gitign…
ueshin Jan 30, 2018
31c00ad
[SPARK-23267][SQL] Increase spark.sql.codegen.hugeMethodLimit to 65535
gatorsmile Jan 30, 2018
58fcb5a
[SPARK-23275][SQL] hive/tests have been failing when run locally on t…
dilipbiswal Jan 30, 2018
9623a98
[MINOR] Fix typos in dev/* scripts.
ashashwat Jan 30, 2018
7786616
[SPARK-23276][SQL][TEST] Enable UDT tests in (Hive)OrcHadoopFsRelatio…
dongjoon-hyun Jan 31, 2018
ca04c3f
[SPARK-23274][SQL] Fix ReplaceExceptWithFilter when the right's Filte…
gatorsmile Jan 31, 2018
8c6a9c9
[SPARK-23279][SS] Avoid triggering distributed job for Console sink
jerryshao Jan 31, 2018
695f714
[SPARK-23272][SQL] add calendar interval type support to ColumnVector
cloud-fan Jan 31, 2018
161a3f2
[SPARK-23112][DOC] Update ML migration guide with breaking and behavi…
Jan 31, 2018
3d0911b
[SPARK-23228][PYSPARK] Add Python Created jsparkSession to JVM's defa…
jerryshao Jan 31, 2018
48dd6a4
revert [SPARK-22785][SQL] remove ColumnVector.anyNullsSet
cloud-fan Jan 31, 2018
8c21170
[SPARK-23249][SQL] Improved block merging logic for partitions
Jan 31, 2018
dd242ba
[SPARK-21525][STREAMING] Check error code from supervisor RPC.
Jan 31, 2018
9ff1d96
[SPARK-23281][SQL] Query produces results in incorrect order when a c…
dilipbiswal Jan 31, 2018
f470df2
[SPARK-23157][SQL][FOLLOW-UP] DataFrame -> SparkDataFrame in R comment
Feb 1, 2018
52e00f7
[SPARK-23280][SQL] add map type support to ColumnVector
cloud-fan Feb 1, 2018
2ac895b
[SPARK-23247][SQL] combines Unsafe operations and statistics operatio…
heary-cao Feb 1, 2018
56ae326
[SPARK-23268][SQL] Reorganize packages in data source V2
gengliangwang Feb 1, 2018
b2e7677
[SPARK-21396][SQL] Fixes MatchError when UDTs are passed through Hive…
atallahhezbor Feb 1, 2018
cc41245
[SPARK-23188][SQL] Make vectorized columar reader batch size configur…
jiangxb1987 Feb 1, 2018
b6b50ef
[SQL][MINOR] Inline SpecifiedWindowFrame.defaultWindowFrame().
jiangxb1987 Feb 1, 2018
4b7cd47
Revert "[SPARK-23200] Reset Kubernetes-specific config on Checkpoint …
jerryshao Feb 1, 2018
07cee33
[SPARK-22274][PYTHON][SQL][FOLLOWUP] Use `assertRaisesRegexp` instead…
ueshin Feb 1, 2018
e15da5b
[SPARK-23107][ML] ML 2.3 QA: New Scala APIs, docs.
yanboliang Feb 1, 2018
8bb70b0
[SPARK-23280][SQL][FOLLOWUP] Fix Java style check issues.
ueshin Feb 1, 2018
89e8d55
[SPARK-23280][SQL][FOLLOWUP] Enable `MutableColumnarRow.getMap()`.
ueshin Feb 1, 2018
ffbca84
[SPARK-23202][SQL] Add new API in DataSourceWriter: onDataWriterCommit
gengliangwang Feb 1, 2018
ec63e2d
[SPARK-23289][CORE] OneForOneBlockFetcher.DownloadCallback.onData sho…
zsxwing Feb 1, 2018
f051f83
[SPARK-13983][SQL] Fix HiveThriftServer2 can not get "--hiveconf" and…
wangyum Feb 1, 2018
73da3b6
[SPARK-23293][SQL] fix data source v2 self join
cloud-fan Feb 1, 2018
4bcfdef
[INFRA] Close stale PRs.
Feb 1, 2018
032c11b
[SPARK-23296][YARN] Include stacktrace in YARN-app diagnostic
gerashegalov Feb 1, 2018
90848d5
[SPARK-23284][SQL] Document the behavior of several ColumnVector's ge…
viirya Feb 2, 2018
969eda4
[SPARK-23020][CORE] Fix another race in the in-process launcher test.
Feb 2, 2018
b3a0428
[SPARK-23306] Fix the oom caused by contention
Feb 2, 2018
19c7c7e
[SPARK-23301][SQL] data source column pruning should work for arbitra…
cloud-fan Feb 2, 2018
b9503fc
[SPARK-23312][SQL] add a config to turn off vectorized cache reader
cloud-fan Feb 2, 2018
dd52681
[SPARK-23253][CORE][SHUFFLE] Only write shuffle temporary index file …
yaooqinn Feb 2, 2018
eefec93
[SPARK-23295][BUILD][MINOR] Exclude Waring message when generating ve…
yaooqinn Feb 2, 2018
eaf35de
[SPARK-23064][SS][DOCS] Stream-stream joins Documentation - follow up
tdas Feb 3, 2018
3ff83ad
[SQL] Minor doc update: Add an example in DataFrameReader.schema
rxin Feb 3, 2018
fe73cb4
[SPARK-23317][SQL] rename ContinuousReader.setOffset to setStartOffset
cloud-fan Feb 3, 2018
63b49fa
[SPARK-23311][SQL][TEST] add FilterFunction test case for test Combin…
heary-cao Feb 3, 2018
522e0b1
[SPARK-23305][SQL][TEST] Test `spark.sql.files.ignoreMissingFiles` fo…
dongjoon-hyun Feb 3, 2018
4aaa7d4
[MINOR][DOC] Use raw triple double quotes around docstrings where the…
ashashwat Feb 3, 2018
551dff2
[SPARK-21658][SQL][PYSPARK] Revert "[] Add default None for value in …
HyukjinKwon Feb 3, 2018
715047b
[SPARK-23256][ML][PYTHON] Add columnSchema method to PySpark image re…
HyukjinKwon Feb 4, 2018
6fb3fd1
[SPARK-22036][SQL][FOLLOWUP] Fix decimalArithmeticOperations.sql
wangyum Feb 4, 2018
a6bf3db
[SPARK-23307][WEBUI] Sort jobs/stages/tasks/queries with the complete…
zsxwing Feb 5, 2018
03b7e12
[SPARK-23310][CORE] Turn off read ahead input stream for unshafe shuf…
Feb 5, 2018
c2766b0
[SPARK-23330][WEBUI] Spark UI SQL executions page throws NPE
jiangxb1987 Feb 5, 2018
f3f1e14
[SPARK-23326][WEBUI] schedulerDelay should return 0 when the task is …
zsxwing Feb 6, 2018
a24c031
[SPARK-23290][SQL][PYTHON] Use datetime.date for date type when conve…
ueshin Feb 6, 2018
8141c3e
[SPARK-23300][TESTS] Prints out if Pandas and PyArrow are installed o…
HyukjinKwon Feb 6, 2018
63c5bf1
[SPARK-23334][SQL][PYTHON] Fix pandas_udf with return type StringType…
ueshin Feb 6, 2018
7db9979
[SPARK-23310][CORE][FOLLOWUP] Fix Java style check issues.
ueshin Feb 6, 2018
ac7454c
[SPARK-23312][SQL][FOLLOWUP] add a config to turn off vectorized cach…
cloud-fan Feb 6, 2018
caf3044
[MINOR][TEST] Fix class name for Pandas UDF tests
icexelloss Feb 6, 2018
b96a083
[SPARK-23315][SQL] failed to get output from canonicalized data sourc…
cloud-fan Feb 6, 2018
c36fecc
[SPARK-23327][SQL] Update the description and tests of three external…
gatorsmile Feb 7, 2018
9775df6
[SPARK-23122][PYSPARK][FOLLOWUP] Replace registerTempTable by createO…
gatorsmile Feb 7, 2018
71cfba0
[SPARK-23319][TESTS] Explicitly specify Pandas and PyArrow versions i…
HyukjinKwon Feb 7, 2018
9841ae0
[SPARK-23345][SQL] Remove open stream record even closing it fails
viirya Feb 7, 2018
30295bf
[SPARK-23092][SQL] Migrate MemoryStream to DataSourceV2 APIs
tdas Feb 7, 2018
a62f30d
[SPARK-23319][TESTS][FOLLOWUP] Fix a test for Python 3 without pandas.
ueshin Feb 8, 2018
3473fda
Revert [SPARK-22279][SQL] Turn on spark.sql.hive.convertMetastoreOrc …
gatorsmile Feb 8, 2018
7f5f5fb
[SPARK-23348][SQL] append data using saveAsTable should adjust the da…
cloud-fan Feb 8, 2018
a75f927
[SPARK-23268][SQL][FOLLOWUP] Reorganize packages in data source V2
cloud-fan Feb 8, 2018
76e019d
[SPARK-21860][CORE] Improve memory reuse for heap memory in `HeapMemo…
10110346 Feb 8, 2018
4df84c3
[SPARK-23336][BUILD] Upgrade snappy-java to 1.1.7.1
wangyum Feb 8, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ project/plugins/src_managed/
project/plugins/target/
python/lib/pyspark.zip
python/deps
python/test_coverage/coverage_data
python/test_coverage/htmlcov
python/pyspark/python
reports/
scalastyle-on-compile.generated.xml
Expand Down
3 changes: 2 additions & 1 deletion R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -2090,7 +2090,8 @@ setMethod("selectExpr",
#'
#' @param x a SparkDataFrame.
#' @param colName a column name.
#' @param col a Column expression, or an atomic vector in the length of 1 as literal value.
#' @param col a Column expression (which must refer only to this SparkDataFrame), or an atomic
#' vector in the length of 1 as literal value.
#' @return A SparkDataFrame with the new column added or the existing column replaced.
#' @family SparkDataFrame functions
#' @aliases withColumn,SparkDataFrame,character-method
Expand Down
4 changes: 3 additions & 1 deletion R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -1026,7 +1026,9 @@ setMethod("last_day",
})

#' @details
#' \code{length}: Computes the length of a given string or binary column.
#' \code{length}: Computes the character length of a string data or number of bytes
#' of a binary data. The length of string data includes the trailing spaces.
#' The length of binary data includes binary zeros.
#'
#' @rdname column_string_functions
#' @aliases length length,Column-method
Expand Down
15 changes: 14 additions & 1 deletion R/pkg/R/mllib_classification.R
Original file line number Diff line number Diff line change
Expand Up @@ -279,11 +279,24 @@ function(object, path, overwrite = FALSE) {
#' savedModel <- read.ml(path)
#' summary(savedModel)
#'
#' # multinomial logistic regression
#' # binary logistic regression against two classes with
#' # upperBoundsOnCoefficients and upperBoundsOnIntercepts
#' ubc <- matrix(c(1.0, 0.0, 1.0, 0.0), nrow = 1, ncol = 4)
#' model <- spark.logit(training, Species ~ .,
#' upperBoundsOnCoefficients = ubc,
#' upperBoundsOnIntercepts = 1.0)
#'
#' # multinomial logistic regression
#' model <- spark.logit(training, Class ~ ., regParam = 0.5)
#' summary <- summary(model)
#'
#' # multinomial logistic regression with
#' # lowerBoundsOnCoefficients and lowerBoundsOnIntercepts
#' lbc <- matrix(c(0.0, -1.0, 0.0, -1.0, 0.0, -1.0, 0.0, -1.0), nrow = 2, ncol = 4)
#' lbi <- as.array(c(0.0, 0.0))
#' model <- spark.logit(training, Species ~ ., family = "multinomial",
#' lowerBoundsOnCoefficients = lbc,
#' lowerBoundsOnIntercepts = lbi)
#' }
#' @note spark.logit since 2.1.0
setMethod("spark.logit", signature(data = "SparkDataFrame", formula = "formula"),
Expand Down
11 changes: 6 additions & 5 deletions R/pkg/R/serialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,17 @@
# POSIXct,POSIXlt -> Time
#
# list[T] -> Array[T], where T is one of above mentioned types
# Multi-element vector of any of the above (except raw) -> Array[T]
# environment -> Map[String, T], where T is a native type
# jobj -> Object, where jobj is an object created in the backend
# nolint end

getSerdeType <- function(object) {
type <- class(object)[[1]]
if (type != "list") {
type
if (is.atomic(object) & !is.raw(object) & length(object) > 1) {
"array"
} else if (type != "list") {
type
} else {
# Check if all elements are of same type
elemType <- unique(sapply(object, function(elem) { getSerdeType(elem) }))
Expand All @@ -50,9 +53,7 @@ getSerdeType <- function(object) {
}

writeObject <- function(con, object, writeType = TRUE) {
# NOTE: In R vectors have same type as objects. So we don't support
# passing in vectors as arrays and instead require arrays to be passed
# as lists.
# NOTE: In R vectors have same type as objects
type <- class(object)[[1]] # class of POSIXlt is c("POSIXlt", "POSIXt")
# Checking types is needed here, since 'is.na' only handles atomic vectors,
# lists and pairlists
Expand Down
47 changes: 47 additions & 0 deletions R/pkg/tests/fulltests/test_Serde.R
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,53 @@ test_that("SerDe of primitive types", {
expect_equal(class(x), "character")
})

test_that("SerDe of multi-element primitive vectors inside R data.frame", {
# vector of integers embedded in R data.frame
indices <- 1L:3L
myDf <- data.frame(indices)
myDf$data <- list(rep(0L, 3L))
mySparkDf <- as.DataFrame(myDf)
myResultingDf <- collect(mySparkDf)
myDfListedData <- data.frame(indices)
myDfListedData$data <- list(as.list(rep(0L, 3L)))
expect_equal(myResultingDf, myDfListedData)
expect_equal(class(myResultingDf[["data"]][[1]]), "list")
expect_equal(class(myResultingDf[["data"]][[1]][[1]]), "integer")

# vector of numeric embedded in R data.frame
myDf <- data.frame(indices)
myDf$data <- list(rep(0, 3L))
mySparkDf <- as.DataFrame(myDf)
myResultingDf <- collect(mySparkDf)
myDfListedData <- data.frame(indices)
myDfListedData$data <- list(as.list(rep(0, 3L)))
expect_equal(myResultingDf, myDfListedData)
expect_equal(class(myResultingDf[["data"]][[1]]), "list")
expect_equal(class(myResultingDf[["data"]][[1]][[1]]), "numeric")

# vector of logical embedded in R data.frame
myDf <- data.frame(indices)
myDf$data <- list(rep(TRUE, 3L))
mySparkDf <- as.DataFrame(myDf)
myResultingDf <- collect(mySparkDf)
myDfListedData <- data.frame(indices)
myDfListedData$data <- list(as.list(rep(TRUE, 3L)))
expect_equal(myResultingDf, myDfListedData)
expect_equal(class(myResultingDf[["data"]][[1]]), "list")
expect_equal(class(myResultingDf[["data"]][[1]][[1]]), "logical")

# vector of character embedded in R data.frame
myDf <- data.frame(indices)
myDf$data <- list(rep("abc", 3L))
mySparkDf <- as.DataFrame(myDf)
myResultingDf <- collect(mySparkDf)
myDfListedData <- data.frame(indices)
myDfListedData$data <- list(as.list(rep("abc", 3L)))
expect_equal(myResultingDf, myDfListedData)
expect_equal(class(myResultingDf[["data"]][[1]]), "list")
expect_equal(class(myResultingDf[["data"]][[1]][[1]]), "character")
})

test_that("SerDe of list of primitive types", {
x <- list(1L, 2L, 3L)
y <- callJStatic("SparkRHandler", "echo", x)
Expand Down
10 changes: 5 additions & 5 deletions R/pkg/tests/fulltests/test_mllib_classification.R
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ test_that("spark.logit", {
# Petal.Width 0.42122607
# nolint end

# Test multinomial logistic regression againt three classes
# Test multinomial logistic regression against three classes
df <- suppressWarnings(createDataFrame(iris))
model <- spark.logit(df, Species ~ ., regParam = 0.5)
summary <- summary(model)
Expand Down Expand Up @@ -196,7 +196,7 @@ test_that("spark.logit", {
#
# nolint end

# Test multinomial logistic regression againt two classes
# Test multinomial logistic regression against two classes
df <- suppressWarnings(createDataFrame(iris))
training <- df[df$Species %in% c("versicolor", "virginica"), ]
model <- spark.logit(training, Species ~ ., regParam = 0.5, family = "multinomial")
Expand All @@ -208,7 +208,7 @@ test_that("spark.logit", {
expect_true(all(abs(versicolorCoefsR - versicolorCoefs) < 0.1))
expect_true(all(abs(virginicaCoefsR - virginicaCoefs) < 0.1))

# Test binomial logistic regression againt two classes
# Test binomial logistic regression against two classes
model <- spark.logit(training, Species ~ ., regParam = 0.5)
summary <- summary(model)
coefsR <- c(-6.08, 0.25, 0.16, 0.48, 1.04)
Expand Down Expand Up @@ -239,7 +239,7 @@ test_that("spark.logit", {
prediction2 <- collect(select(predict(model2, df2), "prediction"))
expect_equal(sort(prediction2$prediction), c("0.0", "0.0", "0.0", "0.0", "0.0"))

# Test binomial logistic regression againt two classes with upperBoundsOnCoefficients
# Test binomial logistic regression against two classes with upperBoundsOnCoefficients
# and upperBoundsOnIntercepts
u <- matrix(c(1.0, 0.0, 1.0, 0.0), nrow = 1, ncol = 4)
model <- spark.logit(training, Species ~ ., upperBoundsOnCoefficients = u,
Expand All @@ -252,7 +252,7 @@ test_that("spark.logit", {
expect_error(spark.logit(training, Species ~ ., upperBoundsOnCoefficients = as.array(c(1, 2)),
upperBoundsOnIntercepts = 1.0))

# Test binomial logistic regression againt two classes with lowerBoundsOnCoefficients
# Test binomial logistic regression against two classes with lowerBoundsOnCoefficients
# and lowerBoundsOnIntercepts
l <- matrix(c(0.0, -1.0, 0.0, -1.0), nrow = 1, ncol = 4)
model <- spark.logit(training, Species ~ ., lowerBoundsOnCoefficients = l,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,9 @@ private class DownloadCallback implements StreamCallback {

@Override
public void onData(String streamId, ByteBuffer buf) throws IOException {
channel.write(buf);
while (buf.hasRemaining()) {
channel.write(buf);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,12 @@ private boolean shouldPool(long size) {

@Override
public MemoryBlock allocate(long size) throws OutOfMemoryError {
if (shouldPool(size)) {
int numWords = (int) ((size + 7) / 8);
long alignedSize = numWords * 8L;
assert (alignedSize >= size);
if (shouldPool(alignedSize)) {
synchronized (this) {
final LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(size);
final LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
if (pool != null) {
while (!pool.isEmpty()) {
final WeakReference<long[]> arrayReference = pool.pop();
Expand All @@ -62,11 +65,11 @@ public MemoryBlock allocate(long size) throws OutOfMemoryError {
return memory;
}
}
bufferPoolsBySize.remove(size);
bufferPoolsBySize.remove(alignedSize);
}
}
}
long[] array = new long[(int) ((size + 7) / 8)];
long[] array = new long[numWords];
MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
Expand Down Expand Up @@ -98,12 +101,13 @@ public void free(MemoryBlock memory) {
long[] array = (long[]) memory.obj;
memory.setObjAndOffset(null, 0);

if (shouldPool(size)) {
long alignedSize = ((size + 7) / 8) * 8;
if (shouldPool(alignedSize)) {
synchronized (this) {
LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(size);
LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
if (pool == null) {
pool = new LinkedList<>();
bufferPoolsBySize.put(size, pool);
bufferPoolsBySize.put(alignedSize, pool);
}
pool.add(new WeakReference<>(array));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.unsafe;

import org.apache.spark.unsafe.memory.HeapMemoryAllocator;
import org.apache.spark.unsafe.memory.MemoryAllocator;
import org.apache.spark.unsafe.memory.MemoryBlock;

Expand Down Expand Up @@ -134,4 +135,25 @@ public void memoryDebugFillEnabledInTest() {
MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
MemoryAllocator.UNSAFE.free(offheap);
}

@Test
public void heapMemoryReuse() {
MemoryAllocator heapMem = new HeapMemoryAllocator();
// The size is less than `HeapMemoryAllocator.POOLING_THRESHOLD_BYTES`,allocate new memory every time.
MemoryBlock onheap1 = heapMem.allocate(513);
Object obj1 = onheap1.getBaseObject();
heapMem.free(onheap1);
MemoryBlock onheap2 = heapMem.allocate(514);
Assert.assertNotEquals(obj1, onheap2.getBaseObject());

// The size is greater than `HeapMemoryAllocator.POOLING_THRESHOLD_BYTES`,
// reuse the previous memory which has released.
MemoryBlock onheap3 = heapMem.allocate(1024 * 1024 + 1);
Assert.assertEquals(onheap3.size(), 1024 * 1024 + 1);
Object obj3 = onheap3.getBaseObject();
heapMem.free(onheap3);
MemoryBlock onheap4 = heapMem.allocate(1024 * 1024 + 7);
Assert.assertEquals(onheap4.size(), 1024 * 1024 + 7);
Assert.assertEquals(obj3, onheap4.getBaseObject());
}
}
12 changes: 12 additions & 0 deletions core/src/main/java/org/apache/spark/SparkFirehoseListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,18 @@ public final void onExecutorBlacklisted(SparkListenerExecutorBlacklisted executo
onEvent(executorBlacklisted);
}

@Override
public void onExecutorBlacklistedForStage(
SparkListenerExecutorBlacklistedForStage executorBlacklistedForStage) {
onEvent(executorBlacklistedForStage);
}

@Override
public void onNodeBlacklistedForStage(
SparkListenerNodeBlacklistedForStage nodeBlacklistedForStage) {
onEvent(nodeBlacklistedForStage);
}

@Override
public final void onExecutorUnblacklisted(
SparkListenerExecutorUnblacklisted executorUnblacklisted) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,7 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
currentEntry = sortedConsumers.lastEntry();
}
List<MemoryConsumer> cList = currentEntry.getValue();
MemoryConsumer c = cList.remove(cList.size() - 1);
if (cList.isEmpty()) {
sortedConsumers.remove(currentEntry.getKey());
}
MemoryConsumer c = cList.get(cList.size() - 1);
try {
long released = c.spill(required - got, consumer);
if (released > 0) {
Expand All @@ -185,6 +182,11 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
if (got >= required) {
break;
}
} else {
cList.remove(cList.size() - 1);
if (cList.isEmpty()) {
sortedConsumers.remove(currentEntry.getKey());
}
}
} catch (ClosedByInterruptException e) {
// This called by user to kill a task (e.g: speculative task).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public abstract class RecordComparator {
public abstract int compare(
Object leftBaseObject,
long leftBaseOffset,
int leftBaseLength,
Object rightBaseObject,
long rightBaseOffset);
long rightBaseOffset,
int rightBaseLength);
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,13 @@ public int compare(RecordPointerAndKeyPrefix r1, RecordPointerAndKeyPrefix r2) {
int uaoSize = UnsafeAlignedOffset.getUaoSize();
if (prefixComparisonResult == 0) {
final Object baseObject1 = memoryManager.getPage(r1.recordPointer);
// skip length
final long baseOffset1 = memoryManager.getOffsetInPage(r1.recordPointer) + uaoSize;
final int baseLength1 = UnsafeAlignedOffset.getSize(baseObject1, baseOffset1 - uaoSize);
final Object baseObject2 = memoryManager.getPage(r2.recordPointer);
// skip length
final long baseOffset2 = memoryManager.getOffsetInPage(r2.recordPointer) + uaoSize;
return recordComparator.compare(baseObject1, baseOffset1, baseObject2, baseOffset2);
final int baseLength2 = UnsafeAlignedOffset.getSize(baseObject2, baseOffset2 - uaoSize);
return recordComparator.compare(baseObject1, baseOffset1, baseLength1, baseObject2,
baseOffset2, baseLength2);
} else {
return prefixComparisonResult;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ final class UnsafeSorterSpillMerger {
prefixComparator.compare(left.getKeyPrefix(), right.getKeyPrefix());
if (prefixComparisonResult == 0) {
return recordComparator.compare(
left.getBaseObject(), left.getBaseOffset(),
right.getBaseObject(), right.getBaseOffset());
left.getBaseObject(), left.getBaseOffset(), left.getRecordLength(),
right.getBaseObject(), right.getBaseOffset(), right.getRecordLength());
} else {
return prefixComparisonResult;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,10 @@ public UnsafeSorterSpillReader(
SparkEnv.get() == null ? 0.5 :
SparkEnv.get().conf().getDouble("spark.unsafe.sorter.spill.read.ahead.fraction", 0.5);

// SPARK-23310: Disable read-ahead input stream, because it is causing lock contention and perf
// regression for TPC-DS queries.
final boolean readAheadEnabled = SparkEnv.get() != null &&
SparkEnv.get().conf().getBoolean("spark.unsafe.sorter.spill.read.ahead.enabled", true);
SparkEnv.get().conf().getBoolean("spark.unsafe.sorter.spill.read.ahead.enabled", false);

final InputStream bs =
new NioBufferedFileInputStream(file, (int) bufferSizeBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,12 @@ $(document).ready(function() {
requestedIncomplete = getParameterByName("showIncomplete", searchString);
requestedIncomplete = (requestedIncomplete == "true" ? true : false);

$.getJSON("api/v1/applications?limit=" + appLimit, function(response,status,jqXHR) {
appParams = {
limit: appLimit,
status: (requestedIncomplete ? "running" : "completed")
};

$.getJSON("api/v1/applications", appParams, function(response,status,jqXHR) {
var array = [];
var hasMultipleAttempts = false;
for (i in response) {
Expand Down
Loading