Skip to content

Commit

Permalink
[SPARK-6560][CORE] Do not suppress exceptions from writer.write.
Browse files Browse the repository at this point in the history
If there is a failure in the Hadoop backend while calling
writer.write, we should remember this original exception,
and try to call writer.close(), but if that fails as well,
still report the original exception.

Note that, if writer.write fails, it is likely that writer
was left in an invalid state, and so actually makes it more
likely that writer.close will also fail. Which just increases
the chances for writer.write's exception to be suppressed.

This patch introduces an admittedly potentially too cute
Utils.tryWithSafeFinally method to handle the try/finally
gyrations.

Author: Stephen Haberman <stephen@exigencecorp.com>

Closes apache#5223 from stephenh/do_not_suppress_writer_exception and squashes the following commits:

c7ad53f [Stephen Haberman] [SPARK-6560][CORE] Do not suppress exceptions from writer.write.
  • Loading branch information
Stephen Haberman authored and srowen committed Apr 3, 2015
1 parent 82701ee commit b0d884f
Show file tree
Hide file tree
Showing 12 changed files with 118 additions and 62 deletions.
11 changes: 7 additions & 4 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -356,11 +356,14 @@ private[spark] object MapOutputTracker extends Logging {
def serializeMapStatuses(statuses: Array[MapStatus]): Array[Byte] = {
val out = new ByteArrayOutputStream
val objOut = new ObjectOutputStream(new GZIPOutputStream(out))
// Since statuses can be modified in parallel, sync on it
statuses.synchronized {
objOut.writeObject(statuses)
Utils.tryWithSafeFinally {
// Since statuses can be modified in parallel, sync on it
statuses.synchronized {
objOut.writeObject(statuses)
}
} {
objOut.close()
}
objOut.close()
out.toByteArray
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,9 +614,9 @@ private[spark] object PythonRDD extends Logging {
try {
val sock = serverSocket.accept()
val out = new DataOutputStream(new BufferedOutputStream(sock.getOutputStream))
try {
Utils.tryWithSafeFinally {
writeIteratorToStream(items, out)
} finally {
} {
out.close()
}
} catch {
Expand Down Expand Up @@ -862,9 +862,9 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial
val file = File.createTempFile("broadcast", "", dir)
path = file.getAbsolutePath
val out = new FileOutputStream(file)
try {
Utils.tryWithSafeFinally {
Utils.copyStream(in, out)
} finally {
} {
out.close()
}
}
Expand Down
19 changes: 12 additions & 7 deletions core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ private[broadcast] object HttpBroadcast extends Logging {
private def write(id: Long, value: Any) {
val file = getFile(id)
val fileOutputStream = new FileOutputStream(file)
try {
Utils.tryWithSafeFinally {
val out: OutputStream = {
if (compress) {
compressionCodec.compressedOutputStream(fileOutputStream)
Expand All @@ -175,10 +175,13 @@ private[broadcast] object HttpBroadcast extends Logging {
}
val ser = SparkEnv.get.serializer.newInstance()
val serOut = ser.serializeStream(out)
serOut.writeObject(value)
serOut.close()
Utils.tryWithSafeFinally {
serOut.writeObject(value)
} {
serOut.close()
}
files += file
} finally {
} {
fileOutputStream.close()
}
}
Expand Down Expand Up @@ -212,9 +215,11 @@ private[broadcast] object HttpBroadcast extends Logging {
}
val ser = SparkEnv.get.serializer.newInstance()
val serIn = ser.deserializeStream(in)
val obj = serIn.readObject[T]()
serIn.close()
obj
Utils.tryWithSafeFinally {
serIn.readObject[T]()
} {
serIn.close()
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.reflect.ClassTag
import akka.serialization.Serialization

import org.apache.spark.Logging
import org.apache.spark.util.Utils


/**
Expand Down Expand Up @@ -59,9 +60,9 @@ private[master] class FileSystemPersistenceEngine(
val serializer = serialization.findSerializerFor(value)
val serialized = serializer.toBinary(value)
val out = new FileOutputStream(file)
try {
Utils.tryWithSafeFinally {
out.write(serialized)
} finally {
} {
out.close()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import com.fasterxml.jackson.core.JsonProcessingException
import com.google.common.base.Charsets

import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion}
import org.apache.spark.util.Utils

/**
* A client that submits applications to the standalone Master using a REST protocol.
Expand Down Expand Up @@ -148,8 +149,11 @@ private[deploy] class StandaloneRestClient extends Logging {
conn.setRequestProperty("charset", "utf-8")
conn.setDoOutput(true)
val out = new DataOutputStream(conn.getOutputStream)
out.write(json.getBytes(Charsets.UTF_8))
out.close()
Utils.tryWithSafeFinally {
out.write(json.getBytes(Charsets.UTF_8))
} {
out.close()
}
readResponse(conn)
}

Expand Down
8 changes: 6 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils

private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {}

Expand Down Expand Up @@ -112,8 +113,11 @@ private[spark] object CheckpointRDD extends Logging {
}
val serializer = env.serializer.newInstance()
val serializeStream = serializer.serializeStream(fileOutputStream)
serializeStream.writeAll(iterator)
serializeStream.close()
Utils.tryWithSafeFinally {
serializeStream.writeAll(iterator)
} {
serializeStream.close()
}

if (!fs.rename(tempOutputPath, finalOutputPath)) {
if (!fs.exists(finalOutputPath)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -995,7 +995,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
require(writer != null, "Unable to obtain RecordWriter")
var recordsWritten = 0L
try {
Utils.tryWithSafeFinally {
while (iter.hasNext) {
val pair = iter.next()
writer.write(pair._1, pair._2)
Expand All @@ -1004,7 +1004,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
recordsWritten += 1
}
} finally {
} {
writer.close(hadoopContext)
}
committer.commitTask(hadoopContext)
Expand Down Expand Up @@ -1068,7 +1068,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
writer.setup(context.stageId, context.partitionId, taskAttemptId)
writer.open()
var recordsWritten = 0L
try {

Utils.tryWithSafeFinally {
while (iter.hasNext) {
val record = iter.next()
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
Expand All @@ -1077,7 +1078,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
recordsWritten += 1
}
} finally {
} {
writer.close()
}
writer.commit()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.storage._
import org.apache.spark.util.Utils

import IndexShuffleBlockManager.NOOP_REDUCE_ID

Expand Down Expand Up @@ -78,16 +79,15 @@ class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockResolver {
def writeIndexFile(shuffleId: Int, mapId: Int, lengths: Array[Long]): Unit = {
val indexFile = getIndexFile(shuffleId, mapId)
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile)))
try {
Utils.tryWithSafeFinally {
// We take in lengths of each block, need to convert it to offsets.
var offset = 0L
out.writeLong(offset)

for (length <- lengths) {
offset += length
out.writeLong(offset)
}
} finally {
} {
out.close()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.nio.channels.FileChannel
import org.apache.spark.Logging
import org.apache.spark.serializer.{SerializationStream, Serializer}
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.util.Utils

/**
* An interface for writing JVM objects to some underlying storage. This interface allows
Expand Down Expand Up @@ -140,14 +141,17 @@ private[spark] class DiskBlockObjectWriter(

override def close() {
if (initialized) {
if (syncWrites) {
// Force outstanding writes to disk and track how long it takes
objOut.flush()
callWithTiming {
fos.getFD.sync()
Utils.tryWithSafeFinally {
if (syncWrites) {
// Force outstanding writes to disk and track how long it takes
objOut.flush()
callWithTiming {
fos.getFD.sync()
}
}
} {
objOut.close()
}
objOut.close()

channel = null
bs = null
Expand Down
18 changes: 10 additions & 8 deletions core/src/main/scala/org/apache/spark/storage/DiskStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,13 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
val startTime = System.currentTimeMillis
val file = diskManager.getFile(blockId)
val channel = new FileOutputStream(file).getChannel
while (bytes.remaining > 0) {
channel.write(bytes)
Utils.tryWithSafeFinally {
while (bytes.remaining > 0) {
channel.write(bytes)
}
} {
channel.close()
}
channel.close()
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file on disk in %d ms".format(
file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime))
Expand All @@ -75,9 +78,9 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
val file = diskManager.getFile(blockId)
val outputStream = new FileOutputStream(file)
try {
try {
Utils.tryWithSafeFinally {
blockManager.dataSerializeStream(blockId, outputStream, values)
} finally {
} {
// Close outputStream here because it should be closed before file is deleted.
outputStream.close()
}
Expand Down Expand Up @@ -106,8 +109,7 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc

private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = {
val channel = new RandomAccessFile(file, "r").getChannel

try {
Utils.tryWithSafeFinally {
// For small files, directly read rather than memory map
if (length < minMemoryMapBytes) {
val buf = ByteBuffer.allocate(length.toInt)
Expand All @@ -123,7 +125,7 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
} else {
Some(channel.map(MapMode.READ_ONLY, offset, length))
}
} finally {
} {
channel.close()
}
}
Expand Down
46 changes: 42 additions & 4 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ private[spark] object Utils extends Logging {
transferToEnabled: Boolean = false): Long =
{
var count = 0L
try {
tryWithSafeFinally {
if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]
&& transferToEnabled) {
// When both streams are File stream, use transferTo to improve copy performance.
Expand Down Expand Up @@ -353,7 +353,7 @@ private[spark] object Utils extends Logging {
}
}
count
} finally {
} {
if (closeStreams) {
try {
in.close()
Expand Down Expand Up @@ -1214,6 +1214,44 @@ private[spark] object Utils extends Logging {
}
}

/**
* Execute a block of code, then a finally block, but if exceptions happen in
* the finally block, do not suppress the original exception.
*
* This is primarily an issue with `finally { out.close() }` blocks, where
* close needs to be called to clean up `out`, but if an exception happened
* in `out.write`, it's likely `out` may be corrupted and `out.close` will
* fail as well. This would then suppress the original/likely more meaningful
* exception from the original `out.write` call.
*/
def tryWithSafeFinally[T](block: => T)(finallyBlock: => Unit): T = {
// It would be nice to find a method on Try that did this
var originalThrowable: Throwable = null
try {
block
} catch {
case t: Throwable =>
// Purposefully not using NonFatal, because even fatal exceptions
// we don't want to have our finallyBlock suppress
originalThrowable = t
throw originalThrowable
} finally {
try {
finallyBlock
} catch {
case t: Throwable =>
if (originalThrowable != null) {
// We could do originalThrowable.addSuppressed(t), but it's
// not available in JDK 1.6.
logWarning(s"Suppressing exception in finally: " + t.getMessage, t)
throw originalThrowable
} else {
throw t
}
}
}
}

/** Default filtering function for finding call sites using `getCallSite`. */
private def coreExclusionFunction(className: String): Boolean = {
// A regular expression to match classes of the "core" Spark API that we want to skip when
Expand Down Expand Up @@ -2074,15 +2112,15 @@ private[spark] class RedirectThread(
override def run() {
scala.util.control.Exception.ignoring(classOf[IOException]) {
// FIXME: We copy the stream on the level of bytes to avoid encoding problems.
try {
Utils.tryWithSafeFinally {
val buf = new Array[Byte](1024)
var len = in.read(buf)
while (len != -1) {
out.write(buf, 0, len)
out.flush()
len = in.read(buf)
}
} finally {
} {
if (propagateEof) {
out.close()
}
Expand Down
Loading

0 comments on commit b0d884f

Please sign in to comment.