Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Guarantee one boosterPtr is allocated and freed per LightGBMBooster instance #792

Merged
merged 8 commits into from Feb 10, 2020
287 changes: 137 additions & 150 deletions src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMBooster.scala
Expand Up @@ -8,6 +8,110 @@ import com.microsoft.ml.spark.lightgbm.LightGBMUtils.getBoosterPtrFromModelStrin
import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector}
import org.apache.spark.sql.{SaveMode, SparkSession}

//scalastyle:off
/** Wraps the boosterPtr and guarantees that Native library is initialized
* everytime it is needed
* @param model The string serialized representation of the learner
*/
protected class BoosterHandler(model: String) {

LightGBMUtils.initializeNativeLibrary()

var boosterPtr: SWIGTYPE_p_void = {
getBoosterPtrFromModelString(model)
}

var scoredDataOutPtr: SWIGTYPE_p_double = {
lightgbmlib.new_doubleArray(numClasses)
}

var scoredDataLengthLongPtr: SWIGTYPE_p_long_long = {
val dataLongLengthPtr = lightgbmlib.new_int64_tp()
lightgbmlib.int64_tp_assign(dataLongLengthPtr, 1)
dataLongLengthPtr
}

var leafIndexDataOutPtr: SWIGTYPE_p_double = lightgbmlib.new_doubleArray(numTotalModel)

var leafIndexDataLengthLongPtr: SWIGTYPE_p_long_long = {
val dataLongLengthPtr = lightgbmlib.new_int64_tp()
lightgbmlib.int64_tp_assign(dataLongLengthPtr, numTotalModel)
dataLongLengthPtr
}

lazy val numClasses = getNumClasses
lazy val numFeatures = getNumFeatures
lazy val numTotalModel = getNumTotalModel
lazy val numTotalModelPerIteration = getNumModelPerIteration

lazy val rawScoreConstant = lightgbmlibConstants.C_API_PREDICT_RAW_SCORE
lazy val normalScoreConstant = lightgbmlibConstants.C_API_PREDICT_NORMAL
lazy val leafIndexPredictConstant = lightgbmlibConstants.C_API_PREDICT_LEAF_INDEX

lazy val dataInt32bitType = lightgbmlibConstants.C_API_DTYPE_INT32
lazy val data64bitType = lightgbmlibConstants.C_API_DTYPE_FLOAT64

private def getNumClasses: Int = {
val numClassesOut = lightgbmlib.new_intp()
LightGBMUtils.validate(
lightgbmlib.LGBM_BoosterGetNumClasses(boosterPtr, numClassesOut),
"Booster NumClasses")
lightgbmlib.intp_value(numClassesOut)
}

private def getNumModelPerIteration: Int = {
val numModelPerIterationOut = lightgbmlib.new_intp()
LightGBMUtils.validate(
lightgbmlib.LGBM_BoosterNumModelPerIteration(boosterPtr, numModelPerIterationOut),
"Booster models per iteration")
lightgbmlib.intp_value(numModelPerIterationOut)
}

private def getNumTotalModel: Int = {
val numModelOut = lightgbmlib.new_intp()
LightGBMUtils.validate(
lightgbmlib.LGBM_BoosterNumberOfTotalModel(boosterPtr, numModelOut),
"Booster total models")
lightgbmlib.intp_value(numModelOut)
}

private def getNumFeatures: Int = {
val numFeaturesOut = lightgbmlib.new_intp()
LightGBMUtils.validate(
lightgbmlib.LGBM_BoosterGetNumFeature(boosterPtr, numFeaturesOut),
"Booster NumFeature")
lightgbmlib.intp_value(numFeaturesOut)
}

private def freeNativeMemory(): Unit = {
if (scoredDataLengthLongPtr != null) {
lightgbmlib.delete_int64_tp(scoredDataLengthLongPtr)
scoredDataLengthLongPtr = null
}
if (scoredDataOutPtr != null) {
lightgbmlib.delete_doubleArray(scoredDataOutPtr)
scoredDataOutPtr = null
}
if (leafIndexDataLengthLongPtr != null) {
lightgbmlib.delete_int64_tp(leafIndexDataLengthLongPtr)
leafIndexDataLengthLongPtr = null
}
if (leafIndexDataOutPtr != null) {
lightgbmlib.delete_doubleArray(leafIndexDataOutPtr)
leafIndexDataOutPtr = null
}
if (boosterPtr != null) {
LightGBMUtils.validate(lightgbmlib.LGBM_BoosterFree(boosterPtr), "Finalize Booster")
boosterPtr = null
}
}

override protected def finalize(): Unit = {
freeNativeMemory()
}
}

