Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/spark into master_…
Browse files Browse the repository at this point in the history
…nravi
  • Loading branch information
nishkamravi2 committed Feb 20, 2015
2 parents f0d12de + 3be92cd commit 71d0e17
Show file tree
Hide file tree
Showing 68 changed files with 923 additions and 1,466 deletions.
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark
import scala.collection.mutable

import org.apache.spark.scheduler._
import org.apache.spark.util.{SystemClock, Clock}

/**
* An agent that dynamically allocates and removes executors based on the workload.
Expand Down Expand Up @@ -123,7 +124,7 @@ private[spark] class ExecutorAllocationManager(
private val intervalMillis: Long = 100

// Clock used to schedule when executors should be added and removed
private var clock: Clock = new RealClock
private var clock: Clock = new SystemClock()

// Listener for Spark events that impact the allocation policy
private val listener = new ExecutorAllocationListener
Expand Down Expand Up @@ -588,28 +589,3 @@ private[spark] class ExecutorAllocationManager(
private object ExecutorAllocationManager {
val NOT_SET = Long.MaxValue
}

/**
* An abstract clock for measuring elapsed time.
*/
private trait Clock {
def getTimeMillis: Long
}

/**
* A clock backed by a monotonically increasing time source.
* The time returned by this clock does not correspond to any notion of wall-clock time.
*/
private class RealClock extends Clock {
override def getTimeMillis: Long = System.nanoTime / (1000 * 1000)
}

/**
* A clock that allows the caller to customize the time.
* This is used mainly for testing.
*/
private class TestClock(startTimeMillis: Long) extends Clock {
private var time: Long = startTimeMillis
override def getTimeMillis: Long = time
def tick(ms: Long): Unit = { time += ms }
}
Expand Up @@ -20,19 +20,18 @@ package org.apache.spark.deploy.worker
import java.io._

import scala.collection.JavaConversions._
import scala.collection.Map

import akka.actor.ActorRef
import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileUtil, Path}

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.{Command, DriverDescription, SparkHadoopUtil}
import org.apache.spark.deploy.{DriverDescription, SparkHadoopUtil}
import org.apache.spark.deploy.DeployMessages.DriverStateChanged
import org.apache.spark.deploy.master.DriverState
import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.util.{Clock, SystemClock}

/**
* Manages the execution of one driver, including automatically restarting the driver on failure.
Expand All @@ -59,9 +58,7 @@ private[spark] class DriverRunner(
// Decoupled for testing
private[deploy] def setClock(_clock: Clock) = clock = _clock
private[deploy] def setSleeper(_sleeper: Sleeper) = sleeper = _sleeper
private var clock = new Clock {
def currentTimeMillis(): Long = System.currentTimeMillis()
}
private var clock: Clock = new SystemClock()
private var sleeper = new Sleeper {
def sleep(seconds: Int): Unit = (0 until seconds).takeWhile(f => {Thread.sleep(1000); !killed})
}
Expand Down Expand Up @@ -190,9 +187,9 @@ private[spark] class DriverRunner(
initialize(process.get)
}

val processStart = clock.currentTimeMillis()
val processStart = clock.getTimeMillis()
val exitCode = process.get.waitFor()
if (clock.currentTimeMillis() - processStart > successfulRunDuration * 1000) {
if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) {
waitSeconds = 1
}

Expand All @@ -208,10 +205,6 @@ private[spark] class DriverRunner(
}
}

private[deploy] trait Clock {
def currentTimeMillis(): Long
}

private[deploy] trait Sleeper {
def sleep(seconds: Int)
}
Expand Down
Expand Up @@ -26,10 +26,17 @@ import org.apache.spark.util.{AkkaUtils, ChildFirstURLClassLoader, MutableURLCla

/**
* Utility object for launching driver programs such that they share fate with the Worker process.
* This is used in standalone cluster mode only.
*/
object DriverWrapper {
def main(args: Array[String]) {
args.toList match {
/*
* IMPORTANT: Spark 1.3 provides a stable application submission gateway that is both
* backward and forward compatible across future Spark versions. Because this gateway
* uses this class to launch the driver, the ordering and semantics of the arguments
* here must also remain consistent across versions.
*/
case workerUrl :: userJar :: mainClass :: extraArgs =>
val conf = new SparkConf()
val (actorSystem, _) = AkkaUtils.createActorSystem("Driver",
Expand Down
20 changes: 10 additions & 10 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Expand Up @@ -63,7 +63,7 @@ class DAGScheduler(
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv,
clock: org.apache.spark.util.Clock = SystemClock)
clock: Clock = new SystemClock())
extends Logging {

def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
Expand Down Expand Up @@ -657,7 +657,7 @@ class DAGScheduler(
// completion events or stage abort
stageIdToStage -= s.id
jobIdToStageIds -= job.jobId
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTime(), jobResult))
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), jobResult))
}
}

