In [1]:
# prepare needed environment on Colab
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://apache.mirror.digionline.de/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz
!tar xf spark-3.1.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7"

import findspark
findspark.init()

In [3]:
!pip install elephas

Collecting elephas
  Downloading https://files.pythonhosted.org/packages/a4/8c/a459d974e01f782e2709c74c280030c0424115f022f33a887ced9e03282b/elephas-2.1.0.tar.gz
Collecting keras==2.2.5
[?25l  Downloading https://files.pythonhosted.org/packages/f8/ba/2d058dcf1b85b9c212cc58264c98a4a7dd92c989b798823cc5690d062bb2/Keras-2.2.5-py2.py3-none-any.whl (336kB)
[K     |████████████████████████████████| 337kB 24.7MB/s 
[?25hCollecting hyperas
  Downloading https://files.pythonhosted.org/packages/04/34/87ad6ffb42df9c1fa9c4c906f65813d42ad70d68c66af4ffff048c228cd4/hyperas-0.4.1-py3-none-any.whl
Collecting h5py==2.10.0
[?25l  Downloading https://files.pythonhosted.org/packages/3f/c0/abde58b837e066bca19a3f7332d9d0493521d7dd6b48248451a9e3fe2214/h5py-2.10.0-cp37-cp37m-manylinux1_x86_64.whl (2.9MB)
[K     |████████████████████████████████| 2.9MB 47.6MB/s 
[?25hCollecting pyspark<3.2
[?25l  Downloading https://files.pythonhosted.org/packages/89/db/e18cfd78e408de957821ec5ca56de1250645b05f8523d169803d8

In [4]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from elephas.spark_model import SparkModel
from elephas.utils.rdd_utils import to_simple_rdd

from tensorflow.keras.datasets import mnist
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Activation
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.utils import to_categorical

# Define basic parameters
batch_size = 64
nb_classes = 10
epochs = 1

# Create Spark context
conf = SparkConf().setAppName('Mnist_Spark_MLP').setMaster('local[*]')
sc = SparkContext(conf=conf)

# Load data
(x_train, y_train), (x_test, y_test) = mnist.load_data()

x_train = x_train.reshape(60000, 784)
x_test = x_test.reshape(10000, 784)
x_train = x_train.astype("float32")
x_test = x_test.astype("float32")
x_train /= 255
x_test /= 255

print(x_train.shape[0], 'train samples')
print(x_test.shape[0], 'test samples')

# Convert class vectors to binary class matrices
y_train = to_categorical(y_train, nb_classes)
y_test = to_categorical(y_test, nb_classes)

model = Sequential()
model.add(Dense(128, input_dim=784))
model.add(Activation('relu'))
model.add(Dropout(0.2))
model.add(Dense(128))
model.add(Activation('relu'))
model.add(Dropout(0.2))
model.add(Dense(10))
model.add(Activation('softmax'))

sgd = SGD(lr=0.1)
model.compile(sgd, 'categorical_crossentropy', ['acc'])

# Build RDD from numpy features and labels
rdd = to_simple_rdd(sc, x_train, y_train)

# Initialize SparkModel from tensorflow.keras model and Spark context
spark_model = SparkModel(model, frequency='epoch', mode='asynchronous')

# Train Spark model
spark_model.fit(rdd, epochs=epochs, batch_size=batch_size, verbose=0, validation_split=0.1)

# Evaluate Spark model by evaluating the underlying model
score = spark_model.evaluate(x_test, y_test, verbose=2)
print('Test accuracy:', score[1])

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
60000 train samples
10000 test samples


  "The `lr` argument is deprecated, use `learning_rate` instead.")


>>> Fit model
 * Serving Flask app "elephas.parameter.server" (lazy loading)
 * Environment: production
[2m   Use a production WSGI server instead.[0m
 * Debug mode: off


 * Running on http://172.28.0.2:4000/ (Press CTRL+C to quit)


>>> Initialize workers
>>> Distribute load


172.28.0.2 - - [31/May/2021 18:16:24] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.28.0.2 - - [31/May/2021 18:16:24] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.28.0.2 - - [31/May/2021 18:16:26] "[37mPOST /update HTTP/1.1[0m" 200 -
172.28.0.2 - - [31/May/2021 18:16:26] "[37mPOST /update HTTP/1.1[0m" 200 -
172.28.0.2 - - [31/May/2021 18:16:27] "[37mGET /parameters HTTP/1.1[0m" 200 -


>>> Async training complete.
Test accuracy: 0.9187291860580444


In [5]:
'''
Possible solution: https://stackoverflow.com/questions/61326144/pyspark-pipeline-fitdf-method-give-picklingerror-could-not-serialize-object  
!pip install q keras==2.2.4
!pip install q tensorflow==1.14.0
---------------------------------------------------------------------------
ValueError: Cell is empty


ValueError                                Traceback (most recent call last)

/content/spark-3.1.1-bin-hadoop2.7/python/pyspark/serializers.py in dumps(self, obj)
    436         try:
--> 437             return cloudpickle.dumps(obj, pickle_protocol)
    438         except pickle.PickleError:
---------------------------------------------------------------------------


from tensorflow.keras.datasets import mnist
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Activation
from tensorflow.keras.utils import to_categorical
from tensorflow.keras import optimizers

from pyspark import SparkContext, SparkConf
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession

from elephas.ml_model import ElephasEstimator
from elephas.ml.adapter import to_data_frame
from elephas.utils.rdd_utils import to_simple_rdd


# Define basic parameters
batch_size = 64
nb_classes = 10
epochs = 1

# Load data
(x_train, y_train), (x_test, y_test) = mnist.load_data()

x_train = x_train.reshape(60000, 784)
x_test = x_test.reshape(10000, 784)
x_train = x_train.astype("float32")
x_test = x_test.astype("float32")
x_train /= 255
x_test /= 255
print(x_train.shape[0], 'train samples')
print(x_test.shape[0], 'test samples')

# Convert class vectors to binary class matrices
y_train = to_categorical(y_train, nb_classes)
y_test = to_categorical(y_test, nb_classes)

spark = SparkSession\
        .builder\
        .appName("keras-elphas-mnist")\
        .getOrCreate()

model = Sequential()
model.add(Dense(128, input_dim=784))
model.add(Activation('relu'))
model.add(Dropout(0.2))
model.add(Dense(128))
model.add(Activation('relu'))
model.add(Dropout(0.2))
model.add(Dense(10))
model.add(Activation('softmax'))

sgd = optimizers.SGD(lr=0.01, decay=1e-6, momentum=0.9, nesterov=True)
model.compile(sgd, 'categorical_crossentropy', ['acc'])

# Create Spark context
#conf = SparkConf().setAppName('Mnist_Spark_MLP') # .setMaster('local[8]')
#sc = SparkContext(conf=conf)
sc = spark.sparkContext

# Build RDD from numpy features and labels
df = to_data_frame(sc, x_train, y_train, categorical=True)
test_df = to_data_frame(sc, x_test, y_test, categorical=True)

sgd_conf = optimizers.serialize(sgd)

# Initialize Spark ML Estimator
estimator = ElephasEstimator()
estimator.set_keras_model_config(model.to_yaml())
estimator.set_optimizer_config(sgd_conf)
estimator.set_mode("synchronous")
estimator.set_loss("categorical_crossentropy")
estimator.set_metrics(['acc'])
estimator.set_epochs(epochs)
estimator.set_batch_size(batch_size)
estimator.set_validation_split(0.1)
estimator.set_categorical_labels(True)
estimator.set_nb_classes(nb_classes)

# Fitting a model returns a Transformer
pipeline = Pipeline(stages=[estimator])
fitted_pipeline = pipeline.fit(df)

# Evaluate Spark model by evaluating the underlying model
prediction = fitted_pipeline.transform(test_df)
pnl = prediction.select("label", "prediction")
pnl.show(100)

prediction_and_label = pnl.rdd.map(lambda row: (row.label, row.prediction))
metrics = MulticlassMetrics(prediction_and_label)
print(metrics.precision())
print(metrics.recall())
spark.stop()
'''

'\nPossible solution: https://stackoverflow.com/questions/61326144/pyspark-pipeline-fitdf-method-give-picklingerror-could-not-serialize-object  \n!pip install q keras==2.2.4\n!pip install q tensorflow==1.14.0\n---------------------------------------------------------------------------\nValueError: Cell is empty\n\n\nValueError                                Traceback (most recent call last)\n\n/content/spark-3.1.1-bin-hadoop2.7/python/pyspark/serializers.py in dumps(self, obj)\n    436         try:\n--> 437             return cloudpickle.dumps(obj, pickle_protocol)\n    438         except pickle.PickleError:\n---------------------------------------------------------------------------\n\n\nfrom tensorflow.keras.datasets import mnist\nfrom tensorflow.keras.models import Sequential\nfrom tensorflow.keras.layers import Dense, Dropout, Activation\nfrom tensorflow.keras.utils import to_categorical\nfrom tensorflow.keras import optimizers\n\nfrom pyspark import SparkContext, SparkConf\nfrom p