Skip to content

Commit

Permalink
Merge branch 'master_nravi' of https://github.com/nishkamravi2/spark
Browse files Browse the repository at this point in the history
…into master_nravi

Conflicts:
	streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
  • Loading branch information
nishkamravi2 committed Jun 5, 2015
2 parents 02dbdb8 + 07b9dfa commit 45e3a99
Showing 1 changed file with 1 addition and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.streaming.scheduler

import scala.collection.mutable.{HashMap, SynchronizedMap, ArrayBuffer}
import scala.collection.mutable.{ArrayBuffer, HashMap, SynchronizedMap}
import scala.language.existentials
import org.apache.spark.rdd._

Expand Down Expand Up @@ -302,12 +302,10 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
if (receivers.length >= executors.length) {
for (i <- 0 to (receivers.length - 1)) {
locations(i) += executors(i % executors.length)
System.out.println("RECEIVERS > executors " + executors(i % executors.length))
}
} else {
for (i <- 0 to (executors.length - 1)) {
locations(i % receivers.length) += executors(i)
System.out.println("EXECUTORS > receivers " + executors(i))
}
}
roundRobin = true
Expand All @@ -319,7 +317,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
} else if (roundRobin) {
// ssc.sc.makeRDD(receivers, receivers.size)
val roundRobinReceivers = (0 to (receivers.length - 1)).map(i => (receivers(i), locations(i)))
ssc.sc.makeRDD[Receiver[_]](roundRobinReceivers)
} else {
Expand Down

0 comments on commit 45e3a99

Please sign in to comment.