Skip to content

Commit

Permalink
[SPARK-28912][BRANCH-2.4] Fixed MatchError in getCheckpointFiles()
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This change fixes issue SPARK-28912.

### Why are the changes needed?

If checkpoint directory is set to name which matches regex pattern used for checkpoint files then logs are flooded with MatchError exceptions and old checkpoint files are not removed.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Manually.

1. Start Hadoop in a pseudo-distributed mode.

2. In another terminal run command  nc -lk 9999

3. In the Spark shell execute the following statements:

    ```scala
    val ssc = new StreamingContext(sc, Seconds(30))
    ssc.checkpoint("hdfs://localhost:9000/checkpoint-01")
    val lines = ssc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
    ```

Closes apache#25719 from avkgh/SPARK-28912-branch-2.4.

Authored-by: avk <nullp7r@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
  • Loading branch information
avkgh authored and Sean Cunniff committed Nov 5, 2020
1 parent fc37846 commit 4549801
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ object Checkpoint extends Logging {
try {
val statuses = fs.listStatus(path)
if (statuses != null) {
val paths = statuses.map(_.getPath)
val filtered = paths.filter(p => REGEX.findFirstIn(p.toString).nonEmpty)
val paths = statuses.filterNot(_.isDirectory).map(_.getPath)
val filtered = paths.filter(p => REGEX.findFirstIn(p.getName).nonEmpty)
filtered.sortWith(sortFunc)
} else {
logWarning(s"Listing $path returned null")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,26 @@ class CheckpointSuite extends TestSuiteBase with LocalStreamingContext with DStr
checkpointWriter.stop()
}

test("SPARK-28912: Fix MatchError in getCheckpointFiles") {
val tempDir = Utils.createTempDir()
try {
val fs = FileSystem.get(tempDir.toURI, new Configuration)
val checkpointDir = tempDir.getAbsolutePath + "/checkpoint-01"

assert(Checkpoint.getCheckpointFiles(checkpointDir, Some(fs)).length === 0)

// Ignore files whose parent path match.
fs.create(new Path(checkpointDir, "this-is-matched-before-due-to-parent-path")).close()
assert(Checkpoint.getCheckpointFiles(checkpointDir, Some(fs)).length === 0)

// Ignore directories whose names match.
fs.mkdirs(new Path(checkpointDir, "checkpoint-1000000000"))
assert(Checkpoint.getCheckpointFiles(checkpointDir, Some(fs)).length === 0)
} finally {
Utils.deleteRecursively(tempDir)
}
}

test("SPARK-6847: stack overflow when updateStateByKey is followed by a checkpointed dstream") {
// In this test, there are two updateStateByKey operators. The RDD DAG is as follows:
//
Expand Down

0 comments on commit 4549801

Please sign in to comment.