# MNIST - Handwriting Recognition

In [19]:
from com.yahoo.ml.caffe.DisplayUtils import *
from com.yahoo.ml.caffe.CaffeOnSpark import *
from com.yahoo.ml.caffe.Config import *
from com.yahoo.ml.caffe.DataSource import *
import caffe


## Training Data

In [20]:
df = sqlCtx.read.parquet('/Users/mridul/bigml/mnist_test_dataframe')

In [21]:
show_df(df,5)

Index,Label,Image
0,7,
1,2,
2,1,
3,0,
4,4,


## Define your network

In [22]:
from caffe import layers as L, params as P
n = caffe.NetSpec()
n.data, n.label = L.MemoryData(batch_size=64, channels=1,height=28,width=28, 
                      source = '/Users/mridul/bigml/mnist_train_dataframe',
                      share_in_parallel = False,
                      source_class="com.yahoo.ml.caffe.ImageDataFrame",
                      transform_param=dict(scale=0.00390625),
                      include=dict(phase=0),ntop=2)

train = str(n.to_proto())
n.data, n.label = L.MemoryData(batch_size=100, channels=1,height=28,width=28, 
                      source = '/Users/mridul/bigml/mnist_test_dataframe',
                      share_in_parallel = False,
                      source_class="com.yahoo.ml.caffe.ImageDataFrame",
                      transform_param=dict(scale=0.00390625),
                      include=dict(phase=1),ntop=2)

n.conv1 = L.Convolution(n.data, kernel_size=5, num_output=20, weight_filler=dict(type='xavier'),
                        bias_filler=dict(type='constant'),
                        param=[dict(lr_mult=1),dict(lr_mult=2)])
n.pool1 = L.Pooling(n.conv1, kernel_size=2, stride=2, pool=P.Pooling.MAX)
n.conv2 = L.Convolution(n.pool1, kernel_size=5, num_output=50, weight_filler=dict(type='xavier'),
                        bias_filler=dict(type='constant'))
n.pool2 = L.Pooling(n.conv2, kernel_size=2, stride=2, pool=P.Pooling.MAX)
n.ip1 =   L.InnerProduct(n.pool2, num_output=500, weight_filler=dict(type='xavier'),
                        bias_filler=dict(type='constant'))
n.relu1 = L.ReLU(n.ip1, in_place=True)
n.ip2 = L.InnerProduct(n.relu1, num_output=10, weight_filler=dict(type='xavier'),
                      bias_filler=dict(type='constant'),param=[dict(lr_mult=1),dict(lr_mult=2)])
n.accuracy = L.Accuracy(n.ip2, n.label,include=dict(phase=1))
n.loss =  L.SoftmaxWithLoss(n.ip2, n.label)
    
test = str(n.to_proto())


In [23]:
with open('/Users/mridul/bigml/CaffeOnSpark/data/lenet_dataframe_train_test.prototxt', 'w') as f:
    f.write('name:"LeNet"\n')
    f.write(train)
    f.write(test)
    f.close()
    


In [None]:
show_network('/Users/mridul/bigml/CaffeOnSpark/data/lenet_dataframe_train_test.prototxt','LR')

# Training

In [25]:
cos=CaffeOnSpark(sc,sqlContext)

In [26]:
args={}
args['conf']='/Users/mridul/bigml/CaffeOnSpark/data/lenet_dataframe_solver.prototxt'
args['model']='file:///tmp/lenet.model'
args['devices']='1'
args['clusterSize']='1'
cfg=Config(sc,args)

In [27]:
dl_train_source = DataSource(sc).getSource(cfg,True)

In [28]:
cos.train(dl_train_source)

# Test

In [29]:
dl_test_source = DataSource(sc).getSource(cfg,False)

In [30]:
test_result=cos.test(dl_test_source)

In [31]:
test_result

{u'accuracy': [0.9907000052928925], u'loss': [0.02928926563981804]}

# Feature Extraction

In [32]:
args['features']='accuracy,ip1,ip2'
args['label']='label'
cfg=Config(sc,args)

In [33]:
dl_feature_source = DataSource(sc).getSource(cfg,False)

In [34]:
f=cos.features(dl_feature_source)

In [35]:
f.show(5)

+--------+--------+--------------------+--------------------+-----+
|SampleID|accuracy|                 ip1|                 ip2|label|
+--------+--------+--------------------+--------------------+-----+
|00000000|   [1.0]|[3.6676927, 0.0, ...|[-3.4122076, -0.7...|[7.0]|
|00000001|   [1.0]|[0.0, 0.0, 0.0, 0...|[5.695843, 6.8152...|[2.0]|
|00000002|   [1.0]|[0.0, 0.0, 2.2626...|[-2.0900922, 11.5...|[1.0]|
|00000003|   [1.0]|[2.6771448, 0.0, ...|[14.722494, -4.43...|[0.0]|
|00000004|   [1.0]|[0.23194918, 0.0,...|[-2.8665361, -3.2...|[4.0]|
+--------+--------+--------------------+--------------------+-----+
only showing top 5 rows



In [36]:
def maxScoreAndIndex(array_of_scores): 
    return max(enumerate(array_of_scores), key=lambda x: x[1])
g = sqlContext.createDataFrame(f.map(lambda row: (
            row.SampleID,
            row.accuracy[0],
            row.ip2,
            maxScoreAndIndex(row.ip2)[1],
            maxScoreAndIndex(row.ip2)[0],
            int(row.label[0]))), 
            ["SampleID", "Accuracy", "Scores", "MaxScore", "Prediction", "Label"])
g.toPandas()[:5]

Unnamed: 0,SampleID,Accuracy,Scores,MaxScore,Prediction,Label
0,0,1,"[-3.41220760345, -0.772891402245, 0.7426052689...",15.277498,7,7
1,1,1,"[5.69584321976, 6.81526565552, 20.2332878113, ...",20.233288,2,2
2,2,1,"[-2.09009218216, 11.5996389389, -1.10666882992...",11.599639,1,1
3,3,1,"[14.7224941254, -4.43603801727, -0.56451821327...",14.722494,0,0
4,4,1,"[-2.86653614044, -3.27059435844, -1.7027045488...",13.648049,4,4


###  Logistic Regression using MLlib

In [37]:
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithLBFGS

In [38]:
data = f.map(lambda row: LabeledPoint(row.label[0], Vectors.dense(row.ip1)))

In [39]:
lr = LogisticRegressionWithLBFGS.train(data, numClasses=10, iterations=10)

In [40]:
predictions = lr.predict(data.map(lambda pt : pt.features))

In [41]:
predictions.take(5)

[7, 2, 1, 0, 4]