Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into fix-drop-events
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Aug 2, 2014
2 parents 14fa1c3 + 0da07da commit e132d69
Show file tree
Hide file tree
Showing 27 changed files with 1,545 additions and 560 deletions.
69 changes: 49 additions & 20 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collectio
import scala.collection.JavaConversions._
import scala.language.existentials
import scala.reflect.ClassTag
import scala.util.Try
import scala.util.{Try, Success, Failure}

import net.razorvine.pickle.{Pickler, Unpickler}

Expand Down Expand Up @@ -536,25 +536,6 @@ private[spark] object PythonRDD extends Logging {
file.close()
}

/**
* Convert an RDD of serialized Python dictionaries to Scala Maps (no recursive conversions).
* It is only used by pyspark.sql.
* TODO: Support more Python types.
*/
def pythonToJavaMap(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[Map[String, _]] = {
pyRDD.rdd.mapPartitions { iter =>
val unpickle = new Unpickler
iter.flatMap { row =>
unpickle.loads(row) match {
// in case of objects are pickled in batch mode
case objs: java.util.ArrayList[JMap[String, _] @unchecked] => objs.map(_.toMap)
// not in batch mode
case obj: JMap[String @unchecked, _] => Seq(obj.toMap)
}
}
}
}

private def getMergedConf(confAsMap: java.util.HashMap[String, String],
baseConf: Configuration): Configuration = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
Expand Down Expand Up @@ -701,6 +682,54 @@ private[spark] object PythonRDD extends Logging {
}
}