//scalastyle:on
/** Represents a LightGBM Booster learner
* @param model The string serialized representation of the learner
*/
Expand All @@ -16,162 +120,108 @@ class LightGBMBooster(val model: String) extends Serializable {
/** Transient variable containing local machine's pointer to native booster
*/
@transient
var boosterPtr: SWIGTYPE_p_void = _
lazy val boosterHandler: BoosterHandler = {
new BoosterHandler(model)
}

def score(features: Vector, raw: Boolean, classification: Boolean): Array[Double] = {
// Reload booster on each node
if (boosterPtr == null) {
LightGBMUtils.initializeNativeLibrary()
boosterPtr = getBoosterPtrFromModelString(model)
}
val kind =
if (raw) lightgbmlibConstants.C_API_PREDICT_RAW_SCORE
else lightgbmlibConstants.C_API_PREDICT_NORMAL
if (raw) boosterHandler.rawScoreConstant
else boosterHandler.normalScoreConstant
features match {
case dense: DenseVector => predictScoreForMat(dense.toArray, kind, classification)
case sparse: SparseVector => predictScoreForCSR(sparse, kind, classification)
}
}

def predictLeaf(features: Vector): Array[Double] = {
// Reload booster on each node
if (boosterPtr == null) {
LightGBMUtils.initializeNativeLibrary()
boosterPtr = getBoosterPtrFromModelString(model)
}

features match {
case dense: DenseVector => predictLeafForMat(dense.toArray)
case sparse: SparseVector => predictLeafForCSR(sparse)
}
}

lazy val numClasses: Int = getNumClasses()

lazy val numTotalModel: Int = getNumTotalModel

lazy val numModelPerIteration: Int = getNumModelPerIteration
lazy val numClasses: Int = boosterHandler.numClasses

lazy val numIterations: Int = numTotalModel / numModelPerIteration

@transient
var scoredDataOutPtr: SWIGTYPE_p_double = _

@transient
var scoredDataLengthLongPtr: SWIGTYPE_p_long_long = _

@transient
var leafIndexDataOutPtr: SWIGTYPE_p_double = _

@transient
var leafIndexDataLengthLongPtr: SWIGTYPE_p_long_long = _
lazy val numFeatures: Int = boosterHandler.numFeatures

def ensureScoredDataCreated(): Unit = {
if (scoredDataLengthLongPtr == null) {
scoredDataOutPtr = lightgbmlib.new_doubleArray(numClasses)
scoredDataLengthLongPtr = lightgbmlib.new_int64_tp()
lightgbmlib.int64_tp_assign(scoredDataLengthLongPtr, 1)
}
}
lazy val numTotalModel: Int = boosterHandler.numTotalModel

def ensureLeafIndexDataCreated(): Unit = {
if (leafIndexDataOutPtr == null) {
leafIndexDataOutPtr = lightgbmlib.new_doubleArray(numTotalModel)
leafIndexDataLengthLongPtr = lightgbmlib.new_int64_tp()
lightgbmlib.int64_tp_assign(leafIndexDataLengthLongPtr, numTotalModel)
}
}
lazy val numModelPerIteration: Int = boosterHandler.numTotalModelPerIteration

override protected def finalize(): Unit = {
if (scoredDataLengthLongPtr != null)
lightgbmlib.delete_int64_tp(scoredDataLengthLongPtr)
if (scoredDataOutPtr != null)
lightgbmlib.delete_doubleArray(scoredDataOutPtr)
if (leafIndexDataLengthLongPtr != null)
lightgbmlib.delete_int64_tp(leafIndexDataLengthLongPtr)
if (leafIndexDataOutPtr != null)
lightgbmlib.delete_doubleArray(leafIndexDataOutPtr)
}
lazy val numIterations: Int = numTotalModel / numModelPerIteration

protected def predictScoreForCSR(sparseVector: SparseVector, kind: Int, classification: Boolean): Array[Double] = {
val numCols = sparseVector.size

val datasetParams = "max_bin=255"
val dataInt32bitType = lightgbmlibConstants.C_API_DTYPE_INT32
val data64bitType = lightgbmlibConstants.C_API_DTYPE_FLOAT64

ensureScoredDataCreated()
val dataInt32bitType = boosterHandler.dataInt32bitType
val data64bitType = boosterHandler.data64bitType

LightGBMUtils.validate(
lightgbmlib.LGBM_BoosterPredictForCSRSingle(
sparseVector.indices, sparseVector.values,
sparseVector.numNonzeros,
boosterPtr, dataInt32bitType, data64bitType, 2, numCols,
boosterHandler.boosterPtr, dataInt32bitType, data64bitType, 2, numCols,
kind, -1, datasetParams,
scoredDataLengthLongPtr, scoredDataOutPtr), "Booster Predict")
boosterHandler.scoredDataLengthLongPtr, boosterHandler.scoredDataOutPtr), "Booster Predict")

