Skip to content

Commit

Permalink
Feature/model schema (#15)
Browse files Browse the repository at this point in the history
* LogisticRegressionTest contains error to clear

* Finalize LogisticRegression's class and  tests

* refactor names to caml case and correct spaces

* Adjust LogisticRegretion format code and add DecisionTreeClassifier model class's and test's

* Add GBTClassifier model class's and tests

* tests not ended

* finilize tests new models classes

* CarastageMapper update

* update caraMapperModel

* Add Kmeans, LDA and NaiveBayes models and class's tests

Co-authored-by: merzouk <merzoukoumedda@gmail.com>
  • Loading branch information
merzouk13 and merzouk committed Jun 27, 2021
1 parent 93aeef5 commit 05e6c6e
Show file tree
Hide file tree
Showing 7 changed files with 359 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ trait CaraStageMapper {
GBTClassifier(stageDescription.params)
case "DecisionTreeClassifier" =>
DecisionTreeClassifier(stageDescription.params)
case "KMeans" =>
KMeans(stageDescription.params)
case "LDA" =>
LDA(stageDescription.params)
case "NaiveBayes" =>
NaiveBayes(stageDescription.params)
case _ => throw
new Exception(s"${stageDescription.stageName} is not a valid Cara Stage name. Please verify your Yaml File")
}
Expand Down
46 changes: 46 additions & 0 deletions src/main/scala/io/github/jsarni/CaraStage/ModelStage/KMeans.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.github.jsarni.CaraStage.ModelStage

import io.github.jsarni.CaraStage.Annotation.MapperConstructor
import org.apache.spark.ml.PipelineStage
import org.apache.spark.ml.clustering.{KMeans => SparkML}
import scala.util.Try


case class KMeans(DistanceMeasure : Option[String], FeaturesCol : Option[String], K : Option[Int], MaxIter : Option[Int],
PredictionCol : Option[String], Seed : Option[Long], Tol : Option[Double], WeightCol : Option[String] )

extends CaraModel {

@MapperConstructor
def this(params: Map[String, String]) = {
this(
params.get("DistanceMeasure"),
params.get("FeaturesCol"),
params.get("K").map(_.toInt),
params.get("MaxIter").map(_.toInt),
params.get("PredictionCol"),
params.get("Seed").map(_.toLong),
params.get("Tol").map(_.toDouble),
params.get("WeightCol")

)
}

override def build(): Try[PipelineStage] = Try {
val model = new SparkML()
val definedFields = this.getClass.getDeclaredFields.filter(f => f.get(this).asInstanceOf[Option[Any]].isDefined)
val names = definedFields.map(f => f.getName)
val values = definedFields.map(f => f.get(this))
val zipFields = names zip values
zipFields.map { f =>
val fieldName = f._1
val fieldValue = f._2 match {case Some(s) => s }
getMethode(model,fieldValue,fieldName)
.invoke(model,fieldValue.asInstanceOf[f._2.type])
}
model
}
}
object KMeans {
def apply(params: Map[String, String]): KMeans = new KMeans(params)
}
48 changes: 48 additions & 0 deletions src/main/scala/io/github/jsarni/CaraStage/ModelStage/LDA.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.github.jsarni.CaraStage.ModelStage

import io.github.jsarni.CaraStage.Annotation.MapperConstructor
import org.apache.spark.ml.PipelineStage
import org.apache.spark.ml.clustering.{LDA => SparkML}
import scala.util.Try


case class LDA(CheckpointInterval : Option[Int], DocConcentration : Option[Array[Double]], FeaturesCol : Option[String], K : Option[Int], MaxIter : Option[Int],
Optimizer : Option[String], Seed : Option[Long], SubsamplingRate : Option[Double], TopicConcentration : Option[Double], TopicDistributionCol : Option[String],
)

extends CaraModel {

@MapperConstructor
def this(params: Map[String, String]) = {
this(
params.get("CheckpointInterval").map(_.toInt),
params.get("DocConcentration").map(_.split(",").map(_.toDouble)),
params.get("FeaturesCol"),
params.get("K").map(_.toInt),
params.get("MaxIter").map(_.toInt),
params.get("Optimizer"),
params.get("Seed").map(_.toLong),
params.get("SubsamplingRate").map(_.toDouble),
params.get("TopicConcentration").map(_.toDouble),
params.get("TopicDistributionCol")
)
}

override def build(): Try[PipelineStage] = Try {
val model = new SparkML()
val definedFields = this.getClass.getDeclaredFields.filter(f => f.get(this).asInstanceOf[Option[Any]].isDefined)
val names = definedFields.map(f => f.getName)
val values = definedFields.map(f => f.get(this))
val zipFields = names zip values
zipFields.map { f =>
val fieldName = f._1
val fieldValue = f._2 match {case Some(s) => s }
getMethode(model,fieldValue,fieldName)
.invoke(model,fieldValue.asInstanceOf[f._2.type])
}
model
}
}
object LDA {
def apply(params: Map[String, String]): LDA = new LDA(params)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.github.jsarni.CaraStage.ModelStage
import io.github.jsarni.CaraStage.Annotation.MapperConstructor
import org.apache.spark.ml.PipelineStage
import org.apache.spark.ml.classification.{NaiveBayes => SparkML}
import scala.util.Try

case class NaiveBayes(FeaturesCol : Option[String], LabelCol : Option[String], ModelType : Option[String], PredictionCol : Option[String], ProbabilityCol : Option[String],
RawPredictionCol : Option[String], Smoothing : Option[Double], Thresholds : Option[Array[Double]], WeightCol : Option[String])

extends CaraModel {

@MapperConstructor
def this(params: Map[String, String]) = {
this(
params.get("FeaturesCol"),
params.get("LabelCol"),
params.get("ModelType"),
params.get("PredictionCol"),
params.get("ProbabilityCol"),
params.get("RawPredictionCol"),
params.get("Smoothing").map(_.toDouble),
params.get("Thresholds").map(_.split(",").map(_.toDouble)),
params.get("WeightCol")

)
}

override def build(): Try[PipelineStage] = Try {
val model = new SparkML()
val definedFields = this.getClass.getDeclaredFields.filter(f => f.get(this).asInstanceOf[Option[Any]].isDefined)
val names = definedFields.map(f => f.getName)
val values = definedFields.map(f => f.get(this))
val zipFields = names zip values
zipFields.map { f =>
val fieldName = f._1
val fieldValue = f._2 match {case Some(s) => s }
getMethode(model,fieldValue,fieldName)
.invoke(model,fieldValue.asInstanceOf[f._2.type])
}
model
}
}
object NaiveBayes {
def apply(params: Map[String, String]): NaiveBayes = new NaiveBayes(params)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package io.github.jsarni.CaraStage.ModelStage

import org.apache.spark.ml.clustering.{KMeans => SparkML}
import io.github.jsarni.TestBase

class KMeansTest extends TestBase {
"build" should "Create an lr model and set all parameters with there args values or set default ones" in {
val params = Map(
"DistanceMeasure" -> "euclidean",
"FeaturesCol" -> "FeaturesCol",
"K" -> "5",
"MaxIter" -> "12",
"PredictionCol" -> "PredictionCol",
"Seed" -> "1214151",
"Tol" -> "0.2",
"WeightCol" -> "WeightColname"
)

val Kmeans = KMeans(params)
val KmeansWithTwoParams = new SparkML()
.setTol(0.3)
.setDistanceMeasure("euclidean")

val expectedResult = List(
new SparkML()
.setDistanceMeasure("euclidean")
.setFeaturesCol("FeaturesCol")
.setK(5)
.setMaxIter(12)
.setPredictionCol("PredictionCol")
.setSeed(1214151)
.setTol(0.2)
.setWeightCol("WeightColname")
)
Kmeans.build().isSuccess shouldBe true

val res = List(Kmeans.build().get)
val resParameters = res.map(_.extractParamMap().toSeq.map(_.value))
val expectedParameters = expectedResult.map(_.extractParamMap().toSeq.map(_.value))

resParameters.head should contain theSameElementsAs expectedParameters.head

// Test default values of unset params
KmeansWithTwoParams.getMaxIter shouldBe 20
KmeansWithTwoParams.getK shouldBe 2

}
"GetMethode" should "Return the appropriate methode by it's name" in {
val params = Map(
"DistanceMeasure" -> "euclidean",
"FeaturesCol" -> "FeaturesCol",
"K" -> "5",
"MaxIter" -> "12",
"PredictionCol" -> "PredictionCol",
"Seed" -> "1214151",
"Tol" -> "0.2",
"WeightCol" -> "WeightColname"
)
val caraKmeans = KMeans(params)
val model =caraKmeans.build().get.asInstanceOf[SparkML]

caraKmeans.getMethode(model,10,"MaxIter").getName shouldBe "setMaxIter"
caraKmeans.getMethode(model,2,"K").getName shouldBe "setK"
caraKmeans.getMethode(model, "euclidean" ,"DistanceMeasure").getName shouldBe "setDistanceMeasure"

}


}
76 changes: 76 additions & 0 deletions src/test/scala/io/github/jsarni/CaraStage/ModelStage/LDATest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package io.github.jsarni.CaraStage.ModelStage

import org.apache.spark.ml.clustering.{LDA => SparkML}
import io.github.jsarni.TestBase

class LDATest extends TestBase {
"build" should "Create an lr model and set all parameters with there args values or set default ones" in {
val params = Map(
"CheckpointInterval" -> "3",
"DocConcentration" -> "1.02, 1.5, 12.4",
"FeaturesCol" -> "FeaturesCol",
"K" -> "6",
"MaxIter" -> "15",
"Optimizer" -> "online",
"Seed" -> "12454535",
"SubsamplingRate" -> "0.066",
"TopicConcentration" -> "0.23",
"TopicDistributionCol" -> "gamma"
)

val LDAModel = LDA(params)
val LDAWithTwoParams = new SparkML()
.setSeed(6464845)
.setTopicDistributionCol("gamma")

val expectedResult = List(
new SparkML()
.setCheckpointInterval(3)
.setDocConcentration(Array(1.02, 1.5, 12.4))
.setFeaturesCol("FeaturesCol")
.setK(6)
.setMaxIter(15)
.setOptimizer("online")
.setSeed(12454535)
.setSubsamplingRate(0.066)
.setTopicConcentration(0.23)
.setTopicDistributionCol("gamma")
)
LDAModel.build().isSuccess shouldBe true

val res = List(LDAModel.build().get)
val resParameters = res.map(_.extractParamMap().toSeq.map(_.value))
val expectedParameters = expectedResult.map(_.extractParamMap().toSeq.map(_.value))

resParameters.head should contain theSameElementsAs expectedParameters.head

// Test default values of unset params
LDAWithTwoParams.getMaxIter shouldBe 20
LDAWithTwoParams.getK shouldBe 10
LDAWithTwoParams.getSubsamplingRate shouldBe 0.05

}
"GetMethode" should "Return the appropriate methode by it's name" in {
val params = Map(
"CheckpointInterval" -> "3",
"DocConcentration" -> "1.02, 1.5, 12.4",
"FeaturesCol" -> "FeaturesCol",
"K" -> "6",
"MaxIter" -> "15",
"Optimizer" -> "online",
"Seed" -> "12454535",
"SubsamplingRate" -> "0.066",
"TopicConcentration" -> "0.23",
"TopicDistributionCol" -> "gamma"
)
val caraLDA = LDA(params)
val model =caraLDA.build().get.asInstanceOf[SparkML]

caraLDA.getMethode(model,10,"MaxIter").getName shouldBe "setMaxIter"
caraLDA.getMethode(model,2,"K").getName shouldBe "setK"
caraLDA.getMethode(model, "gamma" ,"TopicDistributionCol").getName shouldBe "setTopicDistributionCol"

}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package io.github.jsarni.CaraStage.ModelStage

import org.apache.spark.ml.classification.{NaiveBayes => SparkML}
import io.github.jsarni.TestBase

class NaiveBayesTest extends TestBase {
"build" should "Create an lr model and set all parameters with there args values or set default ones" in {
val params = Map(
"FeaturesCol" -> "FeaturesCol",
"LabelCol" -> "LabelCol",
"ModelType" -> "gaussian",
"PredictionCol" -> "PredictionCol",
"ProbabilityCol" -> "ProbabilityCol",
"RawPredictionCol" -> "RawPredictionCol",
"Smoothing" -> "0.8",
"Thresholds" -> "0.2, 0.4, 1.05",
"WeightCol" -> "WeightCol"
)
val NBayes = NaiveBayes(params)
val NBayesWithTwoParams = new SparkML()
.setFeaturesCol("FeaturesCol")
.setThresholds(Array(0.5, 1.44))

val expectedResult = List(
new SparkML()
.setFeaturesCol("FeaturesCol")
.setLabelCol("LabelCol")
.setModelType("gaussian")
.setPredictionCol("PredictionCol")
.setProbabilityCol("ProbabilityCol")
.setRawPredictionCol("RawPredictionCol")
.setSmoothing(0.8)
.setThresholds(Array(0.2, 0.4, 1.05))
.setWeightCol("WeightCol")
)
NBayes.build().isSuccess shouldBe true

val res = List(NBayes.build().get)
val resParameters = res.map(_.extractParamMap().toSeq.map(_.value))
val expectedParameters = expectedResult.map(_.extractParamMap().toSeq.map(_.value))

resParameters.head should contain theSameElementsAs expectedParameters.head

// Test default values of unset params
NBayesWithTwoParams.getSmoothing shouldBe 1.0
NBayesWithTwoParams.getModelType shouldBe "multinomial"

}
"GetMethode" should "Return the appropriate methode by it's name" in {
val params = Map(
"FeaturesCol" -> "FeaturesCol",
"LabelCol" -> "LabelCol",
"ModelType" -> "gaussian",
"PredictionCol" -> "PredictionCol",
"ProbabilityCol" -> "ProbabilityCol",
"RawPredictionCol" -> "RawPredictionCol",
"Smoothing" -> "0.8",
"Thresholds" -> "0.2, 0.4, .05",
"WeightCol" -> "WeightCol"
)
val caraNaivebayes = NaiveBayes(params)
val model =caraNaivebayes.build().get.asInstanceOf[SparkML]

caraNaivebayes.getMethode(model,"String","FeaturesCol").getName shouldBe "setFeaturesCol"
caraNaivebayes.getMethode(model,0.0,"Smoothing").getName shouldBe "setSmoothing"
caraNaivebayes.getMethode(model, Array(1.0,0.2) ,"Thresholds").getName shouldBe "setThresholds"

}
}

0 comments on commit 05e6c6e

Please sign in to comment.