Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Issue #42 fixed.

  • Loading branch information...
commit db7a2c4897aabf0b00a8df347dae1ad579e767f9 1 parent 60d1121
@mosharaf mosharaf authored
View
5 core/src/main/scala/spark/SparkContext.scala
@@ -35,10 +35,11 @@ extends Logging {
private val isLocal = scheduler.isInstanceOf[LocalScheduler]
- // Start the scheduler, the cache and the broadcast system
- scheduler.start()
+ // Start all other subsystems before the scheduler
+ // Initialize Cache before Broadcast because Broadcast depends on it
Cache.initialize()
Broadcast.initialize(true)
+ scheduler.start()
// Methods for creating RDDs
View
4 core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
@@ -49,7 +49,7 @@ extends Broadcast[T] with Logging {
@transient var rxSpeeds = new SpeedTracker
@transient var txSpeeds = new SpeedTracker
- @transient var hostAddress = InetAddress.getLocalHost.getHostAddress
+ @transient var hostAddress = Utils.localIpAddress
@transient var listenPort = -1
@transient var guidePort = -1
@@ -187,7 +187,7 @@ extends Broadcast[T] with Logging {
rxSpeeds = new SpeedTracker
txSpeeds = new SpeedTracker
- hostAddress = InetAddress.getLocalHost.getHostAddress
+ hostAddress = Utils.localIpAddress
listenPort = -1
listOfSources = ListBuffer[SourceInfo]()
View
11 core/src/main/scala/spark/broadcast/Broadcast.scala
@@ -34,14 +34,19 @@ extends Logging {
// Called by SparkContext or Executor before using Broadcast
def initialize (isMaster__ : Boolean): Unit = synchronized {
if (!initialized) {
- val broadcastFactoryClass = System.getProperty("spark.broadcast.factory",
- "spark.broadcast.DfsBroadcastFactory")
+ val broadcastFactoryClass = System.getProperty(
+ "spark.broadcast.factory", "spark.broadcast.DfsBroadcastFactory")
broadcastFactory =
Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]
// Setup isMaster before using it
isMaster_ = isMaster__
+
+ // Set masterHostAddress to the master's IP address for the slaves to read
+ if (isMaster) {
+ System.setProperty("spark.broadcast.masterHostAddress", Utils.localIpAddress)
+ }
// Initialize appropriate BroadcastFactory and BroadcastObject
broadcastFactory.initialize(isMaster)
@@ -59,7 +64,7 @@ extends Logging {
// Load common broadcast-related config parameters
private var MasterHostAddress_ = System.getProperty(
- "spark.broadcast.masterHostAddress", InetAddress.getLocalHost.getHostAddress)
+ "spark.broadcast.masterHostAddress", "")
private var MasterTrackerPort_ = System.getProperty(
"spark.broadcast.masterTrackerPort", "11111").toInt
private var BlockSize_ = System.getProperty(
View
4 core/src/main/scala/spark/broadcast/ChainedBroadcast.scala
@@ -36,7 +36,7 @@ extends Broadcast[T] with Logging {
@transient var serveMR: ServeMultipleRequests = null
@transient var guideMR: GuideMultipleRequests = null
- @transient var hostAddress = InetAddress.getLocalHost.getHostAddress
+ @transient var hostAddress = Utils.localIpAddress
@transient var listenPort = -1
@transient var guidePort = -1
@@ -149,7 +149,7 @@ extends Broadcast[T] with Logging {
serveMR = null
- hostAddress = InetAddress.getLocalHost.getHostAddress
+ hostAddress = Utils.localIpAddress
listenPort = -1
stopBroadcast = false
View
4 core/src/main/scala/spark/broadcast/TreeBroadcast.scala
@@ -36,7 +36,7 @@ extends Broadcast[T] with Logging {
@transient var serveMR: ServeMultipleRequests = null
@transient var guideMR: GuideMultipleRequests = null
- @transient var hostAddress = InetAddress.getLocalHost.getHostAddress
+ @transient var hostAddress = Utils.localIpAddress
@transient var listenPort = -1
@transient var guidePort = -1
@@ -152,7 +152,7 @@ extends Broadcast[T] with Logging {
serveMR = null
- hostAddress = InetAddress.getLocalHost.getHostAddress
+ hostAddress = Utils.localIpAddress
listenPort = -1
stopBroadcast = false
View
2  core/src/main/scala/spark/shuffle/CustomBlockedLocalFileShuffle.scala
@@ -399,7 +399,7 @@ object CustomBlockedLocalFileShuffle extends Logging {
private var shuffleDir: File = null
private var shuffleServer: ShuffleServer = null
- private var serverAddress = InetAddress.getLocalHost.getHostAddress
+ private var serverAddress = Utils.localIpAddress
private var serverPort: Int = -1
// Random number generator
View
2  core/src/main/scala/spark/shuffle/CustomParallelLocalFileShuffle.scala
@@ -335,7 +335,7 @@ object CustomParallelLocalFileShuffle extends Logging {
private var shuffleDir: File = null
private var shuffleServer: ShuffleServer = null
- private var serverAddress = InetAddress.getLocalHost.getHostAddress
+ private var serverAddress = Utils.localIpAddress
private var serverPort: Int = -1
// Random number generator
Please sign in to comment.
Something went wrong with that request. Please try again.