Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

latest apache spark #14

Merged
merged 88 commits into from
Jun 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
88 commits
Select commit Hold shift + click to select a range
2c9c862
[MINOR][YARN] Add YARN-specific credential providers in debug logging…
HyukjinKwon Jun 1, 2018
cbaa729
[SPARK-24330][SQL] Refactor ExecuteWriteTask and Use `while` in writi…
gengliangwang Jun 1, 2018
b2d0226
[SPARK-24444][DOCS][PYTHON] Improve Pandas UDF docs to explain column…
BryanCutler Jun 1, 2018
22df953
[SPARK-24326][MESOS] add support for local:// scheme for the app jar
Jun 1, 2018
98909c3
[SPARK-23920][SQL] add array_remove to remove all elements that equal…
huaxingao Jun 1, 2018
6039b13
[SPARK-24351][SS] offsetLog/commitLog purge thresholdBatchId should b…
ivoson Jun 1, 2018
d2c3de7
Revert "[SPARK-24369][SQL] Correct handling for multiple distinct agg…
gatorsmile Jun 1, 2018
09e78c1
[INFRA] Close stale PRs.
Jun 1, 2018
8ef167a
[SPARK-24340][CORE] Clean up non-shuffle disk block manager files fol…
jiangxb1987 Jun 1, 2018
a36c1a6
[SPARK-23668][K8S] Added missing config property in running-on-kubern…
liyinan926 Jun 2, 2018
de4feae
[SPARK-24356][CORE] Duplicate strings in File.path managed by FileSeg…
misha-cloudera Jun 3, 2018
a2166ec
[SPARK-24455][CORE] fix typo in TaskSchedulerImpl comment
Jun 4, 2018
416cd1f
[SPARK-24369][SQL] Correct handling for multiple distinct aggregation…
cloud-fan Jun 4, 2018
1d9338b
[SPARK-23786][SQL] Checking column names of csv headers
MaxGekk Jun 4, 2018
0be5aa2
[SPARK-23903][SQL] Add support for date extract
wangyum Jun 4, 2018
7297ae0
[SPARK-21896][SQL] Fix StackOverflow caused by window functions insid…
Jun 4, 2018
b24d3db
[SPARK-24290][ML] add support for Array input for instrumentation.log…
lu-wang-dl Jun 4, 2018
ff0501b
[SPARK-24300][ML] change the way to set seed in ml.cluster.LDASuite.g…
lu-wang-dl Jun 4, 2018
dbb4d83
[SPARK-24215][PYSPARK] Implement _repr_html_ for dataframes in PySpark
xuanyuanking Jun 5, 2018
b3417b7
[SPARK-16451][REPL] Fail shell if SparkSession fails to start.
Jun 5, 2018
e8c1a0c
[SPARK-15784] Add Power Iteration Clustering to spark.ml
WeichenXu123 Jun 5, 2018
2c2a86b
[SPARK-24453][SS] Fix error recovering from the failure in a no-data …
tdas Jun 5, 2018
93df3cd
[SPARK-22384][SQL] Refine partition pruning when attribute is wrapped…
Jun 5, 2018
e9efb62
[SPARK-24187][R][SQL] Add array_join function to SparkR
huaxingao Jun 6, 2018
e76b012
[SPARK-23803][SQL] Support bucket pruning
Jun 6, 2018
1462bba
[SPARK-24119][SQL] Add interpreted execution to SortPrefix expression
bersprockets Jun 8, 2018
2c10020
[SPARK-24224][ML-EXAMPLES] Java example code for Power Iteration Clus…
shahidki31 Jun 8, 2018
a5d775a
[SPARK-24191][ML] Scala Example code for Power Iteration Clustering
shahidki31 Jun 8, 2018
173fe45
[SPARK-24477][SPARK-24454][ML][PYTHON] Imports submodule in ml/__init…
HyukjinKwon Jun 8, 2018
1a644af
[SPARK-23984][K8S] Initial Python Bindings for PySpark on K8s
ifilonenko Jun 8, 2018
b070ded
[SPARK-17756][PYTHON][STREAMING] Workaround to avoid return type mism…
HyukjinKwon Jun 8, 2018
f433ef7
[SPARK-23010][K8S] Initial checkin of k8s integration tests.
ssuchter Jun 8, 2018
36a3409
[SPARK-24412][SQL] Adding docs about automagical type casting in `isi…
raptond Jun 9, 2018
f07c506
[SPARK-24468][SQL] Handle negative scale when adjusting precision for…
mgaido91 Jun 9, 2018
3e5b4ae
[SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop iteration wrapping from…
e-dorigatti Jun 11, 2018
a99d284
[SPARK-19826][ML][PYTHON] add spark.ml Python API for PIC
huaxingao Jun 11, 2018
9b6f242
[MINOR][CORE] Log committer class used by HadoopMapRedCommitProtocol
ejono Jun 11, 2018
2dc047a
[SPARK-24520] Double braces in documentations
Jun 11, 2018
f5af86e
[SPARK-24134][DOCS] A missing full-stop in doc "Tuning Spark".
XD-DENG Jun 11, 2018
0481977
[SPARK-22144][SQL] ExchangeCoordinator combine the partitions of an 0…
liutang123 Jun 12, 2018
dc22465
[SPARK-23732][DOCS] Fix source links in generated scaladoc.
Jun 12, 2018
01452ea
[SPARK-24502][SQL] flaky test: UnsafeRowSerializerSuite
cloud-fan Jun 12, 2018
1d7db65
docs: fix typo
tomsaleeba Jun 12, 2018
5d6a53d
[SPARK-15064][ML] Locale support in StopWordsRemover
dongjinleekr Jun 12, 2018
2824f14
[SPARK-24531][TESTS] Remove version 2.2.0 from testing versions in Hi…
mgaido91 Jun 12, 2018
3af1d3e
[SPARK-24416] Fix configuration specification for killBlacklisted exe…
Jun 12, 2018
f0ef1b3
[SPARK-23931][SQL] Adds arrays_zip function to sparksql
DylanGuedes Jun 12, 2018
cc88d7f
[SPARK-24216][SQL] Spark TypedAggregateExpression uses getSimpleName …
Jun 12, 2018
ada28f2
[SPARK-23933][SQL] Add map_from_arrays function
kiszk Jun 12, 2018
0d3714d
[SPARK-23010][BUILD][FOLLOWUP] Fix java checkstyle failure of kuberne…
jiangxb1987 Jun 12, 2018
f53818d
[SPARK-24506][UI] Add UI filters to tabs added after binding
mgaido91 Jun 12, 2018
9786ce6
[SPARK-22239][SQL][PYTHON] Enable grouped aggregate pandas UDFs as wi…
icexelloss Jun 13, 2018
3352d6f
[SPARK-24466][SS] Fix TextSocketMicroBatchReader to be compatible wit…
HeartSaVioR Jun 13, 2018
4c388bc
[SPARK-24485][SS] Measure and log elapsed time for filesystem operati…
HeartSaVioR Jun 13, 2018
7703b46
[SPARK-24479][SS] Added config for registering streamingQueryListeners
arunmahadevan Jun 13, 2018
299d297
[SPARK-24500][SQL] Make sure streams are materialized during Tree tra…
hvanhovell Jun 13, 2018
1b46f41
[SPARK-24235][SS] Implement continuous shuffle writer for single read…
jose-torres Jun 13, 2018
3bf7691
[SPARK-24531][TESTS] Replace 2.3.0 version with 2.3.1
mgaido91 Jun 13, 2018
534065e
[MINOR][CORE][TEST] Remove unnecessary sort in UnsafeInMemorySorterSuite
jiangxb1987 Jun 14, 2018
fdadc4b
[SPARK-24495][SQL] EnsureRequirement returns wrong plan when reorderi…
mgaido91 Jun 14, 2018
d3eed8f
[SPARK-24563][PYTHON] Catch TypeError when testing existence of HiveC…
icexelloss Jun 14, 2018
b8f27ae
[SPARK-24543][SQL] Support any type as DDL string for from_json's schema
MaxGekk Jun 14, 2018
18cb0c0
[SPARK-24319][SPARK SUBMIT] Fix spark-submit execution where no main …
gaborgsomogyi Jun 14, 2018
270a9a3
[SPARK-24248][K8S] Use level triggering and state reconciliation in s…
mccheah Jun 14, 2018
22daeba
[SPARK-24478][SQL] Move projection and filter push down to physical c…
rdblue Jun 15, 2018
6567fc4
[PYTHON] Fix typo in serializer exception
rberenguel Jun 15, 2018
495d8cf
[SPARK-24490][WEBUI] Use WebUI.addStaticHandler in web UIs
jaceklaskowski Jun 15, 2018
b5ccf0d
[SPARK-24396][SS][PYSPARK] Add Structured Streaming ForeachWriter for…
tdas Jun 15, 2018
90da7dc
[SPARK-24452][SQL][CORE] Avoid possible overflow in int add or multiple
kiszk Jun 15, 2018
e4fee39
[SPARK-24525][SS] Provide an option to limit number of rows in a Memo…
mukulmurthy Jun 15, 2018
c7c0b08
add one supported type missing from the javadoc
Jun 16, 2018
b0a9352
[SPARK-24573][INFRA] Runs SBT checkstyle after the build to work arou…
HyukjinKwon Jun 18, 2018
e219e69
[SPARK-23772][SQL] Provide an option to ignore column of all null val…
maropu Jun 18, 2018
bce1775
[SPARK-24526][BUILD][TEST-MAVEN] Spaces in the build dir causes failu…
Jun 18, 2018
8f225e0
[SPARK-24548][SQL] Fix incorrect schema of Dataset with tuple encoders
viirya Jun 18, 2018
1737d45
[SPARK-24478][SQL][FOLLOWUP] Move projection and filter push down to …
cloud-fan Jun 19, 2018
9a75c18
[SPARK-24542][SQL] UDF series UDFXPathXXXX allow users to pass carefu…
gatorsmile Jun 19, 2018
a78a904
[SPARK-24521][SQL][TEST] Fix ineffective test in CachedTableSuite
icexelloss Jun 19, 2018
9dbe53e
[SPARK-24556][SQL] Always rewrite output partitioning in ReusedExchan…
yucai Jun 19, 2018
13092d7
[SPARK-24534][K8S] Bypass non spark-on-k8s commands
rimolive Jun 19, 2018
2cb9763
[SPARK-24565][SS] Add API for in Structured Streaming for exposing ou…
tdas Jun 19, 2018
bc0498d
[SPARK-24583][SQL] Wrong schema type in InsertIntoDataSourceCommand
maryannxue Jun 19, 2018
bc11146
[SPARK-23778][CORE] Avoid unneeded shuffle when union gets an empty RDD
mgaido91 Jun 20, 2018
c8ef923
[MINOR][SQL] Remove invalid comment from SparkStrategies
HeartSaVioR Jun 20, 2018
c5a0d11
[SPARK-24575][SQL] Prohibit window expressions inside WHERE and HAVIN…
Jun 20, 2018
3f4bda7
[SPARK-24578][CORE] Cap sub-region's size of returned nio buffer
WenboZhao Jun 20, 2018
15747cf
[SPARK-24547][K8S] Allow for building spark on k8s docker images with…
Jun 21, 2018
9de11d3
[SPARK-23912][SQL] add array_distinct
huaxingao Jun 21, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.

(BSD 3 Clause) netlib core (com.github.fommil.netlib:core:1.1.2 - https://github.com/fommil/netlib-java/core)
(BSD 3 Clause) JPMML-Model (org.jpmml:pmml-model:1.2.7 - https://github.com/jpmml/jpmml-model)
(BSD 3 Clause) jmock (org.jmock:jmock-junit4:2.8.4 - http://jmock.org/)
(BSD License) AntLR Parser Generator (antlr:antlr:2.7.7 - http://www.antlr.org/)
(BSD License) ANTLR 4.5.2-1 (org.antlr:antlr4:4.5.2-1 - http://wwww.antlr.org/)
(BSD licence) ANTLR ST4 4.0.4 (org.antlr:ST4:4.0.4 - http://www.stringtemplate.org)
Expand Down
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ exportMethods("%<=>%",
"approxCountDistinct",
"approxQuantile",
"array_contains",
"array_join",
"array_max",
"array_min",
"array_position",
Expand Down
29 changes: 26 additions & 3 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,9 @@ NULL
#' head(select(tmp3, element_at(tmp3$v3, "Valiant")))
#' tmp4 <- mutate(df, v4 = create_array(df$mpg, df$cyl), v5 = create_array(df$cyl, df$hp))
#' head(select(tmp4, concat(tmp4$v4, tmp4$v5), arrays_overlap(tmp4$v4, tmp4$v5)))
#' head(select(tmp, concat(df$mpg, df$cyl, df$hp)))}
#' head(select(tmp, concat(df$mpg, df$cyl, df$hp)))
#' tmp5 <- mutate(df, v6 = create_array(df$model, df$model))
#' head(select(tmp5, array_join(tmp5$v6, "#"), array_join(tmp5$v6, "#", "NULL")))}
NULL

#' Window functions for Column operations
Expand Down Expand Up @@ -3006,6 +3008,27 @@ setMethod("array_contains",
column(jc)
})

#' @details
#' \code{array_join}: Concatenates the elements of column using the delimiter.
#' Null values are replaced with nullReplacement if set, otherwise they are ignored.
#'
#' @param delimiter a character string that is used to concatenate the elements of column.
#' @param nullReplacement an optional character string that is used to replace the Null values.
#' @rdname column_collection_functions
#' @aliases array_join array_join,Column-method
#' @note array_join since 2.4.0
setMethod("array_join",
signature(x = "Column", delimiter = "character"),
function(x, delimiter, nullReplacement = NULL) {
jc <- if (is.null(nullReplacement)) {
callJStatic("org.apache.spark.sql.functions", "array_join", x@jc, delimiter)
} else {
callJStatic("org.apache.spark.sql.functions", "array_join", x@jc, delimiter,
as.character(nullReplacement))
}
column(jc)
})

#' @details
#' \code{array_max}: Returns the maximum value of the array.
#'
Expand Down Expand Up @@ -3197,8 +3220,8 @@ setMethod("size",
#' (or starting from the end if start is negative) with the specified length.
#'
#' @rdname column_collection_functions
#' @param start an index indicating the first element occuring in the result.
#' @param length a number of consecutive elements choosen to the result.
#' @param start an index indicating the first element occurring in the result.
#' @param length a number of consecutive elements chosen to the result.
#' @aliases slice slice,Column-method
#' @note slice since 2.4.0
setMethod("slice",
Expand Down
4 changes: 4 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,10 @@ setGeneric("approxCountDistinct", function(x, ...) { standardGeneric("approxCoun
#' @name NULL
setGeneric("array_contains", function(x, value) { standardGeneric("array_contains") })

#' @rdname column_collection_functions
#' @name NULL
setGeneric("array_join", function(x, delimiter, ...) { standardGeneric("array_join") })

#' @rdname column_collection_functions
#' @name NULL
setGeneric("array_max", function(x) { standardGeneric("array_max") })
Expand Down
15 changes: 15 additions & 0 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1518,6 +1518,21 @@ test_that("column functions", {
result <- collect(select(df, arrays_overlap(df[[1]], df[[2]])))[[1]]
expect_equal(result, c(TRUE, FALSE, NA))

# Test array_join()
df <- createDataFrame(list(list(list("Hello", "World!"))))
result <- collect(select(df, array_join(df[[1]], "#")))[[1]]
expect_equal(result, "Hello#World!")
df2 <- createDataFrame(list(list(list("Hello", NA, "World!"))))
result <- collect(select(df2, array_join(df2[[1]], "#", "Beautiful")))[[1]]
expect_equal(result, "Hello#Beautiful#World!")
result <- collect(select(df2, array_join(df2[[1]], "#")))[[1]]
expect_equal(result, "Hello#World!")
df3 <- createDataFrame(list(list(list("Hello", NULL, "World!"))))
result <- collect(select(df3, array_join(df3[[1]], "#", "Beautiful")))[[1]]
expect_equal(result, "Hello#Beautiful#World!")
result <- collect(select(df3, array_join(df3[[1]], "#")))[[1]]
expect_equal(result, "Hello#World!")

# Test array_sort() and sort_array()
df <- createDataFrame(list(list(list(2L, 1L, 3L, NA)), list(list(NA, 6L, 5L, NA, 4L))))

Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ can be run using:
Please see the guidance on how to
[run tests for a module, or individual tests](http://spark.apache.org/developer-tools.html#individual-tests).

There is also a Kubernetes integration test, see resource-managers/kubernetes/integration-tests/README.md

## A Note About Hadoop Versions

Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported
Expand Down
33 changes: 24 additions & 9 deletions bin/docker-image-tool.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,25 @@ function build {
if [ ! -d "$IMG_PATH" ]; then
error "Cannot find docker image. This script must be run from a runnable distribution of Apache Spark."
fi

local DOCKERFILE=${DOCKERFILE:-"$IMG_PATH/spark/Dockerfile"}

docker build "${BUILD_ARGS[@]}" \
local BINDING_BUILD_ARGS=(
--build-arg
base_img=$(image_ref spark)
)
local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/Dockerfile"}
local PYDOCKERFILE=${PYDOCKERFILE:-"$IMG_PATH/spark/bindings/python/Dockerfile"}

docker build $NOCACHEARG "${BUILD_ARGS[@]}" \
-t $(image_ref spark) \
-f "$DOCKERFILE" .
-f "$BASEDOCKERFILE" .

docker build $NOCACHEARG "${BINDING_BUILD_ARGS[@]}" \
-t $(image_ref spark-py) \
-f "$PYDOCKERFILE" .
}

function push {
docker push "$(image_ref spark)"
docker push "$(image_ref spark-py)"
}

function usage {
Expand All @@ -86,10 +95,12 @@ Commands:
push Push a pre-built image to a registry. Requires a repository address to be provided.

Options:
-f file Dockerfile to build. By default builds the Dockerfile shipped with Spark.
-f file Dockerfile to build for JVM based Jobs. By default builds the Dockerfile shipped with Spark.
-p file Dockerfile with Python baked in. By default builds the Dockerfile shipped with Spark.
-r repo Repository address.
-t tag Tag to apply to the built image, or to identify the image to be pushed.
-m Use minikube's Docker daemon.
-n Build docker image with --no-cache

Using minikube when building images will do so directly into minikube's Docker daemon.
There is no need to push the images into minikube in that case, they'll be automatically
Expand All @@ -116,14 +127,18 @@ fi

REPO=
TAG=
DOCKERFILE=
while getopts f:mr:t: option
BASEDOCKERFILE=
PYDOCKERFILE=
NOCACHEARG=
while getopts f:mr:t:n option
do
case "${option}"
in
f) DOCKERFILE=${OPTARG};;
f) BASEDOCKERFILE=${OPTARG};;
p) PYDOCKERFILE=${OPTARG};;
r) REPO=${OPTARG};;
t) TAG=${OPTARG};;
n) NOCACHEARG="--no-cache";;
m)
if ! which minikube 1>/dev/null; then
error "Cannot find minikube."
Expand Down
2 changes: 1 addition & 1 deletion build/mvn
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,4 @@ export MAVEN_OPTS=${MAVEN_OPTS:-"$_COMPILE_JVM_OPTS"}
echo "Using \`mvn\` from path: $MVN_BIN" 1>&2

# Last, call the `mvn` command as usual
${MVN_BIN} -DzincPort=${ZINC_PORT} "$@"
"${MVN_BIN}" -DzincPort=${ZINC_PORT} "$@"
Original file line number Diff line number Diff line change
Expand Up @@ -137,30 +137,15 @@ protected void deallocate() {
}

private int copyByteBuf(ByteBuf buf, WritableByteChannel target) throws IOException {
ByteBuffer buffer = buf.nioBuffer();
int written = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
target.write(buffer) : writeNioBuffer(target, buffer);
// SPARK-24578: cap the sub-region's size of returned nio buffer to improve the performance
// for the case that the passed-in buffer has too many components.
int length = Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT);
ByteBuffer buffer = buf.nioBuffer(buf.readerIndex(), length);
int written = target.write(buffer);
buf.skipBytes(written);
return written;
}

private int writeNioBuffer(
WritableByteChannel writeCh,
ByteBuffer buf) throws IOException {
int originalLimit = buf.limit();
int ret = 0;

try {
int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
buf.limit(buf.position() + ioSize);
ret = writeCh.write(buf);
} finally {
buf.limit(originalLimit);
}

return ret;
}

@Override
public MessageWithHeader touch(Object o) {
super.touch(o);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@

package org.apache.spark.network.util;

import java.io.Closeable;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -91,11 +88,24 @@ public static String bytesToString(ByteBuffer b) {
* @throws IOException if deletion is unsuccessful
*/
public static void deleteRecursively(File file) throws IOException {
deleteRecursively(file, null);
}

/**
* Delete a file or directory and its contents recursively.
* Don't follow directories if they are symlinks.
*
* @param file Input file / dir to be deleted
* @param filter A filename filter that make sure only files / dirs with the satisfied filenames
* are deleted.
* @throws IOException if deletion is unsuccessful
*/
public static void deleteRecursively(File file, FilenameFilter filter) throws IOException {
if (file == null) { return; }

// On Unix systems, use operating system command to run faster
// If that does not work out, fallback to the Java IO way
if (SystemUtils.IS_OS_UNIX) {
if (SystemUtils.IS_OS_UNIX && filter == null) {
try {
deleteRecursivelyUsingUnixNative(file);
return;
Expand All @@ -105,15 +115,17 @@ public static void deleteRecursively(File file) throws IOException {
}
}

deleteRecursivelyUsingJavaIO(file);
deleteRecursivelyUsingJavaIO(file, filter);
}

private static void deleteRecursivelyUsingJavaIO(File file) throws IOException {
private static void deleteRecursivelyUsingJavaIO(
File file,
FilenameFilter filter) throws IOException {
if (file.isDirectory() && !isSymlink(file)) {
IOException savedIOException = null;
for (File child : listFilesSafely(file)) {
for (File child : listFilesSafely(file, filter)) {
try {
deleteRecursively(child);
deleteRecursively(child, filter);
} catch (IOException e) {
// In case of multiple exceptions, only last one will be thrown
savedIOException = e;
Expand All @@ -124,10 +136,13 @@ private static void deleteRecursivelyUsingJavaIO(File file) throws IOException {
}
}

boolean deleted = file.delete();
// Delete can also fail if the file simply did not exist.
if (!deleted && file.exists()) {
throw new IOException("Failed to delete: " + file.getAbsolutePath());
// Delete file only when it's a normal file or an empty directory.
if (file.isFile() || (file.isDirectory() && listFilesSafely(file, null).length == 0)) {
boolean deleted = file.delete();
// Delete can also fail if the file simply did not exist.
if (!deleted && file.exists()) {
throw new IOException("Failed to delete: " + file.getAbsolutePath());
}
}
}

Expand Down Expand Up @@ -157,9 +172,9 @@ private static void deleteRecursivelyUsingUnixNative(File file) throws IOExcepti
}
}

private static File[] listFilesSafely(File file) throws IOException {
private static File[] listFilesSafely(File file, FilenameFilter filter) throws IOException {
if (file.exists()) {
File[] files = file.listFiles();
File[] files = file.listFiles(filter);
if (files == null) {
throw new IOException("Failed to list files for dir: " + file);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,13 @@ public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
blockManager.applicationRemoved(appId, cleanupLocalDirs);
}

/**
* Clean up any non-shuffle files in any local directories associated with an finished executor.
*/
public void executorRemoved(String executorId, String appId) {
blockManager.executorRemoved(executorId, appId);
}

/**
* Register an (application, executor) with the given shuffle info.
*
Expand Down
Loading