In [70]:
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD

In [2]:
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.tree.model.RandomForestModel
import org.apache.spark.mllib.util.MLUtils._
import org.apache.spark.mllib.linalg.Vector

In [3]:
%AddDeps com.databricks spark-csv_2.11 1.4.0 --transitive

Marking com.databricks:spark-csv_2.11:1.4.0 for download
Preparing to fetch from:
-> file:/tmp/toree_add_deps8414302016316025609/
-> https://repo1.maven.org/maven2
-> New file at /tmp/toree_add_deps8414302016316025609/https/repo1.maven.org/maven2/org/apache/commons/commons-csv/1.1/commons-csv-1.1.jar
-> New file at /tmp/toree_add_deps8414302016316025609/https/repo1.maven.org/maven2/com/databricks/spark-csv_2.11/1.4.0/spark-csv_2.11-1.4.0.jar
-> New file at /tmp/toree_add_deps8414302016316025609/https/repo1.maven.org/maven2/com/univocity/univocity-parsers/1.5.1/univocity-parsers-1.5.1.jar


## Load and explore data:

In [4]:
val sqlC = new SQLContext(sc)

In [5]:
val fName = "file:/home/jordi/Projects/demos/mock_NPPmodel_data.csv"

In [43]:
var df = sqlC.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").option("delimiter", ";").load(fName)
// select only the columns needed by the model:
df = df.select("Mileage", "Age", "Power", "Body", "Fuel", "VehicleValue")
df.show(10)

+-------+---+-----+---------+------+------------+
|Mileage|Age|Power|     Body|  Fuel|VehicleValue|
+-------+---+-----+---------+------+------------+
|  59300| 30|   85|  Station|DIESEL|       10209|
|  81980| 24|  130|    Coupe|DIESEL|       17367|
|  78480| 36|  120|Offroader|DIESEL|       18983|
|   8230| 12|  110|Limousine|BENZIN|       21419|
|   3480| 12|  130|    Coupe|BENZIN|       26788|
|  24180|  6|  145|Smallsize|DIESEL|       36965|
|   6430|  6|  120|Limousine|DIESEL|       37712|
|  23680|  6|  130|Smallsize|BENZIN|       26880|
|  16480| 12|  130|Smallsize|BENZIN|       32596|
|  25799| 12|  185|Offroader|DIESEL|       47805|
+-------+---+-----+---------+------+------------+
only showing top 10 rows



In [44]:
df.describe().show()

+-------+-----------------+------------------+------------------+------------------+
|summary|          Mileage|               Age|             Power|      VehicleValue|
+-------+-----------------+------------------+------------------+------------------+
|  count|            23840|             23840|             23840|             23840|
|   mean|41915.34110738255|18.703439597315437|121.32151845637584|19967.896140939596|
| stddev|51710.27847223828| 13.92402864403154|47.374738992271546|10848.195784641135|
|    min|                5|                 6|                31|              1643|
|    max|          3054512|               138|               468|            144498|
+-------+-----------------+------------------+------------------+------------------+



In [45]:
df.select("Body").groupBy("Body").count.show()

                                                                                +-----------+-----+
|       Body|count|
+-----------+-----+
|    Compact|  106|
|      Coupe| 3226|
|  Smallsize| 5479|
|  Limousine| 8923|
|Convertable| 1912|
|  Offroader| 2022|
|    Station| 2172|
+-----------+-----+



#### Split train and test sets:

In [46]:
var Array(dfTrain, dfTest) = df.randomSplit(Array(0.8, 0.2), seed = 13)
println("num rows train set: " + dfTrain.count())
println("num rows test set :  " + dfTest.count())

num rows train set: 19032
num rows test set :  4808


## Create pipeline with training set:

In [47]:
// the label column (response) must be double float and must be named "label":
dfTrain = dfTrain.withColumn("label", dfTrain.col("VehicleValue").cast(DoubleType)).drop("VehicleValue")

In [48]:
val namesColsFeatures = List("Mileage", "Age", "Power", "Body", "Fuel")
val nameColsFeaturesStrings = Array("Body", "Fuel")

In [49]:
val index_transformers: Array[org.apache.spark.ml.PipelineStage] = nameColsFeaturesStrings.map(
  colName => new StringIndexer()
    .setInputCol(colName)
    .setOutputCol(s"${colName}_idx")
    .setHandleInvalid("error") // options: "skip", "error". With "skip", when the transformer is applied to data that has rows with new levels, the resulting df will skip those rows
)
new Pipeline().setStages(index_transformers).fit(dfTrain).transform(dfTrain).select("Body", "Body_idx", "Fuel", "Fuel_idx").show(5)

