Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into streaming-kmeans
Browse files Browse the repository at this point in the history
  • Loading branch information
freeman-lab committed Oct 25, 2014
2 parents 9fd9c15 + 3a845d3 commit a0fd790
Show file tree
Hide file tree
Showing 8,273 changed files with 40,302 additions and 47,672 deletions.
The diff you're trying to view is too large. We only load the first 3000 changed files.
1 change: 1 addition & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ sbt-launch-lib.bash
plugins.sbt
work
.*\.q
.*\.qv
golden
test.out/*
.*iml
Expand Down
6 changes: 6 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,12 @@
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<!-- TODO: Move this to "hive" profile once 0.13 JDBC is supported -->
<id>hive-0.12.0</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
Expand Down
4 changes: 4 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@
<groupId>com.twitter</groupId>
<artifactId>chill-java</artifactId>
</dependency>
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.mutable.Map
import scala.reflect.ClassTag

import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.util.Utils

/**
* A data type that can be accumulated, ie has an commutative and associative "add" operation,
Expand Down Expand Up @@ -126,7 +127,7 @@ class Accumulable[R, T] (
}

// Called by Java when deserializing an object
private def readObject(in: ObjectInputStream) {
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
value_ = zero
deserialized = true
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
val computedValues = rdd.computeOrReadCheckpoint(partition, context)

// If the task is running locally, do not persist the result
if (context.runningLocally) {
if (context.isRunningLocally) {
return computedValues
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
}

@throws(classOf[IOException])
private def writeObject(out: ObjectOutputStream) {
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
val sfactory = SparkEnv.get.serializer
sfactory match {
case js: JavaSerializer => out.defaultWriteObject()
Expand All @@ -222,7 +222,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
}

@throws(classOf[IOException])
private def readObject(in: ObjectInputStream) {
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
val sfactory = SparkEnv.get.serializer
sfactory match {
case js: JavaSerializer => in.defaultReadObject()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,19 @@ import org.apache.hadoop.io.ObjectWritable
import org.apache.hadoop.io.Writable

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils

@DeveloperApi
class SerializableWritable[T <: Writable](@transient var t: T) extends Serializable {
def value = t
override def toString = t.toString

private def writeObject(out: ObjectOutputStream) {
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
out.defaultWriteObject()
new ObjectWritable(t).write(out)
}

private def readObject(in: ObjectInputStream) {
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
val ow = new ObjectWritable()
ow.setConf(new Configuration())
Expand Down
17 changes: 16 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package org.apache.spark

import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
import scala.collection.mutable.{HashMap, LinkedHashSet}
import org.apache.spark.serializer.KryoSerializer

/**
* Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
Expand Down Expand Up @@ -140,6 +141,20 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
this
}

/**
* Use Kryo serialization and register the given set of classes with Kryo.
* If called multiple times, this will append the classes from all calls together.
*/
def registerKryoClasses(classes: Array[Class[_]]): SparkConf = {
val allClassNames = new LinkedHashSet[String]()
allClassNames ++= get("spark.kryo.classesToRegister", "").split(',').filter(!_.isEmpty)
allClassNames ++= classes.map(_.getName)

set("spark.kryo.classesToRegister", allClassNames.mkString(","))
set("spark.serializer", classOf[KryoSerializer].getName)
this
}

