# Spark ML Example

This example is made with the goal to introduce ML with Spark from a Jupyter notebook

### Adding resources 

In [2]:
//Breeze from ScalaNLP: http://www.scalanlp.org/
//classpath.addRepository("https://oss.sonatype.org/content/repositories/snapshots/",
//                        "https://oss.sonatype.org/content/repositories/releases/")




In [5]:
//classpath.addRepository("http://dl.bintray.com/scalaz/releases")
//classpath.addRepository("https://repo1.maven.org/maven2/")



### Adding the library dependencies

Note that this is only for jupyter-scala, to be able to add these paths and libraries in a project you should do it with a build.sbt file

In [1]:
classpath.add("org.apache.spark" %% "spark-core" % "1.6.0")
classpath.add("org.apache.spark" %% "spark-sql" % "1.6.0")
// classpath.add("org.apache.spark" %% "spark-hive" % "1.6.0")
classpath.add("org.apache.spark" %% "spark-streaming" % "1.6.0")
// classpath.add("org.apache.spark" %% "spark-streaming-kafka" % "1.6.0")
// classpath.add("org.apache.spark" %% "spark-streaming-flume" % "1.6.0")
classpath.add("org.apache.spark" %% "spark-mllib" % "1.6.0")

Adding 130 artifact(s)
Adding 11 artifact(s)
Adding 1 artifact(s)
Adding 16 artifact(s)




In [None]:
//classpath.add("com.datastax.spark" %% "spark-cassandra-connector" % "1.3.0")
//classpath.add("org.apache.cassandra" % "cassandra-all" % "2.2.5")

In [2]:
classpath.add("org.scalanlp" %% "breeze" % "0.12")
classpath.add("org.scalanlp" %% "breeze-natives" % "0.12")
classpath.add("org.scalanlp" %% "breeze-viz" % "0.12")

Adding 2 artifact(s)
Adding 17 artifact(s)
Adding 8 artifact(s)




## Preparing Spark and Spark SQL instances needed for the rest of the process

In [3]:
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.sql.SQLContext

