Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/model schema #13

Merged
merged 13 commits into from
Jun 18, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,14 @@ trait CaraStageMapper {
stageDescription.stageName match {
case "LogisticRegression" =>
LogisticRegression(stageDescription.params)
case "RandomForestClassifier" =>
RandomForestClassifier(stageDescription.params)
case "LinearRegression" =>
LinearRegression(stageDescription.params)
case "GBTClassifier" =>
GBTClassifier(stageDescription.params)
case "DecisionTreeClassifier" =>
DecisionTreeClassifier(stageDescription.params)
case _ => throw
new Exception(s"${stageDescription.stageName} is not a valid Cara Stage name. Please verify your Yaml File")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.github.jsarni.CaraStage.ModelStage

import io.github.jsarni.CaraStage.Annotation.MapperConstructor
import org.apache.spark.ml.classification.{DecisionTreeClassifier => SparkML}
import org.apache.spark.ml.PipelineStage
import scala.util.Try

case class DecisionTreeClassifier(
CheckpointInterval: Option[Int], FeaturesCol: Option[String], Impurity: Option[String], LabelCol: Option[String], LeafCol: Option[String],
MaxBins: Option[Int], MaxDepth: Option[Int], MinInfoGain: Option[Double],MinInstancesPerNode: Option[Int], MinWeightFractionPerNode: Option[Double],
PredictionCol: Option[String], ProbabilityCol: Option[String], RawPredictionCol: Option[String], Seed: Option[Long], Thresholds: Option[Array[Double]],
WeightCol: Option[String]
)

extends CaraModel {

@MapperConstructor
def this(params: Map[String, String]) = {
this(
params.get("CheckpointInterval").map(_.toInt),
params.get("FeaturesCol"),
params.get("Impurity"),
params.get("LabelCol"),
params.get("LeafCol"),
params.get("MaxBins").map(_.toInt),
params.get("MaxDepth").map(_.toInt),
params.get("MinInfoGain").map(_.toDouble),
params.get("MinInstancesPerNode").map(_.toInt),
params.get("MinWeightFractionPerNode").map(_.toDouble),
params.get("PredictionCol"),
params.get("ProbabilityCol"),
params.get("RawPredictionCol"),
params.get("Seed").map(_.toLong),
params.get("Thresholds").map(_.split(",").map(_.toDouble)),
params.get("WeightCol")
)
}

override def build(): Try[PipelineStage] = Try {
val model = new SparkML()
val definedFields = this.getClass.getDeclaredFields.filter(f => f.get(this).asInstanceOf[Option[Any]].isDefined)
val names = definedFields.map(f => f.getName)
val values = definedFields.map(f => f.get(this))
val zipFields = names zip values
zipFields.map { f =>
val fieldName = f._1
val fieldValue = f._2 match {case Some(s) => s }
getMethode(model,fieldValue,fieldName)
.invoke(model,fieldValue.asInstanceOf[f._2.type])
}
model
}
}
object DecisionTreeClassifier {
def apply(params: Map[String, String]): DecisionTreeClassifier = new DecisionTreeClassifier(params)
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package io.github.jsarni.CaraStage.ModelStage

import io.github.jsarni.CaraStage.Annotation.MapperConstructor
import org.apache.spark.ml.classification.{GBTClassifier => SparkML}
import org.apache.spark.ml.PipelineStage
import scala.util.Try


case class GBTClassifier(
CheckpointInterval: Option[Int], FeaturesCol: Option[String], LabelCol: Option[String], LeafCol: Option[String],
MaxBins: Option[Int], MaxDepth: Option[Int], MinInfoGain: Option[Double],MinInstancesPerNode: Option[Int], MinWeightFractionPerNode: Option[Double],
PredictionCol: Option[String], ProbabilityCol: Option[String], RawPredictionCol: Option[String], Seed: Option[Long], Thresholds: Option[Array[Double]],
WeightCol: Option[String], FeatureSubsetStrategy: Option[String], SubsamplingRate: Option[Double],LossType: Option[String], MaxIter: Option[Int],
StepSize: Option[Double], ValidationIndicatorCol: Option[String]
)


extends CaraModel {

@MapperConstructor
def this(params: Map[String, String]) = {
this(
params.get("CheckpointInterval").map(_.toInt),
params.get("FeaturesCol"),
params.get("LabelCol"),
params.get("LeafCol"),
params.get("MaxBins").map(_.toInt),
params.get("MaxDepth").map(_.toInt),
params.get("MinInfoGain").map(_.toDouble),
params.get("MinInstancesPerNode").map(_.toInt),
params.get("MinWeightFractionPerNode").map(_.toDouble),
params.get("PredictionCol"),
params.get("ProbabilityCol"),
params.get("RawPredictionCol"),
params.get("Seed").map(_.toLong),
params.get("Thresholds").map(_.split(",").map(_.toDouble)),
params.get("WeightCol"),
params.get("FeatureSubsetStrategy"),
params.get("SubsamplingRate").map(_.toDouble),
params.get("LossType"),
params.get("MaxIter").map(_.toInt),
params.get("StepSize").map(_.toDouble),
params.get("ValidationIndicatorCol")
)
}

override def build(): Try[PipelineStage] = Try {
val model = new SparkML()
val definedFields = this.getClass.getDeclaredFields.filter(f => f.get(this).asInstanceOf[Option[Any]].isDefined)
val names = definedFields.map(f => f.getName)
val values = definedFields.map(f => f.get(this))
val zipFields = names zip values
zipFields.map { f =>
val fieldName = f._1
val fieldValue = f._2 match {case Some(s) => s }
getMethode(model,fieldValue,fieldName)
.invoke(model,fieldValue.asInstanceOf[f._2.type])
}
model
}
}
object GBTClassifier {
def apply(params: Map[String, String]): GBTClassifier = new GBTClassifier(params)
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.github.jsarni.CaraStage.ModelStage
import io.github.jsarni.CaraStage.Annotation.MapperConstructor
import org.apache.spark.ml.PipelineStage
import org.apache.spark.ml.classification.{LogisticRegression => SparkLR}
import org.apache.spark.ml.classification.{LogisticRegression => SparkML}
import scala.util.Try


Expand Down Expand Up @@ -33,14 +33,18 @@ case class LogisticRegression(MaxIter: Option[Int], RegParam: Option[Double], El
}

override def build(): Try[PipelineStage] = Try {
val lr = new SparkLR()
val model = new SparkML()
val definedFields = this.getClass.getDeclaredFields.filter(f => f.get(this).asInstanceOf[Option[Any]].isDefined)
val names = definedFields.map(f => f.getName)
val values = definedFields.map(f => f.get(this))
val zipFields = names zip values
zipFields.map(f=> getMethode(lr,f._2 match {case Some(s) => s },f._1).invoke(lr,(f._2 match {case Some(value) => value.asInstanceOf[f._2.type ] })))
lr

zipFields.map { f =>
val fieldName = f._1
val fieldValue = f._2 match {case Some(s) => s }
getMethode(model,fieldValue,fieldName)
.invoke(model,fieldValue.asInstanceOf[f._2.type])
}
model
}
}
object LogisticRegression {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package io.github.jsarni.CaraStage.ModelStage

import io.github.jsarni.CaraStage.Annotation.MapperConstructor
import org.apache.spark.ml.classification.{RandomForestClassifier => SparkML}
import org.apache.spark.ml.PipelineStage
import scala.util.Try

case class RandomForestClassifier(
CheckpointInterval: Option[Int], FeaturesCol: Option[String], Impurity: Option[String], LabelCol: Option[String], LeafCol: Option[String],
MaxBins: Option[Int], MaxDepth: Option[Int], MinInfoGain: Option[Double],MinInstancesPerNode: Option[Int], MinWeightFractionPerNode: Option[Double],
PredictionCol: Option[String], ProbabilityCol: Option[String], RawPredictionCol: Option[String], Seed: Option[Long], Thresholds: Option[Array[Double]],
WeightCol: Option[String], FeatureSubsetStrategy: Option[String], SubsamplingRate: Option[Double], NumTrees: Option[Int]
)
extends CaraModel {

@MapperConstructor
def this(params: Map[String, String]) = {
this(
params.get("CheckpointInterval").map(_.toInt),
params.get("FeaturesCol"),
params.get("Impurity"),
params.get("LabelCol"),
params.get("LeafCol"),
params.get("MaxBins").map(_.toInt),
params.get("MaxDepth").map(_.toInt),
params.get("MinInfoGain").map(_.toDouble),
params.get("MinInstancesPerNode").map(_.toInt),
params.get("MinWeightFractionPerNode").map(_.toDouble),
params.get("PredictionCol"),
params.get("ProbabilityCol"),
params.get("RawPredictionCol"),
params.get("Seed").map(_.toLong),
params.get("Thresholds").map(_.split(",").map(_.toDouble)),
params.get("WeightCol"),
params.get("FeatureSubsetStrategy"),
params.get("SubsamplingRate").map(_.toDouble),
params.get("NumTrees").map(_.toInt)
)
}

override def build(): Try[PipelineStage] = Try {
val model = new SparkML()
val definedFields = this.getClass.getDeclaredFields.filter(f => f.get(this).asInstanceOf[Option[Any]].isDefined)
val names = definedFields.map(f => f.getName)
val values = definedFields.map(f => f.get(this))
val zipFields = names zip values
zipFields.map { f =>
val fieldName = f._1
val fieldValue = f._2 match {case Some(s) => s }
getMethode(model,fieldValue,fieldName)
.invoke(model,fieldValue.asInstanceOf[f._2.type])
}
model
}
}
object RandomForestClassifier {
def apply(params: Map[String, String]): RandomForestClassifier = new RandomForestClassifier(params)
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package io.github.jsarni.CaraStage.ModelStage

import org.apache.spark.ml.classification.{DecisionTreeClassifier => SparkML}
import io.github.jsarni.TestBase

class DecisionTreeClassifierTest extends TestBase {

"build" should "Create an lr model and set all parameters with there args values or set default ones" in {
val params = Map(
"CheckpointInterval" -> "10",
"FeaturesCol" -> "FeatureCol",
"Impurity" -> "entropy",
"LabelCol" -> "LabelCol",
"LeafCol" -> "LeafCol",
"MaxBins" -> "10",
"MaxDepth" -> "5",
"MinInfoGain"-> "0.02",
"MinInstancesPerNode" -> "2",
"MinWeightFractionPerNode" -> "0.03",
"PredictionCol" -> "PredictionCol",
"ProbabilityCol" -> "ProbabilityCol",
"RawPredictionCol" -> "RawPredictionCol",
"Seed" -> "124555",
"Thresholds" -> "0.2, 0.04",
"WeightCol" -> "1.2"
)
val dTree = DecisionTreeClassifier(params)
val dTreeWithTwoParams = new SparkML()
.setCheckpointInterval(5)
.setMaxDepth(10)

val expectedResult = List(
new SparkML()
.setCheckpointInterval(10)
.setFeaturesCol("FeatureCol")
.setImpurity("entropy")
.setLabelCol("LabelCol")
.setLeafCol("LeafCol")
.setMaxBins(10)
.setMaxDepth(5)
.setMinInfoGain(0.02)
.setMinInstancesPerNode(2)
.setMinWeightFractionPerNode(0.03)
.setPredictionCol("PredictionCol")
.setProbabilityCol("ProbabilityCol")
.setRawPredictionCol("RawPredictionCol")
.setSeed(124555.toLong)
.setWeightCol("1.2")
.setThresholds(Array(0.2, 0.04))

)
dTree.build().isSuccess shouldBe true

val res = List(dTree.build().get)
val resParameters = res.map(_.extractParamMap().toSeq.map(_.value))
.map(_.map(elem =>
if (elem.isInstanceOf[Array[_]]) elem.asInstanceOf[Array[_]].toList
else List(elem)))
.flatten
.flatten
val expectedParameters = expectedResult.map(_.extractParamMap().toSeq.map(_.value))
.map(_.map(elem =>
if (elem.isInstanceOf[Array[_]]) elem.asInstanceOf[Array[_]].toList
else List(elem)))
.flatten
.flatten

resParameters should contain theSameElementsAs expectedParameters

// Test default values of unset params
dTreeWithTwoParams.getImpurity shouldBe "gini"
dTreeWithTwoParams.getMaxBins shouldBe 32
dTreeWithTwoParams.getMinInfoGain shouldBe 0.0
}

"GetMethode" should "Return the appropriate methode by it's name" in {
val params = Map(
"CheckpointInterval" -> "10",
"FeaturesCol" -> "FeatureCol",
"Impurity" -> "entropy",
"LabelCol" -> "LabelCol",
"LeafCol" -> "LeafCol",
"MaxBins" -> "10",
"MaxDepth" -> "5",
"MinInfoGain"-> "0.02",
"MinInstancesPerNode" -> "2",
"MinWeightFractionPerNode" -> "0.03",
"PredictionCol" -> "PredictionCol",
"ProbabilityCol" -> "ProbabilityCol",
"RawPredictionCol" -> "RawPredictionCol",
"Seed" -> "124555",
"Thresholds" -> "0.2, 0.04",
"WeightCol" -> "1.2"
)
val caraLr = DecisionTreeClassifier(params)
val model =caraLr.build().get.asInstanceOf[SparkML]

caraLr.getMethode(model,10.toLong,"Seed").getName shouldBe "setSeed"
caraLr.getMethode(model,"PredictCol","PredictionCol").getName shouldBe "setPredictionCol"
caraLr.getMethode(model, 10 ,"CheckpointInterval").getName shouldBe "setCheckpointInterval"

}

}
Loading