## Lab Sheet - Handwritten Digit Classfication using Convolutional Neural Network

## TASK 1

In this lab we are going to look at an example of classifying handwritten digits using BigDL. We will introduce the convolution operation and gain familiarity with the different parameters in CNNs. 


Goal: 

Our goal is to train a classifier that will identify the digits in the MNIST dataset.

Approach:

There are 5 stages: Data reading, Data preprocessing, Creating a model, Learning the model parameters and Evaluating (a.k.a. testing/prediction) the model.


### Before we get into the lab tasks, follow Step 1 and Step 2 and make sure you have installed BigDL on your DSX notebooks.

## STEP 1

In [1]:
!(export sv=2.1 bv=0.3.0 ; cd ~/data/libs/ && wget https://repo1.maven.org/maven2/com/intel/analytics/bigdl/bigdl-SPARK_${sv}/${bv}/bigdl-SPARK_${sv}-${bv}-jar-with-dependencies.jar)

--2018-04-08 04:50:11--  https://repo1.maven.org/maven2/com/intel/analytics/bigdl/bigdl-SPARK_2.1/0.3.0/bigdl-SPARK_2.1-0.3.0-jar-with-dependencies.jar
Resolving repo1.maven.org (repo1.maven.org)... 151.101.48.209
Connecting to repo1.maven.org (repo1.maven.org)|151.101.48.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 77227587 (74M) [application/java-archive]
Saving to: ‘bigdl-SPARK_2.1-0.3.0-jar-with-dependencies.jar.5’


2018-04-08 04:50:12 (99.8 MB/s) - ‘bigdl-SPARK_2.1-0.3.0-jar-with-dependencies.jar.5’ saved [77227587/77227587]



## STEP 2

In [2]:
!pip install bigdl==0.3.0 | cat



We are going to import the packages needed 

In [3]:
import matplotlib
matplotlib.use('Agg')
%pylab inline

import pandas
import datetime as dt

from bigdl.nn.layer import *
from bigdl.nn.criterion import *
from bigdl.optim.optimizer import *
from bigdl.util.common import *
from bigdl.dataset.transformer import *
from bigdl.dataset import mnist
#from utils import get_mnist
from matplotlib.pyplot import imshow
import matplotlib.pyplot as plt
from pyspark import SparkContext



Populating the interactive namespace from numpy and matplotlib




Prepending /gpfs/fs01/user/sb1c-2a2fc727357cc3-3b10766cfe3e/.local/lib/python3.5/site-packages/bigdl/share/conf/spark-bigdl.conf to sys.path


## CNN Model Creation

CNN is a feedforward network made up of bunch of layers in such a way that the output of one layer becomes the input to the next layer (similar to MLP). In MLP, all possible pairs of input pixels are connected to the output nodes with each pair having a weight, thus leading to a combinatorial explosion of parameters to be learnt and also increasing the possibility of overfitting (details). Convolution layers take advantage of the spatial arrangement of the pixels and learn multiple filters that significantly reduce the amount of parameters in the network (details). The size of the filter is a parameter of the convolution layer.

In [4]:
# Create a LeNet-5 model
#The 1st convolutional layer takes Input = 28x28x1 ; Output = 24x24x6
#Math = 6 unique (28-5+1),(28-5+1) ==>24x24x6
#The 1st pooling takes Input = 24x24x6; Output = 12x12x6
#Math = For a stride of 2,2 (24,24) = output ==>(12x12)
#The 2nd convolution layer takes Input = 12x12x6 ; Output = 8x8x12
#Math = 12 unique (12-5+1),(12-5+1) ==> 8x8x12
#The 2nd pooling takes Input = 8x8x12; Output = 4x4x12
#Math = For a stride (2,2) (8,8) = outputs ==>(4x4)

#Next step is to create a fully connected layer with input 4x4x12 and Output = 100
#Next layer will also be a fully connecte layer with Input 100 and output = class_num
#class_num is the number of output layer = 10 (MNIST 0-9 numbers)
#the softmax layer will ensure we get a probability distribution


def build_model(class_num):
    model = Sequential()
    model.add(Reshape([1, 28, 28]))  #The image shape is 28x28x1 
    model.add(SpatialConvolution(1, 6, 5, 5).set_name('conv1'))  #this is the first convolutional layer
    model.add(Tanh())   #add an activation function
    model.add(SpatialMaxPooling(2, 2, 2, 2).set_name('pool1'))     
    model.add(Tanh())   #add an activation function 
    model.add(SpatialConvolution(6, 12, 5, 5).set_name('conv2'))  #this is the second convolutional layer
    model.add(SpatialMaxPooling(2, 2, 2, 2).set_name('pool2'))
    model.add(Reshape([12 * 4 * 4]))
    model.add(Linear(12 * 4 * 4, 100).set_name('fc1')) #1st Fully connected layer
    model.add(Tanh())
    model.add(Linear(100, class_num).set_name('score'))
    model.add(LogSoftMax())
    return model

#This will build a model with 10 output layers

lenet_model = build_model(10)

creating: createSequential
creating: createReshape
creating: createSpatialConvolution
creating: createTanh
creating: createSpatialMaxPooling
creating: createTanh
creating: createSpatialConvolution
creating: createSpatialMaxPooling
creating: createReshape
creating: createLinear
creating: createTanh
creating: createLinear
creating: createLogSoftMax


In [5]:
#Now start with creating a new spark context and get the BigDL engine started

sc.stop()            #This is to ensure we first stop any instance already running
confCore=create_spark_conf()
confCore.set("spark.executor.cores", 1)
confCore.set("spark.cores.max", 1)
sc = SparkContext(appName="Mnist", conf=confCore)
init_engine()

In [6]:
# Get and store MNIST into RDD of Sample
def get_mnist(sc, data_type="train", location="/tmp/mnist"):
    """
    Get and normalize the mnist data. We would download it automatically
    if the data doesn't present at the specific location.
    :param sc: SparkContext
    :param data_type: training data or testing data
    :param location: Location storing the mnist
    :return: A RDD of (features: Ndarray, label: Ndarray)
    """
    (images, labels) = mnist.read_data_sets(location, data_type)   #Seperating the data into images and labels respectively
    images = sc.parallelize(images)
    labels = sc.parallelize(labels + 1) # Target start from 1 in BigDL
    record = images.zip(labels)
    return record

def get_end_trigger():
        return MaxEpoch(10)

train_data = get_mnist(sc, "train", "")\
    .map(lambda rec_tuple: (normalizer(rec_tuple[0], mnist.TRAIN_MEAN, mnist.TRAIN_STD),
                       rec_tuple[1]))\
    .map(lambda t: Sample.from_ndarray(t[0], t[1]))
test_data = get_mnist(sc, "test", "")\
    .map(lambda rec_tuple: (normalizer(rec_tuple[0], mnist.TEST_MEAN, mnist.TEST_STD),
                       rec_tuple[1]))\
    .map(lambda t: Sample.from_ndarray(t[0], t[1]))


Extracting train-images-idx3-ubyte.gz
Extracting train-labels-idx1-ubyte.gz
Extracting t10k-images-idx3-ubyte.gz
Extracting t10k-labels-idx1-ubyte.gz


## CREATING AN OPTIMIZER 

An optimizer is in general to minimize any function with respect to a set of parameters. In case of training a neural network, an optimizer tries to minimize the loss of the neural net with respect to its weights/biases, over the training set.

In [7]:
# Create an Optimizer

optimizer = Optimizer(
    model=build_model(10),
    training_rdd=train_data,
    criterion=ClassNLLCriterion(),
    optim_method=SGD(learningrate=0.4, learningrate_decay=0.0002),
    end_trigger=MaxEpoch(20),
    batch_size=2048)

#Set the validation logic
#Function .setValidation is to set a validate evaluation in the optimizer.
#trigger: how often to evaluation validation set.
#sampleRDD: validate data set in type of RDD[Sample].
#vMethods: a set of ValidationMethod.
#batchSize: size of mini batch. 

optimizer.set_validation(
    batch_size=2048,
    val_rdd=test_data,
    trigger=EveryEpoch(),
    val_method=[Top1Accuracy()])


creating: createSequential
creating: createReshape
creating: createSpatialConvolution
creating: createTanh
creating: createSpatialMaxPooling
creating: createTanh
creating: createSpatialConvolution
creating: createSpatialMaxPooling
creating: createReshape
creating: createLinear
creating: createTanh
creating: createLinear
creating: createLogSoftMax
creating: createClassNLLCriterion
creating: createDefault
creating: createSGD
creating: createMaxEpoch
creating: createOptimizer
creating: createEveryEpoch
creating: createTop1Accuracy


In [8]:
# Here TrainSummary and ValidationSummary are in built functions to save logs.
# We can save the logs to a tmp file on DSX 
#the idea behind creating an app_name is to save different model as and when you run them
app_name='lenet-'+dt.datetime.now().strftime("%Y%m%d-%H%M%S")
train_summary = TrainSummary(log_dir='/tmp',
                                     app_name=app_name)
train_summary.set_summary_trigger("Parameters", SeveralIteration(50))
val_summary = ValidationSummary(log_dir='/tmp',
                                        app_name=app_name)
optimizer.set_train_summary(train_summary)
optimizer.set_val_summary(val_summary)
print("saving logs to ",app_name)

creating: createTrainSummary
creating: createSeveralIteration
creating: createValidationSummary
saving logs to  lenet-20180408-045158


## START TRAINING

In [9]:
%%time
# Training may take about ~7 minutes
#Function optimize will start the training.
# Boot training process
trained_model = optimizer.optimize()
print("Optimization Done.")

Py4JJavaError: An error occurred while calling o537.optimize.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 10 times, most recent failure: Lost task 0.9 in stage 0.0 (TID 18, yp-spark-dal09-env5-0025, executor 9f959b8e-d7f0-4302-aa8e-8f00a3a8ec93): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/src/spark21master/spark-2.1.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/worker.py", line 160, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/usr/local/src/spark21master/spark-2.1.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/worker.py", line 54, in read_command
    command = serializer._read_with_length(file)
  File "/usr/local/src/spark21master/spark-2.1.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/serializers.py", line 169, in _read_with_length
    return self.loads(obj)
  File "/usr/local/src/spark21master/spark-2.1.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/serializers.py", line 455, in loads
    return pickle.loads(obj, encoding=encoding)
  File "/usr/local/src/spark21master/spark-2.1.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 783, in _make_skel_func
    closure = _reconstruct_closure(closures) if closures else None
  File "/usr/local/src/spark21master/spark-2.1.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 775, in _reconstruct_closure
    return tuple([_make_cell(v) for v in values])
TypeError: 'int' object is not iterable

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:326)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:290)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:326)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:290)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:326)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:290)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:326)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:290)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:326)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.lang.Thread.run(Thread.java:785)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1442)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1430)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1429)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1429)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:803)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:803)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:803)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1657)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1612)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at java.lang.Thread.getStackTrace(Thread.java:1117)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:629)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1941)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1967)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1981)
	at org.apache.spark.rdd.RDD.count(RDD.scala:1178)
	at com.intel.analytics.bigdl.dataset.DistributedDataSet$$anon$5.cache(DataSet.scala:188)
	at com.intel.analytics.bigdl.optim.DistriOptimizer.prepareInput(DistriOptimizer.scala:757)
	at com.intel.analytics.bigdl.optim.DistriOptimizer.optimize(DistriOptimizer.scala:777)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:95)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
	at java.lang.reflect.Method.invoke(Method.java:507)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:785)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/src/spark21master/spark-2.1.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/worker.py", line 160, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/usr/local/src/spark21master/spark-2.1.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/worker.py", line 54, in read_command
    command = serializer._read_with_length(file)
  File "/usr/local/src/spark21master/spark-2.1.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/serializers.py", line 169, in _read_with_length
    return self.loads(obj)
  File "/usr/local/src/spark21master/spark-2.1.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/serializers.py", line 455, in loads
    return pickle.loads(obj, encoding=encoding)
  File "/usr/local/src/spark21master/spark-2.1.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 783, in _make_skel_func
    closure = _reconstruct_closure(closures) if closures else None
  File "/usr/local/src/spark21master/spark-2.1.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 775, in _reconstruct_closure
    return tuple([_make_cell(v) for v in values])