[32mimport [36morg.apache.spark.{ SparkConf, SparkContext }[0m
[32mimport [36morg.apache.spark.sql.SQLContext[0m

In [4]:
//this is an example spark configuration, the setMaster is absolutely needed to be able to connect to the spark service
val sparkConf = new SparkConf()
      .setAppName("JupyterScalaTest")
      .setMaster("local")

val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/03/02 10:40:58 INFO SparkContext: Running Spark version 1.6.0
16/03/02 10:40:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/03/02 10:40:58 WARN Utils: Your hostname, deepl resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
16/03/02 10:40:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
16/03/02 10:40:58 INFO SecurityManager: Changing view acls to: leo
16/03/02 10:40:58 INFO SecurityManager: Changing modify acls to: leo
16/03/02 10:40:58 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(leo); users with modify permissions: Set(leo)
16/03/02 10:40:59 INFO Utils: Successfully started service 'sparkDriver' on port 35351.
16/03/02 10:41:00 INFO Slf4jLogger: Slf4jLogger started
16/03/02 10:41:00 INFO Re

[36msparkConf[0m: [32mSparkConf[0m = org.apache.spark.SparkConf@24ba3f70
[36msc[0m: [32mSparkContext[0m = org.apache.spark.SparkContext@1effa89e
[36msqlContext[0m: [32mSQLContext[0m = org.apache.spark.sql.SQLContext@6c356b2

## Introduction to Spark ML
### First example: Extimator, Transformer and Param

The following example comes from the [official Spark 1.6.0 ML documentation ](http://spark.apache.org/docs/latest/ml-guide.html)

In [5]:
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.sql.Row

[32mimport [36morg.apache.spark.ml.classification.LogisticRegression[0m
[32mimport [36morg.apache.spark.ml.param.ParamMap[0m
[32mimport [36morg.apache.spark.mllib.linalg.{Vector, Vectors}[0m
[32mimport [36morg.apache.spark.sql.Row[0m

Prepare training data from a list of (label, features) tuples.


In [6]:
val training = sqlContext.createDataFrame(Seq(
  (1.0, Vectors.dense(0.0, 1.1, 0.1)),
  (0.0, Vectors.dense(2.0, 1.0, -1.0)),
  (0.0, Vectors.dense(2.0, 1.3, 1.0)),
  (1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")


[36mtraining[0m: [32morg[0m.[32mapache[0m.[32mspark[0m.[32msql[0m.[32mDataFrame[0m = [label: double, features: vector]

Create a LogisticRegression instance.  This instance is an Estimator.

In [7]:
val lr = new LogisticRegression()
// Print out the parameters, documentation, and any default values.
println("LogisticRegression parameters:\n" + lr.explainParams() + "\n")

LogisticRegression parameters:
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty (default: 0.0)
featuresCol: features column name (default: features)
fitIntercept: whether to fit an intercept term (default: true)
labelCol: label column name (default: label)
maxIter: maximum number of iterations (>= 0) (default: 100)
predictionCol: prediction column name (default: prediction)
probabilityCol: Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities (default: probability)
rawPredictionCol: raw prediction (a.k.a. confidence) column name (default: rawPrediction)
regParam: regularization parameter (>= 0) (default: 0.0)
standardization: whether to standardize the training features before fitting the model (default: true)
threshold: threshold in binary

[36mlr[0m: [32mLogisticRegression[0m = logreg_141b3c4b147f

We may set parameters using setter methods.

In [8]:
lr.setMaxIter(10)
  .setRegParam(0.01)

[36mres7[0m: [32mLogisticRegression[0m = logreg_141b3c4b147f

Learn a LogisticRegression model.  This uses the parameters stored in lr.

In [9]:
val model1 = lr.fit(training)

[36mmodel1[0m: [32morg[0m.[32mapache[0m.[32mspark[0m.[32mml[0m.[32mclassification[0m.[32mLogisticRegressionModel[0m = logreg_141b3c4b147f

Since model1 is a Model (i.e., a Transformer produced by an Estimator), we can view the parameters it used during fit().

This prints the parameter (name: value) pairs, where names are unique IDs for this LogisticRegression instance.

In [10]:
println("Model 1 was fit using parameters: " + model1.parent.extractParamMap)

Model 1 was fit using parameters: {
	logreg_141b3c4b147f-elasticNetParam: 0.0,
	logreg_141b3c4b147f-featuresCol: features,
	logreg_141b3c4b147f-fitIntercept: true,
	logreg_141b3c4b147f-labelCol: label,
	logreg_141b3c4b147f-maxIter: 10,
	logreg_141b3c4b147f-predictionCol: prediction,
	logreg_141b3c4b147f-probabilityCol: probability,
	logreg_141b3c4b147f-rawPredictionCol: rawPrediction,
	logreg_141b3c4b147f-regParam: 0.01,
	logreg_141b3c4b147f-standardization: true,
	logreg_141b3c4b147f-threshold: 0.5,
	logreg_141b3c4b147f-tol: 1.0E-6,
	logreg_141b3c4b147f-weightCol: 
}




We may alternatively specify parameters using a ParamMap, which supports several methods for specifying parameters.

In [11]:
val paramMap = ParamMap(lr.maxIter -> 20)
  .put(lr.maxIter, 30) // Specify 1 Param.  This overwrites the original maxIter.
  .put(lr.regParam -> 0.1, lr.threshold -> 0.55) // Specify multiple Params.

[36mparamMap[0m: [32mParamMap[0m = {
	logreg_141b3c4b147f-maxIter: 30,
	logreg_141b3c4b147f-regParam: 0.1,
	logreg_141b3c4b147f-threshold: 0.55
}

One can also combine ParamMaps:

In [12]:
val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") // Change output column name
val paramMapCombined = paramMap ++ paramMap2

[36mparamMap2[0m: [32mParamMap[0m = {
	logreg_141b3c4b147f-probabilityCol: myProbability
}
[36mparamMapCombined[0m: [32mParamMap[0m = {
	logreg_141b3c4b147f-maxIter: 30,
	logreg_141b3c4b147f-probabilityCol: myProbability,
	logreg_141b3c4b147f-regParam: 0.1,
	logreg_141b3c4b147f-threshold: 0.55
}

Now learn a new model using the paramMapCombined parameters.

paramMapCombined overrides all parameters set earlier via lr.set* methods.

In [13]:
val model2 = lr.fit(training, paramMapCombined)
println("Model 2 was fit using parameters: " + model2.parent.extractParamMap)

Model 2 was fit using parameters: {
	logreg_141b3c4b147f-elasticNetParam: 0.0,
	logreg_141b3c4b147f-featuresCol: features,
	logreg_141b3c4b147f-fitIntercept: true,
	logreg_141b3c4b147f-labelCol: label,
	logreg_141b3c4b147f-maxIter: 30,
	logreg_141b3c4b147f-predictionCol: prediction,
	logreg_141b3c4b147f-probabilityCol: myProbability,
	logreg_141b3c4b147f-rawPredictionCol: rawPrediction,
	logreg_141b3c4b147f-regParam: 0.1,
	logreg_141b3c4b147f-standardization: true,
	logreg_141b3c4b147f-threshold: 0.55,
	logreg_141b3c4b147f-tol: 1.0E-6,
	logreg_141b3c4b147f-weightCol: 
}


[36mmodel2[0m: [32morg[0m.[32mapache[0m.[32mspark[0m.[32mml[0m.[32mclassification[0m.[32mLogisticRegressionModel[0m = logreg_141b3c4b147f

Prepare test data.

In [14]:
val test = sqlContext.createDataFrame(Seq(
  (1.0, Vectors.dense(-1.0, 1.5, 1.3)),
  (0.0, Vectors.dense(3.0, 2.0, -0.1)),
  (1.0, Vectors.dense(0.0, 2.2, -1.5))
)).toDF("label", "features")

[36mtest[0m: [32morg[0m.[32mapache[0m.[32mspark[0m.[32msql[0m.[32mDataFrame[0m = [label: double, features: vector]

Make predictions on test data using the Transformer.transform() method.

LogisticRegression.transform will only use the 'features' column.

Note that model2.transform() outputs a *'myProbability'* column instead of the usual *'probability'* column since we renamed the lr.probabilityCol parameter previously.

In [15]:
model2.transform(test)
  .select("features", "label", "myProbability", "prediction")
  .collect()
  .foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
    println(s"($features, $label) -> prob=$prob, prediction=$prediction")
  }

([-1.0,1.5,1.3], 1.0) -> prob=[0.05707304171033984,0.9429269582896601], prediction=1.0
([3.0,2.0,-0.1], 0.0) -> prob=[0.9238522311704088,0.0761477688295912], prediction=0.0
([0.0,2.2,-1.5], 1.0) -> prob=[0.10972776114779145,0.8902722388522085], prediction=1.0




In [16]:
model1.transform(test)
  .select("features", "label", "probability", "prediction")
  .collect()
  .foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
    println(s"($features, $label) -> prob=$prob, prediction=$prediction")
  }

([-1.0,1.5,1.3], 1.0) -> prob=[0.0013759947069214356,0.9986240052930786], prediction=1.0
([3.0,2.0,-0.1], 0.0) -> prob=[0.9816604009374171,0.018339599062582906], prediction=0.0
([0.0,2.2,-1.5], 1.0) -> prob=[0.0016981475578358176,0.9983018524421641], prediction=1.0




## Example: Pipeline

In [17]:
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.sql.Row

[32mimport [36morg.apache.spark.ml.Pipeline[0m
[32mimport [36morg.apache.spark.ml.classification.LogisticRegression[0m
[32mimport [36morg.apache.spark.ml.feature.{HashingTF, Tokenizer}[0m
[32mimport [36morg.apache.spark.mllib.linalg.Vector[0m
[32mimport [36morg.apache.spark.sql.Row[0m

Prepare training documents from a list of (id, text, label) tuples.

In [18]:
val training = sqlContext.createDataFrame(Seq(
  (0L, "a b c d e spark", 1.0),
  (1L, "b d", 0.0),
  (2L, "spark f g h", 1.0),
  (3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")

[36mtraining[0m: [32morg[0m.[32mapache[0m.[32mspark[0m.[32msql[0m.[32mDataFrame[0m = [id: bigint, text: string, label: double]

Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.

In [19]:
val tokenizer = new Tokenizer()
  .setInputCol("text")
  .setOutputCol("words")
val hashingTF = new HashingTF()
  .setNumFeatures(1000)
  .setInputCol(tokenizer.getOutputCol)
  .setOutputCol("features")
val lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.01)
val pipeline = new Pipeline()
  .setStages(Array(tokenizer, hashingTF, lr))

[36mtokenizer[0m: [32mTokenizer[0m = tok_b2cc633ac5de
[36mhashingTF[0m: [32mHashingTF[0m = hashingTF_7a2a361c6479
[36mlr[0m: [32mLogisticRegression[0m = logreg_b4713feb1c46
[36mpipeline[0m: [32mPipeline[0m = pipeline_59400bbc551d

Fit the pipeline to training documents.

In [20]:
val model = pipeline.fit(training)

[36mmodel[0m: [32morg[0m.[32mapache[0m.[32mspark[0m.[32mml[0m.[32mPipelineModel[0m = pipeline_59400bbc551d

now we can optionally save the fitted pipeline to disk

In [21]:
model.save("/tmp/spark-logistic-regression-model")

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.




we can also save this unfit pipeline to disk

In [22]:
pipeline.save("/tmp/unfit-lr-model")



and load it back in during production

In [23]:
val sameModel = Pipeline.load("/tmp/spark-logistic-regression-model")

: 

Prepare test documents, which are unlabeled (id, text) tuples.

In [24]:
val test = sqlContext.createDataFrame(Seq(
  (4L, "spark i j k"),
  (5L, "l m n"),
  (6L, "mapreduce spark"),
  (7L, "apache hadoop")
)).toDF("id", "text")

[36mtest[0m: [32morg[0m.[32mapache[0m.[32mspark[0m.[32msql[0m.[32mDataFrame[0m = [id: bigint, text: string]

Make predictions on test documents.

In [25]:
model.transform(test)
  .select("id", "text", "probability", "prediction")
  .collect()
  .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
    println(s"($id, $text) --> prob=$prob, prediction=$prediction")
  }

(4, spark i j k) --> prob=[0.5406433544851431,0.45935664551485683], prediction=0.0
(5, l m n) --> prob=[0.9334382627383263,0.06656173726167372], prediction=0.0
(6, mapreduce spark) --> prob=[0.7799076868203894,0.2200923131796106], prediction=0.0
(7, apache hadoop) --> prob=[0.9768636139518304,0.023136386048169637], prediction=0.0




## Example: Model  selection via Cross-Validation method

An important task in ML is model selection, or using data to find the best model or parameters for a given task. This is also called tuning. Pipelines facilitate model selection by making it easy to tune an entire Pipeline at once, rather than tuning each element in the Pipeline separately.

Currently, spark.ml supports model selection using the CrossValidator class, which takes an Estimator, a set of ParamMaps, and an Evaluator. CrossValidator begins by splitting the dataset into a set of folds which are used as separate training and test datasets; e.g., with k=3 folds, CrossValidator will generate 3 (training, test) dataset pairs, each of which uses 2/3 of the data for training and 1/3 for testing. CrossValidator iterates through the set of ParamMaps. For each ParamMap, it trains the given Estimator and evaluates it using the given Evaluator.

The Evaluator can be a RegressionEvaluator for regression problems, a BinaryClassificationEvaluator for binary data, or a MultiClassClassificationEvaluator for multiclass problems. The default metric used to choose the best ParamMap can be overriden by the setMetric method in each of these evaluators.

The ParamMap which produces the best evaluation metric (averaged over the k folds) is selected as the best model. CrossValidator finally fits the Estimator using the best ParamMap and the entire dataset.

The following example demonstrates using CrossValidator to select from a grid of parameters. To help construct the parameter grid, we use the ParamGridBuilder utility.

Note that cross-validation over a grid of parameters is expensive. E.g., in the example below, the parameter grid has 3 values for hashingTF.numFeatures and 2 values for lr.regParam, and CrossValidator uses 2 folds. This multiplies out to (3×2)×2=12 different models being trained. In realistic settings, it can be common to try many more parameters and use more folds (k=3 and k=10 are common). In other words, using CrossValidator can be very expensive. However, it is also a well-established method for choosing parameters which is more statistically sound than heuristic hand-tuning.

In [26]:
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.sql.Row

[32mimport [36morg.apache.spark.ml.Pipeline[0m
[32mimport [36morg.apache.spark.ml.classification.LogisticRegression[0m
[32mimport [36morg.apache.spark.ml.evaluation.BinaryClassificationEvaluator[0m
[32mimport [36morg.apache.spark.ml.feature.{HashingTF, Tokenizer}[0m
[32mimport [36morg.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}[0m
[32mimport [36morg.apache.spark.mllib.linalg.Vector[0m
[32mimport [36morg.apache.spark.sql.Row[0m

Prepare training data from a list of (id, text, label) tuples.

In [27]:
val training = sqlContext.createDataFrame(Seq(
  (0L, "a b c d e spark", 1.0),
  (1L, "b d", 0.0),
  (2L, "spark f g h", 1.0),
  (3L, "hadoop mapreduce", 0.0),
  (4L, "b spark who", 1.0),
  (5L, "g d a y", 0.0),
  (6L, "spark fly", 1.0),
  (7L, "was mapreduce", 0.0),
  (8L, "e spark program", 1.0),
  (9L, "a e c l", 0.0),
  (10L, "spark compile", 1.0),
  (11L, "hadoop software", 0.0)
)).toDF("id", "text", "label")

[36mtraining[0m: [32morg[0m.[32mapache[0m.[32mspark[0m.[32msql[0m.[32mDataFrame[0m = [id: bigint, text: string, label: double]

Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.

In [28]:
val tokenizer = new Tokenizer()
  .setInputCol("text")
  .setOutputCol("words")
val hashingTF = new HashingTF()
  .setInputCol(tokenizer.getOutputCol)
  .setOutputCol("features")
val lr = new LogisticRegression()
  .setMaxIter(10)
val pipeline = new Pipeline()
  .setStages(Array(tokenizer, hashingTF, lr))

[36mtokenizer[0m: [32mTokenizer[0m = tok_1521227f7244
[36mhashingTF[0m: [32mHashingTF[0m = hashingTF_8283fa2c8032
[36mlr[0m: [32mLogisticRegression[0m = logreg_02308251a5f5
[36mpipeline[0m: [32mPipeline[0m = pipeline_6a7de8af7d90

We use a ParamGridBuilder to construct a grid of parameters to search over.

With 3 values for hashingTF.numFeatures and 2 values for lr.regParam, this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.

In [29]:
val paramGrid = new ParamGridBuilder()
  .addGrid(hashingTF.numFeatures, Array(10, 100, 1000))
  .addGrid(lr.regParam, Array(0.1, 0.01))
  .build()

[36mparamGrid[0m: [32mArray[0m[[32mParamMap[0m] = [33mArray[0m(
  {
	hashingTF_8283fa2c8032-numFeatures: 10,
	logreg_02308251a5f5-regParam: 0.1
},
  {
	hashingTF_8283fa2c8032-numFeatures: 10,
	logreg_02308251a5f5-regParam: 0.01
},
  {
	hashingTF_8283fa2c8032-numFeatures: 100,
	logreg_02308251a5f5-regParam: 0.1
},
  {
	hashingTF_8283fa2c8032-numFeatures: 100,
	logreg_02308251a5f5-regParam: 0.01
},
  {
	hashingTF_8283fa2c8032-numFeatures: 1000,
	logreg_02308251a5f5-regParam: 0.1
[33m...[0m

We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.

This will allow us to jointly choose parameters for all Pipeline stages.

A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.

Note that the evaluator here is a BinaryClassificationEvaluator and its default metric is areaUnderROC.

In [30]:
val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(new BinaryClassificationEvaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2) // Use 3+ in practice

[36mcv[0m: [32mCrossValidator[0m = cv_b5cb0e750640

Run cross-validation, and choose the best set of parameters.

In [31]:
val cvModel = cv.fit(training)

[36mcvModel[0m: [32morg[0m.[32mapache[0m.[32mspark[0m.[32mml[0m.[32mtuning[0m.[32mCrossValidatorModel[0m = cv_b5cb0e750640

Prepare test documents, which are unlabeled (id, text) tuples.

In [32]:
val test = sqlContext.createDataFrame(Seq(
  (4L, "spark i j k"),
  (5L, "l m n"),
  (6L, "mapreduce spark"),
  (7L, "apache hadoop")
)).toDF("id", "text")

[36mtest[0m: [32morg[0m.[32mapache[0m.[32mspark[0m.[32msql[0m.[32mDataFrame[0m = [id: bigint, text: string]

Make predictions on test documents. cvModel uses the best model found (lrModel).

In [33]:
cvModel.transform(test)
  .select("id", "text", "probability", "prediction")
  .collect()
  .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
    println(s"($id, $text) --> prob=$prob, prediction=$prediction")
  }

(4, spark i j k) --> prob=[0.24804795226775067,0.7519520477322493], prediction=1.0
(5, l m n) --> prob=[0.9647209186740324,0.0352790813259676], prediction=0.0
(6, mapreduce spark) --> prob=[0.4248344997494982,0.5751655002505017], prediction=1.0
(7, apache hadoop) --> prob=[0.6899594200690093,0.3100405799309907], prediction=0.0




## Example: model selection via train validation split

In addition to CrossValidator Spark also offers TrainValidationSplit for hyper-parameter tuning. 

TrainValidationSplit only evaluates each combination of parameters once as opposed to k times in case of CrossValidator. It is therefore less expensive, but will not produce as reliable results when the training dataset is not sufficiently large.

TrainValidationSplit takes an Estimator, a set of ParamMaps provided in the estimatorParamMaps parameter, and an Evaluator. It begins by splitting the dataset into two parts using trainRatio parameter which are used as separate training and test datasets. For example with *trainRatio=0.75* (default), TrainValidationSplit will generate a training and test dataset pair where 75% of the data is used for training and 25% for validation. Similar to CrossValidator, TrainValidationSplit also iterates through the set of ParamMaps. For each combination of parameters, it trains the given Estimator and evaluates it using the given Evaluator. The ParamMap which produces the best evaluation metric is selected as the best option. TrainValidationSplit finally fits the Estimator using the best ParamMap and the entire dataset.

In [34]:
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}

[32mimport [36morg.apache.spark.ml.evaluation.RegressionEvaluator[0m
[32mimport [36morg.apache.spark.ml.regression.LinearRegression[0m
[32mimport [36morg.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}[0m

Prepare training and test data.

In [50]:
val data = sqlContext.read.format("libsvm").load("/home/leo/installers/spark-1.6.0-bin-hadoop2.6/data/mllib/sample_libsvm_data.txt")
val Array(training, test) = data.randomSplit(Array(0.9, 0.1), seed = 12345)

// val lir = new LinearRegression() // this fails -> bug in spark ml so I' ll use logistic
val lir = new LogisticRegression()

[36mdata[0m: [32morg[0m.[32mapache[0m.[32mspark[0m.[32msql[0m.[32mDataFrame[0m = [label: double, features: vector]
[36mtraining[0m: [32morg[0m.[32mapache[0m.[32mspark[0m.[32msql[0m.[32mDataFrame[0m = [label: double, features: vector]
[36mtest[0m: [32morg[0m.[32mapache[0m.[32mspark[0m.[32msql[0m.[32mDataFrame[0m = [label: double, features: vector]
[36mlir[0m: [32mLogisticRegression[0m = logreg_594edaf1ad50

In [51]:
println(data.first)

[0.0,(692,[127,128,129,130,131,154,155,156,157,158,159,181,182,183,184,185,186,187,188,189,207,208,209,210,211,212,213,214,215,216,217,235,236,237,238,239,240,241,242,243,244,245,262,263,264,265,266,267,268,269,270,271,272,273,289,290,291,292,293,294,295,296,297,300,301,302,316,317,318,319,320,321,328,329,330,343,344,345,346,347,348,349,356,357,358,371,372,373,374,384,385,386,399,400,401,412,413,414,426,427,428,429,440,441,442,454,455,456,457,466,467,468,469,470,482,483,484,493,494,495,496,497,510,511,512,520,521,522,523,538,539,540,547,548,549,550,566,567,568,569,570,571,572,573,574,575,576,577,578,594,595,596,597,598,599,600,601,602,603,604,622,623,624,625,626,627,628,629,630,651,652,653,654,655,656,657],[51.0,159.0,253.0,159.0,50.0,48.0,238.0,252.0,252.0,252.0,237.0,54.0,227.0,253.0,252.0,239.0,233.0,252.0,57.0,6.0,10.0,60.0,224.0,252.0,253.0,252.0,202.0,84.0,252.0,253.0,122.0,163.0,252.0,252.0,252.0,253.0,252.0,252.0,96.0,189.0,253.0,167.0,51.0,238.0,253.0,253.0,190.0,114.0,253.0,2



We use a ParamGridBuilder to construct a grid of parameters to search over.
TrainValidationSplit will try all combinations of values and determine best model using
the evaluator.

In [52]:
val paramGrid = new ParamGridBuilder()
  .addGrid(lir.regParam, Array(0.1, 0.01))
  .addGrid(lir.fitIntercept)
  .addGrid(lir.elasticNetParam, Array(0.0, 0.5, 1.0))
  .build()

[36mparamGrid[0m: [32mArray[0m[[32mParamMap[0m] = [33mArray[0m(
  {
	logreg_594edaf1ad50-elasticNetParam: 0.0,
	logreg_594edaf1ad50-fitIntercept: true,
	logreg_594edaf1ad50-regParam: 0.1
},
  {
	logreg_594edaf1ad50-elasticNetParam: 0.5,
	logreg_594edaf1ad50-fitIntercept: true,
	logreg_594edaf1ad50-regParam: 0.1
},
  {
	logreg_594edaf1ad50-elasticNetParam: 1.0,
	logreg_594edaf1ad50-fitIntercept: true,
	logreg_594edaf1ad50-regParam: 0.1
},
  {
	logreg_594edaf1ad50-elasticNetParam: 0.0,
	logreg_594edaf1ad50-fitIntercept: true,
	logreg_594edaf1ad50-regParam: 0.01
[33m...[0m

In this case the estimator is simply the linear regression.

A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.

In [53]:
val trainValidationSplit = new TrainValidationSplit()
  .setEstimator(lir)
  .setEvaluator(new RegressionEvaluator)
  .setEstimatorParamMaps(paramGrid)
  // 80% of the data will be used for training and the remaining 20% for validation.
  .setTrainRatio(0.8)

[36mtrainValidationSplit[0m: [32mTrainValidationSplit[0m = tvs_bb15903fef3c

Run train validation split, and choose the best set of parameters.

In [54]:
val model = trainValidationSplit.fit(training)

[36mmodel[0m: [32morg[0m.[32mapache[0m.[32mspark[0m.[32mml[0m.[32mtuning[0m.[32mTrainValidationSplitModel[0m = tvs_bb15903fef3c

Make predictions on test data. model is the model with combination of parameters
that performed best.

In [55]:
model.transform(test)
  .select("features", "label", "prediction")
  .show()

+--------------------+-----+----------+
|            features|label|prediction|
+--------------------+-----+----------+
|(692,[151,152,153...|  1.0|       1.0|
|(692,[154,155,156...|  0.0|       0.0|
|(692,[127,128,129...|  1.0|       1.0|
|(692,[127,128,129...|  1.0|       1.0|
|(692,[95,96,97,12...|  0.0|       0.0|
|(692,[128,129,130...|  1.0|       1.0|
|(692,[129,130,131...|  1.0|       1.0|
+--------------------+-----+----------+