Expand Down Expand Up @@ -706,7 +706,7 @@ class DAGScheduler(
stage.latestInfo.stageFailed(stageFailedMessage)
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
}
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTime(), JobFailed(error)))
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
}
}

Expand Down Expand Up @@ -745,7 +745,7 @@ class DAGScheduler(
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val shouldRunLocally =
localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
val jobSubmissionTime = clock.getTime()
val jobSubmissionTime = clock.getTimeMillis()
if (shouldRunLocally) {
// Compute very short actions like first() or take() with no parent stages locally.
listenerBus.post(
Expand Down Expand Up @@ -871,7 +871,7 @@ class DAGScheduler(
logDebug("New pending tasks: " + stage.pendingTasks)
taskScheduler.submitTasks(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTime())
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should post
// SparkListenerStageCompleted here in case there are no tasks to run.
Expand Down Expand Up @@ -940,12 +940,12 @@ class DAGScheduler(

def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None) = {
val serviceTime = stage.latestInfo.submissionTime match {
case Some(t) => "%.03f".format((clock.getTime() - t) / 1000.0)
case Some(t) => "%.03f".format((clock.getTimeMillis() - t) / 1000.0)
case _ => "Unknown"
}
if (errorMessage.isEmpty) {
logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
stage.latestInfo.completionTime = Some(clock.getTime())
stage.latestInfo.completionTime = Some(clock.getTimeMillis())
} else {
stage.latestInfo.stageFailed(errorMessage.get)
logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime))
Expand All @@ -971,7 +971,7 @@ class DAGScheduler(
markStageAsFinished(stage)
cleanupStateForJobAndIndependentStages(job)
listenerBus.post(
SparkListenerJobEnd(job.jobId, clock.getTime(), JobSucceeded))
SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
}

// taskSucceeded runs some user code that might throw an exception. Make sure
Expand Down Expand Up @@ -1187,7 +1187,7 @@ class DAGScheduler(
}
val dependentJobs: Seq[ActiveJob] =
activeJobs.filter(job => stageDependsOn(job.finalStage, failedStage)).toSeq
failedStage.latestInfo.completionTime = Some(clock.getTime())
failedStage.latestInfo.completionTime = Some(clock.getTimeMillis())
for (job <- dependentJobs) {
failJobAndIndependentStages(job, s"Job aborted due to stage failure: $reason")
}
Expand Down Expand Up @@ -1242,7 +1242,7 @@ class DAGScheduler(
if (ableToCancelStages) {
job.listener.jobFailed(error)
cleanupStateForJobAndIndependentStages(job)
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTime(), JobFailed(error)))
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
}
}

Expand Down
Expand Up @@ -51,7 +51,7 @@ private[spark] class TaskSetManager(
sched: TaskSchedulerImpl,
val taskSet: TaskSet,
val maxTaskFailures: Int,
clock: Clock = SystemClock)
clock: Clock = new SystemClock())
extends Schedulable with Logging {

val conf = sched.sc.conf
Expand Down Expand Up @@ -166,7 +166,7 @@ private[spark] class TaskSetManager(
// last launched a task at that level, and move up a level when localityWaits[curLevel] expires.
// We then move down if we manage to launch a "more local" task.
var currentLocalityIndex = 0 // Index of our current locality level in validLocalityLevels
var lastLaunchTime = clock.getTime() // Time we last launched a task at this level
var lastLaunchTime = clock.getTimeMillis() // Time we last launched a task at this level

override def schedulableQueue = null

Expand Down Expand Up @@ -281,7 +281,7 @@ private[spark] class TaskSetManager(
val failed = failedExecutors.get(taskId).get

return failed.contains(execId) &&
clock.getTime() - failed.get(execId).get < EXECUTOR_TASK_BLACKLIST_TIMEOUT
clock.getTimeMillis() - failed.get(execId).get < EXECUTOR_TASK_BLACKLIST_TIMEOUT
}

false
Expand Down Expand Up @@ -428,7 +428,7 @@ private[spark] class TaskSetManager(
: Option[TaskDescription] =
{
if (!isZombie) {
val curTime = clock.getTime()
val curTime = clock.getTimeMillis()

var allowedLocality = maxLocality

Expand Down Expand Up @@ -459,7 +459,7 @@ private[spark] class TaskSetManager(
lastLaunchTime = curTime
}
// Serialize and return the task
val startTime = clock.getTime()
val startTime = clock.getTimeMillis()
val serializedTask: ByteBuffer = try {
Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
} catch {
Expand Down Expand Up @@ -674,7 +674,7 @@ private[spark] class TaskSetManager(
return
}
val key = ef.description
val now = clock.getTime()
val now = clock.getTimeMillis()
val (printFull, dupCount) = {
if (recentExceptions.contains(key)) {
val (dupCount, printTime) = recentExceptions(key)
Expand Down Expand Up @@ -706,7 +706,7 @@ private[spark] class TaskSetManager(
}
// always add to failed executors
failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()).
put(info.executorId, clock.getTime())
put(info.executorId, clock.getTimeMillis())
sched.dagScheduler.taskEnded(tasks(index), reason, null, null, info, taskMetrics)
addPendingTask(index)
if (!isZombie && state != TaskState.KILLED && !reason.isInstanceOf[TaskCommitDenied]) {
Expand Down Expand Up @@ -821,7 +821,7 @@ private[spark] class TaskSetManager(
val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt
logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation)
if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {
val time = clock.getTime()
val time = clock.getTimeMillis()
val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray
Arrays.sort(durations)
val medianDuration = durations(min((0.5 * tasksSuccessful).round.toInt, durations.size - 1))
Expand Down
44 changes: 41 additions & 3 deletions core/src/main/scala/org/apache/spark/util/Clock.scala
Expand Up @@ -21,9 +21,47 @@ package org.apache.spark.util
* An interface to represent clocks, so that they can be mocked out in unit tests.
*/
private[spark] trait Clock {
def getTime(): Long
def getTimeMillis(): Long
def waitTillTime(targetTime: Long): Long
}

private[spark] object SystemClock extends Clock {
def getTime(): Long = System.currentTimeMillis()
/**
* A clock backed by the actual time from the OS as reported by the `System` API.
*/
private[spark] class SystemClock extends Clock {

val minPollTime = 25L

/**
* @return the same time (milliseconds since the epoch)
* as is reported by `System.currentTimeMillis()`
*/
def getTimeMillis(): Long = System.currentTimeMillis()

/**
* @param targetTime block until the current time is at least this value
* @return current system time when wait has completed
*/
def waitTillTime(targetTime: Long): Long = {
var currentTime = 0L
currentTime = System.currentTimeMillis()

var waitTime = targetTime - currentTime
if (waitTime <= 0) {
return currentTime
}

val pollTime = math.max(waitTime / 10.0, minPollTime).toLong

while (true) {
currentTime = System.currentTimeMillis()
waitTime = targetTime - currentTime
if (waitTime <= 0) {
return currentTime
}
val sleepTime = math.min(waitTime, pollTime)
Thread.sleep(sleepTime)
}
-1
}
}
69 changes: 69 additions & 0 deletions core/src/main/scala/org/apache/spark/util/ManualClock.scala
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util

/**
* A `Clock` whose time can be manually set and modified. Its reported time does not change
* as time elapses, but only as its time is modified by callers. This is mainly useful for
* testing.
*
* @param time initial time (in milliseconds since the epoch)
*/
private[spark] class ManualClock(private var time: Long) extends Clock {

/**
* @return `ManualClock` with initial time 0
*/
def this() = this(0L)

def getTimeMillis(): Long =
synchronized {
time
}

/**
* @param timeToSet new time (in milliseconds) that the clock should represent
*/
def setTime(timeToSet: Long) =
synchronized {
time = timeToSet
notifyAll()
}

/**
* @param timeToAdd time (in milliseconds) to add to the clock's time
*/
def advance(timeToAdd: Long) =
synchronized {
time += timeToAdd
notifyAll()
}

/**
* @param targetTime block until the clock time is set or advanced to at least this time
* @return current time reported by the clock when waiting finishes
*/
def waitTillTime(targetTime: Long): Long =
synchronized {
while (time < targetTime) {
wait(100)
}
getTimeMillis()
}

}

0 comments on commit 71d0e17

Please sign in to comment.