Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/model schema #15

Merged
merged 15 commits into from
Jun 27, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ trait CaraStageMapper {
GBTClassifier(stageDescription.params)
case "DecisionTreeClassifier" =>
DecisionTreeClassifier(stageDescription.params)
case "KMeans" =>
KMeans(stageDescription.params)
case "LDA" =>
LDA(stageDescription.params)
case "NaiveBayes" =>
NaiveBayes(stageDescription.params)
case _ => throw
new Exception(s"${stageDescription.stageName} is not a valid Cara Stage name. Please verify your Yaml File")
}
Expand Down
46 changes: 46 additions & 0 deletions src/main/scala/io/github/jsarni/CaraStage/ModelStage/KMeans.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.github.jsarni.CaraStage.ModelStage

import io.github.jsarni.CaraStage.Annotation.MapperConstructor
import org.apache.spark.ml.PipelineStage
import org.apache.spark.ml.clustering.{KMeans => SparkML}
import scala.util.Try


case class KMeans(DistanceMeasure : Option[String], FeaturesCol : Option[String], K : Option[Int], MaxIter : Option[Int],
PredictionCol : Option[String], Seed : Option[Long], Tol : Option[Double], WeightCol : Option[String] )

extends CaraModel {

@MapperConstructor
def this(params: Map[String, String]) = {
this(
params.get("DistanceMeasure"),
params.get("FeaturesCol"),
params.get("K").map(_.toInt),
params.get("MaxIter").map(_.toInt),
params.get("PredictionCol"),
params.get("Seed").map(_.toLong),
params.get("Tol").map(_.toDouble),
params.get("WeightCol")

)
}

override def build(): Try[PipelineStage] = Try {
val model = new SparkML()
val definedFields = this.getClass.getDeclaredFields.filter(f => f.get(this).asInstanceOf[Option[Any]].isDefined)
val names = definedFields.map(f => f.getName)
val values = definedFields.map(f => f.get(this))
val zipFields = names zip values
zipFields.map { f =>
val fieldName = f._1
val fieldValue = f._2 match {case Some(s) => s }
getMethode(model,fieldValue,fieldName)
.invoke(model,fieldValue.asInstanceOf[f._2.type])
}
model
}
}
object KMeans {
def apply(params: Map[String, String]): KMeans = new KMeans(params)
}
48 changes: 48 additions & 0 deletions src/main/scala/io/github/jsarni/CaraStage/ModelStage/LDA.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.github.jsarni.CaraStage.ModelStage

import io.github.jsarni.CaraStage.Annotation.MapperConstructor
import org.apache.spark.ml.PipelineStage
import org.apache.spark.ml.clustering.{LDA => SparkML}
import scala.util.Try


case class LDA(CheckpointInterval : Option[Int], DocConcentration : Option[Array[Double]], FeaturesCol : Option[String], K : Option[Int], MaxIter : Option[Int],
Optimizer : Option[String], Seed : Option[Long], SubsamplingRate : Option[Double], TopicConcentration : Option[Double], TopicDistributionCol : Option[String],
)

