Skip to content

Commit

Permalink
Merge github.com:apache/spark into cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Mar 29, 2014
2 parents 88904a3 + 75d46be commit e442246
Show file tree
Hide file tree
Showing 58 changed files with 322 additions and 92 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class SparkEnv private[spark] (
// Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
// down, but let's call it anyway in case it gets fixed in a later release
// UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
//actorSystem.awaitTermination()
// actorSystem.awaitTermination()
}

private[spark]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I
// TODO: In Akka 2.1.x, ActorSystem.awaitTermination hangs when you have remote actors!
// This is unfortunate, but for now we just comment it out.
workerActorSystems.foreach(_.shutdown())
//workerActorSystems.foreach(_.awaitTermination())
// workerActorSystems.foreach(_.awaitTermination())
masterActorSystems.foreach(_.shutdown())
//masterActorSystems.foreach(_.awaitTermination())
// masterActorSystems.foreach(_.awaitTermination())
masterActorSystems.clear()
workerActorSystems.clear()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.deploy.master.MasterMessages.ElectedLeader
* [[org.apache.spark.deploy.master.MasterMessages.RevokedLeadership RevokedLeadership]]
*/
private[spark] trait LeaderElectionAgent extends Actor {
//TODO: LeaderElectionAgent does not necessary to be an Actor anymore, need refactoring.
// TODO: LeaderElectionAgent does not necessary to be an Actor anymore, need refactoring.
val masterActor: ActorRef
}

Expand Down
6 changes: 2 additions & 4 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,10 @@ private[spark] class Executor(
}
}

// Create our ClassLoader and set it on this thread
// Create our ClassLoader
// do this after SparkEnv creation so can access the SecurityManager
private val urlClassLoader = createClassLoader()
private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
Thread.currentThread.setContextClassLoader(replClassLoader)

