## Init SQL context for future use

In [1]:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

# Ex 1 : Load data and run sample a MLP

## First we need to load the file from object storage

In [2]:
// Sets hadoop config with given credentials - this need to be done only once per container
def set_hadoop_config_with_credentials(creds:scala.collection.mutable.HashMap[String, String]){
    val prefix = "fs.swift.service.spark"
    val hconf = sc.hadoopConfiguration
    hconf.set(prefix + ".auth.url", creds("auth_url")+"/v3/auth/tokens")
    hconf.set(prefix + ".auth.endpoint.prefix", "endpoints")
    hconf.set(prefix + ".tenant", creds("project_id"))
    hconf.set(prefix + ".username", creds("user_id"))
    hconf.set(prefix + ".password", creds("password"))
    hconf.setInt(prefix + ".http.port", 8080)
    hconf.set(prefix + ".region", creds("region"))
    hconf.setBoolean(prefix + ".public", true)
}
//Build swift url, need to be done for each file
def build_swift_url(creds:scala.collection.mutable.HashMap[String, String]):String={
    "swift://"+creds("container")+".spark/"+creds("filename")
}

In [None]:
var credentials_1 = scala.collection.mutable.HashMap[String, String](
  "auth_url"->"https://identity.open.softlayer.com",
  "project"->"object_storage_xxx",
  "project_id"->"123654",
  "region"->"dallas",
  "user_id"->"abcde54",
  "domain_id"->"azd564",
  "domain_name"->"azd654",
  "username"->"member_azd564",
  "password"->"""654654""",
  "container"->"MyProject",
  "tenantId"->"undefined",
  "filename"->"sample_multiclass_classification_data.txt"
)

In [None]:
// The code was removed by DSX for sharing.

In [None]:
set_hadoop_config_with_credentials(credentials_1)
val testFile = sc.textFile(build_swift_url(credentials_1))
testFile.count()
testFile.take(5)

## Now we play the sample MLP from
https://spark.apache.org/docs/latest/ml-classification-regression.html#multilayer-perceptron-classifier

In [15]:
//Sample from
// https://spark.apache.org/docs/latest/ml-classification-regression.html#multilayer-perceptron-classifier
import org.apache.spark.ml.classification.MultilayerPerceptronClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

// Load the data stored in LIBSVM format as a DataFrame.
val data = spark.read.format("libsvm").load(build_swift_url(credentials_1))
//format is: class in1:value in2:value in3:value in4:value
//1 1:-0.722222 2:-0.166667 3:-0.864407 4:-0.833333 
//1 1:-0.722222 2:0.166667 3:-0.694915 4:-0.916667 
//0 1:0.166667 2:-0.416667 3:0.457627 4:0.5 
//1 1:-0.833333 3:-0.864407 4:-0.916667 

// Split the data into train and test
val splits = data.randomSplit(Array(0.6, 0.4), seed = 1234L)
val train = splits(0)
val test = splits(1)
// specify layers for the neural network:
// input layer of size 4 (features), two intermediate of size 5 and 4
// and output of size 3 (classes)
val layers = Array[Int](4, 5, 4, 3)
// create the trainer and set its parameters
val trainer = new MultilayerPerceptronClassifier().
  setLayers(layers).
  setBlockSize(128).
  setSeed(1234L).
  setMaxIter(100)
// train the model
val model = trainer.fit(train)
// compute accuracy on the test set
val result = model.transform(test)
val predictionAndLabels = result.select("prediction", "label")
val evaluator = new MulticlassClassificationEvaluator().setMetricName("accuracy")
println("Accuracy: " + evaluator.evaluate(predictionAndLabels))

Accuracy: 0.9019607843137255


# Ex 2 : Let's try with our own dataset : handwritten numbers from mnist
http://yann.lecun.com/exdb/mnist/
## Load the training files train-images.idx3-ubyte and train-labels.idx1-ubyte

In [3]:
//limits
val trainSetSize=60000 //max:60000
val testSetSize=10000 //max:10000