predScoreToArray(classification, scoredDataOutPtr, kind)
predScoreToArray(classification, boosterHandler.scoredDataOutPtr, kind)
}

protected def predictScoreForMat(row: Array[Double], kind: Int, classification: Boolean): Array[Double] = {
val data64bitType = lightgbmlibConstants.C_API_DTYPE_FLOAT64
val data64bitType = boosterHandler.data64bitType

val numCols = row.length
val isRowMajor = 1

val datasetParams = "max_bin=255"

ensureScoredDataCreated()

LightGBMUtils.validate(
lightgbmlib.LGBM_BoosterPredictForMatSingle(
row, boosterPtr, data64bitType,
row, boosterHandler.boosterPtr, data64bitType,
numCols,
isRowMajor, kind,
-1, datasetParams, scoredDataLengthLongPtr, scoredDataOutPtr),
-1, datasetParams, boosterHandler.scoredDataLengthLongPtr, boosterHandler.scoredDataOutPtr),
"Booster Predict")
predScoreToArray(classification, scoredDataOutPtr, kind)
predScoreToArray(classification, boosterHandler.scoredDataOutPtr, kind)
}

protected def predictLeafForCSR(sparseVector: SparseVector): Array[Double] = {
val numCols = sparseVector.size

val datasetParams = "max_bin=255 is_pre_partition=True"
val dataInt32bitType = lightgbmlibConstants.C_API_DTYPE_INT32
val data64bitType = lightgbmlibConstants.C_API_DTYPE_FLOAT64

ensureLeafIndexDataCreated()
val dataInt32bitType = boosterHandler.dataInt32bitType
val data64bitType = boosterHandler.data64bitType

LightGBMUtils.validate(
lightgbmlib.LGBM_BoosterPredictForCSRSingle(
sparseVector.indices, sparseVector.values,
sparseVector.numNonzeros,
boosterPtr, dataInt32bitType, data64bitType, 2, numCols,
lightgbmlibConstants.C_API_PREDICT_LEAF_INDEX, -1, datasetParams,
leafIndexDataLengthLongPtr, leafIndexDataOutPtr), "Booster Predict Leaf")
boosterHandler.boosterPtr, dataInt32bitType, data64bitType, 2, numCols,
boosterHandler.leafIndexPredictConstant, -1, datasetParams,
boosterHandler.leafIndexDataLengthLongPtr, boosterHandler.leafIndexDataOutPtr), "Booster Predict Leaf")

predLeafToArray(leafIndexDataOutPtr)
predLeafToArray(boosterHandler.leafIndexDataOutPtr)
}

protected def predictLeafForMat(row: Array[Double]): Array[Double] = {
val data64bitType = lightgbmlibConstants.C_API_DTYPE_FLOAT64
val data64bitType = boosterHandler.data64bitType

val numCols = row.length
val isRowMajor = 1

val datasetParams = "max_bin=255"

ensureLeafIndexDataCreated()

LightGBMUtils.validate(
lightgbmlib.LGBM_BoosterPredictForMatSingle(
row, boosterPtr, data64bitType,
row, boosterHandler.boosterPtr, data64bitType,
numCols,
isRowMajor, lightgbmlibConstants.C_API_PREDICT_LEAF_INDEX,
-1, datasetParams, leafIndexDataLengthLongPtr, leafIndexDataOutPtr),
isRowMajor, boosterHandler.leafIndexPredictConstant,
-1, datasetParams, boosterHandler.leafIndexDataLengthLongPtr, boosterHandler.leafIndexDataOutPtr),
"Booster Predict Leaf")

predLeafToArray(leafIndexDataOutPtr)
predLeafToArray(boosterHandler.leafIndexDataOutPtr)
}