TypeError: 'int' object is not iterable

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:326)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:290)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:326)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:290)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:326)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:290)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:326)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:290)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:326)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


In [11]:
def map_predict_label(l):
    return np.array(l).argmax()
def map_groundtruth_label(l):
    return l[0] - 1

In [12]:
# label-1 to restore the original label.
print("Ground Truth labels:") 
print(', '.join([str(map_groundtruth_label(s.label.to_ndarray())) for s in train_data.take(8)]))
imshow(np.column_stack([np.array(s.features[0].to_ndarray()).reshape(28,28) for s in train_data.take(8)]),cmap='gray'); plt.axis('off')

Ground Truth labels:


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 10 times, most recent failure: Lost task 0.9 in stage 4.0 (TID 49, yp-spark-dal09-env5-0039, executor 89331fcd-49be-4c15-98d3-00dc1b051602): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/src/spark21master/spark-2.1.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/worker.py", line 160, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/usr/local/src/spark21master/spark-2.1.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/worker.py", line 54, in read_command
    command = serializer._read_with_length(file)
  File "/usr/local/src/spark21master/spark-2.1.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/serializers.py", line 169, in _read_with_length
    return self.loads(obj)
  File "/usr/local/src/spark21master/spark-2.1.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/serializers.py", line 455, in loads
    return pickle.loads(obj, encoding=encoding)
  File "/usr/local/src/spark21master/spark-2.1.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 783, in _make_skel_func
    closure = _reconstruct_closure(closures) if closures else None
  File "/usr/local/src/spark21master/spark-2.1.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 775, in _reconstruct_closure
    return tuple([_make_cell(v) for v in values])