// Akka's message frame size. If task result is bigger than this, we use the block manager
// to send the result back.
Expand Down Expand Up @@ -276,7 +275,6 @@ private[spark] class Executor(
// have left some weird state around depending on when the exception was thrown, but on
// the other hand, maybe we could detect that when future tasks fail and exit then.
logError("Exception in task ID " + taskId, t)
//System.exit(1)
}
} finally {
// TODO: Unregister shuffle memory only for ResultTask
Expand All @@ -294,7 +292,7 @@ private[spark] class Executor(
* created by the interpreter to the search path
*/
private def createClassLoader(): ExecutorURLClassLoader = {
val loader = this.getClass.getClassLoader
val loader = Thread.currentThread().getContextClassLoader

// For each of the jars in the jarSet, add them to the class loader.
// We assume each of the files has already been fetched.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi
}

def initialize() {
//Add default properties in case there's no properties file
// Add default properties in case there's no properties file
setDefaultProperties(properties)

// If spark.metrics.conf is not set, try to get file in class path
Expand Down
18 changes: 9 additions & 9 deletions core/src/main/scala/org/apache/spark/network/Connection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
channel.socket.setTcpNoDelay(true)
channel.socket.setReuseAddress(true)
channel.socket.setKeepAlive(true)
/*channel.socket.setReceiveBufferSize(32768) */
/* channel.socket.setReceiveBufferSize(32768) */

@volatile private var closed = false
var onCloseCallback: Connection => Unit = null
Expand Down Expand Up @@ -206,12 +206,12 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,

private class Outbox {
val messages = new Queue[Message]()
val defaultChunkSize = 65536 //32768 //16384
val defaultChunkSize = 65536
var nextMessageToBeUsed = 0

def addMessage(message: Message) {
messages.synchronized{
/*messages += message*/
/* messages += message*/
messages.enqueue(message)
logDebug("Added [" + message + "] to outbox for sending to " +
"[" + getRemoteConnectionManagerId() + "]")
Expand All @@ -221,8 +221,8 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
def getChunk(): Option[MessageChunk] = {
messages.synchronized {
while (!messages.isEmpty) {
/*nextMessageToBeUsed = nextMessageToBeUsed % messages.size */
/*val message = messages(nextMessageToBeUsed)*/
/* nextMessageToBeUsed = nextMessageToBeUsed % messages.size */
/* val message = messages(nextMessageToBeUsed)*/
val message = messages.dequeue
val chunk = message.getChunkForSending(defaultChunkSize)
if (chunk.isDefined) {
Expand Down Expand Up @@ -262,7 +262,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,

val currentBuffers = new ArrayBuffer[ByteBuffer]()

/*channel.socket.setSendBufferSize(256 * 1024)*/
/* channel.socket.setSendBufferSize(256 * 1024)*/

override def getRemoteAddress() = address

Expand Down Expand Up @@ -355,7 +355,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
}
case None => {
// changeConnectionKeyInterest(0)
/*key.interestOps(0)*/
/* key.interestOps(0)*/
return false
}
}
Expand Down Expand Up @@ -540,10 +540,10 @@ private[spark] class ReceivingConnection(
return false
}

/*logDebug("Read " + bytesRead + " bytes for the buffer")*/
/* logDebug("Read " + bytesRead + " bytes for the buffer")*/

if (currentChunk.buffer.remaining == 0) {
/*println("Filled buffer at " + System.currentTimeMillis)*/
/* println("Filled buffer at " + System.currentTimeMillis)*/
val bufferMessage = inbox.getMessageForChunk(currentChunk).get
if (bufferMessage.isCompletelyReceived) {
bufferMessage.flip
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
}
}
handleMessageExecutor.execute(runnable)
/*handleMessage(connection, message)*/
/* handleMessage(connection, message)*/
}

private def handleClientAuthentication(
Expand Down Expand Up @@ -732,7 +732,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
logTrace("Sending Security [" + message + "] to [" + connManagerId + "]")
val connection = connectionsById.getOrElseUpdate(connManagerId, startNewConnection())

//send security message until going connection has been authenticated
// send security message until going connection has been authenticated
connection.send(message)

wakeupSelector()
Expand Down Expand Up @@ -858,14 +858,14 @@ private[spark] object ConnectionManager {
None
})

/*testSequentialSending(manager)*/
/*System.gc()*/
/* testSequentialSending(manager)*/
/* System.gc()*/

/*testParallelSending(manager)*/
/*System.gc()*/
/* testParallelSending(manager)*/
/* System.gc()*/

/*testParallelDecreasingSending(manager)*/
/*System.gc()*/
/* testParallelDecreasingSending(manager)*/
/* System.gc()*/

testContinuousSending(manager)
System.gc()
Expand Down Expand Up @@ -947,7 +947,7 @@ private[spark] object ConnectionManager {
val ms = finishTime - startTime
val tput = mb * 1000.0 / ms
println("--------------------------")
/*println("Started at " + startTime + ", finished at " + finishTime) */
/* println("Started at " + startTime + ", finished at " + finishTime) */
println("Sent " + mb + " MB in " + ms + " ms (" + tput + " MB/s)")
println("--------------------------")
println()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ private[spark] object ConnectionManagerTest extends Logging{
val slaves = slavesFile.mkString.split("\n")
slavesFile.close()

/*println("Slaves")*/
/*slaves.foreach(println)*/
/* println("Slaves")*/
/* slaves.foreach(println)*/
val tasknum = if (args.length > 2) args(2).toInt else slaves.length
val size = ( if (args.length > 3) (args(3).toInt) else 10 ) * 1024 * 1024
val count = if (args.length > 4) args(4).toInt else 3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ private[spark] object ReceiverTest {
println("Started connection manager with id = " + manager.id)

manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
/*println("Received [" + msg + "] from [" + id + "] at " + System.currentTimeMillis)*/
/* println("Received [" + msg + "] from [" + id + "] at " + System.currentTimeMillis)*/
val buffer = ByteBuffer.wrap("response".getBytes)
Some(Message.createBufferMessage(buffer, msg.id))
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[spark] object SenderTest {
(0 until count).foreach(i => {
val dataMessage = Message.createBufferMessage(buffer.duplicate)
val startTime = System.currentTimeMillis
/*println("Started timer at " + startTime)*/
/* println("Started timer at " + startTime)*/
val responseStr = manager.sendMessageReliablySync(targetConnectionManagerId, dataMessage)
.map { response =>
val buffer = response.asInstanceOf[BufferMessage].buffers(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private[spark] class FileHeader (
buf.writeInt(fileLen)
buf.writeInt(blockId.name.length)
blockId.name.foreach((x: Char) => buf.writeByte(x))
//padding the rest of header
// padding the rest of header
if (FileHeader.HEADER_SIZE - buf.readableBytes > 0 ) {
buf.writeZero(FileHeader.HEADER_SIZE - buf.readableBytes)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ class DAGScheduler(
val properties = if (stageIdToActiveJob.contains(jobId)) {
stageIdToActiveJob(stage.jobId).properties
} else {
//this stage will be assigned to "default" pool
// this stage will be assigned to "default" pool
null
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
properties += ((key, value))
}
}
//TODO (prashant) send conf instead of properties
// TODO (prashant) send conf instead of properties
driverActor = actorSystem.actorOf(
Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ object BlockFetcherIterator {
}
} catch {
case x: InterruptedException => logInfo("Copier Interrupted")
//case _ => throw new SparkException("Exception Throw in Shuffle Copier")
// case _ => throw new SparkException("Exception Throw in Shuffle Copier")
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private[spark] object ClosureCleaner extends Logging {
accessedFields(cls) = Set[String]()
for (cls <- func.getClass :: innerClasses)
getClassReader(cls).accept(new FieldAccessFinder(accessedFields), 0)
//logInfo("accessedFields: " + accessedFields)
// logInfo("accessedFields: " + accessedFields)

val inInterpreter = {
try {
Expand All @@ -139,21 +139,21 @@ private[spark] object ClosureCleaner extends Logging {
val field = cls.getDeclaredField(fieldName)
field.setAccessible(true)
val value = field.get(obj)
//logInfo("1: Setting " + fieldName + " on " + cls + " to " + value);
// logInfo("1: Setting " + fieldName + " on " + cls + " to " + value);
field.set(outer, value)
}
}

if (outer != null) {
//logInfo("2: Setting $outer on " + func.getClass + " to " + outer);
// logInfo("2: Setting $outer on " + func.getClass + " to " + outer);
val field = func.getClass.getDeclaredField("$outer")
field.setAccessible(true)
field.set(func, outer)
}
}

private def instantiateClass(cls: Class[_], outer: AnyRef, inInterpreter: Boolean): AnyRef = {
//logInfo("Creating a " + cls + " with outer = " + outer)
// logInfo("Creating a " + cls + " with outer = " + outer)
if (!inInterpreter) {
// This is a bona fide closure class, whose constructor has no effects
// other than to set its fields, so use its constructor
Expand All @@ -170,7 +170,7 @@ private[spark] object ClosureCleaner extends Logging {
val newCtor = rf.newConstructorForSerialization(cls, parentCtor)
val obj = newCtor.newInstance().asInstanceOf[AnyRef]
if (outer != null) {
//logInfo("3: Setting $outer on " + cls + " to " + outer);
// logInfo("3: Setting $outer on " + cls + " to " + outer);
val field = cls.getDeclaredField("$outer")
field.setAccessible(true)
field.set(obj, outer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private[akka] class IndestructibleActorSystemImpl(
if (isFatalError(cause) && !settings.JvmExitOnFatalError) {
log.error(cause, "Uncaught fatal error from thread [{}] not shutting down " +
"ActorSystem [{}] tolerating and continuing.... ", thread.getName, name)
//shutdown() //TODO make it configurable
// shutdown() //TODO make it configurable
} else {
fallbackHandler.uncaughtException(thread, cause)
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/util/MutablePair.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ package org.apache.spark.util
* @param _1 Element 1 of this MutablePair
* @param _2 Element 2 of this MutablePair
*/
case class MutablePair[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T1,
@specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T2]
case class MutablePair[@specialized(Int, Long, Double, Char, Boolean/* , AnyRef*/) T1,
@specialized(Int, Long, Double, Char, Boolean/* , AnyRef*/) T2]
(var _1: T1, var _2: T2)
extends Product2[T1, T2]
{
Expand Down
39 changes: 39 additions & 0 deletions core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,45 @@ class BitSet(numBits: Int) extends Serializable {
newBS
}

/**
* Compute the symmetric difference by performing bit-wise XOR of the two sets returning the
* result.
*/
def ^(other: BitSet): BitSet = {
val newBS = new BitSet(math.max(capacity, other.capacity))
val smaller = math.min(numWords, other.numWords)
var ind = 0
while (ind < smaller) {
newBS.words(ind) = words(ind) ^ other.words(ind)
ind += 1
}
if (ind < numWords) {
Array.copy( words, ind, newBS.words, ind, numWords - ind )
}
if (ind < other.numWords) {
Array.copy( other.words, ind, newBS.words, ind, other.numWords - ind )
}
newBS
}

/**
* Compute the difference of the two sets by performing bit-wise AND-NOT returning the
* result.
*/
def andNot(other: BitSet): BitSet = {
val newBS = new BitSet(capacity)
val smaller = math.min(numWords, other.numWords)
var ind = 0
while (ind < smaller) {
newBS.words(ind) = words(ind) & ~other.words(ind)
ind += 1
}
if (ind < numWords) {
Array.copy( words, ind, newBS.words, ind, numWords - ind )
}
newBS
}

/**
* Sets the bit at the specified index to true.
* @param index the bit index
Expand Down
6 changes: 3 additions & 3 deletions core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte

test ("add value to collection accumulators") {
val maxI = 1000
for (nThreads <- List(1, 10)) { //test single & multi-threaded
for (nThreads <- List(1, 10)) { // test single & multi-threaded
sc = new SparkContext("local[" + nThreads + "]", "test")
val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
val d = sc.parallelize(1 to maxI)
Expand All @@ -83,7 +83,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte

test ("value not readable in tasks") {
val maxI = 1000
for (nThreads <- List(1, 10)) { //test single & multi-threaded
for (nThreads <- List(1, 10)) { // test single & multi-threaded
sc = new SparkContext("local[" + nThreads + "]", "test")
val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
val d = sc.parallelize(1 to maxI)
Expand Down Expand Up @@ -124,7 +124,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte

test ("localValue readable in tasks") {
val maxI = 1000
for (nThreads <- List(1, 10)) { //test single & multi-threaded
for (nThreads <- List(1, 10)) { // test single & multi-threaded
sc = new SparkContext("local[" + nThreads + "]", "test")
val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
val groupedInts = (1 to (maxI/20)).map {x => (20 * (x - 1) to 20 * x).toSet}
Expand Down
1 change: 0 additions & 1 deletion core/src/test/scala/org/apache/spark/CheckpointSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,6 @@ object CheckpointSuite {
// This is a custom cogroup function that does not use mapValues like
// the PairRDDFunctions.cogroup()
def cogroup[K, V](first: RDD[(K, V)], second: RDD[(K, V)], part: Partitioner) = {
//println("First = " + first + ", second = " + second)
new CoGroupedRDD[K](
Seq(first.asInstanceOf[RDD[(K, _)]], second.asInstanceOf[RDD[(K, _)]]),
part
Expand Down
Loading

0 comments on commit e442246

Please sign in to comment.