Skip to content

Commit

Permalink
[SPARK-21225][CORE] Considering CPUS_PER_TASK when allocating task sl…
Browse files Browse the repository at this point in the history
…ots for each WorkerOffer

JIRA Issue:https://issues.apache.org/jira/browse/SPARK-21225
    In the function "resourceOffers", It declare a variable "tasks" for storage the tasks which have allocated a executor. It declared like this:
`val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))`
    But, I think this code only conside a situation for that one task per core. If the user set "spark.task.cpus" as 2 or 3, It really don't need so much Mem. I think It can motify as follow:
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
 to instead.
    Motify like this the other earning is that it's more easy to understand the way how the tasks allocate offers.

Author: 杨治国10192065 <yang.zhiguo@zte.com.cn>

Closes apache#18435 from JackYangzg/motifyTaskCoreDisp.
  • Loading branch information
杨治国10192065 authored and Robert Kruszewski committed Jun 29, 2017
1 parent 5ee3b65 commit 4120a17
Showing 1 changed file with 1 addition and 1 deletion.
Expand Up @@ -345,7 +345,7 @@ private[spark] class TaskSchedulerImpl(

val shuffledOffers = shuffleOffers(filteredOffers)
// Build a list of tasks to assign to each worker.
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
val availableCpus = shuffledOffers.map(o => o.cores).toArray
val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet <- sortedTaskSets) {
Expand Down

0 comments on commit 4120a17

Please sign in to comment.