Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: chunksize parameter incorrectly specified during data copy #1490

Merged
merged 1 commit into from
Apr 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,6 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine
val execNumThreads =
if (getUseSingleDatasetMode) get(numThreads).getOrElse(numTasksPerExec - 1)
else getNumThreads

ExecutionParams(getChunkSize, getMatrixType, execNumThreads, getUseSingleDatasetMode)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,25 +170,4 @@ object LightGBMUtils {
val taskId = ctx.taskAttemptId()
taskId
}

def getNumRowsForChunksArray(numRows: Int, chunkSize: Int): SWIGTYPE_p_int = {
val leftoverChunk = numRows % chunkSize
val numChunks = if (leftoverChunk > 0) {
Math.floorDiv(numRows, chunkSize) + 1
}else{
Math.floorDiv(numRows, chunkSize)
}
val numRowsForChunks = lightgbmlib.new_intArray(numChunks)
(0 until numChunks).foreach({ index: Int =>
if (index == numChunks - 1 && leftoverChunk > 0) {
lightgbmlib.intArray_setitem(numRowsForChunks, index, leftoverChunk)
} else {
lightgbmlib.intArray_setitem(numRowsForChunks, index, chunkSize)
}
})
numRowsForChunks
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ import scala.collection.concurrent.TrieMap
private[lightgbm] object ChunkedArrayUtils {
def copyChunkedArray[T: Numeric](chunkedArray: ChunkedArray[T],
mainArray: BaseSwigArray[T],
threadRowStartIndex: Long,
chunkSize: Long): Unit = {
threadRowStartIndex: Long): Unit = {
val num = implicitly[Numeric[T]]
val defaultVal = num.fromInt(-1)
// Copy in parallel on each thread
// First copy full chunks
val chunkSize = chunkedArray.getChunkSize
val chunkCount = chunkedArray.getChunksCount - 1
for (chunk <- 0L until chunkCount) {
for (inChunkIdx <- 0L until chunkSize) {
Expand Down Expand Up @@ -306,16 +306,15 @@ private[lightgbm] trait SyncAggregatedColumns extends BaseAggregatedColumns {
threadInitScoreStartIndex = chunkedCols.initScores.map(_ => pIdToInitScoreCountOffset(partitionId)).getOrElse(0)
updateThreadLocalIndices(chunkedCols, threadRowStartIndex)
}
ChunkedArrayUtils.copyChunkedArray(chunkedCols.labels, labels, threadRowStartIndex, chunkSize)
ChunkedArrayUtils.copyChunkedArray(chunkedCols.labels, labels, threadRowStartIndex)
chunkedCols.weights.foreach {
weightChunkedArray =>
ChunkedArrayUtils.copyChunkedArray(weightChunkedArray, weights.get, threadRowStartIndex,
chunkSize)
ChunkedArrayUtils.copyChunkedArray(weightChunkedArray, weights.get, threadRowStartIndex)
}
chunkedCols.initScores.foreach {
initScoreChunkedArray =>
ChunkedArrayUtils.copyChunkedArray(initScoreChunkedArray, initScores.get,
threadInitScoreStartIndex, chunkSize)
threadInitScoreStartIndex)
}
parallelizeFeaturesCopy(chunkedCols, featureIndexes)
chunkedCols.groups.copyToArray(groups, threadRowStartIndex.toInt)
Expand Down Expand Up @@ -388,7 +387,7 @@ private[lightgbm] final class DenseSyncAggregatedColumns(chunkSize: Int)

protected def parallelizeFeaturesCopy(chunkedCols: BaseChunkedColumns, featureIndexes: List[Long]): Unit = {
ChunkedArrayUtils.copyChunkedArray(chunkedCols.asInstanceOf[DenseChunkedColumns].features,
features, featureIndexes.head * numCols, chunkSize)
features, featureIndexes.head * numCols)
}

}
Expand Down Expand Up @@ -521,9 +520,9 @@ private[lightgbm] final class SparseSyncAggregatedColumns(chunkSize: Int)

protected def parallelizeFeaturesCopy(chunkedCols: BaseChunkedColumns, featureIndexes: List[Long]): Unit = {
val sparseChunkedCols = chunkedCols.asInstanceOf[SparseChunkedColumns]
ChunkedArrayUtils.copyChunkedArray(sparseChunkedCols.indexes, indexes, featureIndexes(0), chunkSize)
ChunkedArrayUtils.copyChunkedArray(sparseChunkedCols.values, values, featureIndexes(0), chunkSize)
ChunkedArrayUtils.copyChunkedArray(sparseChunkedCols.indexPointers, indexPointers, featureIndexes(1), chunkSize)
ChunkedArrayUtils.copyChunkedArray(sparseChunkedCols.indexes, indexes, featureIndexes(0))
ChunkedArrayUtils.copyChunkedArray(sparseChunkedCols.values, values, featureIndexes(0))
ChunkedArrayUtils.copyChunkedArray(sparseChunkedCols.indexPointers, indexPointers, featureIndexes(1))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ abstract class ChunkedArray[T]() {

def getLastChunkAddCount: Long

def getChunkSize: Long

def getAddCount: Long

def getItem(chunk: Long, inChunkIdx: Long, default: T): T
Expand All @@ -47,6 +49,8 @@ class FloatChunkedArray(array: floatChunkedArray) extends ChunkedArray[Float] {

def getLastChunkAddCount: Long = array.get_last_chunk_add_count()

def getChunkSize: Long = array.get_chunk_size()

def getAddCount: Long = array.get_add_count()

def getItem(chunk: Long, inChunkIdx: Long, default: Float): Float =
Expand Down Expand Up @@ -75,6 +79,8 @@ class DoubleChunkedArray(array: doubleChunkedArray) extends ChunkedArray[Double]

def getLastChunkAddCount: Long = array.get_last_chunk_add_count()

def getChunkSize: Long = array.get_chunk_size()

def getAddCount: Long = array.get_add_count()

def getItem(chunk: Long, inChunkIdx: Long, default: Double): Double =
Expand Down Expand Up @@ -103,6 +109,8 @@ class IntChunkedArray(array: int32ChunkedArray) extends ChunkedArray[Int] {

def getLastChunkAddCount: Long = array.get_last_chunk_add_count()

def getChunkSize: Long = array.get_chunk_size()

def getAddCount: Long = array.get_add_count()

def getItem(chunk: Long, inChunkIdx: Long, default: Int): Int =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,18 @@ class VerifyLightGBMClassifier extends Benchmarks with EstimatorFuzzing[LightGBM
assert(!evaluatedDf2.columns.contains(featuresShapCol))
}

test("Verify Binary LightGBM Classifier chunk size parameter") {
val Array(train, test) = pimaDF.repartition(4).randomSplit(Array(0.8, 0.2), seed)
val untrainedModel = baseModel.setUseSingleDatasetMode(true)
val scoredDF1 = untrainedModel.setChunkSize(1000).setSeed(1).setDeterministic(true).fit(train).transform(test)
val chunkSizes = Array(10, 100, 1000, 10000)
chunkSizes.foreach { chunkSize =>
val model = untrainedModel.setChunkSize(chunkSize).setSeed(1).setDeterministic(true).fit(train)
val scoredDF2 = model.transform(test)
assertBinaryEquality(scoredDF1, scoredDF2);
}
}

test("Verify Multiclass LightGBM Classifier local feature importance SHAP values") {
val Array(train, test) = breastTissueDF.select(labelCol, featuresCol).randomSplit(Array(0.8, 0.2), seed)

Expand Down