+---------+--------+------+--------+
|     Body|Body_idx|  Fuel|Fuel_idx|
+---------+--------+------+--------+
|Limousine|     0.0|BENZIN|     0.0|
|    Coupe|     2.0|BENZIN|     0.0|
|Limousine|     0.0|DIESEL|     1.0|
|    Coupe|     2.0|BENZIN|     0.0|
|    Coupe|     2.0|BENZIN|     0.0|
+---------+--------+------+--------+
only showing top 5 rows



In [50]:
val one_hot_encoders: Array[org.apache.spark.ml.PipelineStage] = nameColsFeaturesStrings.map(
  colName => new OneHotEncoder()
    .setInputCol(s"${colName}_idx")
    .setOutputCol(s"${colName}_vec")
)
new Pipeline().setStages(index_transformers++one_hot_encoders).fit(dfTrain).transform(dfTrain).select("Body_idx", "Body_vec", "Fuel_idx", "Fuel_vec").show(5)

+--------+-------------+--------+-------------+
|Body_idx|     Body_vec|Fuel_idx|     Fuel_vec|
+--------+-------------+--------+-------------+
|     0.0|(6,[0],[1.0])|     0.0|(1,[0],[1.0])|
|     2.0|(6,[2],[1.0])|     0.0|(1,[0],[1.0])|
|     0.0|(6,[0],[1.0])|     1.0|    (1,[],[])|
|     2.0|(6,[2],[1.0])|     0.0|(1,[0],[1.0])|
|     2.0|(6,[2],[1.0])|     0.0|(1,[0],[1.0])|
+--------+-------------+--------+-------------+
only showing top 5 rows



In [51]:
val assembler = new VectorAssembler().setInputCols(Array("Mileage", "Age", "Power", "Body_vec", "Fuel_vec")).setOutputCol("features")

In [52]:
// Create pipeline:
val pipelineDataTransform = new Pipeline().setStages(index_transformers ++ one_hot_encoders ++ Array(assembler))

In [53]:
// Fit pipeline with training data:
val pipelineDataTransformFitted: PipelineModel = pipelineDataTransform.fit(dfTrain)

In [54]:
// Save pipeline to HDFS:
pipelineDataTransformFitted.write.overwrite().save("storage/Pipeline")

In [55]:
// Transform data:
dfTrain = pipelineDataTransformFitted.transform(dfTrain).select("label", "features")

In [56]:
dfTrain.show(5)