/** Remove a parameter from the configuration */
def remove(key: String): SparkConf = {
settings.remove(key)
Expand Down
17 changes: 6 additions & 11 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -209,16 +209,10 @@ class SparkContext(config: SparkConf) extends Logging {
// An asynchronous listener bus for Spark events
private[spark] val listenerBus = new LiveListenerBus

// Create the Spark execution environment (cache, map output tracker, etc)
conf.set("spark.executor.id", "driver")
private[spark] val env = SparkEnv.create(
conf,
"<driver>",
conf.get("spark.driver.host"),
conf.get("spark.driver.port").toInt,
isDriver = true,
isLocal = isLocal,
listenerBus = listenerBus)

// Create the Spark execution environment (cache, map output tracker, etc)
private[spark] val env = SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
SparkEnv.set(env)

// Used to store a URL for each static file/jar together with the file's local timestamp
Expand Down Expand Up @@ -837,11 +831,12 @@ class SparkContext(config: SparkConf) extends Logging {
case "local" => "file:" + uri.getPath
case _ => path
}
addedFiles(key) = System.currentTimeMillis
val timestamp = System.currentTimeMillis
addedFiles(key) = timestamp

// Fetch the file locally in case a job is executed using DAGScheduler.runLocally().
Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager,
hadoopConfiguration)
hadoopConfiguration, timestamp, useCache = false)

logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
postEnvironmentUpdate()
Expand Down
51 changes: 46 additions & 5 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,15 @@ class SparkEnv (
val shuffleMemoryManager: ShuffleMemoryManager,
val conf: SparkConf) extends Logging {

private[spark] var isStopped = false
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()

// A general, soft-reference map for metadata needed during HadoopRDD split computation
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()

private[spark] def stop() {
isStopped = true
pythonWorkers.foreach { case(key, worker) => worker.stop() }
Option(httpFileServer).foreach(_.stop())
mapOutputTracker.stop()
Expand Down Expand Up @@ -142,24 +144,63 @@ object SparkEnv extends Logging {
env
}

private[spark] def create(
/**
* Create a SparkEnv for the driver.
*/
private[spark] def createDriverEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
assert(conf.contains("spark.driver.host"), "spark.driver.host is not set on the driver!")
assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
val hostname = conf.get("spark.driver.host")
val port = conf.get("spark.driver.port").toInt
create(conf, "<driver>", hostname, port, true, isLocal, listenerBus)
}

/**
* Create a SparkEnv for an executor.
* In coarse-grained mode, the executor provides an actor system that is already instantiated.
*/
private[spark] def createExecutorEnv(
conf: SparkConf,
executorId: String,
hostname: String,
port: Int,
isLocal: Boolean,
actorSystem: ActorSystem = null): SparkEnv = {
create(conf, executorId, hostname, port, false, isLocal, defaultActorSystem = actorSystem)
}

/**
* Helper method to create a SparkEnv for a driver or an executor.
*/
private def create(
conf: SparkConf,
executorId: String,
hostname: String,
port: Int,
isDriver: Boolean,
isLocal: Boolean,
listenerBus: LiveListenerBus = null): SparkEnv = {
listenerBus: LiveListenerBus = null,
defaultActorSystem: ActorSystem = null): SparkEnv = {

// Listener bus is only used on the driver
if (isDriver) {
assert(listenerBus != null, "Attempted to create driver SparkEnv with null listener bus!")
}

val securityManager = new SecurityManager(conf)
val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
actorSystemName, hostname, port, conf, securityManager)

// If an existing actor system is already provided, use it.
// This is the case when an executor is launched in coarse-grained mode.
val (actorSystem, boundPort) =
Option(defaultActorSystem) match {
case Some(as) => (as, port)
case None =>
val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, securityManager)
}

// Figure out which port Akka actually bound to in case the original port is 0 or occupied.
// This is so that we tell the executors the correct port to connect to.
Expand Down
11 changes: 10 additions & 1 deletion core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ private[spark] class PythonRDD(
var complete_cleanly = false
context.addTaskCompletionListener { context =>
writerThread.shutdownOnTaskCompletion()
writerThread.join()
if (reuse_worker && complete_cleanly) {
env.releasePythonWorker(pythonExec, envVars.toMap, worker)
} else {
Expand Down Expand Up @@ -145,7 +146,9 @@ private[spark] class PythonRDD(
stream.readFully(update)
accumulator += Collections.singletonList(update)
}
complete_cleanly = true
if (stream.readInt() == SpecialLengths.END_OF_STREAM) {
complete_cleanly = true
}
null
}
} catch {
Expand All @@ -154,6 +157,10 @@ private[spark] class PythonRDD(
logDebug("Exception thrown after task interruption", e)
throw new TaskKilledException

case e: Exception if env.isStopped =>
logDebug("Exception thrown after context is stopped", e)
null // exit silently

case e: Exception if writerThread.exception.isDefined =>
logError("Python worker exited unexpectedly (crashed)", e)
logError("This may have been caused by a prior exception:", writerThread.exception.get)
Expand Down Expand Up @@ -235,6 +242,7 @@ private[spark] class PythonRDD(
// Data values
PythonRDD.writeIteratorToStream(firstParent.iterator(split, context), dataOut)
dataOut.writeInt(SpecialLengths.END_OF_DATA_SECTION)
dataOut.writeInt(SpecialLengths.END_OF_STREAM)
dataOut.flush()
} catch {
case e: Exception if context.isCompleted || context.isInterrupted =>
Expand Down Expand Up @@ -306,6 +314,7 @@ private object SpecialLengths {
val END_OF_DATA_SECTION = -1
val PYTHON_EXCEPTION_THROWN = -2
val TIMING_DATA = -3
val END_OF_STREAM = -4
}

private[spark] object PythonRDD extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ private[spark] class HttpBroadcast[T: ClassTag](
}

/** Used by the JVM when serializing this object. */
private def writeObject(out: ObjectOutputStream) {
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
assertValid()
out.defaultWriteObject()
}

/** Used by the JVM when deserializing this object. */
private def readObject(in: ObjectInputStream) {
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
HttpBroadcast.synchronized {
SparkEnv.get.blockManager.getSingle(blockId) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
import org.apache.spark.util.ByteBufferInputStream
import org.apache.spark.util.{ByteBufferInputStream, Utils}
import org.apache.spark.util.io.ByteArrayChunkOutputStream

/**
Expand Down Expand Up @@ -152,13 +152,13 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
}

/** Used by the JVM when serializing this object. */
private def writeObject(out: ObjectOutputStream) {
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
assertValid()
out.defaultWriteObject()
}

/** Used by the JVM when deserializing this object. */
private def readObject(in: ObjectInputStream) {
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
TorrentBroadcast.synchronized {
setConf(SparkEnv.get.conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.deploy

import java.net.{URI, URISyntaxException}

import scala.collection.mutable.ListBuffer

import org.apache.log4j.Level
Expand Down Expand Up @@ -114,5 +116,12 @@ private[spark] class ClientArguments(args: Array[String]) {
}

object ClientArguments {
def isValidJarUrl(s: String): Boolean = s.matches("(.+):(.+)jar")
def isValidJarUrl(s: String): Boolean = {
try {
val uri = new URI(s)
uri.getScheme != null && uri.getAuthority != null && s.endsWith("jar")
} catch {
case _: URISyntaxException => false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.mutable.ArrayBuffer
import akka.actor.ActorRef

import org.apache.spark.deploy.ApplicationDescription
import org.apache.spark.util.Utils

private[spark] class ApplicationInfo(
val startTime: Long,
Expand All @@ -46,7 +47,7 @@ private[spark] class ApplicationInfo(

init()

private def readObject(in: java.io.ObjectInputStream): Unit = {
private def readObject(in: java.io.ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
init()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.deploy.master
import java.util.Date

import org.apache.spark.deploy.DriverDescription
import org.apache.spark.util.Utils

private[spark] class DriverInfo(
val startTime: Long,
Expand All @@ -36,7 +37,7 @@ private[spark] class DriverInfo(

init()

private def readObject(in: java.io.ObjectInputStream): Unit = {
private def readObject(in: java.io.ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
init()
}
Expand Down
Loading

0 comments on commit a0fd790

Please sign in to comment.