Skip to content

Commit

Permalink
rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
zhzhan committed Oct 9, 2014
2 parents cb22863 + a72c0d4 commit 2b50502
Show file tree
Hide file tree
Showing 134 changed files with 3,344 additions and 1,867 deletions.
2 changes: 1 addition & 1 deletion bin/compute-classpath.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"
rem Build up classpath
set CLASSPATH=%SPARK_CLASSPATH%;%SPARK_SUBMIT_CLASSPATH%

if "x%SPARK_CONF_DIR%"!="x" (
if not "x%SPARK_CONF_DIR%"=="x" (
set CLASSPATH=%CLASSPATH%;%SPARK_CONF_DIR%
) else (
set CLASSPATH=%CLASSPATH%;%FWDIR%conf
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ table.sortable thead {
cursor: pointer;
}

table.sortable td {
word-wrap: break-word;
max-width: 600px;
}

.progress {
margin-bottom: 0px; position: relative
}
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,6 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
arr.iterator.asInstanceOf[Iterator[T]]
case Right(it) =>
// There is not enough space to cache this partition in memory
logWarning(s"Not enough space to cache partition $key in memory! " +
s"Free memory is ${blockManager.memoryStore.freeMemory} bytes.")
val returnValues = it.asInstanceOf[Iterator[T]]
if (putLevel.useDisk) {
logWarning(s"Persisting partition $key to disk instead.")
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -779,20 +779,20 @@ class SparkContext(config: SparkConf) extends Logging {
/**
* Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values
* with `+=`. Only the driver can access the accumuable's `value`.
* @tparam T accumulator type
* @tparam R type that can be added to the accumulator
* @tparam R accumulator result type
* @tparam T type that can be added to the accumulator
*/
def accumulable[T, R](initialValue: T)(implicit param: AccumulableParam[T, R]) =
def accumulable[R, T](initialValue: R)(implicit param: AccumulableParam[R, T]) =
new Accumulable(initialValue, param)

/**
* Create an [[org.apache.spark.Accumulable]] shared variable, with a name for display in the
* Spark UI. Tasks can add values to the accumuable using the `+=` operator. Only the driver can
* access the accumuable's `value`.
* @tparam T accumulator type
* @tparam R type that can be added to the accumulator
* @tparam R accumulator result type
* @tparam T type that can be added to the accumulator
*/
def accumulable[T, R](initialValue: T, name: String)(implicit param: AccumulableParam[T, R]) =
def accumulable[R, T](initialValue: R, name: String)(implicit param: AccumulableParam[R, T]) =
new Accumulable(initialValue, param, Some(name))

/**
Expand Down
19 changes: 8 additions & 11 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,8 @@ import org.apache.spark.util.{AkkaUtils, Utils}
* :: DeveloperApi ::
* Holds all the runtime environment objects for a running Spark instance (either master or worker),
* including the serializer, Akka actor system, block manager, map output tracker, etc. Currently
* Spark code finds the SparkEnv through a thread-local variable, so each thread that accesses these
* objects needs to have the right SparkEnv set. You can get the current environment with
* SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set.
* Spark code finds the SparkEnv through a global variable, so all the threads can access the same
* SparkEnv. It can be accessed by SparkEnv.get (e.g. after creating a SparkContext).
*
* NOTE: This is not intended for external use. This is exposed for Shark and may be made private
* in a future release.
Expand Down Expand Up @@ -119,30 +118,28 @@ class SparkEnv (
}

object SparkEnv extends Logging {
private val env = new ThreadLocal[SparkEnv]
@volatile private var lastSetSparkEnv : SparkEnv = _
@volatile private var env: SparkEnv = _

private[spark] val driverActorSystemName = "sparkDriver"
private[spark] val executorActorSystemName = "sparkExecutor"

def set(e: SparkEnv) {
lastSetSparkEnv = e
env.set(e)
env = e
}

/**
* Returns the ThreadLocal SparkEnv, if non-null. Else returns the SparkEnv
* previously set in any thread.
* Returns the SparkEnv.
*/
def get: SparkEnv = {
Option(env.get()).getOrElse(lastSetSparkEnv)
env
}

/**
* Returns the ThreadLocal SparkEnv.
*/
@deprecated("Use SparkEnv.get instead", "1.2")
def getThreadLocal: SparkEnv = {
env.get()
env
}

private[spark] def create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ private[spark] class PythonRDD(

override def run(): Unit = Utils.logUncaughtExceptions {
try {
SparkEnv.set(env)
val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize)
val dataOut = new DataOutputStream(stream)
// Partition index
Expand Down Expand Up @@ -248,6 +247,11 @@ private[spark] class PythonRDD(
// will kill the whole executor (see org.apache.spark.executor.Executor).
_exception = e
worker.shutdownOutput()
} finally {
// Release memory used by this thread for shuffles
env.shuffleMemoryManager.releaseMemoryForThisThread()
// Release memory used by this thread for unrolling blocks
env.blockManager.memoryStore.releaseUnrollMemoryForThisThread()
}
}
}
Expand Down
25 changes: 15 additions & 10 deletions core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -163,18 +163,23 @@ private[broadcast] object HttpBroadcast extends Logging {

private def write(id: Long, value: Any) {
val file = getFile(id)
val out: OutputStream = {
if (compress) {
compressionCodec.compressedOutputStream(new FileOutputStream(file))
} else {
new BufferedOutputStream(new FileOutputStream(file), bufferSize)
val fileOutputStream = new FileOutputStream(file)
try {
val out: OutputStream = {
if (compress) {
compressionCodec.compressedOutputStream(fileOutputStream)
} else {
new BufferedOutputStream(fileOutputStream, bufferSize)
}
}
val ser = SparkEnv.get.serializer.newInstance()
val serOut = ser.serializeStream(out)
serOut.writeObject(value)
serOut.close()
files += file
} finally {
fileOutputStream.close()
}
val ser = SparkEnv.get.serializer.newInstance()
val serOut = ser.serializeStream(out)
serOut.writeObject(value)
serOut.close()
files += file
}

private def read[T: ClassTag](id: Long): T = {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
System.exit(-1)

case AssociationErrorEvent(cause, _, remoteAddress, _) =>
case AssociationErrorEvent(cause, _, remoteAddress, _, _) =>
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
println(s"Cause was: $cause")
System.exit(-1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private[spark] class AppClient(
logWarning(s"Connection to $address failed; waiting for master to reconnect...")
markDisconnected()

case AssociationErrorEvent(cause, _, address, _) if isPossibleMaster(address) =>
case AssociationErrorEvent(cause, _, address, _, _) if isPossibleMaster(address) =>
logWarning(s"Could not connect to $address: $cause")

case StopAppClient =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,21 @@ private[spark] class FileSystemPersistenceEngine(
val serialized = serializer.toBinary(value)

val out = new FileOutputStream(file)
out.write(serialized)
out.close()
try {
out.write(serialized)
} finally {
out.close()
}
}

def deserializeFromFile[T](file: File)(implicit m: Manifest[T]): T = {
val fileData = new Array[Byte](file.length().asInstanceOf[Int])
val dis = new DataInputStream(new FileInputStream(file))
dis.readFully(fileData)
dis.close()
try {
dis.readFully(fileData)
} finally {
dis.close()
}

val clazz = m.runtimeClass.asInstanceOf[Class[T]]
val serializer = serialization.serializerFor(clazz)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import scala.language.postfixOps

import akka.actor._
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import org.apache.commons.io.FileUtils

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ private[spark] class WorkerWatcher(workerUrl: String)
case AssociatedEvent(localAddress, remoteAddress, inbound) if isWorker(remoteAddress) =>
logInfo(s"Successfully connected to $workerUrl")

case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound)
case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound, _)
if isWorker(remoteAddress) =>
// These logs may not be seen if the worker (and associated pipe) has died
logError(s"Could not initialize connection to worker $workerUrl. Exiting.")
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ private[spark] class Executor(

override def run() {
val startTime = System.currentTimeMillis()
SparkEnv.set(env)
Thread.currentThread.setContextClassLoader(replClassLoader)
val ser = SparkEnv.get.closureSerializer.newInstance()
logInfo(s"Running $taskName (TID $taskId)")
Expand All @@ -158,7 +157,6 @@ private[spark] class Executor(
val startGCTime = gcTime

try {
SparkEnv.set(env)
Accumulators.clear()
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
updateDependencies(taskFiles, taskJars)
Expand Down
35 changes: 21 additions & 14 deletions core/src/main/scala/org/apache/spark/network/nio/Connection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ package org.apache.spark.network.nio
import java.net._
import java.nio._
import java.nio.channels._
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.LinkedList

import org.apache.spark._

import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.util.control.NonFatal

private[nio]
abstract class Connection(val channel: SocketChannel, val selector: Selector,
Expand All @@ -51,7 +54,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,

@volatile private var closed = false
var onCloseCallback: Connection => Unit = null
var onExceptionCallback: (Connection, Exception) => Unit = null
val onExceptionCallbacks = new ConcurrentLinkedQueue[(Connection, Throwable) => Unit]
var onKeyInterestChangeCallback: (Connection, Int) => Unit = null

val remoteAddress = getRemoteAddress()
Expand Down Expand Up @@ -130,20 +133,24 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
onCloseCallback = callback
}

def onException(callback: (Connection, Exception) => Unit) {
onExceptionCallback = callback
def onException(callback: (Connection, Throwable) => Unit) {
onExceptionCallbacks.add(callback)
}

def onKeyInterestChange(callback: (Connection, Int) => Unit) {
onKeyInterestChangeCallback = callback
}

def callOnExceptionCallback(e: Exception) {
if (onExceptionCallback != null) {
onExceptionCallback(this, e)
} else {
logError("Error in connection to " + getRemoteConnectionManagerId() +
" and OnExceptionCallback not registered", e)
def callOnExceptionCallbacks(e: Throwable) {
onExceptionCallbacks foreach {
callback =>
try {
callback(this, e)
} catch {
case NonFatal(e) => {
logWarning("Ignored error in onExceptionCallback", e)
}
}
}
}

Expand Down Expand Up @@ -323,7 +330,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
} catch {
case e: Exception => {
logError("Error connecting to " + address, e)
callOnExceptionCallback(e)
callOnExceptionCallbacks(e)
}
}
}
Expand All @@ -348,7 +355,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
} catch {
case e: Exception => {
logWarning("Error finishing connection to " + address, e)
callOnExceptionCallback(e)
callOnExceptionCallbacks(e)
}
}
true
Expand Down Expand Up @@ -393,7 +400,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
} catch {
case e: Exception => {
logWarning("Error writing in connection to " + getRemoteConnectionManagerId(), e)
callOnExceptionCallback(e)
callOnExceptionCallbacks(e)
close()
return false
}
Expand All @@ -420,7 +427,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
case e: Exception =>
logError("Exception while reading SendingConnection to " + getRemoteConnectionManagerId(),
e)
callOnExceptionCallback(e)
callOnExceptionCallbacks(e)
close()
}

Expand Down Expand Up @@ -577,7 +584,7 @@ private[spark] class ReceivingConnection(
} catch {
case e: Exception => {
logWarning("Error reading from connection to " + getRemoteConnectionManagerId(), e)
callOnExceptionCallback(e)
callOnExceptionCallbacks(e)
close()
return false
}
Expand Down
Loading

0 comments on commit 2b50502

Please sign in to comment.