diff --git a/core/src/main/scala/com/microsoft/ml/spark/core/utils/ClusterUtil.scala b/core/src/main/scala/com/microsoft/ml/spark/core/utils/ClusterUtil.scala index db8e39cd03..98ac6fb1f4 100644 --- a/core/src/main/scala/com/microsoft/ml/spark/core/utils/ClusterUtil.scala +++ b/core/src/main/scala/com/microsoft/ml/spark/core/utils/ClusterUtil.scala @@ -104,8 +104,8 @@ object ClusterUtil { } } - def getDriverHost(dataset: Dataset[_]): String = { - val blockManager = BlockManagerUtils.getBlockManager(dataset) + def getDriverHost(spark: SparkSession): String = { + val blockManager = BlockManagerUtils.getBlockManager(spark) blockManager.master.getMemoryStatus.toList.flatMap({ case (blockManagerId, _) => if (blockManagerId.executorId == "driver") Some(getHostToIP(blockManagerId.host)) else None @@ -120,11 +120,11 @@ object ClusterUtil { } /** Returns a list of executor id and host. - * @param dataset The dataset containing the current spark session. + * @param spark The current spark session. * @return List of executors as an array of (id,host). */ - def getExecutors(dataset: Dataset[_]): Array[(Int, String)] = { - val blockManager = BlockManagerUtils.getBlockManager(dataset) + def getExecutors(spark: SparkSession): Array[(Int, String)] = { + val blockManager = BlockManagerUtils.getBlockManager(spark) blockManager.master.getMemoryStatus.toList.flatMap({ case (blockManagerId, _) => if (blockManagerId.executorId == "driver") None else Some((blockManagerId.executorId.toInt, getHostToIP(blockManagerId.host))) @@ -142,35 +142,33 @@ object ClusterUtil { * @param numTasksPerExec The number of tasks per executor. * @return The number of executors * number of tasks. */ - def getNumExecutorTasks(dataset: Dataset[_], numTasksPerExec: Int, log: Logger): Int = { - val executors = getExecutors(dataset) + def getNumExecutorTasks(spark: SparkSession, numTasksPerExec: Int, log: Logger): Int = { + val executors = getExecutors(spark) log.info(s"Retrieving executors...") if (!executors.isEmpty) { log.info(s"Retrieved num executors ${executors.length} with num tasks per executor $numTasksPerExec") executors.length * numTasksPerExec } else { log.info(s"Could not retrieve executors from blockmanager, trying to get from configuration...") - val master = dataset.sparkSession.sparkContext.master + val master = spark.sparkContext.master + + //TODO make this less brittle val rx = "local(?:\\[(\\*|\\d+)(?:,\\d+)?\\])?".r master match { - case rx(null) => { + case rx(null) => log.info(s"Retrieved local() = 1 executor by default") 1 - } - case rx("*") => { + case rx("*") => log.info(s"Retrieved local(*) = ${Runtime.getRuntime.availableProcessors()} executors") Runtime.getRuntime.availableProcessors() - } - case rx(cores) => { + case rx(cores) => log.info(s"Retrieved local(cores) = $cores executors") cores.toInt - } - case _ => { - val numExecutors = BlockManagerUtils.getBlockManager(dataset) + case _ => + val numExecutors = BlockManagerUtils.getBlockManager(spark) .master.getMemoryStatus.size log.info(s"Using default case = $numExecutors executors") numExecutors - } } } } diff --git a/core/src/main/scala/com/microsoft/ml/spark/stages/PartitionConsolidator.scala b/core/src/main/scala/com/microsoft/ml/spark/stages/PartitionConsolidator.scala index 8a3cc7a0fa..6f6ce174ff 100644 --- a/core/src/main/scala/com/microsoft/ml/spark/stages/PartitionConsolidator.scala +++ b/core/src/main/scala/com/microsoft/ml/spark/stages/PartitionConsolidator.scala @@ -137,8 +137,3 @@ class Consolidator[T] { } -trait LocalAggregator[T] { - def prep(iter: Iterator[Row]): T - - def merge(ts: Seq[T]): T -} diff --git a/core/src/main/scala/org/apache/spark/injections/BlockManagerUtils.scala b/core/src/main/scala/org/apache/spark/injections/BlockManagerUtils.scala index 6d0564abb4..93f17e6f93 100644 --- a/core/src/main/scala/org/apache/spark/injections/BlockManagerUtils.scala +++ b/core/src/main/scala/org/apache/spark/injections/BlockManagerUtils.scala @@ -3,16 +3,16 @@ package org.apache.spark.injections -import org.apache.spark.sql.Dataset +import org.apache.spark.sql.SparkSession import org.apache.spark.storage.BlockManager object BlockManagerUtils { /** Returns the block manager from the dataframe's spark context. * - * @param data The dataframe to get the block manager from. + * @param spark The spark session to get the block manager from. * @return The block manager. */ - def getBlockManager(data: Dataset[_]): BlockManager = { - data.sparkSession.sparkContext.env.blockManager + def getBlockManager(spark: SparkSession): BlockManager = { + spark.sparkContext.env.blockManager } } diff --git a/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMBase.scala b/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMBase.scala index 8fc1c34927..96ef661498 100644 --- a/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMBase.scala +++ b/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMBase.scala @@ -3,22 +3,31 @@ package com.microsoft.ml.spark.lightgbm +import com.microsoft.ml.lightgbm.{SWIGTYPE_p_int, lightgbmlib} import com.microsoft.ml.spark.core.utils.ClusterUtil import com.microsoft.ml.spark.io.http.SharedSingleton +import com.microsoft.ml.spark.lightgbm.ConnectionState.Finished +import com.microsoft.ml.spark.lightgbm.LightGBMUtils.{closeConnections, handleConnection, sendDataToExecutors} +import com.microsoft.ml.spark.lightgbm.TaskTrainingMethods.{isWorkerEnabled, prepareDatasets} +import com.microsoft.ml.spark.lightgbm.TrainUtils._ import com.microsoft.ml.spark.lightgbm.booster.LightGBMBooster -import com.microsoft.ml.spark.lightgbm.dataset.DatasetUtils -import com.microsoft.ml.spark.lightgbm.params.{DartModeParams, ExecutionParams, LightGBMParams, ObjectiveParams, - TrainParams} +import com.microsoft.ml.spark.lightgbm.dataset._ +import com.microsoft.ml.spark.lightgbm.params._ import com.microsoft.ml.spark.logging.BasicLogging -import org.apache.spark.ml.attribute.AttributeGroup +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.ml.attribute._ import org.apache.spark.ml.linalg.SQLDataTypes.VectorType import org.apache.spark.ml.param.shared.{HasFeaturesCol => HasFeaturesColSpark, HasLabelCol => HasLabelColSpark} import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.sql._ import org.apache.spark.sql.types._ -import org.apache.spark.sql.{DataFrame, Dataset, Encoders} -import scala.concurrent.Await +import java.net.{ServerSocket, Socket} +import java.util.concurrent.Executors +import scala.collection.immutable.HashSet +import scala.collection.mutable.ListBuffer import scala.concurrent.duration.{Duration, SECONDS} +import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future} import scala.language.existentials import scala.math.min import scala.util.matching.Regex @@ -33,6 +42,7 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine */ protected def train(dataset: Dataset[_]): TrainedModel = { logTrain({ + getOptGroupCol.foreach(DatasetUtils.validateGroupColumn(_, dataset.schema)) if (getNumBatches > 0) { val ratio = 1.0 / getNumBatches val datasets = dataset.randomSplit((0 until getNumBatches).map(_ => ratio).toArray) @@ -40,12 +50,10 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine if (model.isDefined) { setModelString(stringFromTrainedModel(model.get)) } - val dataset = datasetWithIndex._1 val batchIndex = datasetWithIndex._2 beforeTrainBatch(batchIndex, dataset, model) - val newModel = innerTrain(dataset, batchIndex) afterTrainBatch(batchIndex, dataset, newModel) @@ -80,7 +88,7 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine // Cast columns to correct types dataset.select( trainingCols.map { - case (name, datatypes: Seq[DataType]) => { + case (name, datatypes: Seq[DataType]) => val index = schema.fieldIndex(name) // Note: We only want to cast if original column was of numeric type @@ -93,17 +101,14 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine // If none match, cast to the first option dataset(name).cast(datatypes.head) } - case _ => dataset(name) } - } }: _* ).toDF() } - protected def prepareDataframe(dataset: Dataset[_], trainingCols: Array[(String, Seq[DataType])], - numTasks: Int): DataFrame = { - val df = castColumns(dataset, trainingCols) + protected def prepareDataframe(dataset: Dataset[_], numTasks: Int): DataFrame = { + val df = castColumns(dataset, getTrainingCols) // Reduce number of partitions to number of executor tasks /* Note: with barrier execution mode we must use repartition instead of coalesce when * running on spark standalone. @@ -139,7 +144,7 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine } } - protected def getTrainingCols(): Array[(String, Seq[DataType])] = { + protected def getTrainingCols: Array[(String, Seq[DataType])] = { val colsToCheck: Array[(Option[String], Seq[DataType])] = Array( (Some(getLabelCol), Seq(DoubleType)), (Some(getFeaturesCol), Seq(VectorType)), @@ -148,31 +153,72 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine (get(validationIndicatorCol), Seq(BooleanType)), (get(initScoreCol), Seq(DoubleType, VectorType))) - colsToCheck.flatMap { case (col: Option[String], colType: Seq[DataType]) => { - if (col.isDefined) Some(col.get, colType) else None - } + colsToCheck.flatMap { + case (col: Option[String], colType: Seq[DataType]) => + if (col.isDefined) Some(col.get, colType) else None } } /** * Retrieves the categorical indexes in the features column. - * @param df The dataset to train on. + * + * @param featuresSchema The schema of the features column * @return the categorical indexes in the features column. */ - private def getCategoricalIndexes(df: DataFrame): Array[Int] = { - val categoricalSlotIndexesArr = get(categoricalSlotIndexes).getOrElse(Array.empty[Int]) - val categoricalSlotNamesArr = get(categoricalSlotNames).getOrElse(Array.empty[String]) - LightGBMUtils.getCategoricalIndexes(df, getFeaturesCol, getSlotNames, - categoricalSlotIndexesArr, categoricalSlotNamesArr) + protected def getCategoricalIndexes(featuresSchema: StructField): Array[Int] = { + val categoricalColumnSlotNames = get(categoricalSlotNames).getOrElse(Array.empty[String]) + val categoricalIndexes = if (getSlotNames.nonEmpty) { + categoricalColumnSlotNames.map(getSlotNames.indexOf(_)) + } else { + val categoricalSlotNamesSet = HashSet(categoricalColumnSlotNames: _*) + val attributes = AttributeGroup.fromStructField(featuresSchema).attributes + if (attributes.isEmpty) { + Array[Int]() + } else { + attributes.get.zipWithIndex.flatMap { + case (null, _) => None + case (attr, idx) => + if (attr.name.isDefined && categoricalSlotNamesSet.contains(attr.name.get)) { + Some(idx) + } else { + attr match { + case _: NumericAttribute | UnresolvedAttribute => None + // Note: it seems that BinaryAttribute is not considered categorical, + // since all OHE cols are marked with this, but StringIndexed are always Nominal + case _: BinaryAttribute => None + case _: NominalAttribute => Some(idx) + } + } + } + } + } + + get(categoricalSlotIndexes) + .getOrElse(Array.empty[Int]) + .union(categoricalIndexes).distinct + } + + def getSlotNamesWithMetadata(featuresSchema: StructField): Option[Array[String]] = { + if (getSlotNames.nonEmpty) { + Some(getSlotNames) + } else { + AttributeGroup.fromStructField(featuresSchema).attributes.flatMap(attributes => + if (attributes.isEmpty) { + None + } else { + val colNames = attributes.indices.map(_.toString).toArray + attributes.foreach(attr => + attr.index.foreach(index => colNames(index) = attr.name.getOrElse(index.toString))) + Some(colNames) + } + ) + } } - private def validateSlotNames(df: DataFrame, columnParams: ColumnParams, trainParams: TrainParams): Unit = { - val schema = df.schema - val featuresSchema = schema.fields(schema.fieldIndex(getFeaturesCol)) + private def validateSlotNames(featuresSchema: StructField): Unit = { val metadata = AttributeGroup.fromStructField(featuresSchema) if (metadata.attributes.isDefined) { - val slotNamesOpt = DatasetUtils.getSlotNames(df.schema, - columnParams.featuresColumn, metadata.attributes.get.length, trainParams) + val slotNamesOpt = getSlotNamesWithMetadata(featuresSchema) val pattern = new Regex("[\",:\\[\\]{}]") slotNamesOpt.foreach(slotNames => { val badSlotNames = slotNames.flatMap(slotName => @@ -187,32 +233,207 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine /** * Constructs the DartModeParams + * * @return DartModeParams object containing parameters related to dart mode. */ - protected def getDartParams(): DartModeParams = { + protected def getDartParams: DartModeParams = { DartModeParams(getDropRate, getMaxDrop, getSkipDrop, getXGBoostDartMode, getUniformDrop) } /** * Constructs the ExecutionParams. + * * @return ExecutionParams object containing parameters related to LightGBM execution. */ - protected def getExecutionParams(): ExecutionParams = { - ExecutionParams(getChunkSize, getMatrixType, getNumThreads) + protected def getExecutionParams: ExecutionParams = { + ExecutionParams(getChunkSize, getMatrixType, getNumThreads, getUseSingleDatasetMode) + } + + protected def getColumnParams: ColumnParams = { + ColumnParams(getLabelCol, getFeaturesCol, get(weightCol), get(initScoreCol), getOptGroupCol) } /** * Constructs the ObjectiveParams. + * * @return ObjectiveParams object containing parameters related to the objective function. */ - protected def getObjectiveParams(): ObjectiveParams = { + protected def getObjectiveParams: ObjectiveParams = { ObjectiveParams(getObjective, if (isDefined(fobj)) Some(getFObj) else None) } + def getDatasetParams(categoricalIndexes: Array[Int], numThreads: Int): String = { + val datasetParams = s"max_bin=$getMaxBin is_pre_partition=True " + + s"bin_construct_sample_cnt=$getBinSampleCount " + + s"num_threads=$numThreads " + + (if (categoricalIndexes.isEmpty) "" + else s"categorical_feature=${categoricalIndexes.mkString(",")}") + datasetParams + } + + private def generateDataset(ac: BaseAggregatedColumns, + referenceDataset: Option[LightGBMDataset], + schema: StructType, + datasetParams: String): LightGBMDataset = { + val dataset = try { + val datasetInner = ac.generateDataset(referenceDataset, datasetParams) + getOptGroupCol.foreach(_ => datasetInner.addGroupColumn(ac.getGroups)) + datasetInner.setFeatureNames(getSlotNamesWithMetadata(schema(getFeaturesCol)), ac.getNumCols) + datasetInner + } finally { + ac.cleanup() + } + // Validate generated dataset has the correct number of rows and cols + dataset.validateDataset() + dataset + } + + + private def translate(batchIndex: Int, + validationData: Option[BaseAggregatedColumns], + trainParams: TrainParams, + returnBooster: Boolean, + schema: StructType, + aggregatedColumns: BaseAggregatedColumns): Iterator[LightGBMBooster] = { + val columnParams = getColumnParams + val datasetParams = getDatasetParams(trainParams.categoricalFeatures, trainParams.executionParams.numThreads) + beforeGenerateTrainDataset(batchIndex, columnParams, schema, log, trainParams) + val trainDataset = generateDataset(aggregatedColumns, None, schema, datasetParams) + try { + afterGenerateTrainDataset(batchIndex, columnParams, schema, log, trainParams) + + val validDatasetOpt = validationData.map { vd => + beforeGenerateValidDataset(batchIndex, columnParams, schema, log, trainParams) + val out = generateDataset(vd, Some(trainDataset), schema, datasetParams) + afterGenerateValidDataset(batchIndex, columnParams, schema, log, trainParams) + out + } + + try { + val booster = createBooster(trainParams, trainDataset, validDatasetOpt) + try { + val bestIterResult = trainCore(batchIndex, trainParams, booster, log, validDatasetOpt.isDefined) + if (returnBooster) { + val model = booster.saveToString() + val modelBooster = new LightGBMBooster(model) + // Set best iteration on booster if hit early stopping criteria in trainCore + bestIterResult.foreach(modelBooster.setBestIteration) + Iterator.single(modelBooster) + } else { + Iterator.empty + } + } finally { + // Free booster + booster.freeNativeMemory() + } + } finally { + validDatasetOpt.foreach(_.close()) + } + } finally { + trainDataset.close() + } + } + + private def trainLightGBM(batchIndex: Int, + networkParams: NetworkParams, + validationData: Option[Broadcast[Array[Row]]], + trainParams: TrainParams, + numTasksPerExec: Int, + schema: StructType, + sharedState: SharedState) + (inputRows: Iterator[Row]): Iterator[LightGBMBooster] = { + val useSingleDatasetMode = trainParams.executionParams.useSingleDatasetMode + val emptyPartition = !inputRows.hasNext + val isEnabledWorker = if (!emptyPartition) isWorkerEnabled(trainParams, log, sharedState) else false + // Initialize the native library + LightGBMUtils.initializeNativeLibrary() + // Initialize the network communication + val (nodes, localListenPort) = getNetworkInfo(networkParams, numTasksPerExec, log, isEnabledWorker) + if (emptyPartition) { + log.warn("LightGBM task encountered empty partition, for best performance ensure no partitions empty") + List[LightGBMBooster]().toIterator + } else { + if (isEnabledWorker) { + log.info(s"LightGBM task listening on: $localListenPort") + if (useSingleDatasetMode) sharedState.helperStartSignal.countDown() + } else { + sharedState.helperStartSignal.await() + } + val (aggregatedColumns, aggregatedValidationColumns) = prepareDatasets( + inputRows, validationData, sharedState) + // Return booster only from main worker to reduce network communication overhead + val returnBooster = getReturnBooster(isEnabledWorker, nodes, log, numTasksPerExec, localListenPort) + try { + if (isEnabledWorker) { + // If worker enabled, initialize the network ring of communication + networkInit(nodes, localListenPort, log, LightGBMConstants.NetworkRetries, LightGBMConstants.InitialDelay) + if (useSingleDatasetMode) sharedState.doneSignal.await() + translate(batchIndex, aggregatedValidationColumns, trainParams, returnBooster, schema, aggregatedColumns) + } else { + log.info("Helper task finished processing rows") + sharedState.doneSignal.countDown() + List[LightGBMBooster]().toIterator + } + } finally { + // Finalize network when done + if (isEnabledWorker) LightGBMUtils.validate(lightgbmlib.LGBM_NetworkFree(), "Finalize network") + } + } + } + + /** + * Opens a socket communications channel on the driver, starts a thread that + * waits for the host:port from the executors, and then sends back the + * information to the executors. + * + * @param numTasks The total number of training tasks to wait for. + * @return The address and port of the driver socket. + */ + private def createDriverNodesThread(numTasks: Int, spark: SparkSession): (String, Int, Future[Unit]) = { + // Start a thread and open port to listen on + implicit val context: ExecutionContextExecutor = + ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor()) + val driverServerSocket = new ServerSocket(getDriverListenPort) + // Set timeout on socket + val duration = Duration(getTimeout, SECONDS) + if (duration.isFinite()) { + driverServerSocket.setSoTimeout(duration.toMillis.toInt) + } + val f = Future { + var emptyTaskCounter = 0 + val hostAndPorts = ListBuffer[(Socket, String)]() + if (getUseBarrierExecutionMode) { + log.info(s"driver using barrier execution mode") + + def connectToWorkers: Boolean = handleConnection(driverServerSocket, log, + hostAndPorts) == Finished || connectToWorkers + + connectToWorkers + } else { + log.info(s"driver expecting $numTasks connections...") + while (hostAndPorts.size + emptyTaskCounter < numTasks) { + val connectionResult = handleConnection(driverServerSocket, log, hostAndPorts) + if (connectionResult == ConnectionState.EmptyTask) emptyTaskCounter += 1 + } + } + // Concatenate with commas, eg: host1:port1,host2:port2, ... etc + val allConnections = hostAndPorts.map(_._2).mkString(",") + log.info(s"driver writing back to all connections: $allConnections") + // Send data back to all tasks and helper tasks on executors + sendDataToExecutors(hostAndPorts, allConnections) + closeConnections(log, hostAndPorts, driverServerSocket) + } + val host = ClusterUtil.getDriverHost(spark) + val port = driverServerSocket.getLocalPort + log.info(s"driver waiting for connections on host: $host and port: $port") + (host, port, f) + } + /** * Inner train method for LightGBM learners. Calculates the number of workers, * creates a driver thread, and runs mapPartitions on the dataset. - * @param dataset The dataset to train on. + * + * @param dataset The dataset to train on. * @param batchIndex In running in batch training mode, gets the batch number. * @return The LightGBM Model from the trained LightGBM Booster. */ @@ -223,44 +444,45 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine val numTasks = if (getNumTasks > 0) getNumTasks else { - val numExecutorTasks = ClusterUtil.getNumExecutorTasks(dataset, numTasksPerExec, log) + val numExecutorTasks = ClusterUtil.getNumExecutorTasks(dataset.sparkSession, numTasksPerExec, log) min(numExecutorTasks, dataset.rdd.getNumPartitions) } - // Only get the relevant columns - val trainingCols = getTrainingCols() + val df = prepareDataframe(dataset, numTasks) - val df = prepareDataframe(dataset, trainingCols, numTasks) - - val (inetAddress, port, future) = - LightGBMUtils.createDriverNodesThread(numTasks, df, log, getTimeout, getUseBarrierExecutionMode, - getDriverListenPort) + val (inetAddress, port, future) = createDriverNodesThread(numTasks, df.sparkSession) /* Run a parallel job via map partitions to initialize the native library and network, * translate the data to the LightGBM in-memory representation and train the models */ val encoder = Encoders.kryo[LightGBMBooster] - val trainParams = getTrainParams(numTasks, getCategoricalIndexes(df), dataset) + val trainParams = getTrainParams(numTasks, dataset, numTasksPerExec) + log.info(s"LightGBM parameters: ${trainParams.toString()}") val networkParams = NetworkParams(getDefaultListenPort, inetAddress, port, getUseBarrierExecutionMode) + val (trainingData, validationData) = if (get(validationIndicatorCol).isDefined && dataset.columns.contains(getValidationIndicatorCol)) (df.filter(x => !x.getBoolean(x.fieldIndex(getValidationIndicatorCol))), Some(sc.broadcast(preprocessData(df.filter(x => x.getBoolean(x.fieldIndex(getValidationIndicatorCol)))).collect()))) else (df, None) + val preprocessedDF = preprocessData(trainingData) val schema = preprocessedDF.schema - val columnParams = ColumnParams(getLabelCol, getFeaturesCol, get(weightCol), get(initScoreCol), getOptGroupCol) - validateSlotNames(preprocessedDF, columnParams, trainParams) - val mapPartitionsFunc = PartitionProcessor.trainLightGBM(batchIndex, networkParams, columnParams, - validationData, log, trainParams, numTasksPerExec, schema)(_) - val lightGBMBooster = - if (getUseBarrierExecutionMode) { - preprocessedDF.rdd.barrier().mapPartitions(mapPartitionsFunc).reduce((booster1, _) => booster1) - } else { - preprocessedDF.mapPartitions(mapPartitionsFunc)(encoder).reduce((booster1, _) => booster1) - } + + validateSlotNames(schema(getFeaturesCol)) + + val sharedState = SharedSingleton(new SharedState(getColumnParams, schema, trainParams)) + val mapPartitionsFunc = trainLightGBM(batchIndex, networkParams, + validationData, trainParams, numTasksPerExec, schema, sharedState.get)(_) + + val lightGBMBooster = if (getUseBarrierExecutionMode) { + preprocessedDF.rdd.barrier().mapPartitions(mapPartitionsFunc).reduce((booster1, _) => booster1) + } else { + preprocessedDF.mapPartitions(mapPartitionsFunc)(encoder).reduce((booster1, _) => booster1) + } + // Wait for future to complete (should be done by now) Await.result(future, Duration(getTimeout, SECONDS)) getModel(trainParams, lightGBMBooster) @@ -280,12 +502,12 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine /** Gets the training parameters. * - * @param numTasks The total number of tasks. - * @param categoricalIndexes The indexes of the categorical slots in the features vector. - * @param dataset The training dataset. + * @param numTasks The total number of tasks. + * @param dataset The training dataset. + * @param numTasksPerExec The number of tasks per executor. * @return train parameters. */ - protected def getTrainParams(numTasks: Int, categoricalIndexes: Array[Int], dataset: Dataset[_]): TrainParams + protected def getTrainParams(numTasks: Int, dataset: Dataset[_], numTasksPerExec: Int): TrainParams protected def stringFromTrainedModel(model: TrainedModel): String diff --git a/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMClassifier.scala b/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMClassifier.scala index 57931cb6b0..312d624642 100644 --- a/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMClassifier.scala +++ b/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMClassifier.scala @@ -40,11 +40,12 @@ class LightGBMClassifier(override val uid: String) def getIsUnbalance: Boolean = $(isUnbalance) def setIsUnbalance(value: Boolean): this.type = set(isUnbalance, value) - def getTrainParams(numTasks: Int, categoricalIndexes: Array[Int], dataset: Dataset[_]): TrainParams = { + def getTrainParams(numTasks: Int, dataset: Dataset[_], numTasksPerExec: Int): TrainParams = { /* The native code for getting numClasses is always 1 unless it is multiclass-classification problem * so we infer the actual numClasses from the dataset here */ val actualNumClasses = getNumClasses(dataset) + val categoricalIndexes = getCategoricalIndexes(dataset.schema(getFeaturesCol)) val modelStr = if (getModelString == null || getModelString.isEmpty) None else get(modelString) ClassifierTrainParams(getParallelism, getTopK, getNumIterations, getLearningRate, getNumLeaves, getMaxBin, getBinSampleCount, getBaggingFraction, getPosBaggingFraction, getNegBaggingFraction, @@ -53,7 +54,7 @@ class LightGBMClassifier(override val uid: String) getIsUnbalance, getVerbosity, categoricalIndexes, actualNumClasses, getBoostFromAverage, getBoostingType, getLambdaL1, getLambdaL2, getIsProvideTrainingMetric, getMetric, getMinGainToSplit, getMaxDeltaStep, getMaxBinByFeature, getMinDataInLeaf, getSlotNames, - getDelegate, getDartParams(), getExecutionParams(), getObjectiveParams()) + getDelegate, getDartParams, getExecutionParams, getObjectiveParams) } def getModel(trainParams: TrainParams, lightGBMBooster: LightGBMBooster): LightGBMClassificationModel = { diff --git a/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMConstants.scala b/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMConstants.scala index 094c4e207c..97989dcccb 100644 --- a/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMConstants.scala +++ b/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMConstants.scala @@ -31,6 +31,9 @@ object LightGBMConstants { /** Multiclass classification objective */ val MulticlassObjective: String = "multiclass" + /** Enabled task, used to indicate task that creates lightgbm dataset and runs training. + */ + val EnabledTask: String = "enabledTask" /** Ignore task status, used to ignore tasks that get empty partitions */ val IgnoreStatus: String = "ignore" diff --git a/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMRanker.scala b/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMRanker.scala index 8a4286790e..037bec175f 100644 --- a/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMRanker.scala +++ b/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMRanker.scala @@ -51,7 +51,8 @@ class LightGBMRanker(override val uid: String) def getEvalAt: Array[Int] = $(evalAt) def setEvalAt(value: Array[Int]): this.type = set(evalAt, value) - def getTrainParams(numTasks: Int, categoricalIndexes: Array[Int], dataset: Dataset[_]): TrainParams = { + def getTrainParams(numTasks: Int, dataset: Dataset[_], numTasksPerExec: Int): TrainParams = { + val categoricalIndexes = getCategoricalIndexes(dataset.schema(getFeaturesCol)) val modelStr = if (getModelString == null || getModelString.isEmpty) None else get(modelString) RankerTrainParams(getParallelism, getTopK, getNumIterations, getLearningRate, getNumLeaves, getMaxBin, getBinSampleCount, getBaggingFraction, getPosBaggingFraction, getNegBaggingFraction, @@ -59,8 +60,8 @@ class LightGBMRanker(override val uid: String) getFeatureFraction, getMaxDepth, getMinSumHessianInLeaf, numTasks, modelStr, getVerbosity, categoricalIndexes, getBoostingType, getLambdaL1, getLambdaL2, getMaxPosition, getLabelGain, getIsProvideTrainingMetric, getMetric, getEvalAt, getMinGainToSplit, getMaxDeltaStep, - getMaxBinByFeature, getMinDataInLeaf, getSlotNames, getDelegate, getDartParams(), getExecutionParams(), - getObjectiveParams()) + getMaxBinByFeature, getMinDataInLeaf, getSlotNames, getDelegate, getDartParams, + getExecutionParams, getObjectiveParams) } def getModel(trainParams: TrainParams, lightGBMBooster: LightGBMBooster): LightGBMRankerModel = { @@ -89,8 +90,7 @@ class LightGBMRanker(override val uid: String) override def copy(extra: ParamMap): LightGBMRanker = defaultCopy(extra) - override def prepareDataframe(dataset: Dataset[_], trainingCols: Array[(String, Seq[DataType])], - numTasks: Int): DataFrame = { + override def prepareDataframe(dataset: Dataset[_], numTasks: Int): DataFrame = { if (getRepartitionByGroupingColumn) { val repartitionedDataset = getOptGroupCol match { case None => dataset @@ -101,9 +101,9 @@ class LightGBMRanker(override val uid: String) df } } - super.prepareDataframe(repartitionedDataset, trainingCols, numTasks) + super.prepareDataframe(repartitionedDataset, numTasks) } else { - super.prepareDataframe(dataset, trainingCols, numTasks) + super.prepareDataframe(dataset, numTasks) } } } diff --git a/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMRegressor.scala b/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMRegressor.scala index cdbf2f6e08..c0333e3e29 100644 --- a/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMRegressor.scala +++ b/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMRegressor.scala @@ -58,7 +58,8 @@ class LightGBMRegressor(override val uid: String) def getTweedieVariancePower: Double = $(tweedieVariancePower) def setTweedieVariancePower(value: Double): this.type = set(tweedieVariancePower, value) - def getTrainParams(numTasks: Int, categoricalIndexes: Array[Int], dataset: Dataset[_]): TrainParams = { + def getTrainParams(numTasks: Int, dataset: Dataset[_], numTasksPerExec: Int): TrainParams = { + val categoricalIndexes = getCategoricalIndexes(dataset.schema(getFeaturesCol)) val modelStr = if (getModelString == null || getModelString.isEmpty) None else get(modelString) RegressorTrainParams(getParallelism, getTopK, getNumIterations, getLearningRate, getNumLeaves, getAlpha, getTweedieVariancePower, getMaxBin, getBinSampleCount, getBaggingFraction, getPosBaggingFraction, @@ -66,7 +67,7 @@ class LightGBMRegressor(override val uid: String) getFeatureFraction, getMaxDepth, getMinSumHessianInLeaf, numTasks, modelStr, getVerbosity, categoricalIndexes, getBoostFromAverage, getBoostingType, getLambdaL1, getLambdaL2, getIsProvideTrainingMetric, getMetric, getMinGainToSplit, getMaxDeltaStep, getMaxBinByFeature, getMinDataInLeaf, getSlotNames, getDelegate, - getDartParams(), getExecutionParams(), getObjectiveParams()) + getDartParams, getExecutionParams, getObjectiveParams) } def getModel(trainParams: TrainParams, lightGBMBooster: LightGBMBooster): LightGBMRegressionModel = { diff --git a/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMUtils.scala b/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMUtils.scala index d5db122f87..58d5e2eebb 100644 --- a/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMUtils.scala +++ b/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMUtils.scala @@ -3,29 +3,19 @@ package com.microsoft.ml.spark.lightgbm -import java.io._ -import java.net.{ServerSocket, Socket} -import java.util.concurrent.Executors - import com.microsoft.ml.lightgbm._ import com.microsoft.ml.spark.core.env.NativeLoader -import com.microsoft.ml.spark.core.utils.ClusterUtil import com.microsoft.ml.spark.featurize.{Featurize, FeaturizeUtilities} -import com.microsoft.ml.spark.lightgbm.dataset.LightGBMDataset +import com.microsoft.ml.spark.lightgbm.ConnectionState._ import com.microsoft.ml.spark.lightgbm.params.TrainParams -import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.ml.PipelineModel -import org.apache.spark.ml.attribute._ -import org.apache.spark.ml.linalg.SparseVector -import org.apache.spark.sql.{DataFrame, Dataset} +import org.apache.spark.sql.Dataset +import org.apache.spark.{SparkEnv, TaskContext} import org.slf4j.Logger -import ConnectionState._ - -import scala.collection.immutable.HashSet +import java.io._ +import java.net.{ServerSocket, Socket} import scala.collection.mutable.ListBuffer -import scala.concurrent.duration.{Duration, SECONDS} -import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future} /** Helper utilities for LightGBM learners */ object LightGBMUtils { @@ -56,49 +46,14 @@ object LightGBMUtils { groupColumn: Option[String] = None, oneHotEncodeCategoricals: Boolean = true): PipelineModel = { // Create pipeline model to featurize the dataset - val featuresToHashTo = FeaturizeUtilities.NumFeaturesTreeOrNNBased val featureColumns = dataset.columns.filter(col => col != labelColumn && !weightColumn.contains(col) && !groupColumn.contains(col)).toSeq - val featurizer = new Featurize() + new Featurize() .setOutputCol(featuresColumn) .setInputCols(featureColumns.toArray) .setOneHotEncodeCategoricals(oneHotEncodeCategoricals) - .setNumFeatures(featuresToHashTo) - featurizer.fit(dataset) - } - - def getCategoricalIndexes(df: DataFrame, - featuresCol: String, - slotNames: Array[String], - categoricalColumnIndexes: Array[Int], - categoricalColumnSlotNames: Array[String]): Array[Int] = { - val categoricalIndexes = if(slotNames.nonEmpty) { - categoricalColumnSlotNames.map(slotNames.indexOf(_)) - } else { - val categoricalSlotNamesSet = HashSet(categoricalColumnSlotNames: _*) - val featuresSchema = df.schema(featuresCol) - val metadata = AttributeGroup.fromStructField(featuresSchema) - if (metadata.attributes.isEmpty) Array[Int]() - else { - metadata.attributes.get.zipWithIndex.flatMap { - case (null, _) => Iterator() - case (attr, idx) => - if (attr.name.isDefined && categoricalSlotNamesSet.contains(attr.name.get)) { - Iterator(idx) - } else { - attr match { - case _: NumericAttribute | UnresolvedAttribute => Iterator() - // Note: it seems that BinaryAttribute is not considered categorical, - // since all OHE cols are marked with this, but StringIndexed are always Nominal - case _: BinaryAttribute => Iterator() - case _: NominalAttribute => Iterator(idx) - } - } - } - } - } - - categoricalColumnIndexes.union(categoricalIndexes).distinct + .setNumFeatures(FeaturizeUtilities.NumFeaturesTreeOrNNBased) + .fit(dataset) } def sendDataToExecutors(hostAndPorts: ListBuffer[(Socket, String)], allConnections: String): Unit = { @@ -148,59 +103,11 @@ object LightGBMUtils { } } - /** - * Opens a socket communications channel on the driver, starts a thread that - * waits for the host:port from the executors, and then sends back the - * information to the executors. - * - * @param numTasks The total number of training tasks to wait for. - * @return The address and port of the driver socket. - */ - def createDriverNodesThread(numTasks: Int, df: DataFrame, - log: Logger, timeout: Double, - barrierExecutionMode: Boolean, - driverServerPort: Int): (String, Int, Future[Unit]) = { - // Start a thread and open port to listen on - implicit val context: ExecutionContextExecutor = - ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor()) - val driverServerSocket = new ServerSocket(driverServerPort) - // Set timeout on socket - val duration = Duration(timeout, SECONDS) - if (duration.isFinite()) { - driverServerSocket.setSoTimeout(duration.toMillis.toInt) - } - val f = Future { - var emptyTaskCounter = 0 - val hostAndPorts = ListBuffer[(Socket, String)]() - if (barrierExecutionMode) { - log.info(s"driver using barrier execution mode") - def connectToWorkers: Boolean = handleConnection(driverServerSocket, log, - hostAndPorts) == Finished || connectToWorkers - connectToWorkers - } else { - log.info(s"driver expecting $numTasks connections...") - while (hostAndPorts.size + emptyTaskCounter < numTasks) { - val connectionResult = handleConnection(driverServerSocket, log, hostAndPorts) - if (connectionResult == ConnectionState.EmptyTask) emptyTaskCounter += 1 - } - } - // Concatenate with commas, eg: host1:port1,host2:port2, ... etc - val allConnections = hostAndPorts.map(_._2).mkString(",") - log.info(s"driver writing back to all connections: $allConnections") - // Send data back to all tasks and helper tasks on executors - sendDataToExecutors(hostAndPorts, allConnections) - closeConnections(log, hostAndPorts, driverServerSocket) - } - val host = ClusterUtil.getDriverHost(df) - val port = driverServerSocket.getLocalPort - log.info(s"driver waiting for connections on host: $host and port: $port") - (host, port, f) - } /** Returns an integer ID for the current worker. * @return In cluster, returns the executor id. In local case, returns the partition id. */ - def getWorkerId(): Int = { + def getWorkerId: Int = { val executorId = SparkEnv.get.executorId val ctx = TaskContext.get val partId = ctx.partitionId @@ -213,7 +120,7 @@ object LightGBMUtils { /** Returns true if spark is run in local mode. * @return True if spark is run in local mode. */ - def isLocalExecution(): Boolean = { + def isLocalExecution: Boolean = { val executorId = SparkEnv.get.executorId executorId == "driver" } @@ -221,17 +128,18 @@ object LightGBMUtils { /** Returns a unique task Id for the current task run on the executor. * @return A unique task id. */ - def getTaskId(): Long = { + def getTaskId: Long = { val ctx = TaskContext.get val taskId = ctx.taskAttemptId() taskId } def getNumRowsForChunksArray(numRows: Int, chunkSize: Int): SWIGTYPE_p_int = { - var numChunks = Math.floorDiv(numRows, chunkSize) - var leftoverChunk = numRows % chunkSize - if (leftoverChunk > 0) { - numChunks += 1 + val leftoverChunk = numRows % chunkSize + val numChunks = if (leftoverChunk > 0) { + Math.floorDiv(numRows, chunkSize) + 1 + }else{ + Math.floorDiv(numRows, chunkSize) } val numRowsForChunks = lightgbmlib.new_intArray(numChunks) (0 until numChunks).foreach({ index: Int => @@ -244,64 +152,6 @@ object LightGBMUtils { numRowsForChunks } - def getDatasetParams(trainParams: TrainParams): String = { - val datasetParams = s"max_bin=${trainParams.maxBin} is_pre_partition=True " + - s"bin_construct_sample_cnt=${trainParams.binSampleCount} " + - s"num_threads=${trainParams.executionParams.numThreads} " + - (if (trainParams.categoricalFeatures.isEmpty) "" - else s"categorical_feature=${trainParams.categoricalFeatures.mkString(",")}") - datasetParams - } - def generateDenseDataset(numRows: Int, numCols: Int, featuresArray: doubleChunkedArray, - referenceDataset: Option[LightGBMDataset], - featureNamesOpt: Option[Array[String]], - trainParams: TrainParams, chunkSize: Int): LightGBMDataset = { - val isRowMajor = 1 - val datasetOutPtr = lightgbmlib.voidpp_handle() - val datasetParams = getDatasetParams(trainParams) - val data64bitType = lightgbmlibConstants.C_API_DTYPE_FLOAT64 - var data: Option[(SWIGTYPE_p_void, SWIGTYPE_p_double)] = None - val numRowsForChunks = getNumRowsForChunksArray(numRows, chunkSize) - try { - // Generate the dataset for features - featuresArray.get_last_chunk_add_count() - LightGBMUtils.validate(lightgbmlib.LGBM_DatasetCreateFromMats(featuresArray.get_chunks_count().toInt, - featuresArray.data_as_void(), data64bitType, - numRowsForChunks, numCols, - isRowMajor, datasetParams, referenceDataset.map(_.datasetPtr).orNull, datasetOutPtr), - "Dataset create") - } finally { - featuresArray.release() - lightgbmlib.delete_intArray(numRowsForChunks) - } - val dataset = new LightGBMDataset(lightgbmlib.voidpp_value(datasetOutPtr)) - dataset.setFeatureNames(featureNamesOpt, numCols) - dataset - } - /** Generates a sparse dataset in CSR format. - * - * @param sparseRows The rows of sparse vector. - * @return - */ - def generateSparseDataset(sparseRows: Array[SparseVector], - referenceDataset: Option[LightGBMDataset], - featureNamesOpt: Option[Array[String]], - trainParams: TrainParams): LightGBMDataset = { - val numCols = sparseRows(0).size - - val datasetOutPtr = lightgbmlib.voidpp_handle() - val datasetParams = getDatasetParams(trainParams) - // Generate the dataset for features - LightGBMUtils.validate(lightgbmlib.LGBM_DatasetCreateFromCSRSpark( - sparseRows.asInstanceOf[Array[Object]], - sparseRows.length, - numCols, datasetParams, referenceDataset.map(_.datasetPtr).orNull, - datasetOutPtr), - "Dataset create") - val dataset = new LightGBMDataset(lightgbmlib.voidpp_value(datasetOutPtr)) - dataset.setFeatureNames(featureNamesOpt, numCols) - dataset - } } diff --git a/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/PartitionProcessor.scala b/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/PartitionProcessor.scala deleted file mode 100644 index b732c32ddd..0000000000 --- a/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/PartitionProcessor.scala +++ /dev/null @@ -1,89 +0,0 @@ -// Copyright (C) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. See LICENSE in project root for information. - -package com.microsoft.ml.spark.lightgbm - -import com.microsoft.ml.lightgbm.lightgbmlib -import com.microsoft.ml.spark.lightgbm.TrainUtils.{afterGenerateTrainDataset, afterGenerateValidDataset, - beforeGenerateTrainDataset, beforeGenerateValidDataset, createBooster, getNetworkInfo, getReturnBooster, - networkInit, trainCore} -import com.microsoft.ml.spark.lightgbm.booster.LightGBMBooster -import com.microsoft.ml.spark.lightgbm.dataset.{DatasetUtils, LightGBMDataset} -import com.microsoft.ml.spark.lightgbm.params.TrainParams -import org.apache.spark.broadcast.Broadcast -import org.apache.spark.sql.Row -import org.apache.spark.sql.types.StructType -import org.slf4j.Logger - -object PartitionProcessor { - def trainLightGBM(batchIndex: Int, networkParams: NetworkParams, columnParams: ColumnParams, - validationData: Option[Broadcast[Array[Row]]], log: Logger, - trainParams: TrainParams, numTasksPerExec: Int, schema: StructType) - (inputRows: Iterator[Row]): Iterator[LightGBMBooster] = { - val emptyPartition = !inputRows.hasNext - val isEnabledWorker = !emptyPartition - // Initialize the native library - LightGBMUtils.initializeNativeLibrary() - // Initialize the network communication - val (nodes, localListenPort) = getNetworkInfo(networkParams, numTasksPerExec, log, isEnabledWorker) - if (emptyPartition) { - log.warn("LightGBM task encountered empty partition, for best performance ensure no partitions empty") - List[LightGBMBooster]().toIterator - } else { - log.info(s"LightGBM task listening on: $localListenPort") - // Return booster only from main worker to reduce network communication overhead - val returnBooster = getReturnBooster(isEnabledWorker, nodes, log, numTasksPerExec, localListenPort) - try { - // If worker enabled, initialize the network ring of communication - networkInit(nodes, localListenPort, log, LightGBMConstants.NetworkRetries, LightGBMConstants.InitialDelay) - translate(batchIndex, columnParams, validationData, log, trainParams, returnBooster, schema, inputRows) - } finally { - // Finalize network when done - if (isEnabledWorker) LightGBMUtils.validate(lightgbmlib.LGBM_NetworkFree(), "Finalize network") - } - } - } - - def translate(batchIndex: Int, columnParams: ColumnParams, validationData: Option[Broadcast[Array[Row]]], - log: Logger, trainParams: TrainParams, returnBooster: Boolean, - schema: StructType, inputRows: Iterator[Row]): Iterator[LightGBMBooster] = { - var trainDatasetOpt: Option[LightGBMDataset] = None - var validDatasetOpt: Option[LightGBMDataset] = None - try { - beforeGenerateTrainDataset(batchIndex, columnParams, schema, log, trainParams) - trainDatasetOpt = DatasetUtils.generateDataset(inputRows, columnParams, None, schema, - log, trainParams) - afterGenerateTrainDataset(batchIndex, columnParams, schema, log, trainParams) - - if (validationData.isDefined) { - beforeGenerateValidDataset(batchIndex, columnParams, schema, log, trainParams) - validDatasetOpt = DatasetUtils.generateDataset(validationData.get.value.toIterator, columnParams, - trainDatasetOpt, schema, log, trainParams) - afterGenerateValidDataset(batchIndex, columnParams, schema, log, trainParams) - } - - var boosterOpt: Option[LightGBMBooster] = None - try { - val booster = createBooster(trainParams, trainDatasetOpt.get, validDatasetOpt) - boosterOpt = Some(booster) - val bestIterResult = trainCore(batchIndex, trainParams, booster, log, validDatasetOpt.isDefined) - if (returnBooster) { - val model = booster.saveToString() - val modelBooster = new LightGBMBooster(model) - // Set best iteration on booster if hit early stopping criteria in trainCore - bestIterResult.foreach(modelBooster.setBestIteration(_)) - Iterator.single(modelBooster) - } else { - Iterator.empty - } - } finally { - // Free booster - boosterOpt.foreach(_.freeNativeMemory()) - } - } finally { - // Free datasets - trainDatasetOpt.foreach(_.close()) - validDatasetOpt.foreach(_.close()) - } - } -} diff --git a/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/SharedState.scala b/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/SharedState.scala new file mode 100644 index 0000000000..d61337e19c --- /dev/null +++ b/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/SharedState.scala @@ -0,0 +1,106 @@ +// Copyright (C) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See LICENSE in project root for information. + +package com.microsoft.ml.spark.lightgbm + +import java.util.concurrent.CountDownLatch + +import com.microsoft.ml.spark.lightgbm.dataset.DatasetUtils._ +import com.microsoft.ml.spark.lightgbm.dataset._ +import com.microsoft.ml.spark.lightgbm.params.TrainParams +import org.apache.spark.ml.linalg.{DenseVector, SparseVector} +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.StructType +import org.slf4j.Logger + +class SharedState(columnParams: ColumnParams, + schema: StructType, + trainParams: TrainParams) { + val mainExecutorWorker: Long = LightGBMUtils.getTaskId + val useSingleDataset: Boolean = trainParams.executionParams.useSingleDatasetMode + val chunkSize: Int = trainParams.executionParams.chunkSize + val matrixType: String = trainParams.executionParams.matrixType + + lazy val denseAggregatedColumns: BaseDenseAggregatedColumns = new DenseSyncAggregatedColumns(chunkSize) + + lazy val sparseAggregatedColumns: BaseSparseAggregatedColumns = new SparseSyncAggregatedColumns(chunkSize) + + def getArrayType(rowsIter: Iterator[Row], matrixType: String): (Iterator[Row], Boolean) = { + if (matrixType == "auto") { + sampleRowsForArrayType(rowsIter, columnParams) + } else if (matrixType == "sparse") { + (rowsIter: Iterator[Row], true) + } else if (matrixType == "dense") { + (rowsIter: Iterator[Row], false) + } else { + throw new Exception(s"Invalid parameter matrix type specified: ${matrixType}") + } + } + + def prep(iter: Iterator[Row]): BaseChunkedColumns = { + val (concatRowsIter: Iterator[Row], isSparseHere: Boolean) = getArrayType(iter, matrixType) + val peekableIter = new PeekingIterator(concatRowsIter) + // Note: the first worker sets "is sparse", other workers read it + linkIsSparse(isSparseHere) + + if (!isSparse.get) { + new DenseChunkedColumns(peekableIter, columnParams, schema, chunkSize) + } else { + new SparseChunkedColumns(peekableIter, columnParams, schema, chunkSize, useSingleDataset) + } + } + + def merge(ts: BaseChunkedColumns): BaseAggregatedColumns = { + val isSparseVal = isSparse.get + val aggregatedColumns = if (!isSparseVal) { + if (useSingleDataset) denseAggregatedColumns + else new DenseAggregatedColumns(chunkSize) + } else { + if (useSingleDataset) sparseAggregatedColumns + else new SparseAggregatedColumns(chunkSize) + } + aggregatedColumns.incrementCount(ts) + if (useSingleDataset) { + arrayProcessedSignal.countDown() + arrayProcessedSignal.await() + } + aggregatedColumns.addRows(ts) + ts.release() + aggregatedColumns + } + + @volatile var isSparse: Option[Boolean] = None + + def linkIsSparse(isSparse: Boolean): Unit = { + if (this.isSparse.isEmpty) { + this.synchronized { + if (this.isSparse.isEmpty) { + this.isSparse = Some(isSparse) + } + } + } + } + + @volatile var arrayProcessedSignal: CountDownLatch = new CountDownLatch(0) + + def incrementArrayProcessedSignal(log: Logger): Int = { + this.synchronized { + val count = arrayProcessedSignal.getCount.toInt + 1 + arrayProcessedSignal = new CountDownLatch(count) + log.info(s"Task incrementing ArrayProcessedSignal to $count") + count + } + } + + @volatile var doneSignal: CountDownLatch = new CountDownLatch(0) + + def incrementDoneSignal(log: Logger): Unit = { + this.synchronized { + val count = doneSignal.getCount.toInt + 1 + doneSignal = new CountDownLatch(count) + log.info(s"Task incrementing DoneSignal to $count") + } + } + + @volatile var helperStartSignal: CountDownLatch = new CountDownLatch(1) +} diff --git a/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/TaskTrainingMethods.scala b/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/TaskTrainingMethods.scala new file mode 100644 index 0000000000..1c1944a399 --- /dev/null +++ b/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/TaskTrainingMethods.scala @@ -0,0 +1,53 @@ +// Copyright (C) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See LICENSE in project root for information. + +package com.microsoft.ml.spark.lightgbm + +import com.microsoft.ml.spark.lightgbm.dataset.BaseAggregatedColumns +import com.microsoft.ml.spark.lightgbm.params.TrainParams +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.Row +import org.slf4j.Logger + +object TaskTrainingMethods { + /** If using single dataset mode, only returns one task in JVM. + * Otherwise, returns true for all tasks. + * @param trainParams The training parameters. + * @param log The logger. + * @return Whether the current task is enabled. + */ + def isWorkerEnabled(trainParams: TrainParams, log: Logger, sharedState: SharedState): Boolean = { + if (trainParams.executionParams.useSingleDatasetMode) { + // Find all workers in current JVM + val mainExecutorWorker = sharedState.mainExecutorWorker + val myTaskId = LightGBMUtils.getTaskId + val isMainWorker = mainExecutorWorker == myTaskId + log.info(s"Using singleDatasetMode. " + + s"Is main worker: ${isMainWorker} for task id: ${myTaskId} and main task id: ${mainExecutorWorker}") + sharedState.incrementArrayProcessedSignal(log) + if (!isMainWorker) { + sharedState.incrementDoneSignal(log) + } + isMainWorker + } else { + true + } + } + + def prepareDatasets(inputRows: Iterator[Row], + validationData: Option[Broadcast[Array[Row]]], + sharedState: SharedState): (BaseAggregatedColumns, Option[BaseAggregatedColumns]) = { + val aggregatedColumns = { + val prepAggregatedColumns = sharedState.prep(inputRows) + sharedState.merge(prepAggregatedColumns) + } + + val aggregatedValidationColumns = validationData.map { data => + val prepAggregatedColumns = sharedState.prep(data.value.toIterator) + sharedState.merge(prepAggregatedColumns) + } + (aggregatedColumns, aggregatedValidationColumns) + } + + +} diff --git a/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/TrainUtils.scala b/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/TrainUtils.scala index 1c420b1d06..16f554c916 100644 --- a/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/TrainUtils.scala +++ b/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/TrainUtils.scala @@ -10,19 +10,12 @@ import com.microsoft.ml.lightgbm._ import com.microsoft.ml.spark.core.env.StreamUtilities._ import com.microsoft.ml.spark.core.utils.FaultToleranceUtils import com.microsoft.ml.spark.lightgbm.booster.LightGBMBooster -import com.microsoft.ml.spark.lightgbm.dataset.{DatasetUtils, LightGBMDataset} +import com.microsoft.ml.spark.lightgbm.dataset.LightGBMDataset import com.microsoft.ml.spark.lightgbm.params.{ClassifierTrainParams, TrainParams} import org.apache.spark.{BarrierTaskContext, TaskContext} -import org.apache.spark.broadcast.Broadcast -import org.apache.spark.ml.attribute._ -import org.apache.spark.ml.linalg.{DenseVector, SparseVector} -import org.apache.spark.ml.linalg.SQLDataTypes.VectorType -import org.apache.spark.sql.Row import org.apache.spark.sql.types.StructType import org.slf4j.Logger -import scala.collection.mutable.ListBuffer - case class NetworkParams(defaultListenPort: Int, addr: String, port: Int, barrierExecutionMode: Boolean) case class ColumnParams(labelColumn: String, featuresColumn: String, weightColumn: Option[String], initScoreColumn: Option[String], groupColumn: Option[String]) @@ -198,7 +191,7 @@ private object TrainUtils extends Serializable { } private def findOpenPort(defaultListenPort: Int, numTasksPerExec: Int, log: Logger): Socket = { - val basePort = defaultListenPort + (LightGBMUtils.getWorkerId() * numTasksPerExec) + val basePort = defaultListenPort + (LightGBMUtils.getWorkerId * numTasksPerExec) if (basePort > LightGBMConstants.MaxPort) { throw new Exception(s"Error: port $basePort out of range, possibly due to too many executors or unknown error") } diff --git a/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/dataset/DatasetAggregator.scala b/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/dataset/DatasetAggregator.scala new file mode 100644 index 0000000000..151ce98e36 --- /dev/null +++ b/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/dataset/DatasetAggregator.scala @@ -0,0 +1,515 @@ +// Copyright (C) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See LICENSE in project root for information. + +package com.microsoft.ml.spark.lightgbm.dataset + +import com.microsoft.ml.lightgbm.{SWIGTYPE_p_int, lightgbmlib, lightgbmlibConstants} + +import java.util.concurrent.atomic.AtomicLong +import com.microsoft.ml.spark.lightgbm.{ColumnParams, LightGBMUtils} +import com.microsoft.ml.spark.lightgbm.dataset.DatasetUtils.getRowAsDoubleArray +import com.microsoft.ml.spark.lightgbm.swig._ +import org.apache.spark.ml.linalg.SQLDataTypes.VectorType +import org.apache.spark.ml.linalg.{DenseVector, SparseVector} +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.StructType + +import scala.collection.mutable.ListBuffer + +private[lightgbm] object ChunkedArrayUtils { + def copyChunkedArray[T: Numeric](chunkedArray: ChunkedArray[T], + mainArray: BaseSwigArray[T], + threadRowStartIndex: Long, + chunkSize: Long): Unit = { + val num = implicitly[Numeric[T]] + val defaultVal = num.fromInt(-1) + // Copy in parallel on each thread + // First copy full chunks + val chunkCount = chunkedArray.getChunksCount - 1 + for (chunk <- 0L until chunkCount) { + for (inChunkIdx <- 0L until chunkSize) { + mainArray.setItem(threadRowStartIndex + chunk * chunkSize + inChunkIdx, + chunkedArray.getItem(chunk, inChunkIdx, defaultVal)) + } + } + // Next copy filled values from last chunk only + val lastChunkCount = chunkedArray.getLastChunkAddCount + for (lastChunkIdx <- 0L until lastChunkCount) { + mainArray.setItem(threadRowStartIndex + chunkCount * chunkSize + lastChunkIdx, + chunkedArray.getItem(chunkCount, lastChunkIdx, defaultVal)) + } + } +} + +class PeekingIterator[T](it: Iterator[T]) extends Iterator[T] { + var nextHolder: Option[T] = None + + override def hasNext: Boolean = { + nextHolder.isDefined || it.hasNext + } + + override def next(): T = { + if (nextHolder.isDefined) { + val n = nextHolder.get + nextHolder = None + n + } else { + it.next() + } + } + + def peek: T = { + if (nextHolder.isEmpty && it.hasNext) { + nextHolder = Some(it.next()) + } + nextHolder.get + } +} + +private[lightgbm] abstract class BaseChunkedColumns(rowsIter: PeekingIterator[Row], + columnParams: ColumnParams, + schema: StructType, + chunkSize: Int) { + val labels: FloatChunkedArray = new FloatChunkedArray(chunkSize) + val weights: Option[FloatChunkedArray] = columnParams.weightColumn.map { + _ => new FloatChunkedArray(chunkSize) + } + val initScores: Option[DoubleChunkedArray] = columnParams.initScoreColumn.map { + _ => new DoubleChunkedArray(chunkSize) + } + val groups: ListBuffer[Any] = new ListBuffer[Any]() + + val numCols: Int = rowsIter.peek.getAs[Any](columnParams.featuresColumn) match { + case dense: DenseVector => dense.toSparse.size + case sparse: SparseVector => sparse.size + } + + lazy val rowCount: Int = rowsIter.map { row => + addFeatures(row) + labels.add(row.getDouble(schema.fieldIndex(columnParams.labelColumn)).toFloat) + columnParams.weightColumn.foreach { col => + weights.get.add(row.getDouble(schema.fieldIndex(col)).toFloat) + } + addInitScoreColumnRow(row) + addGroupColumnRow(row) + }.length + + protected def addInitScoreColumnRow(row: Row): Unit = { + columnParams.initScoreColumn.foreach { col => + if (schema(col).dataType == VectorType) { + row.getAs[DenseVector](col).values.foreach(initScores.get.add) + // Note: rows * # classes in multiclass case + } else { + initScores.get.add(row.getAs[Double](col)) + } + } + } + + protected def addGroupColumnRow(row: Row): Unit = { + columnParams.groupColumn.foreach { col => + groups.append(row.getAs[Any](col)) + } + } + + protected def addFeatures(row: Row): Unit + + def release(): Unit = { + // Clear memory + labels.delete() + weights.foreach(_.delete()) + initScores.foreach(_.delete()) + } + + def numInitScores: Long = initScores.map(_.getAddCount).getOrElse(0L) + +} + +private[lightgbm] final class SparseChunkedColumns(rowsIter: PeekingIterator[Row], + columnParams: ColumnParams, + schema: StructType, + chunkSize: Int, + useSingleDataset: Boolean) + extends BaseChunkedColumns(rowsIter, columnParams, schema, chunkSize) { + + val indexes = new IntChunkedArray(chunkSize) + val values = new DoubleChunkedArray(chunkSize) + val indexPointers = new IntChunkedArray(chunkSize) + + if (!useSingleDataset) { + indexPointers.add(0) + } + + override protected def addFeatures(row: Row): Unit = { + val sparseVector = row.getAs[Any](columnParams.featuresColumn) match { + case dense: DenseVector => dense.toSparse + case sparse: SparseVector => sparse + } + sparseVector.values.foreach(values.add) + sparseVector.indices.foreach(indexes.add) + indexPointers.add(sparseVector.numNonzeros) + } + + def getNumIndexes: Long = indexes.getAddCount + + def getNumIndexPointers: Long = indexPointers.getAddCount + + override def release(): Unit = { + // Clear memory + super.release() + indexes.delete() + values.delete() + indexPointers.delete() + } +} + +private[lightgbm] final class DenseChunkedColumns(rowsIter: PeekingIterator[Row], + columnParams: ColumnParams, + schema: StructType, + chunkSize: Int) + extends BaseChunkedColumns(rowsIter, columnParams, schema, chunkSize) { + + val features = new DoubleChunkedArray(numCols * chunkSize) + + override protected def addFeatures(row: Row): Unit = { + getRowAsDoubleArray(row, columnParams).foreach(features.add) + } + + override def release(): Unit = { + // Clear memory + super.release() + features.delete() + } + +} + +private[lightgbm] abstract class BaseAggregatedColumns(val chunkSize: Int) { + protected var labels: FloatSwigArray = _ + protected var weights: Option[FloatSwigArray] = None + protected var initScores: Option[DoubleSwigArray] = None + protected var groups: Array[Any] = _ + + /** + * Variables for knowing how large full array should be allocated to + */ + protected val rowCount = new AtomicLong(0L) + protected val initScoreCount = new AtomicLong(0L) + + protected var numCols = 0 + + def getRowCount: Int = rowCount.get().toInt + + def getNumCols: Int = numCols + + def getNumColsFromChunkedArray(chunkedCols: BaseChunkedColumns): Int + + protected def initializeFeatures(chunkedCols: BaseChunkedColumns, rowCount: Long): Unit + + def getGroups: Array[Any] = groups + + def cleanup(): Unit = { + labels.delete() + weights.foreach(_.delete()) + initScores.foreach(_.delete()) + } + + def generateDataset(referenceDataset: Option[LightGBMDataset], datasetParams: String): LightGBMDataset + + def incrementCount(chunkedCols: BaseChunkedColumns): Unit = { + rowCount.addAndGet(chunkedCols.rowCount) + initScoreCount.addAndGet(chunkedCols.numInitScores) + } + + def addRows(chunkedCols: BaseChunkedColumns): Unit = { + numCols = getNumColsFromChunkedArray(chunkedCols) + } + + protected def initializeRows(chunkedCols: BaseChunkedColumns): Unit = { + // this.numCols = numCols + val rc = rowCount.get() + val isc = initScoreCount.get() + labels = new FloatSwigArray(rc) + weights = chunkedCols.weights.map(_ => new FloatSwigArray(rc)) + initScores = chunkedCols.initScores.map(_ => new DoubleSwigArray(isc)) + initializeFeatures(chunkedCols, rc) + groups = new Array[Any](rc.toInt) + } + +} + +private[lightgbm] trait DisjointAggregatedColumns extends BaseAggregatedColumns { + def addFeatures(chunkedCols: BaseChunkedColumns): Unit + + /** Adds the rows to the internal data structure. + */ + override def addRows(chunkedCols: BaseChunkedColumns): Unit = { + super.addRows(chunkedCols) + initializeRows(chunkedCols) + // Coalesce to main arrays passed to dataset create + chunkedCols.labels.coalesceTo(labels) + chunkedCols.weights.foreach(_.coalesceTo(weights.get)) + chunkedCols.initScores.foreach(_.coalesceTo(initScores.get)) + this.addFeatures(chunkedCols) + chunkedCols.groups.copyToArray(groups) + } +} + +private[lightgbm] trait SyncAggregatedColumns extends BaseAggregatedColumns { + /** + * Variables for current thread to use in order to update common arrays in parallel + */ + protected val threadRowStartIndex = new AtomicLong(0L) + protected val threadInitScoreStartIndex = new AtomicLong(0L) + + /** Adds the rows to the internal data structure. + */ + override def addRows(chunkedCols: BaseChunkedColumns): Unit = { + super.addRows(chunkedCols) + parallelInitializeRows(chunkedCols) + parallelizedCopy(chunkedCols) + } + + private def parallelInitializeRows(chunkedCols: BaseChunkedColumns): Unit = { + // Initialize arrays if they are not defined - first thread to get here does the initialization for all of them + if (labels == null) { + this.synchronized { + if (labels == null) { + initializeRows(chunkedCols) + } + } + } + } + + protected def updateThreadLocalIndices(chunkedCols: BaseChunkedColumns, threadRowStartIndex: Long): List[Long] + + protected def parallelizeFeaturesCopy(chunkedCols: BaseChunkedColumns, featureIndexes: List[Long]): Unit + + private def parallelizedCopy(chunkedCols: BaseChunkedColumns): Unit = { + // Parallelized copy to common arrays + var threadRowStartIndex = 0L + var threadInitScoreStartIndex = 0L + val featureIndexes = + this.synchronized { + val labelsSize = chunkedCols.labels.getAddCount + threadRowStartIndex = this.threadRowStartIndex.getAndAdd(labelsSize.toInt) + val initScoreSize = chunkedCols.initScores.map(_.getAddCount) + initScoreSize.foreach(size => threadInitScoreStartIndex = this.threadInitScoreStartIndex.getAndAdd(size)) + updateThreadLocalIndices(chunkedCols, threadRowStartIndex) + } + ChunkedArrayUtils.copyChunkedArray(chunkedCols.labels, labels, threadRowStartIndex, chunkSize) + chunkedCols.weights.foreach { + weightChunkedArray => + ChunkedArrayUtils.copyChunkedArray(weightChunkedArray, weights.get, threadRowStartIndex, + chunkSize) + } + chunkedCols.initScores.foreach { + initScoreChunkedArray => + ChunkedArrayUtils.copyChunkedArray(initScoreChunkedArray, initScores.get, + threadInitScoreStartIndex, chunkSize) + } + parallelizeFeaturesCopy(chunkedCols, featureIndexes) + chunkedCols.groups.copyToArray(groups, threadRowStartIndex.toInt) + // rewrite array reference for volatile arrays, see: https://www.javamex.com/tutorials/volatile_arrays.shtml + this.synchronized { + groups = groups + } + } +} + +private[lightgbm] abstract class BaseDenseAggregatedColumns(chunkSize: Int) extends BaseAggregatedColumns(chunkSize) { + protected var features: DoubleSwigArray = _ + + def getNumColsFromChunkedArray(chunkedCols: BaseChunkedColumns): Int = { + chunkedCols.asInstanceOf[DenseChunkedColumns].numCols + } + + protected def initializeFeatures(chunkedCols: BaseChunkedColumns, rowCount: Long): Unit = { + features = new DoubleSwigArray(numCols * rowCount) + } + + def getFeatures: DoubleSwigArray = features + + def generateDataset(referenceDataset: Option[LightGBMDataset], datasetParams: String): LightGBMDataset = { + val pointer = lightgbmlib.voidpp_handle() + try { + // Generate the dataset for features + LightGBMUtils.validate(lightgbmlib.LGBM_DatasetCreateFromMat( + lightgbmlib.double_to_voidp_ptr(features.array), + lightgbmlibConstants.C_API_DTYPE_FLOAT64, + rowCount.get().toInt, + numCols, + 1, + datasetParams, + referenceDataset.map(_.datasetPtr).orNull, + pointer), "Dataset create") + } finally { + lightgbmlib.delete_doubleArray(features.array) + } + val dataset = new LightGBMDataset(lightgbmlib.voidpp_value(pointer)) + dataset.addFloatField(labels.array, "label", getRowCount) + weights.map(_.array).foreach(dataset.addFloatField(_, "weight", getRowCount)) + initScores.map(_.array).foreach(dataset.addDoubleField(_, "init_score", getRowCount)) + dataset + } + +} + +private[lightgbm] final class DenseAggregatedColumns(chunkSize: Int) + extends BaseDenseAggregatedColumns(chunkSize) with DisjointAggregatedColumns { + + def addFeatures(chunkedCols: BaseChunkedColumns): Unit = { + chunkedCols.asInstanceOf[DenseChunkedColumns].features.coalesceTo(features) + } + +} + +/** Defines class for aggregating rows to a single structure before creating the native LightGBMDataset. + * + * @param chunkSize The chunk size for the chunked arrays. + */ +private[lightgbm] final class DenseSyncAggregatedColumns(chunkSize: Int) + extends BaseDenseAggregatedColumns(chunkSize) with SyncAggregatedColumns { + protected def updateThreadLocalIndices(chunkedCols: BaseChunkedColumns, threadRowStartIndex: Long): List[Long] = { + List(threadRowStartIndex) + } + + protected def parallelizeFeaturesCopy(chunkedCols: BaseChunkedColumns, featureIndexes: List[Long]): Unit = { + ChunkedArrayUtils.copyChunkedArray(chunkedCols.asInstanceOf[DenseChunkedColumns].features, + features, featureIndexes.head * numCols, chunkSize) + } + +} + +private[lightgbm] abstract class BaseSparseAggregatedColumns(chunkSize: Int) + extends BaseAggregatedColumns(chunkSize) { + protected var indexes: IntSwigArray = _ + protected var values: DoubleSwigArray = _ + protected var indexPointers: IntSwigArray = _ + + /** + * Aggregated variables for knowing how large full array should be allocated to + */ + protected var indexesCount = new AtomicLong(0L) + protected var indptrCount = new AtomicLong(0L) + + def getNumColsFromChunkedArray(chunkedCols: BaseChunkedColumns): Int = { + chunkedCols.asInstanceOf[SparseChunkedColumns].numCols + } + + override def incrementCount(chunkedCols: BaseChunkedColumns): Unit = { + super.incrementCount(chunkedCols) + val sparseChunkedCols = chunkedCols.asInstanceOf[SparseChunkedColumns] + indexesCount.addAndGet(sparseChunkedCols.getNumIndexes) + indptrCount.addAndGet(sparseChunkedCols.getNumIndexPointers) + } + + protected def initializeFeatures(chunkedCols: BaseChunkedColumns, rowCount: Long): Unit = { + val indexesCount = this.indexesCount.get() + val indptrCount = this.indptrCount.get() + indexes = new IntSwigArray(indexesCount) + values = new DoubleSwigArray(indexesCount) + indexPointers = new IntSwigArray(indptrCount) + indexPointers.setItem(0, 0) + } + + def getIndexes: IntSwigArray = indexes + + def getValues: DoubleSwigArray = values + + def getIndexPointers: IntSwigArray = indexPointers + + override def cleanup(): Unit = { + labels.delete() + weights.foreach(_.delete()) + initScores.foreach(_.delete()) + values.delete() + indexes.delete() + indexPointers.delete() + } + + private def indexPointerArrayIncrement(indptrArray: SWIGTYPE_p_int): Unit = { + // Update indptr array indexes in sparse matrix + (1L until indptrCount.get()).foreach { index => + val indptrPrevValue = lightgbmlib.intArray_getitem(indptrArray, index - 1) + val indptrCurrValue = lightgbmlib.intArray_getitem(indptrArray, index) + lightgbmlib.intArray_setitem(indptrArray, index, indptrPrevValue + indptrCurrValue) + } + } + + def generateDataset(referenceDataset: Option[LightGBMDataset], datasetParams: String): LightGBMDataset = { + indexPointerArrayIncrement(getIndexPointers.array) + val pointer = lightgbmlib.voidpp_handle() + // Generate the dataset for features + LightGBMUtils.validate(lightgbmlib.LGBM_DatasetCreateFromCSR( + lightgbmlib.int_to_voidp_ptr(indexPointers.array), + lightgbmlibConstants.C_API_DTYPE_INT32, + indexes.array, + lightgbmlib.double_to_voidp_ptr(values.array), + lightgbmlibConstants.C_API_DTYPE_FLOAT64, + indptrCount.get(), + indexesCount.get(), + numCols, + datasetParams, + referenceDataset.map(_.datasetPtr).orNull, + pointer), "Dataset create") + val dataset = new LightGBMDataset(lightgbmlib.voidpp_value(pointer)) + dataset.addFloatField(labels.array, "label", getRowCount) + weights.map(_.array).foreach(dataset.addFloatField(_, "weight", getRowCount)) + initScores.map(_.array).foreach(dataset.addDoubleField(_, "init_score", getRowCount)) + dataset + } + +} + +/** Defines class for aggregating rows to a single structure before creating the native LightGBMDataset. + * + * @param chunkSize The chunk size for the chunked arrays. + */ +private[lightgbm] final class SparseAggregatedColumns(chunkSize: Int) + extends BaseSparseAggregatedColumns(chunkSize) with DisjointAggregatedColumns { + + /** Adds the indexes, values and indptr to the internal data structure. + */ + def addFeatures(chunkedCols: BaseChunkedColumns): Unit = { + val sparseChunkedColumns = chunkedCols.asInstanceOf[SparseChunkedColumns] + sparseChunkedColumns.indexes.coalesceTo(indexes) + sparseChunkedColumns.values.coalesceTo(values) + sparseChunkedColumns.indexPointers.coalesceTo(indexPointers) + } +} + +/** Defines class for aggregating rows to a single structure before creating the native LightGBMDataset. + * + * @param chunkSize The chunk size for the chunked arrays. + */ +private[lightgbm] final class SparseSyncAggregatedColumns(chunkSize: Int) + extends BaseSparseAggregatedColumns(chunkSize) with SyncAggregatedColumns { + /** + * Variables for current thread to use in order to update common arrays in parallel + */ + protected val threadIndexesStartIndex = new AtomicLong(0L) + protected val threadIndptrStartIndex = new AtomicLong(1L) + + override protected def initializeRows(chunkedCols: BaseChunkedColumns): Unit = { + // Add extra 0 for start of indptr in parallel case + this.indptrCount.addAndGet(1L) + super.initializeRows(chunkedCols) + } + + protected def updateThreadLocalIndices(chunkedCols: BaseChunkedColumns, threadRowStartIndex: Long): List[Long] = { + val sparseChunkedCols = chunkedCols.asInstanceOf[SparseChunkedColumns] + val indexesSize = sparseChunkedCols.indexes.getAddCount + val threadIndexesStartIndex = this.threadIndexesStartIndex.getAndAdd(indexesSize) + + val indPtrSize = sparseChunkedCols.indexPointers.getAddCount + val threadIndPtrStartIndex = this.threadIndptrStartIndex.getAndAdd(indPtrSize) + List(threadIndexesStartIndex, threadIndPtrStartIndex) + } + + protected def parallelizeFeaturesCopy(chunkedCols: BaseChunkedColumns, featureIndexes: List[Long]): Unit = { + val sparseChunkedCols = chunkedCols.asInstanceOf[SparseChunkedColumns] + ChunkedArrayUtils.copyChunkedArray(sparseChunkedCols.indexes, indexes, featureIndexes(0), chunkSize) + ChunkedArrayUtils.copyChunkedArray(sparseChunkedCols.values, values, featureIndexes(0), chunkSize) + ChunkedArrayUtils.copyChunkedArray(sparseChunkedCols.indexPointers, indexPointers, featureIndexes(1), chunkSize) + } + +} diff --git a/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/dataset/DatasetUtils.scala b/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/dataset/DatasetUtils.scala index 02ba5b698e..4fe55bb411 100644 --- a/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/dataset/DatasetUtils.scala +++ b/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/dataset/DatasetUtils.scala @@ -4,86 +4,18 @@ package com.microsoft.ml.spark.lightgbm.dataset import com.microsoft.ml.lightgbm.{doubleChunkedArray, floatChunkedArray} -import com.microsoft.ml.spark.lightgbm.{ColumnParams, LightGBMUtils} +import com.microsoft.ml.spark.lightgbm.ColumnParams +import com.microsoft.ml.spark.lightgbm.swig.DoubleChunkedArray import org.apache.spark.ml.linalg.SQLDataTypes.VectorType -import com.microsoft.ml.spark.lightgbm.params.TrainParams -import org.apache.spark.ml.attribute.AttributeGroup import org.apache.spark.ml.linalg.{DenseVector, SparseVector} import org.apache.spark.sql.Row import org.apache.spark.sql.types.StructType -import org.slf4j.Logger - -import scala.collection.mutable.ListBuffer object DatasetUtils { - def getArrayType(rowsIter: Iterator[Row], - columnParams: ColumnParams, - schema: StructType, - matrixType: String): (Iterator[Row], Boolean) = { - if (matrixType == "auto") { - sampleRowsForArrayType(rowsIter, schema, columnParams) - } else if (matrixType == "sparse") { - (rowsIter: Iterator[Row], true) - } else if (matrixType == "dense") { - (rowsIter: Iterator[Row], false) - } else { - throw new Exception(s"Invalid parameter matrix type specified: ${matrixType}") - } - } - - def generateDataset(rowsIter: Iterator[Row], columnParams: ColumnParams, - referenceDataset: Option[LightGBMDataset], schema: StructType, - log: Logger, trainParams: TrainParams): Option[LightGBMDataset] = { - val (concatRowsIter: Iterator[Row], isSparse: Boolean) = getArrayType(rowsIter, columnParams, schema, - trainParams.executionParams.matrixType) - var datasetPtr: Option[LightGBMDataset] = None - if (!isSparse) { - datasetPtr = aggregateDenseStreamedData(concatRowsIter, columnParams, referenceDataset, schema, log, trainParams) - // Validate generated dataset has the correct number of rows and cols - datasetPtr.get.validateDataset() - } else { - val rows = concatRowsIter.toArray - val numRows = rows.length - val labels = rows.map(row => row.getDouble(schema.fieldIndex(columnParams.labelColumn))) - val rowsAsSparse = rows.map(row => row.get(schema.fieldIndex(columnParams.featuresColumn)) match { - case dense: DenseVector => dense.toSparse - case sparse: SparseVector => sparse - }) - val numCols = rowsAsSparse(0).size - val slotNames = getSlotNames(schema, columnParams.featuresColumn, numCols, trainParams) - log.info(s"LightGBM task generating sparse dataset with $numRows rows and $numCols columns") - datasetPtr = Some(LightGBMUtils.generateSparseDataset(rowsAsSparse, referenceDataset, slotNames, trainParams)) - // Validate generated dataset has the correct number of rows and cols - datasetPtr.get.validateDataset() - datasetPtr.get.addFloatField(labels, "label", numRows) - columnParams.weightColumn.foreach { col => - val weights = rows.map(row => row.getDouble(schema.fieldIndex(col))) - datasetPtr.get.addFloatField(weights, "weight", numRows) - } - addInitScoreColumn(rows, columnParams.initScoreColumn, datasetPtr, numRows, schema) - addGroupColumn(rows, columnParams.groupColumn, datasetPtr, numRows, schema, None) - } - datasetPtr - } - - trait CardinalityType[T] - - object CardinalityTypes { - - implicit object LongType extends CardinalityType[Long] - - implicit object IntType extends CardinalityType[Int] - - implicit object StringType extends CardinalityType[String] - - } - - import CardinalityTypes._ - case class CardinalityTriplet[T](groupCounts: List[Int], currentValue: T, currentCount: Int) - def countCardinality[T](input: Seq[T])(implicit ev: CardinalityType[T]): Array[Int] = { + def countCardinality[T](input: Seq[T]): Array[Int] = { val default: T = null.asInstanceOf[T] val cardinalityTriplet = input.foldLeft(CardinalityTriplet(List.empty[Int], default, 0)) { @@ -107,89 +39,34 @@ object DatasetUtils { groupCardinality } - def addInitScoreColumnRow(initScoreChunkedArrayOpt: Option[doubleChunkedArray], row: Row, - columnParams: ColumnParams, schema: StructType): Unit = { - columnParams.initScoreColumn.foreach { col => - val field = schema.fields(schema.fieldIndex(col)) - if (field.dataType == VectorType) { - val initScores = row.get(schema.fieldIndex(col)).asInstanceOf[DenseVector] - // Note: rows * # classes in multiclass case - initScores.values.foreach { rowValue => - initScoreChunkedArrayOpt.get.add(rowValue) - } - } else { - val initScore = row.getDouble(schema.fieldIndex(col)) - initScoreChunkedArrayOpt.get.add(initScore) - } - } - } - - def addGroupColumnRow(row: Row, groupColumnValues: ListBuffer[Row], - columnParams: ColumnParams, schema: StructType): Unit = { - columnParams.groupColumn.foreach { col => - val colIdx = schema.fieldIndex(col) - groupColumnValues.append(Row(row.get(colIdx))) - } - } - /** * Sample the first several rows to determine whether to construct sparse or dense matrix in lightgbm native code. * * @param rowsIter Iterator of rows. - * @param schema The schema. * @param columnParams The column parameters. * @return A reconstructed iterator with the same original rows and whether the matrix should be sparse or dense. */ - def sampleRowsForArrayType(rowsIter: Iterator[Row], schema: StructType, - columnParams: ColumnParams): (Iterator[Row], Boolean) = { + def sampleRowsForArrayType(rowsIter: Iterator[Row], columnParams: ColumnParams): (Iterator[Row], Boolean) = { val numSampledRows = 10 val sampleRows = rowsIter.take(numSampledRows).toArray - val numDense = sampleRows.map(row => - row.get(schema.fieldIndex(columnParams.featuresColumn)).isInstanceOf[DenseVector]).filter(value => value).length + val numDense = sampleRows + .map(row => row.getAs[Any](columnParams.featuresColumn).isInstanceOf[DenseVector]) + .count(value => value) val numSparse = sampleRows.length - numDense // recreate the iterator (sampleRows.toIterator ++ rowsIter, numSparse > numDense) } - def getRowAsDoubleArray(row: Row, columnParams: ColumnParams, schema: StructType): Array[Double] = { - row.get(schema.fieldIndex(columnParams.featuresColumn)) match { + def getRowAsDoubleArray(row: Row, columnParams: ColumnParams): Array[Double] = { + row.getAs[Any](columnParams.featuresColumn) match { case dense: DenseVector => dense.toArray - case sparse: SparseVector => sparse.toDense.toArray + case sparse: SparseVector => sparse.toArray } } - def addFeaturesToChunkedArray(featuresChunkedArrayOpt: Option[doubleChunkedArray], numCols: Int, - rowAsDoubleArray: Array[Double]): Unit = { - featuresChunkedArrayOpt.foreach { featuresChunkedArray => - rowAsDoubleArray.foreach { doubleVal => - featuresChunkedArray.add(doubleVal) - } - } - } - - def addGroupColumn(rows: Array[Row], groupColumn: Option[String], - datasetPtr: Option[LightGBMDataset], numRows: Int, - schema: StructType, overrideIdx: Option[Int]): Unit = { - validateGroupColumn(groupColumn, schema) - groupColumn.foreach { col => - val datatype = schema.fields(schema.fieldIndex(col)).dataType - val colIdx = if (overrideIdx.isEmpty) schema.fieldIndex(col) else overrideIdx.get - - // Convert to distinct count (note ranker should have sorted within partition by group id) - // We use a triplet of a list of cardinalities, last unique value and unique value count - val groupCardinality = datatype match { - case org.apache.spark.sql.types.IntegerType => countCardinality(rows.map(row => row.getInt(colIdx))) - case org.apache.spark.sql.types.LongType => countCardinality(rows.map(row => row.getLong(colIdx))) - case org.apache.spark.sql.types.StringType => countCardinality(rows.map(row => row.getString(colIdx))) - } - - datasetPtr.get.addIntField(groupCardinality, "group", groupCardinality.length) - } - } - - def addInitScoreColumn(rows: Array[Row], initScoreColumn: Option[String], - datasetPtr: Option[LightGBMDataset], numRows: Int, schema: StructType): Unit = { - initScoreColumn.foreach { col => + def getInitScores(rows: Array[Row], initScoreColumn: Option[String], + schema: StructType): Option[Array[Double]] = { + initScoreColumn.map { col => val field = schema.fields(schema.fieldIndex(col)) if (field.dataType == VectorType) { val initScores = rows.map(row => row.get(schema.fieldIndex(col)).asInstanceOf[DenseVector]) @@ -202,14 +79,14 @@ object DatasetUtils { flattenedInitScores(colIndex * initScoresLength + rowIndex) = rowValue } } - datasetPtr.get.addDoubleField(flattenedInitScores, "init_score", numRows) + flattenedInitScores } else { - val initScores = rows.map(row => row.getDouble(schema.fieldIndex(col))) - datasetPtr.get.addDoubleField(initScores, "init_score", numRows) + rows.map(row => row.getDouble(schema.fieldIndex(col))) } } } + def releaseArrays(labelsChunkedArray: floatChunkedArray, weightChunkedArrayOpt: Option[floatChunkedArray], initScoreChunkedArrayOpt: Option[doubleChunkedArray]): Unit = { labelsChunkedArray.release() @@ -217,82 +94,14 @@ object DatasetUtils { initScoreChunkedArrayOpt.foreach(_.release()) } - def aggregateDenseStreamedData(rowsIter: Iterator[Row], columnParams: ColumnParams, - referenceDataset: Option[LightGBMDataset], schema: StructType, - log: Logger, trainParams: TrainParams): Option[LightGBMDataset] = { - var numRows = 0 - val chunkSize = trainParams.executionParams.chunkSize - val labelsChunkedArray = new floatChunkedArray(chunkSize) - val weightChunkedArrayOpt = columnParams.weightColumn.map { _ => new floatChunkedArray(chunkSize) } - val initScoreChunkedArrayOpt = columnParams.initScoreColumn.map { _ => new doubleChunkedArray(chunkSize) } - var featuresChunkedArrayOpt: Option[doubleChunkedArray] = None - val groupColumnValues: ListBuffer[Row] = new ListBuffer[Row]() - try { - var numCols = 0 - while (rowsIter.hasNext) { - val row = rowsIter.next() - numRows += 1 - labelsChunkedArray.add(row.getDouble(schema.fieldIndex(columnParams.labelColumn)).toFloat) - columnParams.weightColumn.map { col => - weightChunkedArrayOpt.get.add(row.getDouble(schema.fieldIndex(col)).toFloat) - } - val rowAsDoubleArray = getRowAsDoubleArray(row, columnParams, schema) - numCols = rowAsDoubleArray.length - if (featuresChunkedArrayOpt.isEmpty) { - featuresChunkedArrayOpt = Some(new doubleChunkedArray(numCols * chunkSize)) - } - addFeaturesToChunkedArray(featuresChunkedArrayOpt, numCols, rowAsDoubleArray) - addInitScoreColumnRow(initScoreChunkedArrayOpt, row, columnParams, schema) - addGroupColumnRow(row, groupColumnValues, columnParams, schema) - } - - val slotNames = getSlotNames(schema, columnParams.featuresColumn, numCols, trainParams) - log.info(s"LightGBM task generating dense dataset with $numRows rows and $numCols columns") - val datasetPtr = Some(LightGBMUtils.generateDenseDataset(numRows, numCols, featuresChunkedArrayOpt.get, - referenceDataset, slotNames, trainParams, chunkSize)) - datasetPtr.get.addFloatField(labelsChunkedArray, "label", numRows) - - weightChunkedArrayOpt.foreach(datasetPtr.get.addFloatField(_, "weight", numRows)) - initScoreChunkedArrayOpt.foreach(datasetPtr.get.addDoubleField(_, "init_score", numRows)) - val overrideGroupIndex = Some(0) - addGroupColumn(groupColumnValues.toArray, columnParams.groupColumn, datasetPtr, numRows, schema, - overrideGroupIndex) - datasetPtr - } finally { - releaseArrays(labelsChunkedArray, weightChunkedArrayOpt, initScoreChunkedArrayOpt) - } - } - - def validateGroupColumn(groupColumn: Option[String], schema: StructType): Unit = { - groupColumn.foreach { col => - val datatype = schema.fields(schema.fieldIndex(col)).dataType - - if (datatype != org.apache.spark.sql.types.IntegerType - && datatype != org.apache.spark.sql.types.LongType - && datatype != org.apache.spark.sql.types.StringType) { - throw new IllegalArgumentException( - s"group column $col must be of type Long, Int or String but is ${datatype.typeName}") - } + def validateGroupColumn(col: String, schema: StructType): Unit = { + val datatype = schema(col).dataType + if (datatype != org.apache.spark.sql.types.IntegerType + && datatype != org.apache.spark.sql.types.LongType + && datatype != org.apache.spark.sql.types.StringType) { + throw new IllegalArgumentException( + s"group column $col must be of type Long, Int or String but is ${datatype.typeName}") } } - def getSlotNames(schema: StructType, featuresColumn: String, numCols: Int, - trainParams: TrainParams): Option[Array[String]] = { - if (trainParams.featureNames.nonEmpty) { - Some(trainParams.featureNames) - } else { - val featuresSchema = schema.fields(schema.fieldIndex(featuresColumn)) - val metadata = AttributeGroup.fromStructField(featuresSchema) - if (metadata.attributes.isEmpty) None - else if (metadata.attributes.get.isEmpty) None - else { - val colnames = (0 until numCols).map(_.toString).toArray - metadata.attributes.get.foreach { - case attr => - attr.index.foreach(index => colnames(index) = attr.name.getOrElse(index.toString)) - } - Some(colnames) - } - } - } } diff --git a/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/dataset/LightGBMDataset.scala b/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/dataset/LightGBMDataset.scala index 60fd33623e..0c513cfd23 100644 --- a/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/dataset/LightGBMDataset.scala +++ b/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/dataset/LightGBMDataset.scala @@ -6,6 +6,7 @@ package com.microsoft.ml.spark.lightgbm.dataset import com.microsoft.lightgbm.SwigPtrWrapper import com.microsoft.ml.lightgbm._ import com.microsoft.ml.spark.lightgbm.LightGBMUtils +import com.microsoft.ml.spark.lightgbm.dataset.DatasetUtils.countCardinality import scala.reflect.ClassTag @@ -167,6 +168,13 @@ class LightGBMDataset(val datasetPtr: SWIGTYPE_p_void) extends AutoCloseable { } } + def addGroupColumn[T](rows: Array[T]): Unit = { + // Convert to distinct count (note ranker should have sorted within partition by group id) + // We use a triplet of a list of cardinalities, last unique value and unique value count + val groupCardinality = countCardinality(rows) + addIntField(groupCardinality, "group", groupCardinality.length) + } + def setFeatureNames(featureNamesOpt: Option[Array[String]], numCols: Int): Unit = { // Add in slot names if they exist featureNamesOpt.foreach { featureNamesVal => diff --git a/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/params/LightGBMParams.scala b/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/params/LightGBMParams.scala index 6761676998..6a938e6ad2 100644 --- a/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/params/LightGBMParams.scala +++ b/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/params/LightGBMParams.scala @@ -52,12 +52,20 @@ trait LightGBMExecutionParams extends Wrappable { def setTimeout(value: Double): this.type = set(timeout, value) val useBarrierExecutionMode = new BooleanParam(this, "useBarrierExecutionMode", - "Use new barrier execution mode in Beta testing, off by default.") + "Barrier execution mode which uses a barrier stage, off by default.") setDefault(useBarrierExecutionMode -> false) def getUseBarrierExecutionMode: Boolean = $(useBarrierExecutionMode) def setUseBarrierExecutionMode(value: Boolean): this.type = set(useBarrierExecutionMode, value) + val useSingleDatasetMode = new BooleanParam(this, "useSingleDatasetMode", + "Use single dataset execution mode to create a single native dataset per executor (singleton) " + + "to reduce memory and communication overhead. Note this is disabled when running spark in local mode.") + setDefault(useSingleDatasetMode -> false) + + def getUseSingleDatasetMode: Boolean = $(useSingleDatasetMode) + def setUseSingleDatasetMode(value: Boolean): this.type = set(useSingleDatasetMode, value) + val numBatches = new IntParam(this, "numBatches", "If greater than 0, splits data into separate batches during training") setDefault(numBatches -> 0) diff --git a/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/params/TrainParams.scala b/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/params/TrainParams.scala index 8bc51c5f7d..6bb55ffc59 100644 --- a/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/params/TrainParams.scala +++ b/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/params/TrainParams.scala @@ -80,7 +80,7 @@ case class ClassifierTrainParams(parallelism: String, topK: Int, numIterations: dartModeParams: DartModeParams, executionParams: ExecutionParams, objectiveParams: ObjectiveParams) extends TrainParams { - override def toString(): String = { + override def toString: String = { val extraStr = if (objectiveParams.objective != LightGBMConstants.BinaryObjective) s"num_class=$numClass" else s"is_unbalance=${isUnbalance.toString}" @@ -105,7 +105,7 @@ case class RegressorTrainParams(parallelism: String, topK: Int, numIterations: I dartModeParams: DartModeParams, executionParams: ExecutionParams, objectiveParams: ObjectiveParams) extends TrainParams { - override def toString(): String = { + override def toString: String = { s"alpha=$alpha tweedie_variance_power=$tweedieVariancePower boost_from_average=${boostFromAverage.toString} " + s"${super.toString()}" } @@ -128,7 +128,7 @@ case class RankerTrainParams(parallelism: String, topK: Int, numIterations: Int, dartModeParams: DartModeParams, executionParams: ExecutionParams, objectiveParams: ObjectiveParams) extends TrainParams { - override def toString(): String = { + override def toString: String = { val labelGainStr = if (labelGain.isEmpty) "" else s"label_gain=${labelGain.mkString(",")}" val evalAtStr = @@ -141,7 +141,7 @@ case class RankerTrainParams(parallelism: String, topK: Int, numIterations: Int, */ case class DartModeParams(dropRate: Double, maxDrop: Int, skipDrop: Double, xgboostDartMode: Boolean, uniformDrop: Boolean) extends Serializable { - override def toString(): String = { + override def toString: String = { s"drop_rate=$dropRate max_drop=$maxDrop skip_drop=$skipDrop xgboost_dart_mode=$xgboostDartMode " + s"uniform_drop=$uniformDrop " } @@ -154,8 +154,9 @@ case class DartModeParams(dropRate: Double, maxDrop: Int, skipDrop: Double, * constructed should be sparse or dense. * @param numThreads The number of threads to run the native lightgbm training with on each worker. */ -case class ExecutionParams(chunkSize: Int, matrixType: String, numThreads: Int) extends Serializable { - override def toString(): String = { +case class ExecutionParams(chunkSize: Int, matrixType: String, numThreads: Int, + useSingleDatasetMode: Boolean) extends Serializable { + override def toString: String = { s"num_threads=$numThreads " } } @@ -169,7 +170,7 @@ case class ExecutionParams(chunkSize: Int, matrixType: String, numThreads: Int) * Should accept two parameters: preds, train_data, and return (grad, hess). */ case class ObjectiveParams(objective: String, fobj: Option[FObjTrait]) extends Serializable { - override def toString(): String = { + override def toString: String = { if (fobj.isEmpty) { s"objective=$objective " } else { diff --git a/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/swig/SwigUtils.scala b/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/swig/SwigUtils.scala index be1f090771..d84992c6eb 100644 --- a/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/swig/SwigUtils.scala +++ b/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/swig/SwigUtils.scala @@ -3,7 +3,8 @@ package com.microsoft.ml.spark.lightgbm.swig -import com.microsoft.ml.lightgbm.{SWIGTYPE_p_float, lightgbmlib} +import com.microsoft.ml.lightgbm.{SWIGTYPE_p_double, SWIGTYPE_p_float, SWIGTYPE_p_int, doubleChunkedArray, + floatChunkedArray, int32ChunkedArray, lightgbmlib} object SwigUtils extends Serializable { /** Converts a Java float array to a native C++ array using SWIG. @@ -17,3 +18,101 @@ object SwigUtils extends Serializable { colArray } } + +abstract class ChunkedArray[T]() { + def getChunksCount: Long + + def getLastChunkAddCount: Long + + def getAddCount: Long + + def getItem(chunk: Long, inChunkIdx: Long, default: T): T + + def add(value: T): Unit +} + +class FloatChunkedArray(floatChunkedArray: floatChunkedArray) extends ChunkedArray[Float] { + + def this(size: Long) = this(new floatChunkedArray(size)) + + def getChunksCount: Long = floatChunkedArray.get_chunks_count() + + def getLastChunkAddCount: Long = floatChunkedArray.get_last_chunk_add_count() + + def getAddCount: Long = floatChunkedArray.get_add_count() + + def getItem(chunk: Long, inChunkIdx: Long, default: Float): Float = + floatChunkedArray.getitem(chunk, inChunkIdx, default) + + def add(value: Float): Unit = floatChunkedArray.add(value) + + def delete(): Unit = floatChunkedArray.delete() + + def coalesceTo(floatSwigArray: FloatSwigArray): Unit = floatChunkedArray.coalesce_to(floatSwigArray.array) +} + +class DoubleChunkedArray(doubleChunkedArray: doubleChunkedArray) extends ChunkedArray[Double] { + def this(size: Long) = this(new doubleChunkedArray(size)) + + def getChunksCount: Long = doubleChunkedArray.get_chunks_count() + + def getLastChunkAddCount: Long = doubleChunkedArray.get_last_chunk_add_count() + + def getAddCount: Long = doubleChunkedArray.get_add_count() + + def getItem(chunk: Long, inChunkIdx: Long, default: Double): Double = + doubleChunkedArray.getitem(chunk, inChunkIdx, default) + + def add(value: Double): Unit = doubleChunkedArray.add(value) + + def delete(): Unit = doubleChunkedArray.delete() + + def coalesceTo(doubleSwigArray: DoubleSwigArray): Unit = doubleChunkedArray.coalesce_to(doubleSwigArray.array) +} + +class IntChunkedArray(intChunkedArray: int32ChunkedArray) extends ChunkedArray[Int] { + def this(size: Long) = this(new int32ChunkedArray(size)) + + def getChunksCount: Long = intChunkedArray.get_chunks_count() + + def getLastChunkAddCount: Long = intChunkedArray.get_last_chunk_add_count() + + def getAddCount: Long = intChunkedArray.get_add_count() + + def getItem(chunk: Long, inChunkIdx: Long, default: Int): Int = + intChunkedArray.getitem(chunk, inChunkIdx, default) + + def add(value: Int): Unit = intChunkedArray.add(value) + + def delete(): Unit = intChunkedArray.delete() + + def coalesceTo(intSwigArray: IntSwigArray): Unit = intChunkedArray.coalesce_to(intSwigArray.array) +} + +abstract class BaseSwigArray[T]() { + def setItem(index: Long, item: T): Unit +} + +class FloatSwigArray(val array: SWIGTYPE_p_float) extends BaseSwigArray[Float] { + def this(size: Long) = this(lightgbmlib.new_floatArray(size)) + + def setItem(index: Long, item: Float): Unit = lightgbmlib.floatArray_setitem(array, index, item) + + def delete(): Unit = lightgbmlib.delete_floatArray(array) +} + +class DoubleSwigArray(val array: SWIGTYPE_p_double) extends BaseSwigArray[Double] { + def this(size: Long) = this(lightgbmlib.new_doubleArray(size)) + + def setItem(index: Long, item: Double): Unit = lightgbmlib.doubleArray_setitem(array, index, item) + + def delete(): Unit = lightgbmlib.delete_doubleArray(array) +} + +class IntSwigArray(val array: SWIGTYPE_p_int) extends BaseSwigArray[Int] { + def this(size: Long) = this(lightgbmlib.new_intArray(size)) + + def setItem(index: Long, item: Int): Unit = lightgbmlib.intArray_setitem(array, index, item) + + def delete(): Unit = lightgbmlib.delete_intArray(array) +} diff --git a/lightgbm/src/test/scala/com/microsoft/ml/spark/lightgbm/split2/VerifyLightGBMRanker.scala b/lightgbm/src/test/scala/com/microsoft/ml/spark/lightgbm/split2/VerifyLightGBMRanker.scala index 4ceecf0199..a1dcd76db8 100644 --- a/lightgbm/src/test/scala/com/microsoft/ml/spark/lightgbm/split2/VerifyLightGBMRanker.scala +++ b/lightgbm/src/test/scala/com/microsoft/ml/spark/lightgbm/split2/VerifyLightGBMRanker.scala @@ -5,10 +5,9 @@ package com.microsoft.ml.spark.lightgbm.split2 import com.microsoft.ml.spark.core.test.benchmarks.{Benchmarks, DatasetUtils} import com.microsoft.ml.spark.core.test.fuzzing.{EstimatorFuzzing, TestObject} -import com.microsoft.ml.spark.lightgbm.dataset.DatasetUtils.CardinalityTypes._ import com.microsoft.ml.spark.lightgbm.dataset.{DatasetUtils => CardinalityUtils} import com.microsoft.ml.spark.lightgbm.split1.LightGBMTestUtils -import com.microsoft.ml.spark.lightgbm.{LightGBMRanker, LightGBMRankerModel, LightGBMUtils, TrainUtils} +import com.microsoft.ml.spark.lightgbm.{LightGBMRanker, LightGBMRankerModel, LightGBMUtils} import org.apache.spark.SparkException import org.apache.spark.ml.feature.VectorAssembler import org.apache.spark.ml.linalg.Vectors @@ -77,10 +76,7 @@ class VerifyLightGBMRanker extends Benchmarks with EstimatorFuzzing[LightGBMRank test("Throws error when group column is not long, int or string") { val df = rankingDF.withColumn(queryCol, from_json(lit("{}"), StructType(Seq()))) - - // Throws SparkException instead of IllegalArgumentException because the type - // inspection is part of the spark job instead of before it - assertThrows[SparkException] { + assertThrows[IllegalArgumentException] { baseModel.fit(df).transform(df).collect() } } diff --git a/lightgbm/src/test/scala/com/microsoft/ml/spark/lightgbm/split2/VerifyLightGBMRegressor.scala b/lightgbm/src/test/scala/com/microsoft/ml/spark/lightgbm/split2/VerifyLightGBMRegressor.scala index fb617c10af..3748f0cc03 100644 --- a/lightgbm/src/test/scala/com/microsoft/ml/spark/lightgbm/split2/VerifyLightGBMRegressor.scala +++ b/lightgbm/src/test/scala/com/microsoft/ml/spark/lightgbm/split2/VerifyLightGBMRegressor.scala @@ -58,6 +58,7 @@ class VerifyLightGBMRegressor extends Benchmarks } test("Verify LightGBM Regressor can be run with TrainValidationSplit") { + (0 until 20).foreach(_ => getAndIncrementPort()) val model = baseModel val paramGrid = new ParamGridBuilder() @@ -84,6 +85,15 @@ class VerifyLightGBMRegressor extends Benchmarks assert(modelStr.contains("[lambda_l2: 0.1]") || modelStr.contains("[lambda_l2: 0.5]")) } + test("Verify LightGBM with single dataset mode") { + val df = airfoilDF + val model = baseModel.setUseSingleDatasetMode(true) + model.fit(df).transform(df).show() + + val models = baseModel.setUseSingleDatasetMode(false) + models.fit(df).transform(df).show() + } + test("Verify LightGBM Regressor with weight column") { val df = airfoilDF.withColumn(weightCol, lit(1.0)) @@ -136,6 +146,7 @@ class VerifyLightGBMRegressor extends Benchmarks } test("Verify LightGBM Regressor with tweedie distribution") { + (0 until 10).foreach(_ => getAndIncrementPort()) val model = baseModel.setObjective("tweedie").setTweedieVariancePower(1.5) val paramGrid = new ParamGridBuilder() @@ -205,7 +216,7 @@ class VerifyLightGBMRegressor extends Benchmarks val featurizer = LightGBMUtils.getFeaturizer(dataset, labelCol, featuresCol) val train = featurizer.transform(dataset) - Seq(new TestObject(new LightGBMRegressor() + Seq(new TestObject(new LightGBMRegressor().setDefaultListenPort(getAndIncrementPort()) .setLabelCol(labelCol).setFeaturesCol(featuresCol).setNumLeaves(5), train)) } diff --git a/vw/src/main/scala/com/microsoft/ml/spark/vw/VowpalWabbitBase.scala b/vw/src/main/scala/com/microsoft/ml/spark/vw/VowpalWabbitBase.scala index 59c983aac1..faad604349 100644 --- a/vw/src/main/scala/com/microsoft/ml/spark/vw/VowpalWabbitBase.scala +++ b/vw/src/main/scala/com/microsoft/ml/spark/vw/VowpalWabbitBase.scala @@ -441,7 +441,7 @@ trait VowpalWabbitBase extends Wrappable spanningTree.start() val jobUniqueId = Math.abs(UUID.randomUUID.getLeastSignificantBits.toInt) - val driverHostAddress = ClusterUtil.getDriverHost(df) + val driverHostAddress = ClusterUtil.getDriverHost(df.sparkSession) val port = spanningTree.getPort /* @@ -505,7 +505,7 @@ trait VowpalWabbitBase extends Wrappable protected def trainInternal[T <: VowpalWabbitBaseModel](dataset: Dataset[_], model: T): T = { // follow LightGBM pattern val numTasksPerExec = ClusterUtil.getNumTasksPerExecutor(dataset, log) - val numExecutorTasks = ClusterUtil.getNumExecutorTasks(dataset, numTasksPerExec, log) + val numExecutorTasks = ClusterUtil.getNumExecutorTasks(dataset.sparkSession, numTasksPerExec, log) val numTasks = min(numExecutorTasks, dataset.rdd.getNumPartitions) // Select needed columns, maybe get the weight column, keeps mem usage low