Description
SynapseML version
1.0.4
System information
- Language version (e.g. python 3.8, scala 2.12):
- Spark Version (e.g. 3.2.3):
- Spark Platform (e.g. Synapse, Databricks):
Describe the problem
I reviewed the port bind code. It works well on a physical machine. However, if my YARN is a virtual container based on a physical machine, the port bind sometimes fails because it finds the port then closed and rebinds after a time.
when n task run on same pyhsical machine and bind port not on same time, this impl may cause bind failed.
Code to reproduce issue
**
Other info / logs
def getGlobalNetworkInfo(ctx: TrainingContext,
log: Logger,
taskId: Long,
partitionId: Int,
shouldExecuteTraining: Boolean,
measures: TaskInstrumentationMeasures): NetworkTopologyInfo = {
measures.markNetworkInitializationStart()
val networkParams = ctx.networkParams
val out = using(findOpenPort(ctx, log).get) {
openPort =>
val localListenPort = openPort.getLocalPort
log.info(s"LightGBM task
FaultToleranceUtils.retryWithTimeout() {
getNetworkTopologyInfoFromDriver(networkParams,
taskId,
partitionId,
localListenPort,
log,
shouldExecuteTraining)
}
}.get
measures.markNetworkInitializationStop()
out
}
def mapPartitionTask(ctx: TrainingContext)(inputRows: Iterator[Row]): Iterator[PartitionResult] = {
// Start with initialization
val taskCtx = initialize(ctx, inputRows)
if (taskCtx.isEmptyPartition) {
log.warn("LightGBM task encountered empty partition, for best performance ensure no partitions are empty")
Array { PartitionResult(None, taskCtx.measures) }.toIterator
} else {
// Perform any data preparation work
val dataIntermediateState = preparePartitionData(taskCtx, inputRows)
try {
if (taskCtx.shouldExecuteTraining) {
// If participating in training, initialize the network ring of communication
NetworkManager.initLightGBMNetwork(taskCtx, log)
if (ctx.useSingleDatasetMode) {
log.info(s"Waiting for all data prep to be done, task ${taskCtx.taskId}, partition ${taskCtx.partitionId}")
ctx.sharedState().dataPreparationDoneSignal.await()
}
// Create the final Dataset for training and execute training iterations
finalizeDatasetAndTrain(taskCtx, dataIntermediateState)
} else {
log.info(s"Helper task ${taskCtx.taskId}, partition ${taskCtx.partitionId} finished processing rows")
ctx.sharedState().dataPreparationDoneSignal.countDown()
Array { PartitionResult(None, taskCtx.measures) }.toIterator
}
} finally {
cleanup(taskCtx)
}
}
}
What component(s) does this bug affect?
-
area/cognitive
: Cognitive project -
area/core
: Core project -
area/deep-learning
: DeepLearning project -
area/lightgbm
: Lightgbm project -
area/opencv
: Opencv project -
area/vw
: VW project -
area/website
: Website -
area/build
: Project build system -
area/notebooks
: Samples under notebooks folder -
area/docker
: Docker usage -
area/models
: models related issue
What language(s) does this bug affect?
-
language/scala
: Scala source code -
language/python
: Pyspark APIs -
language/r
: R APIs -
language/csharp
: .NET APIs -
language/new
: Proposals for new client languages
What integration(s) does this bug affect?
-
integrations/synapse
: Azure Synapse integrations -
integrations/azureml
: Azure ML integrations -
integrations/databricks
: Databricks integrations