TypeError: 'int' object is not iterable

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:326)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:290)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:326)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.lang.Thread.run(Thread.java:785)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1442)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1430)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1429)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1429)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:803)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:803)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:803)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1657)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1612)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at java.lang.Thread.getStackTrace(Thread.java:1117)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:629)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1941)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1967)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:441)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:95)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
	at java.lang.reflect.Method.invoke(Method.java:507)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:785)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/src/spark21master/spark-2.1.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/worker.py", line 160, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/usr/local/src/spark21master/spark-2.1.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/worker.py", line 54, in read_command
    command = serializer._read_with_length(file)
  File "/usr/local/src/spark21master/spark-2.1.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/serializers.py", line 169, in _read_with_length
    return self.loads(obj)
  File "/usr/local/src/spark21master/spark-2.1.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/serializers.py", line 455, in loads
    return pickle.loads(obj, encoding=encoding)
  File "/usr/local/src/spark21master/spark-2.1.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 783, in _make_skel_func
    closure = _reconstruct_closure(closures) if closures else None
  File "/usr/local/src/spark21master/spark-2.1.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 775, in _reconstruct_closure
    return tuple([_make_cell(v) for v in values])
TypeError: 'int' object is not iterable

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:326)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:290)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:326)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


## PERFORM  TESTING

