diff --git a/docs/lightgbm.md b/docs/lightgbm.md index e70e4c5204..f1b00f6502 100644 --- a/docs/lightgbm.md +++ b/docs/lightgbm.md @@ -79,9 +79,9 @@ can be converted to PMML format through the By default LightGBM uses regular spark paradigm for launching tasks and communicates with the driver to coordinate task execution. The driver thread aggregates all task host:port information and then communicates the full list back to the workers in order for NetworkInit to be called. -There have been some issues on certain cluster configurations because the driver needs to know how many tasks there are, and this computation is surprisingly non-trivial in spark. +This requires the driver to know how many tasks there are, and if the expected number of tasks is different from actual this will cause the initialization to deadlock. With the next v0.18 release there is a new UseBarrierExecutionMode flag, which when activated uses the barrier() stage to block all tasks. -The barrier execution mode simplifies the logic to aggregate host:port information across all tasks, so the driver will no longer need to precompute the number of tasks in advance. +The barrier execution mode simplifies the logic to aggregate host:port information across all tasks. To use it in scala, you can call setUseBarrierExecutionMode(true), for example: val lgbm = new LightGBMClassifier() diff --git a/scalastyle-config.xml b/scalastyle-config.xml index ab36c3f769..b2398a1bbf 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -45,7 +45,7 @@ package (?:com\.microsoft\.ml\.spark|org\.apache\.spark|com\.microsoft\.CNTK|com ^[a-z][A-Za-z0-9]*(_=)?$ - + ^[A-Z_]$ diff --git a/scalastyle-test-config.xml b/scalastyle-test-config.xml index 3e07ba8622..fa2ad3c7ad 100644 --- a/scalastyle-test-config.xml +++ b/scalastyle-test-config.xml @@ -45,7 +45,7 @@ package (?:com\.microsoft\.ml\.spark|org\.apache\.spark|com\.microsoft\.CNTK|com ^[a-z][A-Za-z0-9]*(_=)?$ - + ^[A-Z_]$ diff --git a/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMBase.scala b/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMBase.scala index 8c3b3ab600..b5f794164d 100644 --- a/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMBase.scala +++ b/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMBase.scala @@ -39,12 +39,45 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine } } - protected def innerTrain(dataset: Dataset[_]): TrainedModel = { - val sc = dataset.sparkSession.sparkContext - val numCoresPerExec = ClusterUtil.getNumCoresPerExecutor(dataset, log) - val numExecutorCores = ClusterUtil.getNumExecutorCores(dataset, numCoresPerExec) - val numWorkers = min(numExecutorCores, dataset.rdd.getNumPartitions) - // Only get the relevant columns + protected def prepareDataframe(dataset: Dataset[_], trainingCols: ListBuffer[String], + numWorkers: Int): DataFrame = { + // Reduce number of partitions to number of executor cores + val df = dataset.select(trainingCols.map(name => col(name)): _*).toDF() + /* Note: with barrier execution mode we must use repartition instead of coalesce when + * running on spark standalone. + * Using coalesce, we get the error: + * + * org.apache.spark.scheduler.BarrierJobUnsupportedRDDChainException: + * [SPARK-24820][SPARK-24821]: Barrier execution mode does not allow the following + * pattern of RDD chain within a barrier stage: + * 1. Ancestor RDDs that have different number of partitions from the resulting + * RDD (eg. union()/coalesce()/first()/take()/PartitionPruningRDD). A workaround + * for first()/take() can be barrierRdd.collect().head (scala) or barrierRdd.collect()[0] (python). + * 2. An RDD that depends on multiple barrier RDDs (eg. barrierRdd1.zip(barrierRdd2)). + * + * Without repartition, we may hit the error: + * org.apache.spark.scheduler.BarrierJobSlotsNumberCheckFailed: [SPARK-24819]: + * Barrier execution mode does not allow run a barrier stage that requires more + * slots than the total number of slots in the cluster currently. Please init a + * new cluster with more CPU cores or repartition the input RDD(s) to reduce the + * number of slots required to run this barrier stage. + * + * Hence we still need to estimate the number of workers and repartition even when using + * barrier execution, which is unfortunate as repartiton is more expensive than coalesce. + */ + if (getUseBarrierExecutionMode) { + val numPartitions = df.rdd.getNumPartitions + if (numPartitions > numWorkers) { + df.repartition(numWorkers) + } else { + df + } + } else { + df.coalesce(numWorkers) + } + } + + protected def getTrainingCols(): ListBuffer[String] = { val trainingCols = ListBuffer(getLabelCol, getFeaturesCol) if (get(weightCol).isDefined) { trainingCols += getWeightCol @@ -58,8 +91,18 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine if (get(initScoreCol).isDefined) { trainingCols += getInitScoreCol } - // Reduce number of partitions to number of executor cores - val df = dataset.select(trainingCols.map(name => col(name)): _*).toDF().coalesce(numWorkers) + trainingCols + } + + protected def innerTrain(dataset: Dataset[_]): TrainedModel = { + val sc = dataset.sparkSession.sparkContext + val numCoresPerExec = ClusterUtil.getNumCoresPerExecutor(dataset, log) + val numExecutorCores = ClusterUtil.getNumExecutorCores(dataset, numCoresPerExec) + val numWorkers = min(numExecutorCores, dataset.rdd.getNumPartitions) + // Only get the relevant columns + val trainingCols = getTrainingCols() + val df = prepareDataframe(dataset, trainingCols, numWorkers) + val (inetAddress, port, future) = LightGBMUtils.createDriverNodesThread(numWorkers, df, log, getTimeout, getUseBarrierExecutionMode)