Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
sunchao committed Feb 6, 2024
1 parent 5d5b3a5 commit b38a321
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 6 deletions.
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,8 @@ class SparkContext(config: SparkConf) extends Logging {
// Initialize any plugins before the task scheduler is initialized.
_plugins = PluginContainer(this, _resources.asJava)
_env.initializeShuffleManager()
_env.initializeMemoryManager(SparkContext.numDriverCores(master, conf))


// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
Expand Down
18 changes: 13 additions & 5 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ class SparkEnv (
val blockManager: BlockManager,
val securityManager: SecurityManager,
val metricsSystem: MetricsSystem,
val memoryManager: MemoryManager,
val outputCommitCoordinator: OutputCommitCoordinator,
val conf: SparkConf) extends Logging {

Expand All @@ -77,6 +76,12 @@ class SparkEnv (

def shuffleManager: ShuffleManager = _shuffleManager

// We initialize the MemoryManager later in SparkContext after DriverPlugin is loaded
// to allow the plugin to overwrite memory configurations
private var _memoryManager: MemoryManager = _

def memoryManager: MemoryManager = _memoryManager

@volatile private[spark] var isStopped = false

/**
Expand Down Expand Up @@ -199,6 +204,12 @@ class SparkEnv (
"Shuffle manager already initialized to %s", _shuffleManager)
_shuffleManager = ShuffleManager.create(conf, executorId == SparkContext.DRIVER_IDENTIFIER)
}

private[spark] def initializeMemoryManager(numUsableCores: Int): Unit = {
Preconditions.checkState(null == memoryManager,
"Memory manager already initialized to %s", _memoryManager)
_memoryManager = UnifiedMemoryManager(conf, numUsableCores)
}
}

object SparkEnv extends Logging {
Expand Down Expand Up @@ -358,8 +369,6 @@ object SparkEnv extends Logging {
new MapOutputTrackerMasterEndpoint(
rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))

val memoryManager: MemoryManager = UnifiedMemoryManager(conf, numUsableCores)

val blockManagerPort = if (isDriver) {
conf.get(DRIVER_BLOCK_MANAGER_PORT)
} else {
Expand Down Expand Up @@ -418,7 +427,7 @@ object SparkEnv extends Logging {
blockManagerMaster,
serializerManager,
conf,
memoryManager,
_memoryManager = null,
mapOutputTracker,
_shuffleManager = null,
blockTransferService,
Expand Down Expand Up @@ -463,7 +472,6 @@ object SparkEnv extends Logging {
blockManager,
securityManager,
metricsSystem,
memoryManager,
outputCommitCoordinator,
conf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {

val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,
arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)
// Set the memory manager since it needs to be initialized explicitly
env.initializeMemoryManager(arguments.cores)
// Set the application attemptId in the BlockStoreClient if available.
val appAttemptId = env.conf.get(APP_ATTEMPT_ID)
appAttemptId.foreach(attemptId =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ private[spark] class BlockManager(
val master: BlockManagerMaster,
val serializerManager: SerializerManager,
val conf: SparkConf,
memoryManager: MemoryManager,
private val _memoryManager: MemoryManager,
mapOutputTracker: MapOutputTracker,
private val _shuffleManager: ShuffleManager,
val blockTransferService: BlockTransferService,
Expand All @@ -198,6 +198,11 @@ private[spark] class BlockManager(
// (except for tests) and we ask for the instance from the SparkEnv.
private lazy val shuffleManager = Option(_shuffleManager).getOrElse(SparkEnv.get.shuffleManager)

// Similarly, we also initialize MemoryManager later after DriverPlugin is loaded, to
// allow the plugin to overwrite certain memory configurations. The `_memoryManager` will be
// null here and we ask for the instance from SparkEnv
private lazy val memoryManager = Option(_memoryManager).getOrElse(SparkEnv.get.memoryManager)

// same as `conf.get(config.SHUFFLE_SERVICE_ENABLED)`
private[spark] val externalShuffleServiceEnabled: Boolean = externalBlockStoreClient.isDefined
private val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER
Expand Down

0 comments on commit b38a321

Please sign in to comment.