+-------+--------------------+
|  label|            features|
+-------+--------------------+
|17643.0|(10,[0,1,2,3,9],[...|
|21158.0|(10,[0,1,2,5,9],[...|
|35256.0|(10,[0,1,2,3],[11...|
|14437.0|(10,[0,1,2,5,9],[...|
| 5708.0|(10,[0,1,2,5,9],[...|
+-------+--------------------+
only showing top 5 rows



## Train model:

In [57]:
// Transform df to an RDD[LabeledPoint]:
val rddTrain = dfTrain.map(row => LabeledPoint(row.getAs[Double]("label"), row(1).asInstanceOf[Vector]))

Parameters for the **Random Forest**:

In [58]:
// Set here specific parameters for the algorithm, otherwise just use the default ones form analytics.properties:
val categoricalFeaturesInfo = Map[Int, Int]() // ((indexCategoricalColumn, numDistinctCategories)) // or several:  Map[Int, Int]((1,3),(2,5)) The map tells us that feature with index 1 has 3 levels, and feature with index 2 has 5 levels
var numTrees = 10
var featureSubsetStrategy = "auto" // Number of features to consider for splits at each node. Supported: "auto", "all", "sqrt", "log2", "onethird". If "auto" is set, this parameter is set based on numTrees: if numTrees == 1, set to "all"; if numTrees > 1 (forest) set to "sqrt" for classification and to "onethird" for regression.
var impurity = "variance"
var maxDepth = 30 // max is 30
var maxBins = 50
val seedNum = 13

In [59]:
// Train model using the random forest algorithm:
val trainedRFmodel = RandomForest.trainRegressor(rddTrain, categoricalFeaturesInfo,
numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins, seedNum)

In [61]:
// Save model to HDFS:
trainedRFmodel.save(sc, "storage/RFmodel")

## Compute predictions

In [62]:
dfTest = dfTest.select("Mileage", "Age", "Power", "Body", "Fuel", "VehicleValue")
val dfTest0 = dfTest 
dfTest.show(5)

+-------+---+-----+---------+------+------------+
|Mileage|Age|Power|     Body|  Fuel|VehicleValue|
+-------+---+-----+---------+------+------------+
|      5|  6|   85|Limousine|BENZIN|       17643|
|     11|  6|  120|Limousine|DIESEL|       36762|
|     11| 18|  155|    Coupe|BENZIN|       12052|
|     11| 24|   65|Limousine|BENZIN|        6110|
|     11| 24|   75|Limousine|BENZIN|        5347|
+-------+---+-----+---------+------+------------+
only showing top 5 rows



In [63]:
// Load pipeline and RF model from HDFS:
val pipelineDataTransformFittedLoaded = PipelineModel.load("storage/Pipeline")
val trainedRFmodel = RandomForestModel.load(sc, "storage/RFmodel")

In [64]:
// Transform data:
dfTest = pipelineDataTransformFittedLoaded.transform(dfTest).select("features")
dfTest.show(5)

+--------------------+
|            features|
+--------------------+
|(10,[0,1,2,3,9],[...|
|(10,[0,1,2,3],[11...|
|(10,[0,1,2,5,9],[...|
|(10,[0,1,2,3,9],[...|
|(10,[0,1,2,3,9],[...|
+--------------------+
only showing top 5 rows



In [72]:
// Transform df to an RDD[LabeledPoint]:
val rddTest = dfTest.map(row => row(0).asInstanceOf[Vector])
// Compute predictions and transform to array:
val predictions = rddTest.map { features => trainedRFmodel.predict(features) }.collect()
// create the rows that will constitute the new Df:
val rows = dfTest0.rdd.zipWithIndex.map(_.swap).join(sc.parallelize(predictions).zipWithIndex.map(_.swap)).values.map { case (row: Row, x: Double) => Row.fromSeq(row.toSeq :+ x) }
// create the new Df:
val dfResult = sqlC.createDataFrame(rows, dfTest0.schema.add("predVehicleValue", DoubleType, false))
dfResult.show(5)

+-------+---+-----+---------+------+------------+------------------+
|Mileage|Age|Power|     Body|  Fuel|VehicleValue|  predVehicleValue|
+-------+---+-----+---------+------+------------+------------------+
|  62840| 24|  105|Limousine|DIESEL|       14802|11463.168594540179|
|   9631| 12|  200|Limousine|DIESEL|       36556| 36023.64761904762|
| 140470| 36|  130|Smallsize|DIESEL|        6660|11422.769262543354|
|  63492| 36|  165|Limousine|DIESEL|       14328|20916.493130392355|
|  67980| 18|   95|Smallsize|DIESEL|       17201|17847.394139007585|
+-------+---+-----+---------+------+------------+------------------+
only showing top 5 rows



## Evaluate model performance

In [75]:
// create N (train, test) pairs:
val folds = kFold(rddTrain, numFolds = 5, seedNum)

In [76]:
// train models and create (model, test) pairs
val modelsAndTest = folds.map { case (train, test) => (RandomForest.trainRegressor(train, categoricalFeaturesInfo,
  10, featureSubsetStrategy, impurity, maxDepth, maxBins, seedNum), test)
}

In [77]:
def computePredictions(rddToPredict: RDD[LabeledPoint], model: org.apache.spark.mllib.tree.model.RandomForestModel): Array[Double] = {
val predictions = rddToPredict.map { point => model.predict(point.features) }.collect()
predictions
}

def getLabels(rddToPredict: RDD[LabeledPoint]): Array[Double] = {
val labels = rddToPredict.map { point => point.label }.collect()
labels
}    

val predictions = modelsAndTest.flatMap { case (model, test) => computePredictions(test, model) }
val actuals = modelsAndTest.flatMap { case (model, test) => getLabels(test) }
val predictionsAndLabels = sc.parallelize(predictions zip actuals)

In [80]:
val metrics = new RegressionMetrics(predictionsAndLabels)
val R2 = metrics.r2
val MAE = metrics.meanAbsoluteError
println("R2  = " + R2)
println("MAE = " + MAE)

R2  = 0.8541554204558702
MAE = 2729.1208020016106
