# Iris Classification with deeplearning4j

In [ ]:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

import org.apache.spark.sql.functions._
import org.apache.spark.ml._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._

import org.deeplearning4j.spark.ml._

sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@2b533fc7
import sqlContext.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.ml._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._
import org.deeplearning4j.spark.ml._


## Load and prepare the Iris dataset
A sample Iris dataset is available on the local filesystem.  Here, the dataset is loaded using a DataFrame reader from `dl4j-spark-ml`.  The dataset is split into a training set and a test set.

In [ ]:
val data = sqlContext.read
  .format("org.deeplearning4j.spark.sql.sources.iris")
  .load("iris_svmLight_0.txt")

data.sample(false, 0.1).show

+-----+-----------------+
|label|         features|
+-----+-----------------+
|  0.0|[4.8,3.0,1.4,0.1]|
|  0.0|[5.7,3.8,1.7,0.3]|
|  0.0|[5.1,3.8,1.5,0.3]|
|  0.0|[5.0,3.4,1.6,0.4]|
|  0.0|[4.8,3.1,1.6,0.2]|
|  0.0|[5.5,3.5,1.3,0.2]|
|  1.0|[6.3,3.3,4.7,1.6]|
|  1.0|[5.2,2.7,3.9,1.4]|
|  1.0|[5.0,2.0,3.5,1.0]|
|  1.0|[5.6,3.0,4.5,1.5]|
|  1.0|[5.8,2.7,4.1,1.0]|
|  1.0|[5.6,2.5,3.9,1.1]|
|  1.0|[5.6,2.5,3.9,1.1]|
|  1.0|[6.4,2.9,4.3,1.3]|
|  2.0|[6.3,2.9,5.6,1.8]|
|  2.0|[4.9,2.5,4.5,1.7]|
|  2.0|[6.9,3.2,5.7,2.3]|
|  2.0|[6.3,2.7,4.9,1.8]|
|  2.0|[6.1,3.0,4.9,1.8]|
|  2.0|[6.1,2.6,5.6,1.4]|
+-----+-----------------+

data: org.apache.spark.sql.DataFrame = [label: double, features: vector]


In [ ]:
val Array(trainingData, testData) = data.randomSplit(Array(0.6, 0.4), 11L)

trainingData: org.apache.spark.sql.DataFrame = [label: double, features: vector]
testData: org.apache.spark.sql.DataFrame = [label: double, features: vector]


## Configure an ML pipeline

#### Feature scaling
Most algorithms benefit from working with normalized feature data.  Here, Spark's `StandardScaler` normalizes the `feature` column, producing a new `scaledFeatures` column.

In [ ]:
val scaler = new StandardScaler()
                .setWithMean(true).setWithStd(true)
                .setInputCol("features").setOutputCol("scaledFeatures");

scaler: org.apache.spark.ml.feature.StandardScaler = stdScal_f3f95e33cd97


#### Neural network
A neural network must be configured, layer-by-layer.  Here a deep-belief network (DBN) is configured to learn the Iris dataset.

In [ ]:
import org.deeplearning4j.nn.api.OptimizationAlgorithm
import org.deeplearning4j.nn.conf.NeuralNetConfiguration
import org.deeplearning4j.nn.conf.distribution.UniformDistribution
import org.deeplearning4j.nn.conf.layers.RBM
import org.deeplearning4j.nn.conf.`override`.ClassifierOverride
import org.deeplearning4j.nn.conf.rng.DefaultRandom
import org.deeplearning4j.nn.weights.WeightInit
import org.nd4j.linalg.lossfunctions.LossFunctions

val conf = new NeuralNetConfiguration.Builder()
                .layer(new RBM()) //the nn's layers will be RBMs
                .nIn(4) // no. of Input nodes = 4
                .nOut(3) // no. of Output nodes/labels = 3
                .visibleUnit(RBM.VisibleUnit.GAUSSIAN) //Gaussian transform
                .hiddenUnit(RBM.HiddenUnit.RECTIFIED) // Rect. Linear trans.
                .iterations(100) // make 100 passes of guess and backprop
                .weightInit(WeightInit.DISTRIBUTION) // initializes weights
                .dist(new UniformDistribution(0, 1)) 
                .activationFunction("tanh") // sigmoid activation of nodes
                .k(1) // no. of times you run contrastive divergence
                .lossFunction(LossFunctions.LossFunction.RMSE_XENT) 
                // your loss function = root-mean-squared error cross entropy
                .learningRate(1e-1f) //the size of the steps your algo takes
                .momentum(0.9) //a coefficient that modifies the learning rate
                .regularization(true) // regularization fights overfitting
                .l2(2e-4) // l2 is one type of regularization
                .optimizationAlgo(OptimizationAlgorithm.LBFGS) 
                //optimization algorithms calculate the gradients. 
                //LBFGS is one type.
                .constrainGradientToUnitNorm(true) 
                .list(2)
                .hiddenLayerSizes(3) // no. of nodes in your hidden layer. 
                // this is small.
                .`override`(1, new ClassifierOverride())
                .build()

