Skip to content

[BUG] Binding port failed #2230

Open
@coddderX

Description

@coddderX

SynapseML version

1.0.4

System information

  • Language version (e.g. python 3.8, scala 2.12):
  • Spark Version (e.g. 3.2.3):
  • Spark Platform (e.g. Synapse, Databricks):

Describe the problem

I reviewed the port bind code. It works well on a physical machine. However, if my YARN is a virtual container based on a physical machine, the port bind sometimes fails because it finds the port then closed and rebinds after a time.
when n task run on same pyhsical machine and bind port not on same time, this impl may cause bind failed.

Code to reproduce issue

**

Other info / logs

def getGlobalNetworkInfo(ctx: TrainingContext,
log: Logger,
taskId: Long,
partitionId: Int,
shouldExecuteTraining: Boolean,
measures: TaskInstrumentationMeasures): NetworkTopologyInfo = {
measures.markNetworkInitializationStart()
val networkParams = ctx.networkParams
val out = using(findOpenPort(ctx, log).get) {
openPort =>
val localListenPort = openPort.getLocalPort
log.info(s"LightGBM task $taskId connecting to host: ${networkParams.ipAddress}, port: ${networkParams.port}")
FaultToleranceUtils.retryWithTimeout() {
getNetworkTopologyInfoFromDriver(networkParams,
taskId,
partitionId,
localListenPort,
log,
shouldExecuteTraining)
}
}.get
measures.markNetworkInitializationStop()
out
}

def mapPartitionTask(ctx: TrainingContext)(inputRows: Iterator[Row]): Iterator[PartitionResult] = {
// Start with initialization
val taskCtx = initialize(ctx, inputRows)

if (taskCtx.isEmptyPartition) {
  log.warn("LightGBM task encountered empty partition, for best performance ensure no partitions are empty")
  Array { PartitionResult(None, taskCtx.measures) }.toIterator
} else {
  // Perform any data preparation work
  val dataIntermediateState = preparePartitionData(taskCtx, inputRows)

  try {
    if (taskCtx.shouldExecuteTraining) {
      // If participating in training, initialize the network ring of communication
      NetworkManager.initLightGBMNetwork(taskCtx, log)

      if (ctx.useSingleDatasetMode) {
        log.info(s"Waiting for all data prep to be done, task ${taskCtx.taskId}, partition ${taskCtx.partitionId}")
        ctx.sharedState().dataPreparationDoneSignal.await()
      }

      // Create the final Dataset for training and execute training iterations
      finalizeDatasetAndTrain(taskCtx, dataIntermediateState)
    } else {
      log.info(s"Helper task ${taskCtx.taskId}, partition ${taskCtx.partitionId} finished processing rows")
      ctx.sharedState().dataPreparationDoneSignal.countDown()
      Array { PartitionResult(None, taskCtx.measures) }.toIterator
    }
  } finally {
    cleanup(taskCtx)
  }
}

}

What component(s) does this bug affect?

  • area/cognitive: Cognitive project
  • area/core: Core project
  • area/deep-learning: DeepLearning project
  • area/lightgbm: Lightgbm project
  • area/opencv: Opencv project
  • area/vw: VW project
  • area/website: Website
  • area/build: Project build system
  • area/notebooks: Samples under notebooks folder
  • area/docker: Docker usage
  • area/models: models related issue

What language(s) does this bug affect?

  • language/scala: Scala source code
  • language/python: Pyspark APIs
  • language/r: R APIs
  • language/csharp: .NET APIs
  • language/new: Proposals for new client languages

What integration(s) does this bug affect?

  • integrations/synapse: Azure Synapse integrations
  • integrations/azureml: Azure ML integrations
  • integrations/databricks: Databricks integrations

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions