Skip to content

Commit

Permalink
Feature/model schema (#13)
Browse files Browse the repository at this point in the history
* LogisticRegressionTest contains error to clear

* Finalize LogisticRegression's class and  tests

* refactor names to caml case and correct spaces

* Adjust LogisticRegretion format code and add DecisionTreeClassifier model class's and test's

* Add GBTClassifier model class's and tests

* tests not ended

* finilize tests new models classes

* CarastageMapper update

* update caraMapperModel

Co-authored-by: merzouk <merzoukoumedda@gmail.com>
  • Loading branch information
merzouk13 and merzouk committed Jun 18, 2021
1 parent f1e469a commit 93aeef5
Show file tree
Hide file tree
Showing 9 changed files with 528 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,14 @@ trait CaraStageMapper {
stageDescription.stageName match {
case "LogisticRegression" =>
LogisticRegression(stageDescription.params)
case "RandomForestClassifier" =>
RandomForestClassifier(stageDescription.params)
case "LinearRegression" =>
LinearRegression(stageDescription.params)
case "GBTClassifier" =>
GBTClassifier(stageDescription.params)
case "DecisionTreeClassifier" =>
DecisionTreeClassifier(stageDescription.params)
case _ => throw
new Exception(s"${stageDescription.stageName} is not a valid Cara Stage name. Please verify your Yaml File")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.github.jsarni.CaraStage.ModelStage

import io.github.jsarni.CaraStage.Annotation.MapperConstructor
import org.apache.spark.ml.classification.{DecisionTreeClassifier => SparkML}
import org.apache.spark.ml.PipelineStage
import scala.util.Try

case class DecisionTreeClassifier(
CheckpointInterval: Option[Int], FeaturesCol: Option[String], Impurity: Option[String], LabelCol: Option[String], LeafCol: Option[String],
MaxBins: Option[Int], MaxDepth: Option[Int], MinInfoGain: Option[Double],MinInstancesPerNode: Option[Int], MinWeightFractionPerNode: Option[Double],
PredictionCol: Option[String], ProbabilityCol: Option[String], RawPredictionCol: Option[String], Seed: Option[Long], Thresholds: Option[Array[Double]],
WeightCol: Option[String]
)

extends CaraModel {

@MapperConstructor
def this(params: Map[String, String]) = {
this(
params.get("CheckpointInterval").map(_.toInt),
params.get("FeaturesCol"),
params.get("Impurity"),
params.get("LabelCol"),
params.get("LeafCol"),
params.get("MaxBins").map(_.toInt),
params.get("MaxDepth").map(_.toInt),
params.get("MinInfoGain").map(_.toDouble),
params.get("MinInstancesPerNode").map(_.toInt),
params.get("MinWeightFractionPerNode").map(_.toDouble),
params.get("PredictionCol"),
params.get("ProbabilityCol"),
params.get("RawPredictionCol"),
params.get("Seed").map(_.toLong),
params.get("Thresholds").map(_.split(",").map(_.toDouble)),
params.get("WeightCol")
)
}

override def build(): Try[PipelineStage] = Try {
val model = new SparkML()
val definedFields = this.getClass.getDeclaredFields.filter(f => f.get(this).asInstanceOf[Option[Any]].isDefined)
val names = definedFields.map(f => f.getName)
val values = definedFields.map(f => f.get(this))
val zipFields = names zip values
zipFields.map { f =>
val fieldName = f._1
val fieldValue = f._2 match {case Some(s) => s }
getMethode(model,fieldValue,fieldName)
.invoke(model,fieldValue.asInstanceOf[f._2.type])
}
model
}
}
object DecisionTreeClassifier {
def apply(params: Map[String, String]): DecisionTreeClassifier = new DecisionTreeClassifier(params)
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package io.github.jsarni.CaraStage.ModelStage

import io.github.jsarni.CaraStage.Annotation.MapperConstructor
import org.apache.spark.ml.classification.{GBTClassifier => SparkML}
import org.apache.spark.ml.PipelineStage
import scala.util.Try


case class GBTClassifier(
CheckpointInterval: Option[Int], FeaturesCol: Option[String], LabelCol: Option[String], LeafCol: Option[String],
MaxBins: Option[Int], MaxDepth: Option[Int], MinInfoGain: Option[Double],MinInstancesPerNode: Option[Int], MinWeightFractionPerNode: Option[Double],
PredictionCol: Option[String], ProbabilityCol: Option[String], RawPredictionCol: Option[String], Seed: Option[Long], Thresholds: Option[Array[Double]],
WeightCol: Option[String], FeatureSubsetStrategy: Option[String], SubsamplingRate: Option[Double],LossType: Option[String], MaxIter: Option[Int],
StepSize: Option[Double], ValidationIndicatorCol: Option[String]
)


extends CaraModel {

@MapperConstructor
def this(params: Map[String, String]) = {
this(
params.get("CheckpointInterval").map(_.toInt),
params.get("FeaturesCol"),
params.get("LabelCol"),
params.get("LeafCol"),
params.get("MaxBins").map(_.toInt),
params.get("MaxDepth").map(_.toInt),
params.get("MinInfoGain").map(_.toDouble),
params.get("MinInstancesPerNode").map(_.toInt),
params.get("MinWeightFractionPerNode").map(_.toDouble),
params.get("PredictionCol"),
params.get("ProbabilityCol"),
params.get("RawPredictionCol"),
params.get("Seed").map(_.toLong),
params.get("Thresholds").map(_.split(",").map(_.toDouble)),
params.get("WeightCol"),
params.get("FeatureSubsetStrategy"),
params.get("SubsamplingRate").map(_.toDouble),
params.get("LossType"),
params.get("MaxIter").map(_.toInt),
params.get("StepSize").map(_.toDouble),
params.get("ValidationIndicatorCol")
)
}

override def build(): Try[PipelineStage] = Try {
val model = new SparkML()
val definedFields = this.getClass.getDeclaredFields.filter(f => f.get(this).asInstanceOf[Option[Any]].isDefined)
val names = definedFields.map(f => f.getName)
val values = definedFields.map(f => f.get(this))
val zipFields = names zip values
zipFields.map { f =>
val fieldName = f._1
val fieldValue = f._2 match {case Some(s) => s }
getMethode(model,fieldValue,fieldName)
.invoke(model,fieldValue.asInstanceOf[f._2.type])
}
model
}
}
object GBTClassifier {
def apply(params: Map[String, String]): GBTClassifier = new GBTClassifier(params)
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.github.jsarni.CaraStage.ModelStage
import io.github.jsarni.CaraStage.Annotation.MapperConstructor
import org.apache.spark.ml.PipelineStage
import org.apache.spark.ml.classification.{LogisticRegression => SparkLR}
import org.apache.spark.ml.classification.{LogisticRegression => SparkML}
import scala.util.Try


Expand Down Expand Up @@ -33,14 +33,18 @@ case class LogisticRegression(MaxIter: Option[Int], RegParam: Option[Double], El
}

override def build(): Try[PipelineStage] = Try {
val lr = new SparkLR()
val model = new SparkML()
val definedFields = this.getClass.getDeclaredFields.filter(f => f.get(this).asInstanceOf[Option[Any]].isDefined)
val names = definedFields.map(f => f.getName)
val values = definedFields.map(f => f.get(this))
val zipFields = names zip values
zipFields.map(f=> getMethode(lr,f._2 match {case Some(s) => s },f._1).invoke(lr,(f._2 match {case Some(value) => value.asInstanceOf[f._2.type ] })))
lr

zipFields.map { f =>
val fieldName = f._1
val fieldValue = f._2 match {case Some(s) => s }
getMethode(model,fieldValue,fieldName)
.invoke(model,fieldValue.asInstanceOf[f._2.type])
}
model
}
}
object LogisticRegression {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package io.github.jsarni.CaraStage.ModelStage

import io.github.jsarni.CaraStage.Annotation.MapperConstructor
import org.apache.spark.ml.classification.{RandomForestClassifier => SparkML}
import org.apache.spark.ml.PipelineStage
import scala.util.Try

case class RandomForestClassifier(
CheckpointInterval: Option[Int], FeaturesCol: Option[String], Impurity: Option[String], LabelCol: Option[String], LeafCol: Option[String],
MaxBins: Option[Int], MaxDepth: Option[Int], MinInfoGain: Option[Double],MinInstancesPerNode: Option[Int], MinWeightFractionPerNode: Option[Double],
PredictionCol: Option[String], ProbabilityCol: Option[String], RawPredictionCol: Option[String], Seed: Option[Long], Thresholds: Option[Array[Double]],
WeightCol: Option[String], FeatureSubsetStrategy: Option[String], SubsamplingRate: Option[Double], NumTrees: Option[Int]
)
extends CaraModel {

@MapperConstructor
def this(params: Map[String, String]) = {
this(
params.get("CheckpointInterval").map(_.toInt),
params.get("FeaturesCol"),
params.get("Impurity"),
params.get("LabelCol"),
params.get("LeafCol"),
params.get("MaxBins").map(_.toInt),
params.get("MaxDepth").map(_.toInt),
params.get("MinInfoGain").map(_.toDouble),
params.get("MinInstancesPerNode").map(_.toInt),
params.get("MinWeightFractionPerNode").map(_.toDouble),
params.get("PredictionCol"),
params.get("ProbabilityCol"),
params.get("RawPredictionCol"),
params.get("Seed").map(_.toLong),
params.get("Thresholds").map(_.split(",").map(_.toDouble)),
params.get("WeightCol"),
params.get("FeatureSubsetStrategy"),
params.get("SubsamplingRate").map(_.toDouble),
params.get("NumTrees").map(_.toInt)
)
}

override def build(): Try[PipelineStage] = Try {
val model = new SparkML()
val definedFields = this.getClass.getDeclaredFields.filter(f => f.get(this).asInstanceOf[Option[Any]].isDefined)
val names = definedFields.map(f => f.getName)
val values = definedFields.map(f => f.get(this))
val zipFields = names zip values
zipFields.map { f =>
val fieldName = f._1
val fieldValue = f._2 match {case Some(s) => s }
getMethode(model,fieldValue,fieldName)
.invoke(model,fieldValue.asInstanceOf[f._2.type])
}
model
}
}
object RandomForestClassifier {
def apply(params: Map[String, String]): RandomForestClassifier = new RandomForestClassifier(params)
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package io.github.jsarni.CaraStage.ModelStage

import org.apache.spark.ml.classification.{DecisionTreeClassifier => SparkML}
import io.github.jsarni.TestBase

class DecisionTreeClassifierTest extends TestBase {

"build" should "Create an lr model and set all parameters with there args values or set default ones" in {
val params = Map(
"CheckpointInterval" -> "10",
"FeaturesCol" -> "FeatureCol",
"Impurity" -> "entropy",
"LabelCol" -> "LabelCol",
"LeafCol" -> "LeafCol",
"MaxBins" -> "10",
"MaxDepth" -> "5",
"MinInfoGain"-> "0.02",
"MinInstancesPerNode" -> "2",
"MinWeightFractionPerNode" -> "0.03",
"PredictionCol" -> "PredictionCol",
"ProbabilityCol" -> "ProbabilityCol",
"RawPredictionCol" -> "RawPredictionCol",
"Seed" -> "124555",
"Thresholds" -> "0.2, 0.04",
"WeightCol" -> "1.2"
)
val dTree = DecisionTreeClassifier(params)
val dTreeWithTwoParams = new SparkML()
.setCheckpointInterval(5)
.setMaxDepth(10)

val expectedResult = List(
new SparkML()
.setCheckpointInterval(10)
.setFeaturesCol("FeatureCol")
.setImpurity("entropy")
.setLabelCol("LabelCol")
.setLeafCol("LeafCol")
.setMaxBins(10)
.setMaxDepth(5)
.setMinInfoGain(0.02)
.setMinInstancesPerNode(2)
.setMinWeightFractionPerNode(0.03)
.setPredictionCol("PredictionCol")
.setProbabilityCol("ProbabilityCol")
.setRawPredictionCol("RawPredictionCol")
.setSeed(124555.toLong)
.setWeightCol("1.2")
.setThresholds(Array(0.2, 0.04))

)
dTree.build().isSuccess shouldBe true

val res = List(dTree.build().get)
val resParameters = res.map(_.extractParamMap().toSeq.map(_.value))
.map(_.map(elem =>
if (elem.isInstanceOf[Array[_]]) elem.asInstanceOf[Array[_]].toList
else List(elem)))
.flatten
.flatten
val expectedParameters = expectedResult.map(_.extractParamMap().toSeq.map(_.value))
.map(_.map(elem =>
if (elem.isInstanceOf[Array[_]]) elem.asInstanceOf[Array[_]].toList
else List(elem)))
.flatten
.flatten

resParameters should contain theSameElementsAs expectedParameters

// Test default values of unset params
dTreeWithTwoParams.getImpurity shouldBe "gini"
dTreeWithTwoParams.getMaxBins shouldBe 32
dTreeWithTwoParams.getMinInfoGain shouldBe 0.0
}

"GetMethode" should "Return the appropriate methode by it's name" in {
val params = Map(
"CheckpointInterval" -> "10",
"FeaturesCol" -> "FeatureCol",
"Impurity" -> "entropy",
"LabelCol" -> "LabelCol",
"LeafCol" -> "LeafCol",
"MaxBins" -> "10",
"MaxDepth" -> "5",
"MinInfoGain"-> "0.02",
"MinInstancesPerNode" -> "2",
"MinWeightFractionPerNode" -> "0.03",
"PredictionCol" -> "PredictionCol",
"ProbabilityCol" -> "ProbabilityCol",
"RawPredictionCol" -> "RawPredictionCol",
"Seed" -> "124555",
"Thresholds" -> "0.2, 0.04",
"WeightCol" -> "1.2"
)
val caraLr = DecisionTreeClassifier(params)
val model =caraLr.build().get.asInstanceOf[SparkML]

caraLr.getMethode(model,10.toLong,"Seed").getName shouldBe "setSeed"
caraLr.getMethode(model,"PredictCol","PredictionCol").getName shouldBe "setPredictionCol"
caraLr.getMethode(model, 10 ,"CheckpointInterval").getName shouldBe "setCheckpointInterval"

}

}
Loading

0 comments on commit 93aeef5

Please sign in to comment.