In [1]:
import findspark
findspark.init()
import pyspark 
from pyspark.sql import SparkSession

In [2]:
!pip show systemml

Name: systemml
Version: 1.3.0
Summary: Apache SystemML is a distributed and declarative machine learning platform.
Home-page: http://systemml.apache.org/
Author: Apache SystemML
Author-email: dev@systemml.apache.org
License: Apache 2.0
Location: c:\users\user\anaconda3\envs\tensorflow\lib\site-packages
Requires: scikit-learn, numpy, scipy, Pillow, pandas
Required-by: 


In [3]:
from pyspark.sql import *
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc  = SparkContext("local[6]", "systemml") 

In [4]:
spark = SparkSession.builder \
.master('local')\
.appName('systemml_dl')\
.config('spark.excutor.memory','6gb')\
.getOrCreate()

In [5]:
# Import dependencies
from mlxtend.data import mnist_data
import numpy as np
from sklearn.utils import shuffle
from keras.models import Sequential
from keras.layers import Input, Dense, Conv2D, MaxPooling2D, Dropout,Flatten
from keras import backend as K
from keras.models import Model
from keras.optimizers import SGD

  return f(*args, **kwds)
Using TensorFlow backend.


In [6]:
# Set channel first layer
K.set_image_data_format('channels_first')

In [7]:
# Download the MNIST dataset
X, y = mnist_data()
X, y = shuffle(X, y)

In [8]:
# Split the data into training and test
n_samples = len(X)
X_train = X[:int(.9 * n_samples)]
y_train = y[:int(.9 * n_samples)]
X_test = X[int(.9 * n_samples):]
y_test = y[int(.9 * n_samples):]

In [9]:
# Define Lenet in Keras
keras_model = Sequential()
keras_model.add(Conv2D(32, kernel_size=(5, 5), activation='relu', input_shape=(1,28,28), padding='same'))
keras_model.add(MaxPooling2D(pool_size=(2, 2)))
keras_model.add(Conv2D(64, (5, 5), activation='relu', padding='same'))
keras_model.add(MaxPooling2D(pool_size=(2, 2)))
keras_model.add(Flatten())
keras_model.add(Dense(512, activation='relu'))
keras_model.add(Dropout(0.5))
keras_model.add(Dense(10, activation='softmax'))
keras_model.compile(loss='categorical_crossentropy', optimizer=SGD(lr=0.01, decay=1e-6, momentum=0.9, nesterov=True))
keras_model.summary()

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d_1 (Conv2D)            (None, 32, 28, 28)        832       
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 32, 14, 14)        0         
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 64, 14, 14)        51264     
_________________________________________________________________
max_pooling2d_2 (MaxPooling2 (None, 64, 7, 7)          0         
_________________________________________________________________
flatten_1 (Flatten)          (None, 3136)              0         
_________________________________________________________________
dense_1 (Dense)              (None, 512)               1606144   
_________________________________________________________________
dropout_1 (Dropout)          (None, 512)               0         
__________

In [10]:
# Scale the input features
scale = 0.00390625
X_train = X_train*scale
X_test = X_test*scale

In [11]:
# Train Lenet using SystemML
from systemml.mllearn import Keras2DML
sysml_model = Keras2DML(spark, keras_model, weights='weights_dir')



Loading the model from weights_dir...
SystemML Statistics:
Total execution time:		0.000 sec.
Number of executed Spark inst:	0.




In [12]:
sysml_model.summary()

                                                                                
+-------------------+---------------+--------------+------------+---------+-------------------+--------------------+--------------------+
|               Name|           Type|        Output|      Weight|     Bias|                Top|              Bottom|Memory* (train/test)|
+-------------------+---------------+--------------+------------+---------+-------------------+--------------------+--------------------+
|     conv2d_1_input|           Data| (, 1, 28, 28)|            |         |     conv2d_1_input|                    |                 1/0|
|           conv2d_1|    Convolution|(, 32, 28, 28)|   [32 X 25]| [32 X 1]|           conv2d_1|      conv2d_1_input|               25/13|
|conv2d_1_activation|           ReLU|(, 32, 28, 28)|            |         |conv2d_1_activation|            conv2d_1|               37/25|
|    max_pooling2d_1|        Pooling|(, 32, 14, 14)|            |         |    max_pooling2

In [None]:
sysml_model.fit(X_train, y_train)


In [None]:
sysml_model.score(X_test, y_test)