/**
* Convert an RDD of serialized Python dictionaries to Scala Maps (no recursive conversions).
* This function is outdated, PySpark does not use it anymore
*/
@deprecated
def pythonToJavaMap(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[Map[String, _]] = {
pyRDD.rdd.mapPartitions { iter =>
val unpickle = new Unpickler
iter.flatMap { row =>
unpickle.loads(row) match {
// in case of objects are pickled in batch mode
case objs: JArrayList[JMap[String, _] @unchecked] => objs.map(_.toMap)
// not in batch mode
case obj: JMap[String @unchecked, _] => Seq(obj.toMap)
}
}
}
}

/**
* Convert an RDD of serialized Python tuple to Array (no recursive conversions).
* It is only used by pyspark.sql.
*/
def pythonToJavaArray(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): JavaRDD[Array[_]] = {

def toArray(obj: Any): Array[_] = {
obj match {
case objs: JArrayList[_] =>
objs.toArray
case obj if obj.getClass.isArray =>
obj.asInstanceOf[Array[_]].toArray
}
}

pyRDD.rdd.mapPartitions { iter =>
val unpickle = new Unpickler
iter.flatMap { row =>
val obj = unpickle.loads(row)
if (batched) {
obj.asInstanceOf[JArrayList[_]].map(toArray)
} else {
Seq(toArray(obj))
}
}
}.toJavaRDD()
}

/**
* Convert and RDD of Java objects to and RDD of serialized Python objects, that is usable by
* PySpark.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,17 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
*/
private def mergeSparkProperties(): Unit = {
// Use common defaults file, if not specified by user
if (propertiesFile == null) {
sys.env.get("SPARK_CONF_DIR").foreach { sparkConfDir =>
val sep = File.separator
val defaultPath = s"${sparkConfDir}${sep}spark-defaults.conf"
val file = new File(defaultPath)
if (file.exists()) {
propertiesFile = file.getAbsolutePath
}
}
}

if (propertiesFile == null) {
sys.env.get("SPARK_HOME").foreach { sparkHome =>
val sep = File.separator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,23 +65,25 @@ private[spark] class HashShuffleWriter[K, V](
}

/** Close this writer, passing along whether the map completed */
override def stop(success: Boolean): Option[MapStatus] = {
override def stop(initiallySuccess: Boolean): Option[MapStatus] = {
var success = initiallySuccess
try {
if (stopping) {
return None
}
stopping = true
if (success) {
try {
return Some(commitWritesAndBuildStatus())
Some(commitWritesAndBuildStatus())
} catch {
case e: Exception =>
success = false
revertWrites()
throw e
}
} else {
revertWrites()
return None
None
}
} finally {
// Release the writers back to the shuffle block manager.
Expand All @@ -100,8 +102,7 @@ private[spark] class HashShuffleWriter[K, V](
var totalBytes = 0L
var totalTime = 0L
val compressedSizes = shuffle.writers.map { writer: BlockObjectWriter =>
writer.commit()
writer.close()
writer.commitAndClose()
val size = writer.fileSegment().length
totalBytes += size
totalTime += writer.timeWriting()
Expand All @@ -120,8 +121,7 @@ private[spark] class HashShuffleWriter[K, V](
private def revertWrites(): Unit = {
if (shuffle != null && shuffle.writers != null) {
for (writer <- shuffle.writers) {
writer.revertPartialWrites()
writer.close()
writer.revertPartialWritesAndClose()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@ private[spark] class SortShuffleWriter[K, V, C](
for (elem <- elements) {
writer.write(elem)
}
writer.commit()
writer.close()
writer.commitAndClose()
val segment = writer.fileSegment()
offsets(id + 1) = segment.offset + segment.length
lengths(id) = segment.length
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,16 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {
def isOpen: Boolean

/**
* Flush the partial writes and commit them as a single atomic block. Return the
* number of bytes written for this commit.
* Flush the partial writes and commit them as a single atomic block.
*/
def commit(): Long
def commitAndClose(): Unit

/**
* Reverts writes that haven't been flushed yet. Callers should invoke this function
* when there are runtime exceptions.
* when there are runtime exceptions. This method will not throw, though it may be
* unsuccessful in truncating written data.
*/
def revertPartialWrites()
def revertPartialWritesAndClose()

/**
* Writes an object.
Expand All @@ -57,6 +57,7 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {

/**
* Returns the file segment of committed data that this Writer has written.
* This is only valid after commitAndClose() has been called.
*/
def fileSegment(): FileSegment

Expand Down Expand Up @@ -108,15 +109,14 @@ private[spark] class DiskBlockObjectWriter(
private var ts: TimeTrackingOutputStream = null
private var objOut: SerializationStream = null
private val initialPosition = file.length()
private var lastValidPosition = initialPosition
private var finalPosition: Long = -1
private var initialized = false
private var _timeWriting = 0L

override def open(): BlockObjectWriter = {
fos = new FileOutputStream(file, true)
ts = new TimeTrackingOutputStream(fos)
channel = fos.getChannel()
lastValidPosition = initialPosition
bs = compressStream(new BufferedOutputStream(ts, bufferSize))
objOut = serializer.newInstance().serializeStream(bs)
initialized = true
Expand Down Expand Up @@ -147,28 +147,36 @@ private[spark] class DiskBlockObjectWriter(

override def isOpen: Boolean = objOut != null

override def commit(): Long = {
override def commitAndClose(): Unit = {
if (initialized) {
// NOTE: Because Kryo doesn't flush the underlying stream we explicitly flush both the
// serializer stream and the lower level stream.
objOut.flush()
bs.flush()
val prevPos = lastValidPosition
lastValidPosition = channel.position()
lastValidPosition - prevPos
} else {
// lastValidPosition is zero if stream is uninitialized
lastValidPosition
close()
}
finalPosition = file.length()
}

override def revertPartialWrites() {
if (initialized) {
// Discard current writes. We do this by flushing the outstanding writes and
// truncate the file to the last valid position.
objOut.flush()
bs.flush()
channel.truncate(lastValidPosition)
// Discard current writes. We do this by flushing the outstanding writes and then
// truncating the file to its initial position.
override def revertPartialWritesAndClose() {
try {
if (initialized) {
objOut.flush()
bs.flush()
close()
}

val truncateStream = new FileOutputStream(file, true)
try {
truncateStream.getChannel.truncate(initialPosition)
} finally {
truncateStream.close()
}
} catch {
case e: Exception =>
logError("Uncaught exception while reverting partial writes to file " + file, e)
}
}

Expand All @@ -188,6 +196,7 @@ private[spark] class DiskBlockObjectWriter(

// Only valid if called after commit()
override def bytesWritten: Long = {
lastValidPosition - initialPosition
assert(finalPosition != -1, "bytesWritten is only valid after successful commit()")
finalPosition - initialPosition
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
if (consolidateShuffleFiles) {
if (success) {
val offsets = writers.map(_.fileSegment().offset)
fileGroup.recordMapOutput(mapId, offsets)
val lengths = writers.map(_.fileSegment().length)
fileGroup.recordMapOutput(mapId, offsets, lengths)
}
recycleFileGroup(fileGroup)
} else {
Expand Down Expand Up @@ -247,47 +248,48 @@ object ShuffleBlockManager {
* A particular mapper will be assigned a single ShuffleFileGroup to write its output to.
*/
private class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[File]) {
private var numBlocks: Int = 0

/**
* Stores the absolute index of each mapId in the files of this group. For instance,
* if mapId 5 is the first block in each file, mapIdToIndex(5) = 0.
*/
private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]()

/**
* Stores consecutive offsets of blocks into each reducer file, ordered by position in the file.
* This ordering allows us to compute block lengths by examining the following block offset.
* Stores consecutive offsets and lengths of blocks into each reducer file, ordered by
* position in the file.
* Note: mapIdToIndex(mapId) returns the index of the mapper into the vector for every
* reducer.
*/
private val blockOffsetsByReducer = Array.fill[PrimitiveVector[Long]](files.length) {
new PrimitiveVector[Long]()
}

def numBlocks = mapIdToIndex.size
private val blockLengthsByReducer = Array.fill[PrimitiveVector[Long]](files.length) {
new PrimitiveVector[Long]()
}

def apply(bucketId: Int) = files(bucketId)

def recordMapOutput(mapId: Int, offsets: Array[Long]) {
def recordMapOutput(mapId: Int, offsets: Array[Long], lengths: Array[Long]) {
assert(offsets.length == lengths.length)
mapIdToIndex(mapId) = numBlocks
numBlocks += 1
for (i <- 0 until offsets.length) {
blockOffsetsByReducer(i) += offsets(i)
blockLengthsByReducer(i) += lengths(i)
}
}

/** Returns the FileSegment associated with the given map task, or None if no entry exists. */
def getFileSegmentFor(mapId: Int, reducerId: Int): Option[FileSegment] = {
val file = files(reducerId)
val blockOffsets = blockOffsetsByReducer(reducerId)
val blockLengths = blockLengthsByReducer(reducerId)
val index = mapIdToIndex.getOrElse(mapId, -1)
if (index >= 0) {
val offset = blockOffsets(index)
val length =
if (index + 1 < numBlocks) {
blockOffsets(index + 1) - offset
} else {
file.length() - offset
}
assert(length >= 0)
val length = blockLengths(index)
Some(new FileSegment(file, offset, length))
} else {
None
Expand Down
35 changes: 22 additions & 13 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -286,17 +286,23 @@ private[spark] object Utils extends Logging {
out: OutputStream,
closeStreams: Boolean = false)
{
val buf = new Array[Byte](8192)
var n = 0
while (n != -1) {
n = in.read(buf)
if (n != -1) {
out.write(buf, 0, n)
try {
val buf = new Array[Byte](8192)
var n = 0
while (n != -1) {
n = in.read(buf)
if (n != -1) {
out.write(buf, 0, n)
}
}
} finally {
if (closeStreams) {
try {
in.close()
} finally {
out.close()
}
}
}
if (closeStreams) {
in.close()
out.close()
}
}

Expand Down Expand Up @@ -868,9 +874,12 @@ private[spark] object Utils extends Logging {
val buff = new Array[Byte]((effectiveEnd-effectiveStart).toInt)
val stream = new FileInputStream(file)

stream.skip(effectiveStart)
stream.read(buff)
stream.close()
try {
stream.skip(effectiveStart)
stream.read(buff)
} finally {
stream.close()
}
Source.fromBytes(buff).mkString
}

Expand Down
Loading

0 comments on commit e132d69

Please sign in to comment.