In [None]:
val credentials_train_images = scala.collection.mutable.HashMap[String, String](
  "auth_url"->"https://identity.open.softlayer.com",
  "project"->"object_storage_fxxxx4",
  "project_id"->"xxx",
  "region"->"dallas",
  "user_id"->"xxx",
  "domain_id"->"xxx",
  "domain_name"->"xxx",
  "username"->"member_5xxx4",
  "password"->"""xxx""",
  "container"->"FirstProject",
  "tenantId"->"undefined",
  "filename"->"train-images.idx3-ubyte"
)

In [None]:
// The code was removed by DSX for sharing.

In [4]:
set_hadoop_config_with_credentials(credentials_train_images)//This need to be done only once per container
val sFilePathTrainImages = build_swift_url(credentials_train_images)
sFilePathTrainImages

swift://FirstProject.spark/train-images.idx3-ubyte

Images are 28x28 pixels (unsigned byte) -> 784 bytes, 
The training file contains 60k images of 784 pixels, 
The training file size is 784x60000=47040000 bytes + header:4x32bit integers=4*2bytes=16 bytes, 
total:47040016

In [None]:
//Read image file - binary
//see http://stackoverflow.com/questions/33654135/reading-binary-file-in-spark-scala
//v1 - This is not super fancy, but should work
def loadidxFile(sFilePath:String,colPrefix:String,iHeaderLength:Int,iRecordLength:Int,iRecordNum:Int)={
    val binStream = sc.binaryFiles(sFilePath, 1).first._2.open()
    binStream.skipBytes(iHeaderLength)
    val records = new Array[Array[Double]](iRecordNum)
    for(r <- 0 to iRecordNum-1){
        val record = new Array[Double](iRecordLength+1)//id+pixels
        record(0)=r
        for(p <- 1 to iRecordLength){
          record(p)=binStream.readUnsignedByte().toDouble
        }        
        records(r)=record
    }
    //we now have an array of array, lets transform to a dataframe
    val arrayRDD = sc.parallelize(records)
    //  Generate the schema based on the number of fields
    import org.apache.spark.sql.types._
    val fields = StructField("id", DoubleType, nullable = false)::(0 until iRecordLength).toList.map(i => StructField(colPrefix+i, DoubleType, nullable = true))
    val schema = StructType(fields)
    import org.apache.spark.sql._
    sqlContext.createDataFrame(arrayRDD.map(s => Row.fromSeq(s.toSeq)),schema)
}

val trainImagesDF=loadidxFile(sFilePathTrainImages,"p",16,784,trainSetSize)// 60000
trainImagesDF.count()

## Let's preview a digit of the training set

In [6]:
val testRecord = trainImagesDF.take(1)(0)
testRecord