def saveNativeModel(session: SparkSession, filename: String, overwrite: Boolean): Unit = {
Expand All @@ -186,11 +236,7 @@ class LightGBMBooster(val model: String) extends Serializable {
}

def dumpModel(session: SparkSession, filename: String, overwrite: Boolean): Unit = {
if (boosterPtr == null) {
LightGBMUtils.initializeNativeLibrary()
boosterPtr = getBoosterPtrFromModelString(model)
}
val json = lightgbmlib.LGBM_BoosterDumpModelSWIG(boosterPtr, 0, 0, 1, lightgbmlib.new_int64_tp())
val json = lightgbmlib.LGBM_BoosterDumpModelSWIG(boosterHandler.boosterPtr, 0, 0, 1, lightgbmlib.new_int64_tp())
val rdd = session.sparkContext.parallelize(Seq(json))
import session.sqlContext.implicits._
val dataset = session.sqlContext.createDataset(rdd)
Expand All @@ -205,78 +251,19 @@ class LightGBMBooster(val model: String) extends Serializable {
*/
def getFeatureImportances(importanceType: String): Array[Double] = {
val importanceTypeNum = if (importanceType.toLowerCase.trim == "gain") 1 else 0
if (boosterPtr == null) {
LightGBMUtils.initializeNativeLibrary()
boosterPtr = getBoosterPtrFromModelString(model)
}
val numFeaturesOut = lightgbmlib.new_intp()
LightGBMUtils.validate(
lightgbmlib.LGBM_BoosterGetNumFeature(boosterPtr, numFeaturesOut),
"Booster NumFeature")
val numFeatures = lightgbmlib.intp_value(numFeaturesOut)
val featureImportances = lightgbmlib.new_doubleArray(numFeatures)
LightGBMUtils.validate(
lightgbmlib.LGBM_BoosterFeatureImportance(boosterPtr, -1, importanceTypeNum, featureImportances),
lightgbmlib.LGBM_BoosterFeatureImportance(boosterHandler.boosterPtr, -1, importanceTypeNum, featureImportances),
"Booster FeatureImportance")
(0 until numFeatures).map(lightgbmlib.doubleArray_getitem(featureImportances, _)).toArray
}

/**
* Retrieve the number of classes from LightGBM Booster
* @return The number of classes.
*/
def getNumClasses(): Int = {
if (boosterPtr == null) {
LightGBMUtils.initializeNativeLibrary()
boosterPtr = getBoosterPtrFromModelString(model)
}
val numClassesOut = lightgbmlib.new_intp()
LightGBMUtils.validate(
lightgbmlib.LGBM_BoosterGetNumClasses(boosterPtr, numClassesOut),
"Booster NumClasses")
lightgbmlib.intp_value(numClassesOut)
}

/**
* Retrieve the number of models per each iteration from LightGBM Booster
* @return The number of models per iteration.
*/
def getNumModelPerIteration: Int = {
if (boosterPtr == null) {
LightGBMUtils.initializeNativeLibrary()
boosterPtr = getBoosterPtrFromModelString(model)
}

val numModelPerIterationOut = lightgbmlib.new_intp()
LightGBMUtils.validate(
lightgbmlib.LGBM_BoosterNumModelPerIteration(boosterPtr, numModelPerIterationOut),
"Booster models per iteration")
lightgbmlib.intp_value(numModelPerIterationOut)
}

/**
* Retrieve the number of total models from LightGBM Booster
* @return The number of total models.
*/
def getNumTotalModel: Int = {
if (boosterPtr == null) {
LightGBMUtils.initializeNativeLibrary()
boosterPtr = getBoosterPtrFromModelString(model)
}

val numModelOut = lightgbmlib.new_intp()
LightGBMUtils.validate(
lightgbmlib.LGBM_BoosterNumberOfTotalModel(boosterPtr, numModelOut),
"Booster total models")
lightgbmlib.intp_value(numModelOut)
}

private def predScoreToArray(classification: Boolean, scoredDataOutPtr: SWIGTYPE_p_double,
kind: Int): Array[Double] = {
if (classification && numClasses == 1) {
// Binary classification scenario - LightGBM only returns the value for the positive class
val pred = lightgbmlib.doubleArray_getitem(scoredDataOutPtr, 0)
if (kind == lightgbmlibConstants.C_API_PREDICT_RAW_SCORE) {
if (kind == boosterHandler.rawScoreConstant) {
// Return the raw score for binary classification
Array(-pred, pred)
} else {
Expand Down