Skip to content

Commit

Permalink
Take out waitForInitialAllocations
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Jul 20, 2014
1 parent 2a4329b commit c744ef3
Show file tree
Hide file tree
Showing 3 changed files with 1 addition and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
sparkContext.getConf)
}
}
} finally {
// in case of exceptions, etc - ensure that count is atleast ALLOCATOR_LOOP_WAIT_COUNT :
// so that the loop (in ApplicationMaster.sparkContextInitialized) breaks
ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
}
}

Expand All @@ -277,13 +273,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
}
yarnAllocator.allocateContainers(
math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
ApplicationMaster.incrementAllocatorLoop(1)
Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
}
} finally {
// In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT,
// so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
}
logInfo("All executors have launched.")

Expand Down Expand Up @@ -416,19 +407,8 @@ object ApplicationMaster extends Logging {
// This is to ensure that we have reasonable number of containers before we start
// TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
// optimal as more containers are available. Might need to handle this better.
private val ALLOCATOR_LOOP_WAIT_COUNT = 30
private val ALLOCATE_HEARTBEAT_INTERVAL = 100

def incrementAllocatorLoop(by: Int) {
val count = yarnAllocatorLoop.getAndAdd(by)
if (count >= ALLOCATOR_LOOP_WAIT_COUNT) {
yarnAllocatorLoop.synchronized {
// to wake threads off wait ...
yarnAllocatorLoop.notifyAll()
}
}
}

private val applicationMasters = new CopyOnWriteArrayList[ApplicationMaster]()

def register(master: ApplicationMaster) {
Expand All @@ -437,7 +417,6 @@ object ApplicationMaster extends Logging {

val sparkContextRef: AtomicReference[SparkContext] =
new AtomicReference[SparkContext](null /* initialValue */)
val yarnAllocatorLoop: AtomicInteger = new AtomicInteger(0)

def sparkContextInitialized(sc: SparkContext): Boolean = {
var modified = false
Expand Down Expand Up @@ -472,21 +451,6 @@ object ApplicationMaster extends Logging {
modified
}


/**
* Returns when we've either
* 1) received all the requested executors,
* 2) waited ALLOCATOR_LOOP_WAIT_COUNT * ALLOCATE_HEARTBEAT_INTERVAL ms,
* 3) hit an error that causes us to terminate trying to get containers.
*/
def waitForInitialAllocations() {
yarnAllocatorLoop.synchronized {
while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) {
yarnAllocatorLoop.wait(1000L)
}
}
}

def main(argStrings: Array[String]) {
SignalLogger.register(log)
val args = new ApplicationMasterArguments(argStrings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,8 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
}

override def postStartHook() {
val sparkContextInitialized = ApplicationMaster.sparkContextInitialized(sc)
ApplicationMaster.sparkContextInitialized(sc)
super.postStartHook()
if (sparkContextInitialized){
ApplicationMaster.waitForInitialAllocations()
}
logInfo("YarnClusterScheduler.postStartHook done")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
sparkContext.getConf)
}
}
} finally {
// In case of exceptions, etc - ensure that the loop in
// ApplicationMaster#sparkContextInitialized() breaks.
ApplicationMaster.doneWithInitialAllocations()
}
}

Expand All @@ -254,16 +250,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
checkNumExecutorsFailed()
allocateMissingExecutor()
yarnAllocator.allocateResources()
if (iters == ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT) {
ApplicationMaster.doneWithInitialAllocations()
}
Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
iters += 1
}
} finally {
// In case of exceptions, etc - ensure that the loop in
// ApplicationMaster#sparkContextInitialized() breaks.
ApplicationMaster.doneWithInitialAllocations()
}
logInfo("All executors have launched.")
}
Expand Down Expand Up @@ -370,28 +359,13 @@ object ApplicationMaster extends Logging {
// This is to ensure that we have reasonable number of containers before we start
// TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
// optimal as more containers are available. Might need to handle this better.
private val ALLOCATOR_LOOP_WAIT_COUNT = 30
private val ALLOCATE_HEARTBEAT_INTERVAL = 100

private val applicationMasters = new CopyOnWriteArrayList[ApplicationMaster]()

val sparkContextRef: AtomicReference[SparkContext] =
new AtomicReference[SparkContext](null)

// Variable used to notify the YarnClusterScheduler that it should stop waiting
// for the initial set of executors to be started and get on with its business.
val doneWithInitialAllocationsMonitor = new Object()

@volatile var isDoneWithInitialAllocations = false

def doneWithInitialAllocations() {
isDoneWithInitialAllocations = true
doneWithInitialAllocationsMonitor.synchronized {
// to wake threads off wait ...
doneWithInitialAllocationsMonitor.notifyAll()
}
}

def register(master: ApplicationMaster) {
applicationMasters.add(master)
}
Expand Down Expand Up @@ -434,20 +408,6 @@ object ApplicationMaster extends Logging {
modified
}

/**
* Returns when we've either
* 1) received all the requested executors,
* 2) waited ALLOCATOR_LOOP_WAIT_COUNT * ALLOCATE_HEARTBEAT_INTERVAL ms,
* 3) hit an error that causes us to terminate trying to get containers.
*/
def waitForInitialAllocations() {
doneWithInitialAllocationsMonitor.synchronized {
while (!isDoneWithInitialAllocations) {
doneWithInitialAllocationsMonitor.wait(1000L)
}
}
}

def getApplicationAttemptId(): ApplicationAttemptId = {
val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
val containerId = ConverterUtils.toContainerId(containerIdString)
Expand Down

0 comments on commit c744ef3

Please sign in to comment.