Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
sunchao committed Feb 7, 2024
1 parent 23c66af commit 1fd8807
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 18 deletions.
7 changes: 6 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,9 @@ object SparkEnv extends Logging {
new NettyBlockTransferService(conf, securityManager, serializerManager, bindAddress,
advertiseAddress, blockManagerPort, numUsableCores, blockManagerMaster.driverEndpoint)

val maxOnHeapMemory = UnifiedMemoryManager.getMaxMemory(conf)
val maxOffHeapMemory = conf.get(MEMORY_OFFHEAP_SIZE)

// NB: blockManager is not valid until initialize() is called later.
// SPARK-45762 introduces a change where the ShuffleManager is initialized later
// in the SparkContext and Executor, to allow for custom ShuffleManagers defined
Expand All @@ -432,7 +435,9 @@ object SparkEnv extends Logging {
_shuffleManager = null,
blockTransferService,
securityManager,
externalShuffleClient)
externalShuffleClient,
maxOnHeapMemory,
maxOffHeapMemory)

val metricsSystem = if (isDriver) {
// Don't start metrics system right now for Driver.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ object UnifiedMemoryManager {
/**
* Return the total amount of memory shared between execution and storage, in bytes.
*/
private def getMaxMemory(conf: SparkConf): Long = {
private[spark] def getMaxMemory(conf: SparkConf): Long = {
val systemMemory = conf.get(TEST_MEMORY)
val reservedMemory = conf.getLong(TEST_RESERVED_MEMORY.key,
if (conf.contains(IS_TESTING)) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
Expand Down
30 changes: 22 additions & 8 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,9 @@ private[spark] class BlockManager(
private val _shuffleManager: ShuffleManager,
val blockTransferService: BlockTransferService,
securityManager: SecurityManager,
externalBlockStoreClient: Option[ExternalBlockStoreClient])
externalBlockStoreClient: Option[ExternalBlockStoreClient],
val maxOnHeapMemory: Long,
val maxOffHeapMemory: Long)
extends BlockDataManager with BlockEvictionHandler with Logging {

// We initialize the ShuffleManager later in SparkContext and Executor, to allow
Expand Down Expand Up @@ -236,13 +238,6 @@ private[spark] class BlockManager(
}
private[spark] val diskStore = new DiskStore(conf, diskBlockManager, securityManager)

// Note: depending on the memory manager, `maxMemory` may actually vary over time.
// However, since we use this only for reporting and logging, what we actually want here is
// the absolute maximum value that `maxMemory` can ever possibly reach. We may need
// to revisit whether reporting this value as the "max" is intuitive to the user.
private lazy val maxOnHeapMemory = memoryManager.maxOnHeapStorageMemory
private lazy val maxOffHeapMemory = memoryManager.maxOffHeapStorageMemory

private[spark] val externalShuffleServicePort = StorageUtils.externalShuffleServicePort(conf)

var blockManagerId: BlockManagerId = _
Expand Down Expand Up @@ -2157,6 +2152,25 @@ private[spark] class BlockManager(


private[spark] object BlockManager {
// scalastyle:off argcount
def apply(
executorId: String,
rpcEnv: RpcEnv,
master: BlockManagerMaster,
serializerManager: SerializerManager,
conf: SparkConf,
memoryManager: MemoryManager,
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager,
blockTransferService: BlockTransferService,
securityManager: SecurityManager,
externalBlockStoreClient: Option[ExternalBlockStoreClient]): BlockManager =
new BlockManager(executorId, rpcEnv, master, serializerManager, conf, memoryManager,
mapOutputTracker, shuffleManager, blockTransferService, securityManager,
externalBlockStoreClient, memoryManager.maxOnHeapStorageMemory,
memoryManager.maxOffHeapStorageMemory)
// scalastyle:on argcount

private val ID_GENERATOR = new IdGenerator

def blockIdsToLocations(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
val transfer = new NettyBlockTransferService(
conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1)
val memManager = memoryManager.getOrElse(UnifiedMemoryManager(conf, numCores = 1))
val store = new BlockManager(name, rpcEnv, master, serializerManager, conf,
val store = BlockManager(name, rpcEnv, master, serializerManager, conf,
memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, None)
memManager.setMemoryStore(store.memoryStore)
store.initialize("app-id")
Expand Down Expand Up @@ -242,7 +242,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
conf.set(TEST_MEMORY, 10000L)
val memManager = UnifiedMemoryManager(conf, numCores = 1)
val serializerManager = new SerializerManager(serializer, conf)
val failableStore = new BlockManager("failable-store", rpcEnv, master, serializerManager, conf,
val failableStore = BlockManager("failable-store", rpcEnv, master, serializerManager, conf,
memManager, mapOutputTracker, shuffleManager, failableTransfer, securityMgr, None)
memManager.setMemoryStore(failableStore.memoryStore)
failableStore.initialize("app-id")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe
} else {
None
}
val blockManager = new BlockManager(name, rpcEnv, master, serializerManager, bmConf,
val blockManager = BlockManager(name, rpcEnv, master, serializerManager, bmConf,
memManager, mapOutputTracker, shuffleManager, transfer, bmSecurityMgr, externalShuffleClient)
memManager.setMemoryStore(blockManager.memoryStore)
allStores += blockManager
Expand Down Expand Up @@ -1344,7 +1344,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe
val transfer = new NettyBlockTransferService(
conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1)
val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
val store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,
val store = BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,
serializerManager, conf, memoryManager, mapOutputTracker,
shuffleManager, transfer, securityMgr, None)
allStores += store
Expand Down Expand Up @@ -1393,7 +1393,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe
val transfer = new NettyBlockTransferService(
conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1)
val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
val blockManager = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,
val blockManager = BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,
serializerManager, conf, memoryManager, mapOutputTracker,
shuffleManager, transfer, securityMgr, None)
try {
Expand Down Expand Up @@ -2248,7 +2248,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe
val transfer = new NettyBlockTransferService(
conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1)
val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
val store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,
val store = BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,
serializerManager, conf, memoryManager, mapOutputTracker,
shuffleManager, transfer, securityMgr, None)
allStores += store
Expand All @@ -2272,7 +2272,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe
val transfer = new NettyBlockTransferService(
conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1)
val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
val store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,
val store = BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,
serializerManager, conf, memoryManager, mapOutputTracker,
shuffleManager, transfer, securityMgr, None)
allStores += store
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ abstract class BaseReceivedBlockHandlerSuite(enableEncryption: Boolean)
val memManager = new UnifiedMemoryManager(conf, maxMem, maxMem / 2, 1)
val transfer = new NettyBlockTransferService(
conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1)
val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster, serializerManager, conf,
val blockManager = BlockManager(name, rpcEnv, blockManagerMaster, serializerManager, conf,
memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, None)
memManager.setMemoryStore(blockManager.memoryStore)
blockManager.initialize("app-id")
Expand Down

0 comments on commit 1fd8807

Please sign in to comment.