[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,3,18,18,18,126,136,175,26,166,255,247,127,0,0,0,0,0,0,0,0,0,0,0,0,30,36,94,154,170,253,253,253,253,253,225,172,253,242,195,64,0,0,0,0,0,0,0,0,0,0,0,49,238,253,253,253,253,253,253,253,253,251,93,82,82,56,39,0,0,0,0,0,0,0,0,0,0,0,0,18,219,253,253,253,253,253,198,182,247,241,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,80,156,107,253,253,205,11,0,43,154,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,14,1,154,253,90,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,139,253,...

In [7]:
//array of 784 ints (28*28)
//converts the array of ints to a dataframe x,y,int, for scatter plot
def rowToTable(r:org.apache.spark.sql.Row)={
   val t = new Array[(Int,Int,Int)](28*28)
   for(x <- 0 to 27){
        for(y <- 0 to 27){
          val p=r.getInt(1+x*28+y)//+1 because id
            t(x*28+y)=(x,y,p)
        }        
    }
    sc.parallelize(t).toDF("x","y","color")//convert to DF
}
val pixelsDF = rowToTable(testRecord) //still an array x,y,int
pixelsDF.show(5)

+---+---+-----+
|  x|  y|color|
+---+---+-----+
|  0|  0|    0|
|  0|  1|    0|
|  0|  2|    0|
|  0|  3|    0|
|  0|  4|    0|
+---+---+-----+
only showing top 5 rows



In [8]:
%%brunel data('pixelsDF') x(x) y(y) color(color) :: width=800, height=300

## Load the labels for the training set
train-labels.idx1-ubyte

In [None]:
var credentials_train_labels = scala.collection.mutable.HashMap[String, String](
  "auth_url"->"https://identity.open.softlayer.com",
  "project"->"object_storage_xxxx",
  "project_id"->"xxxx",
  "region"->"dallas",
  "user_id"->"xxxx",
  "domain_id"->"xxx",
  "domain_name"->"xxxx",
  "username"->"member_xxxx",
  "password"->"""xxxxx""",
  "container"->"FirstProject",
  "tenantId"->"undefined",
  "filename"->"train-labels.idx1-ubyte"
)

In [None]:
// The code was removed by DSX for sharing.

In [5]:
set_hadoop_config_with_credentials(credentials_train_labels)
val sFilePathTrainLabels = build_swift_url(credentials_train_labels)
sFilePathTrainLabels

swift://FirstProject.spark/train-labels.idx1-ubyte

In [55]:
//Read label file - binary
//header is smaller for label files : 8
val trainLabelsDF=loadidxFile(sFilePathTrainLabels,"l",8,1,trainSetSize).withColumnRenamed("l0", "label")// 60000
trainLabelsDF.count()
trainLabelsDF.show(5)

+---+-----+
| id|label|
+---+-----+
|0.0|  5.0|
|1.0|  0.0|
|2.0|  4.0|
|3.0|  1.0|
|4.0|  9.0|
+---+-----+
only showing top 5 rows



## Prepare the training dataset by joining images and labels
trainImagesDF
trainLabelsDF

### Combine the two dataframes

In [56]:
def prepareMLPdata(imagesDF:org.apache.spark.sql.DataFrame,labelsDF:org.apache.spark.sql.DataFrame)={
    val joinedDF = labelsDF.join(imagesDF,Seq("id"))
    import org.apache.spark.ml.feature.VectorAssembler
    val aCols = (0 until 784).toArray.map(i => "p"+i)
    val assembler = new VectorAssembler().setInputCols(aCols).setOutputCol("features")
    assembler.transform(joinedDF).select("label","features")
}

In [57]:
val trainData=prepareMLPdata(trainImagesDF,trainLabelsDF)
trainData.show(5)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  3.0|(784,[151,152,153...|
|  0.0|(784,[127,128,129...|
|  4.0|(784,[134,135,161...|
|  1.0|(784,[124,125,126...|
|  1.0|(784,[158,159,160...|
+-----+--------------------+
only showing top 5 rows



## Build the MLP

In [58]:
import org.apache.spark.ml.classification.MultilayerPerceptronClassifier
// specify layers for the neural network:
// input layer of size 28*28 (features), 
// intermediate of size 300
// and output of size 10 (classes)
val layers = Array[Int](784, 300, 10)
// create the trainer and set its parameters
val trainer = new MultilayerPerceptronClassifier().
  setLayers(layers).
  setBlockSize(128).
  setSeed(1234L).
  setMaxIter(100)

## Train the model

In [59]:
// train the model
val model = trainer.fit(trainData)

## Load test data
t10k-images.idx3-ubyte to testImagesDF
t10k-labels.idx1-ubyte to testLabelsDF

In [None]:
var credentials_test_images = scala.collection.mutable.HashMap[String, String](
  "auth_url"->"https://identity.open.softlayer.com",
  "project"->"object_storage_cxxxxx",
  "project_id"->"xxxxx",
  "region"->"dallas",
  "user_id"->"xxxxx",
  "domain_id"->"xxxxx",
  "domain_name"->"xxxxx",
  "username"->"member_xxxxx",
  "password"->"""xxxxx""",
  "container"->"FirstProject",
  "tenantId"->"undefined",
  "filename"->"t10k-images.idx3-ubyte"
)

var credentials_test_labels = scala.collection.mutable.HashMap[String, String](
  "auth_url"->"https://identity.open.softlayer.com",
  "project"->"object_storage_xxxxx",
  "project_id"->"xxxxx",
  "region"->"dallas",
  "user_id"->"xxxxx",
  "domain_id"->"xxxxx",
  "domain_name"->"xxxxx",
  "username"->"member_xxxxx",
  "password"->"""xxxxx""",
  "container"->"FirstProject",
  "tenantId"->"undefined",
  "filename"->"t10k-labels.idx1-ubyte"
)

In [6]:
// The code was removed by DSX for sharing.

In [79]:
set_hadoop_config_with_credentials(credentials_test_images)
val sFilePathTestImages = build_swift_url(credentials_test_images)
val testImagesDF = loadidxFile(sFilePathTestImages,"p",16,784,testSetSize)

set_hadoop_config_with_credentials(credentials_test_labels)
val sFilePathTestLabels = build_swift_url(credentials_test_labels)
val testLabelsDF = loadidxFile(sFilePathTestLabels,"l",8,1,testSetSize).withColumnRenamed("l0", "label")
val testData=prepareMLPdata(testImagesDF,testLabelsDF)
testData.count()

Name: Compile Error
Message: <console>:34: error: not found: value loadidxFile
         val testImagesDF = loadidxFile(sFilePathTestImages,"p",16,784,testSetSize)
                            ^
StackTrace: 

## Test the model

In [69]:
// compute accuracy on the test set
val result = model.transform(testData)
result.describe()
val predictionAndLabels = result.select("prediction", "label")
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
val evaluator = new MulticlassClassificationEvaluator().setMetricName("precision")//accuracy")
println("Accuracy: " + evaluator.evaluate(predictionAndLabels))

Accuracy: 0.3964


# Ex 3 optimize

## Loader
Let's rewrite the file loader to build the training dataset in one go - to avoid joining the label and image datasets

In [10]:
//V1, read the file to an array of array of bytes, then convert to a DataFrame using a dynamic schema, then creates a vector via vectorAssembler
//Not optimised at all because we go 3 times trough the data
def loadImageAndLabels_v1(sImageFilePath:String,sLabelFilePath:String,iRecordNum:Int)={
    val imageStream = sc.binaryFiles(sImageFilePath, 1).first._2.open()
    val labelStream = sc.binaryFiles(sLabelFilePath, 1).first._2.open()    
    imageStream.skipBytes(16)
    labelStream.skipBytes(8)
    val records = new Array[Array[Double]](iRecordNum)
    val iImageLength = 784//pixels
    for(r <- 0 to iRecordNum-1){
      val iRecordLength = iImageLength+1//label+pixels
      import org.apache.spark.ml.feature.VectorAssembler
      val record = new Array[Double](iRecordLength)
      record(0) = labelStream.readUnsignedByte().toDouble //label  
      for(p <- 1 to iImageLength){
          record(p)=imageStream.readUnsignedByte().toDouble
      }        
      records(r)=record     
    }
    //we now have an array of array, lets transform to a dataframe
    val arrayRDD = sc.parallelize(records)
    //  Generate the schema based on the number of fields
    import org.apache.spark.sql.types._
    val fields = StructField("label", DoubleType, nullable = false)::(0 until iImageLength).toList.map(i => StructField("p"+i, DoubleType, nullable = true))
    val schema = StructType(fields)
    import org.apache.spark.sql._
    val dataDF = sqlContext.createDataFrame(arrayRDD.map(s => Row.fromSeq(s.toSeq)),schema)
    // Now assemble the pixels columns to form the features vector. Note : I was unable to find how to build a vector from scratch.
    import org.apache.spark.ml.feature.VectorAssembler
    val aCols = (0 until iImageLength).toArray.map(i => "p"+i)
    val assembler = new VectorAssembler().setInputCols(aCols).setOutputCol("features")
    assembler.transform(dataDF).select("label","features")
}  

In [88]:
//V2, we build the vector directly after reading each record, better, but it's still sequential
def loadImageAndLabels_V2(sImageFilePath:String,sLabelFilePath:String,iRecordNum:Int)={
    val imageStream = sc.binaryFiles(sImageFilePath, 1).first._2.open()
    val labelStream = sc.binaryFiles(sLabelFilePath, 1).first._2.open()    
    imageStream.skipBytes(16)
    labelStream.skipBytes(8)
    import org.apache.spark.mllib.linalg.DenseVector
    val records = new Array[(Double,DenseVector)](iRecordNum)
    val iImageLength = 784//pixels
    for(r <- 0 to iRecordNum-1){
      val dLabel = labelStream.readUnsignedByte().toDouble //label
      val pixelArray = new Array[Double](iImageLength)  
      for(p <- 0 to iImageLength-1){
          pixelArray(p)=imageStream.readUnsignedByte().toDouble
      }        
      val fVector = new DenseVector(pixelArray)
      records(r)=(dLabel,fVector)     
    }
    //we now have an array, lets transform to a dataframe
    sc.parallelize(records).toDF("label","features")
  }

In [31]:
//V3, Now we read the array as a big byte array, and do the conversion later in the RDD so it's parallel
def loadImageAndLabels(sImageFilePath:String,sLabelFilePath:String,iRecordNum:Int)={
    val imageStream = sc.binaryFiles(sImageFilePath, 1).first._2.open()
    val labelStream = sc.binaryFiles(sLabelFilePath, 1).first._2.open()    
    imageStream.skipBytes(16)
    labelStream.skipBytes(8)
    import org.apache.spark.mllib.linalg.DenseVector
    val records = new Array[(Double,Array[Byte])](iRecordNum)
    val iImageLength = 784//pixels
    var iFrom=0
    for(r <- 0 to iRecordNum-1){
      val dLabel = labelStream.readUnsignedByte().toDouble //label
      val pixelArray = new Array[Byte](iImageLength)//we read the array as a big byte array, the conversion will be done later in the RDD
      //imageStream.read(pixelArray,iFrom,iImageLength)
      imageStream.read(pixelArray)
      iFrom=iFrom+iImageLength
      records(r)=(dLabel,pixelArray)     
    }
    //we need to convert byte array to vector
    def byteArrayToVector(byteArray:Array[Byte]):DenseVector={
       val pixelArray = new Array[Double](byteArray.size) 
       for(p <- 0 to pixelArray.size-1){
            pixelArray(p)=(byteArray(p) & 0xff).toDouble            
       }
       new DenseVector(pixelArray)
    }   
    //we now have an array, lets transform to a dataframe
    sc.parallelize(records).map(p=>(p._1,byteArrayToVector(p._2))).toDF("label","features")
   } 

In [32]:
val trainData = loadImageAndLabels(build_swift_url(credentials_train_images),build_swift_url(credentials_train_labels),trainSetSize).cache()

In [33]:
val testData = loadImageAndLabels(build_swift_url(credentials_test_images),build_swift_url(credentials_test_labels),testSetSize).cache()

In [34]:
trainData.show(5)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  5.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,0.0,0.0,0.0,...|
|  4.0|[0.0,0.0,0.0,0.0,...|
|  1.0|[0.0,0.0,0.0,0.0,...|
|  9.0|[0.0,0.0,0.0,0.0,...|
+-----+--------------------+
only showing top 5 rows



## Preview
Let's rewrite the preview function to take the train data and display the image and the label

In [27]:
def previewRecord(dataDF:org.apache.spark.sql.DataFrame,iRowNum:Int)={
   val aPixelList = new Array[(Int,Int,Double)](28*28)
   val rRow = dataDF.rdd.take(iRowNum).drop(iRowNum-1)(0)//TODO this is not optimal
   val dLabel = rRow.get(0) match {//Double
    case d: Double => d
    case _ => throw new ClassCastException
   }   
   val vPixelVector  = rRow.get(1) match {
    case dv: org.apache.spark.mllib.linalg.SparseVector => dv.toDense
    case sv: org.apache.spark.mllib.linalg.DenseVector => sv
    case _ => throw new ClassCastException
   }
   for(x <- 0 to 27){
        for(y <- 0 to 27){
          val idx = y*28+x
          val pixel = vPixelVector.values(idx)//+1 because id
          aPixelList(idx)=(x,28-y,pixel)
        }        
    }
    val pixelsDF = sc.parallelize(aPixelList).toDF("x","y","color")//convert to DF
    println(dLabel)
    pixelsDF
}

In [36]:
val pixelsDF = previewRecord(trainData,12)

5.0


In [37]:
%%brunel data('pixelsDF') x(x) y(y) color(color) :: width=400, height=400  

## Pipeline
Let's write the ML model, training and test as an ML Pipeline

### Define

In [38]:
import org.apache.spark.ml.classification.MultilayerPerceptronClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel}

// specify layers for the neural network:
// input layer of size 28*28 (features), 
// intermediate of size 300
// and output of size 10 (classes)
val layers = Array[Int](784, 300, 10)
// create the trainer and set its parameters
val MLPtrainer = new MultilayerPerceptronClassifier().
  setLayers(layers).
  setBlockSize(128).
  setSeed(1234L).
  setMaxIter(100)

val MLPpipeline = new Pipeline().setStages(Array(MLPtrainer))

In [None]:
println(MLPtrainer.explainParams())

### Train

In [None]:
// train the model
val MLPmodel = MLPpipeline.fit(trainData)

### Test

In [None]:
// compute accuracy on the test set
val MLPresult = MLPmodel.transform(testData)
val MLPpredictionAndLabels = MLPresult.select("prediction", "label")
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
val MLPevaluator = new MulticlassClassificationEvaluator().setMetricName("precision")//accuracy")
println("Accuracy: " + MLPevaluator.evaluate(MLPpredictionAndLabels))

## Display one of the results

In [None]:
MLPresult.show(5)

In [None]:
val resultPixelDF = previewRecord(MLPresult.select("prediction","features"),2)

In [None]:
%%brunel data('resultPixelDF') x(x) y(y) color(color) :: width=400, height=400  

## Build an hyperparameter grid, train the models, and display the best parameters

In [130]:
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

val MLPparamGrid = new ParamGridBuilder().
  addGrid(MLPtrainer.layers, Array(Array(784, 30, 10), Array(784, 150, 10), Array(784, 300, 10))).
  addGrid(MLPtrainer.maxIter, Array(10,100,200)).
  build()
  
val MLPcrossValidator = new CrossValidator().
  setEstimator(MLPpipeline).
  setEvaluator(new MulticlassClassificationEvaluator).
  setEstimatorParamMaps(MLPparamGrid).
  setNumFolds(2)  // Use 3+ in practice

In [131]:
// Run cross-validation, and choose the best set of parameters.
val MLPcvModel = MLPcrossValidator.fit(trainData)

In [246]:
val bestParams = MLPcvModel.getEstimatorParamMaps.zip(MLPcvModel.avgMetrics).maxBy(_._2)._1
println(bestParams)

{
	mlpc_e8926a20385b-layers: [I@79da7bde,
	mlpc_e8926a20385b-maxIter: 200
}


In [247]:
import org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel
val MLPbestPipeline = MLPcvModel.bestModel.asInstanceOf[PipelineModel]
val MLPbestModel = MLPbestPipeline.stages(0).asInstanceOf[MultilayerPerceptronClassificationModel]
println(MLPbestModel.explainParams)

featuresCol: features column name (default: features)
labelCol: label column name (default: label)
predictionCol: prediction column name (default: prediction)