extends CaraModel {

@MapperConstructor
def this(params: Map[String, String]) = {
this(
params.get("CheckpointInterval").map(_.toInt),
params.get("DocConcentration").map(_.split(",").map(_.toDouble)),
params.get("FeaturesCol"),
params.get("K").map(_.toInt),
params.get("MaxIter").map(_.toInt),
params.get("Optimizer"),
params.get("Seed").map(_.toLong),
params.get("SubsamplingRate").map(_.toDouble),
params.get("TopicConcentration").map(_.toDouble),
params.get("TopicDistributionCol")
)
}

override def build(): Try[PipelineStage] = Try {
val model = new SparkML()
val definedFields = this.getClass.getDeclaredFields.filter(f => f.get(this).asInstanceOf[Option[Any]].isDefined)
val names = definedFields.map(f => f.getName)
val values = definedFields.map(f => f.get(this))
val zipFields = names zip values
zipFields.map { f =>
val fieldName = f._1
val fieldValue = f._2 match {case Some(s) => s }
getMethode(model,fieldValue,fieldName)
.invoke(model,fieldValue.asInstanceOf[f._2.type])
}
model
}
}
object LDA {
def apply(params: Map[String, String]): LDA = new LDA(params)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.github.jsarni.CaraStage.ModelStage
import io.github.jsarni.CaraStage.Annotation.MapperConstructor
import org.apache.spark.ml.PipelineStage
import org.apache.spark.ml.classification.{NaiveBayes => SparkML}
import scala.util.Try

case class NaiveBayes(FeaturesCol : Option[String], LabelCol : Option[String], ModelType : Option[String], PredictionCol : Option[String], ProbabilityCol : Option[String],
RawPredictionCol : Option[String], Smoothing : Option[Double], Thresholds : Option[Array[Double]], WeightCol : Option[String])

extends CaraModel {

@MapperConstructor
def this(params: Map[String, String]) = {
this(
params.get("FeaturesCol"),
params.get("LabelCol"),
params.get("ModelType"),
params.get("PredictionCol"),
params.get("ProbabilityCol"),
params.get("RawPredictionCol"),
params.get("Smoothing").map(_.toDouble),
params.get("Thresholds").map(_.split(",").map(_.toDouble)),
params.get("WeightCol")

)
}

override def build(): Try[PipelineStage] = Try {
val model = new SparkML()
val definedFields = this.getClass.getDeclaredFields.filter(f => f.get(this).asInstanceOf[Option[Any]].isDefined)
val names = definedFields.map(f => f.getName)
val values = definedFields.map(f => f.get(this))
val zipFields = names zip values
zipFields.map { f =>
val fieldName = f._1
val fieldValue = f._2 match {case Some(s) => s }
getMethode(model,fieldValue,fieldName)
.invoke(model,fieldValue.asInstanceOf[f._2.type])
}
model
}
}
object NaiveBayes {
def apply(params: Map[String, String]): NaiveBayes = new NaiveBayes(params)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package io.github.jsarni.CaraStage.ModelStage

import org.apache.spark.ml.clustering.{KMeans => SparkML}
import io.github.jsarni.TestBase

class KMeansTest extends TestBase {
"build" should "Create an lr model and set all parameters with there args values or set default ones" in {
val params = Map(
"DistanceMeasure" -> "euclidean",
"FeaturesCol" -> "FeaturesCol",
"K" -> "5",
"MaxIter" -> "12",
"PredictionCol" -> "PredictionCol",
"Seed" -> "1214151",
"Tol" -> "0.2",
"WeightCol" -> "WeightColname"
)

val Kmeans = KMeans(params)
val KmeansWithTwoParams = new SparkML()
.setTol(0.3)
.setDistanceMeasure("euclidean")

val expectedResult = List(
new SparkML()
.setDistanceMeasure("euclidean")
.setFeaturesCol("FeaturesCol")
.setK(5)
.setMaxIter(12)
.setPredictionCol("PredictionCol")
.setSeed(1214151)
.setTol(0.2)
.setWeightCol("WeightColname")
)
Kmeans.build().isSuccess shouldBe true

val res = List(Kmeans.build().get)
val resParameters = res.map(_.extractParamMap().toSeq.map(_.value))
val expectedParameters = expectedResult.map(_.extractParamMap().toSeq.map(_.value))

resParameters.head should contain theSameElementsAs expectedParameters.head

// Test default values of unset params
KmeansWithTwoParams.getMaxIter shouldBe 20
KmeansWithTwoParams.getK shouldBe 2

}
"GetMethode" should "Return the appropriate methode by it's name" in {
val params = Map(
"DistanceMeasure" -> "euclidean",
"FeaturesCol" -> "FeaturesCol",
"K" -> "5",
"MaxIter" -> "12",
"PredictionCol" -> "PredictionCol",
"Seed" -> "1214151",
"Tol" -> "0.2",
"WeightCol" -> "WeightColname"
)
val caraKmeans = KMeans(params)
val model =caraKmeans.build().get.asInstanceOf[SparkML]

caraKmeans.getMethode(model,10,"MaxIter").getName shouldBe "setMaxIter"
caraKmeans.getMethode(model,2,"K").getName shouldBe "setK"
caraKmeans.getMethode(model, "euclidean" ,"DistanceMeasure").getName shouldBe "setDistanceMeasure"

}


}
76 changes: 76 additions & 0 deletions src/test/scala/io/github/jsarni/CaraStage/ModelStage/LDATest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package io.github.jsarni.CaraStage.ModelStage

import org.apache.spark.ml.clustering.{LDA => SparkML}
import io.github.jsarni.TestBase

class LDATest extends TestBase {
"build" should "Create an lr model and set all parameters with there args values or set default ones" in {
val params = Map(
"CheckpointInterval" -> "3",
"DocConcentration" -> "1.02, 1.5, 12.4",
"FeaturesCol" -> "FeaturesCol",
"K" -> "6",
"MaxIter" -> "15",
"Optimizer" -> "online",
"Seed" -> "12454535",
"SubsamplingRate" -> "0.066",
"TopicConcentration" -> "0.23",
"TopicDistributionCol" -> "gamma"
)

val LDAModel = LDA(params)
val LDAWithTwoParams = new SparkML()
.setSeed(6464845)
.setTopicDistributionCol("gamma")

val expectedResult = List(
new SparkML()
.setCheckpointInterval(3)
.setDocConcentration(Array(1.02, 1.5, 12.4))
.setFeaturesCol("FeaturesCol")
.setK(6)
.setMaxIter(15)
.setOptimizer("online")
.setSeed(12454535)
.setSubsamplingRate(0.066)
.setTopicConcentration(0.23)
.setTopicDistributionCol("gamma")
)
LDAModel.build().isSuccess shouldBe true

val res = List(LDAModel.build().get)
val resParameters = res.map(_.extractParamMap().toSeq.map(_.value))
val expectedParameters = expectedResult.map(_.extractParamMap().toSeq.map(_.value))

resParameters.head should contain theSameElementsAs expectedParameters.head

// Test default values of unset params
LDAWithTwoParams.getMaxIter shouldBe 20
LDAWithTwoParams.getK shouldBe 10
LDAWithTwoParams.getSubsamplingRate shouldBe 0.05

}
"GetMethode" should "Return the appropriate methode by it's name" in {
val params = Map(
"CheckpointInterval" -> "3",
"DocConcentration" -> "1.02, 1.5, 12.4",
"FeaturesCol" -> "FeaturesCol",
"K" -> "6",
"MaxIter" -> "15",
"Optimizer" -> "online",
"Seed" -> "12454535",
"SubsamplingRate" -> "0.066",
"TopicConcentration" -> "0.23",
"TopicDistributionCol" -> "gamma"
)
val caraLDA = LDA(params)
val model =caraLDA.build().get.asInstanceOf[SparkML]

caraLDA.getMethode(model,10,"MaxIter").getName shouldBe "setMaxIter"
caraLDA.getMethode(model,2,"K").getName shouldBe "setK"
caraLDA.getMethode(model, "gamma" ,"TopicDistributionCol").getName shouldBe "setTopicDistributionCol"

}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package io.github.jsarni.CaraStage.ModelStage

import org.apache.spark.ml.classification.{NaiveBayes => SparkML}
import io.github.jsarni.TestBase

class NaiveBayesTest extends TestBase {
"build" should "Create an lr model and set all parameters with there args values or set default ones" in {
val params = Map(
"FeaturesCol" -> "FeaturesCol",
"LabelCol" -> "LabelCol",
"ModelType" -> "gaussian",
"PredictionCol" -> "PredictionCol",
"ProbabilityCol" -> "ProbabilityCol",
"RawPredictionCol" -> "RawPredictionCol",
"Smoothing" -> "0.8",
"Thresholds" -> "0.2, 0.4, 1.05",
"WeightCol" -> "WeightCol"
)
val NBayes = NaiveBayes(params)
val NBayesWithTwoParams = new SparkML()
.setFeaturesCol("FeaturesCol")
.setThresholds(Array(0.5, 1.44))

val expectedResult = List(
new SparkML()
.setFeaturesCol("FeaturesCol")
.setLabelCol("LabelCol")
.setModelType("gaussian")
.setPredictionCol("PredictionCol")
.setProbabilityCol("ProbabilityCol")
.setRawPredictionCol("RawPredictionCol")
.setSmoothing(0.8)
.setThresholds(Array(0.2, 0.4, 1.05))
.setWeightCol("WeightCol")
)
NBayes.build().isSuccess shouldBe true

val res = List(NBayes.build().get)
val resParameters = res.map(_.extractParamMap().toSeq.map(_.value))
val expectedParameters = expectedResult.map(_.extractParamMap().toSeq.map(_.value))

resParameters.head should contain theSameElementsAs expectedParameters.head

// Test default values of unset params
NBayesWithTwoParams.getSmoothing shouldBe 1.0
NBayesWithTwoParams.getModelType shouldBe "multinomial"

}
"GetMethode" should "Return the appropriate methode by it's name" in {
val params = Map(
"FeaturesCol" -> "FeaturesCol",
"LabelCol" -> "LabelCol",
"ModelType" -> "gaussian",
"PredictionCol" -> "PredictionCol",
"ProbabilityCol" -> "ProbabilityCol",
"RawPredictionCol" -> "RawPredictionCol",
"Smoothing" -> "0.8",
"Thresholds" -> "0.2, 0.4, .05",
"WeightCol" -> "WeightCol"
)
val caraNaivebayes = NaiveBayes(params)
val model =caraNaivebayes.build().get.asInstanceOf[SparkML]

caraNaivebayes.getMethode(model,"String","FeaturesCol").getName shouldBe "setFeaturesCol"
caraNaivebayes.getMethode(model,0.0,"Smoothing").getName shouldBe "setSmoothing"
caraNaivebayes.getMethode(model, Array(1.0,0.2) ,"Thresholds").getName shouldBe "setThresholds"

}
}