In [None]:
%%time
# Use the trained model to test some unseen values
predictions = trained_model.predict(test_data)
imshow(np.column_stack([np.array(s.features[0].to_ndarray()).reshape(28,28) for s in test_data.take(8)]),cmap='gray'); plt.axis('off')
print('Ground Truth labels:')
print(', '.join(str(map_groundtruth_label(s.label.to_ndarray())) for s in test_data.take(8)))
print('Predicted labels:')
print(','.join([str(map_predict_label(s)) for s in predictions.take(8)]))

## PARAMETER INSPECTION

In [None]:
#store the parameters of the trained model
#The parameters are exposed as a dict, and can be retrieved using model.parameters().

params = trained_model.parameters()

#batch num, output_dim, input_dim, spacial_dim
for layer_name, param in params.items():
    print (layer_name,param['weight'].shape,param['bias'].shape)

## VISUALISE WEIGHTS OF CONVOLUTIONAL LAYERS

In [None]:
#vis_square is borrowed from caffe example
def vis_square(data):
    """Take an array of shape (n, height, width) or (n, height, width, 3)
       and visualize each (height, width) thing in a grid of size approx. sqrt(n) by sqrt(n)"""
    
    # normalize data for display
    data = (data - data.min()) / (data.max() - data.min())
    # force the number of filters to be square
    n = int(np.ceil(np.sqrt(data.shape[0])))
    padding = (((0, n ** 2 - data.shape[0]),
               (0, 1), (0, 1))                 # add some space between filters
               + ((0, 0),) * (data.ndim - 3))  # don't pad the last dimension (if there is one)
    data = np.pad(data, padding, mode='constant', constant_values=1)  # pad with ones (white)
    
    # tile the filters into an image
    data = data.reshape((n, n) + data.shape[1:]).transpose((0, 2, 1, 3) + tuple(range(4, data.ndim + 1)))
    data = data.reshape((n * data.shape[1], n * data.shape[3]) + data.shape[4:])
  
    plt.imshow(data,cmap='gray'); plt.axis('off')

In [None]:
filters_conv1 = params['conv1']['weight'] # we are extracting weights for the 1st convolutional layer for visualizing  

filters_conv1[0,0,0]   # this will return the 1st element in the weight array

vis_square(np.squeeze(filters_conv1, axis=(0,)).reshape(1*6,5,5))  # this concatenates the first two dimensions for ease of visualization

