Skip to content

Commit

Permalink
fix: chunksize parameter incorrectly specified during data copy
Browse files Browse the repository at this point in the history
  • Loading branch information
imatiach-msft committed Apr 25, 2022
1 parent edafb8b commit 8c9749b
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,6 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine
val execNumThreads =
if (getUseSingleDatasetMode) get(numThreads).getOrElse(numTasksPerExec - 1)
else getNumThreads

ExecutionParams(getChunkSize, getMatrixType, execNumThreads, getUseSingleDatasetMode)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,25 +170,4 @@ object LightGBMUtils {
val taskId = ctx.taskAttemptId()
taskId
}

def getNumRowsForChunksArray(numRows: Int, chunkSize: Int): SWIGTYPE_p_int = {
val leftoverChunk = numRows % chunkSize
val numChunks = if (leftoverChunk > 0) {
Math.floorDiv(numRows, chunkSize) + 1
}else{
Math.floorDiv(numRows, chunkSize)
}
val numRowsForChunks = lightgbmlib.new_intArray(numChunks)
(0 until numChunks).foreach({ index: Int =>
if (index == numChunks - 1 && leftoverChunk > 0) {
lightgbmlib.intArray_setitem(numRowsForChunks, index, leftoverChunk)
} else {
lightgbmlib.intArray_setitem(numRowsForChunks, index, chunkSize)
}
})
numRowsForChunks
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ import scala.collection.concurrent.TrieMap
private[lightgbm] object ChunkedArrayUtils {
def copyChunkedArray[T: Numeric](chunkedArray: ChunkedArray[T],
mainArray: BaseSwigArray[T],
threadRowStartIndex: Long,
chunkSize: Long): Unit = {
threadRowStartIndex: Long): Unit = {
val num = implicitly[Numeric[T]]
val defaultVal = num.fromInt(-1)
// Copy in parallel on each thread
// First copy full chunks
val chunkSize = chunkedArray.getChunkSize
val chunkCount = chunkedArray.getChunksCount - 1
for (chunk <- 0L until chunkCount) {
for (inChunkIdx <- 0L until chunkSize) {
Expand Down Expand Up @@ -306,16 +306,15 @@ private[lightgbm] trait SyncAggregatedColumns extends BaseAggregatedColumns {
threadInitScoreStartIndex = chunkedCols.initScores.map(_ => pIdToInitScoreCountOffset(partitionId)).getOrElse(0)
updateThreadLocalIndices(chunkedCols, threadRowStartIndex)
}
ChunkedArrayUtils.copyChunkedArray(chunkedCols.labels, labels, threadRowStartIndex, chunkSize)
ChunkedArrayUtils.copyChunkedArray(chunkedCols.labels, labels, threadRowStartIndex)
chunkedCols.weights.foreach {
weightChunkedArray =>
ChunkedArrayUtils.copyChunkedArray(weightChunkedArray, weights.get, threadRowStartIndex,
chunkSize)
ChunkedArrayUtils.copyChunkedArray(weightChunkedArray, weights.get, threadRowStartIndex)
}
chunkedCols.initScores.foreach {
initScoreChunkedArray =>
ChunkedArrayUtils.copyChunkedArray(initScoreChunkedArray, initScores.get,
threadInitScoreStartIndex, chunkSize)
threadInitScoreStartIndex)
}
parallelizeFeaturesCopy(chunkedCols, featureIndexes)
chunkedCols.groups.copyToArray(groups, threadRowStartIndex.toInt)
Expand Down Expand Up @@ -388,7 +387,7 @@ private[lightgbm] final class DenseSyncAggregatedColumns(chunkSize: Int)

protected def parallelizeFeaturesCopy(chunkedCols: BaseChunkedColumns, featureIndexes: List[Long]): Unit = {
ChunkedArrayUtils.copyChunkedArray(chunkedCols.asInstanceOf[DenseChunkedColumns].features,
features, featureIndexes.head * numCols, chunkSize)
features, featureIndexes.head * numCols)
}

}
Expand Down Expand Up @@ -521,9 +520,9 @@ private[lightgbm] final class SparseSyncAggregatedColumns(chunkSize: Int)

protected def parallelizeFeaturesCopy(chunkedCols: BaseChunkedColumns, featureIndexes: List[Long]): Unit = {
val sparseChunkedCols = chunkedCols.asInstanceOf[SparseChunkedColumns]
ChunkedArrayUtils.copyChunkedArray(sparseChunkedCols.indexes, indexes, featureIndexes(0), chunkSize)
ChunkedArrayUtils.copyChunkedArray(sparseChunkedCols.values, values, featureIndexes(0), chunkSize)
ChunkedArrayUtils.copyChunkedArray(sparseChunkedCols.indexPointers, indexPointers, featureIndexes(1), chunkSize)
ChunkedArrayUtils.copyChunkedArray(sparseChunkedCols.indexes, indexes, featureIndexes(0))
ChunkedArrayUtils.copyChunkedArray(sparseChunkedCols.values, values, featureIndexes(0))
ChunkedArrayUtils.copyChunkedArray(sparseChunkedCols.indexPointers, indexPointers, featureIndexes(1))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ abstract class ChunkedArray[T]() {

def getLastChunkAddCount: Long

def getChunkSize: Long

def getAddCount: Long

def getItem(chunk: Long, inChunkIdx: Long, default: T): T
Expand All @@ -47,6 +49,8 @@ class FloatChunkedArray(array: floatChunkedArray) extends ChunkedArray[Float] {

def getLastChunkAddCount: Long = array.get_last_chunk_add_count()

def getChunkSize: Long = array.get_chunk_size()

def getAddCount: Long = array.get_add_count()

def getItem(chunk: Long, inChunkIdx: Long, default: Float): Float =
Expand Down Expand Up @@ -75,6 +79,8 @@ class DoubleChunkedArray(array: doubleChunkedArray) extends ChunkedArray[Double]

def getLastChunkAddCount: Long = array.get_last_chunk_add_count()

def getChunkSize: Long = array.get_chunk_size()

def getAddCount: Long = array.get_add_count()

def getItem(chunk: Long, inChunkIdx: Long, default: Double): Double =
Expand Down Expand Up @@ -103,6 +109,8 @@ class IntChunkedArray(array: int32ChunkedArray) extends ChunkedArray[Int] {

def getLastChunkAddCount: Long = array.get_last_chunk_add_count()

def getChunkSize: Long = array.get_chunk_size()

def getAddCount: Long = array.get_add_count()

def getItem(chunk: Long, inChunkIdx: Long, default: Int): Int =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,18 @@ class VerifyLightGBMClassifier extends Benchmarks with EstimatorFuzzing[LightGBM
assert(!evaluatedDf2.columns.contains(featuresShapCol))
}

test("Verify Binary LightGBM Classifier chunk size parameter") {
val Array(train, test) = pimaDF.repartition(4).randomSplit(Array(0.8, 0.2), seed)
val untrainedModel = baseModel.setUseSingleDatasetMode(true)
val scoredDF1 = untrainedModel.setChunkSize(1000).setSeed(1).setDeterministic(true).fit(train).transform(test)
val chunkSizes = Array(10, 100, 1000, 10000)
chunkSizes.foreach { chunkSize =>
val model = untrainedModel.setChunkSize(chunkSize).setSeed(1).setDeterministic(true).fit(train)
val scoredDF2 = model.transform(test)
assertBinaryEquality(scoredDF1, scoredDF2);
}
}

test("Verify Multiclass LightGBM Classifier local feature importance SHAP values") {
val Array(train, test) = breastTissueDF.select(labelCol, featuresCol).randomSplit(Array(0.8, 0.2), seed)

Expand Down

0 comments on commit 8c9749b

Please sign in to comment.