Skip to content

Commit

Permalink
Register a TaskCompletionListener to make sure release all resources
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Jul 20, 2015
1 parent 163e3f1 commit 3d574d9
Showing 1 changed file with 14 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.collection.mutable.ArrayBuffer

import com.google.common.io.ByteStreams

import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.{Logging, SparkEnv, TaskContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.serializer.{DeserializationStream, Serializer}
import org.apache.spark.storage.{BlockId, BlockManager}
Expand Down Expand Up @@ -474,11 +474,20 @@ class ExternalAppendOnlyMap[K, V, C](
private def cleanup() {
batchIndex = batchOffsets.length // Prevent reading any other batch
val ds = deserializeStream
deserializeStream = null
fileStream = null
ds.close()
file.delete()
if (ds != null) {
ds.close()
deserializeStream = null
}
if (fileStream != null) {
fileStream.close()
fileStream = null
}
if (file.exists()) {
file.delete()
}
}

TaskContext.get().addTaskCompletionListener(context => cleanup())
}

/** Convenience function to hash the given (K, C) pair by the key. */
Expand Down

0 comments on commit 3d574d9

Please sign in to comment.