In [None]:
# the parameters are a list of [weights, biases]
filters_conv2 = params['conv2']['weight']    # we are extracting weights for the 2nd convolutional layer for visualizing

vis_square(np.squeeze(filters_conv2, axis=(0,)).reshape(12*6,5,5))   # this concatenates the first two dimensions for ease of visualization

## LOSS VISUALISATION

In [None]:
loss = np.array(train_summary.read_scalar("Loss"))
top1 = np.array(val_summary.read_scalar("Top1Accuracy"))

plt.figure(figsize = (12,12))
plt.subplot(2,1,1)
plt.plot(loss[:,0],loss[:,1],label='loss')
plt.xlim(0,loss.shape[0]+10)
plt.grid(True)
plt.title("loss")
plt.subplot(2,1,2)
plt.plot(top1[:,0],top1[:,1],label='top1')
plt.xlim(0,loss.shape[0]+10)
plt.title("top1 accuracy")
plt.grid(True)

## TASK 2 - TO TRY

CNN's are also very good at classifying text - Refer papers [1](https://arxiv.org/abs/1408.5882) and [2](https://arxiv.org/abs/1510.03820)
As a task you can try to use the same architecture above for the 20Newsgroups dataset (done in Lab 4&5) and classify the output into 20 different topics.

The approach will be :

     1.convert all text samples in the dataset into sequences of word indices. A "word index" would simply be an integer ID for the word. 
     2.prepare an "embedding matrix" which will contain at index i the embedding vector for the word of index i in our word index.
     3.load this embedding matrix into an Embedding layer
     4.build on top of it a 1D convolutional neural network, ending in a softmax output over our 20 categories.
     




In [None]:
from pyspark.ml import *
from pyspark.sql import *

from pyspark.sql.types import Row
from pyspark.sql import SQLContext

TASK 2a)
### Add code to import 20Newsgroup data
>>>


It then loads the 20 Newsgroup dataset into RDD, and transforms the input data into an RDD of Sample. (Each Sample in essence contains a tuple of two NumPy ndarray representing the feature and label).



In [None]:
##Add code to transform the (feature, label) to tesnors using Tokenizer 

TASK 2b)

>>> tokenizer = ...

# Split the data into traininag and testing

>>>train_rdd, test_rdd = sample_rdd.randomSplit(...)


#We should now prepare the embedding layer
#Word embeddings are computed by applying dimensionality reduction techniques to datasets 
#This can be done via the "word2vec" technique



### Word Embeddings:

The key idea is to encode words and phrases into distributed representations in the format of word vectors, which means each word is represented as a vector. 
There are two widely used word vector training alogirhms, one is published by Google called word to vector, the other is published by Standford called Glove. In this example, pre-trained glove is loaded into a lookup table and will be fine-tuned during the training process. BigDL provides a method to download and load glove in news20 package.


In [None]:
###Embedding Matrix

TASK 2c)

from bigdl.dataset import news20
import itertools

embedding_dim = 100
embedding_matrix = []

>>>open path to file
    >>> transform each value 

In [None]:
#Train a CNN
#Use the build_model() - here we need 20 outputs because we have 20 different topics

TASK 2d)


>>>model.add(Reshape([....]))
        model.add(SpatialConvolution(......))
        model.add(ReLU())
        model.add(SpatialMaxPooling(.......))
        model.add(SpatialConvolution(......))
        model.add(ReLU())
        model.add(SpatialMaxPooling(......))
        model.add(Reshape([.....]))
        model.add(Linear(....))
        model.add(Dropout(...))
        model.add(ReLU())
        model.add(Linear(...))
        model.add(Sigmoid())

In [None]:
##Finally create the Optimizer
##Train the model by calling Optimizer.optimize():


TASK 2e)
>>>>>optimizer = Optimizer(
    model=build_model(.........),
    training_rdd=train_rdd,
    criterion=ClassNLLCriterion(),
    end_trigger=MaxEpoch(max_epoch),
    batch_size=batch_size)
    
...
>>>>>train_model = optimizer.optimize()

In [None]:
### Perform prediction and make use of graphs to visualise the weights of the convolutional neural network


##Extra task:

TASK 2f)

>>>Create a confusion matrix of the true and predicted values
>>>If you are interested in optimizing the results, try different training parameters which may make impacts on the result:
        >>>maximum  sequence length
        >>>batch size
        >>>training epochs
        >>>preprocessing schemes