import org.deeplearning4j.nn.api.OptimizationAlgorithm
import org.deeplearning4j.nn.conf.NeuralNetConfiguration
import org.deeplearning4j.nn.conf.distribution.UniformDistribution
import org.deeplearning4j.nn.conf.layers.RBM
import org.deeplearning4j.nn.conf.`override`.ClassifierOverride
import org.deeplearning4j.nn.conf.rng.DefaultRandom
import org.deeplearning4j.nn.weights.WeightInit
import org.nd4j.linalg.lossfunctions.LossFunctions
conf: org.deeplearning4j.nn.conf.MultiLayerConfiguration = 
{
  "hiddenLayerSizes" : [ 3 ],
  "confs" : [ {
    "sparsity" : 0.0,
    "useAdaGrad" : true,
    "lr" : 0.10000000149011612,
    "corruptionLevel" : 0.30000001192092896,
    "numIterations" : 100,
    "momentum" : 0.9,
    "l2" : 2.0E-4,
    "useRegularization" : true,
    "customLossFunction" :...

In [ ]:
import org.deeplearning4j.spark.ml.classification.NeuralNetworkClassification

val classifier = new NeuralNetworkClassification()
                .setFeaturesCol("scaledFeatures")
                .setConf(conf)

import org.deeplearning4j.spark.ml.classification.NeuralNetworkClassification
classifier: org.deeplearning4j.spark.ml.classification.NeuralNetworkClassification = nnClassification_02f6eac4b110


#### Pipeline assembly
An overall ML pipeline is assembled.

In [ ]:
val pipeline = new Pipeline()
                .setStages(Array(scaler, classifier))

pipeline: org.apache.spark.ml.Pipeline = pipeline_8813088c0acb


## Train and test the neural network

#### Train
The pipeline fits a model to the training data using the neural network.

In [ ]:
val model = pipeline.fit(trainingData)

model: org.apache.spark.ml.PipelineModel = pipeline_8813088c0acb


#### Test
The trained model is used to make predictions about the Iris test data.  Here the model produces a new column called `predictions`.

In [ ]:
val predictions = model.transform(testData)

predictions.select($"features", $"label" as "actual", $"prediction").show(100)

+-----------------+------+----------+
|         features|actual|prediction|
+-----------------+------+----------+
|[5.0,3.6,1.4,0.2]|   0.0|       0.0|
|[5.4,3.9,1.7,0.4]|   0.0|       0.0|
|[5.0,3.4,1.5,0.2]|   0.0|       0.0|
|[4.4,2.9,1.4,0.2]|   0.0|       0.0|
|[4.9,3.1,1.5,0.1]|   0.0|       0.0|
|[5.4,3.7,1.5,0.2]|   0.0|       0.0|
|[4.8,3.4,1.6,0.2]|   0.0|       0.0|
|[4.8,3.0,1.4,0.1]|   0.0|       0.0|
|[4.3,3.0,1.1,0.1]|   0.0|       0.0|
|[5.8,4.0,1.2,0.2]|   0.0|       2.0|
|[5.7,4.4,1.5,0.4]|   0.0|       2.0|
|[5.4,3.9,1.3,0.4]|   0.0|       0.0|
|[5.1,3.5,1.4,0.3]|   0.0|       0.0|
|[5.1,3.8,1.5,0.3]|   0.0|       0.0|
|[5.4,3.4,1.7,0.2]|   0.0|       0.0|
|[5.4,3.4,1.7,0.2]|   0.0|       0.0|
|[5.1,3.7,1.5,0.4]|   0.0|       0.0|
|[5.1,3.7,1.5,0.4]|   0.0|       0.0|
|[4.6,3.6,1.0,0.2]|   0.0|       0.0|
|[4.8,3.4,1.9,0.2]|   0.0|       0.0|
|[5.0,3.0,1.6,0.2]|   0.0|       0.0|
|[5.2,3.5,1.5,0.2]|   0.0|       0.0|
|[4.7,3.2,1.6,0.2]|   0.0|       0.0|
|[5.2,4.1,1.

#### Evaluate
Spark ML provides an evaluation framework including cross-fit validation for hyper-parameter tuning.  Here we simply use an evaluator to calculate the rmse.

In [ ]:
val rootMeanSquaredError = new RegressionEvaluator().evaluate(predictions)

rootMeanSquaredError: Double = 0.8633970960424557
