# MNIST Distributed with Tensorflow Spark 3 Reading FS

Exemplo de identificação de números com a base MNIST usando spark_tensorflow_distributor lendo imagens no filesystem


In [1]:
import findspark
findspark.init()

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

import tensorflow as tf
from spark_tensorflow_distributor import MirroredStrategyRunner

import matplotlib.pyplot as plt
import pathlib, getpass, shutil, os

In [2]:
# Set parameters
CATEGORIES = ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"]

# Set Small MNIST Dataset
DATADIR = "/data/dataset/mnist/trainingSample/trainingSample"
TESTDIR = "/data/dataset/mnist/testSample/testSample"

# Set Big MNIST Dataset
#DATADIR = "/data/dataset/mnist/trainingSet/trainingSet"
#TESTDIR = "/data/dataset/mnist/trainingSample/trainingSample"

data_train = DATADIR
data_test = TESTDIR
#data_train = pathlib.Path(DATADIR)
#data_test = pathlib.Path(TESTDIR)

In [3]:
SIZE_OF_DATASET = len(list(pathlib.Path(DATADIR).glob('*/*.jpg')))
SIZE_OF_TEST = len(list(pathlib.Path(TESTDIR).glob('*/*.jpg')))

print("Number of training images: ",SIZE_OF_DATASET)
print("Number of test images: ",SIZE_OF_TEST)

Number of training images:  600
Number of test images:  350


In [4]:
# Set auxiliar directories
base_dir = '/data/users/tensorflow/' + getpass.getuser()
mnist_model = base_dir + '/model'
mnist_export = base_dir + '/export'
mnist_logs = base_dir + '/logs'

# Clean log dir
shutil.rmtree(mnist_logs, ignore_errors=True, onerror=None)
os.makedirs(mnist_logs, exist_ok=True)

In [5]:
# Set training parameters

BUFFER_SIZE = 10000
BATCH_SIZE = 64
NUM_WORKERS = 8
IMG_SIZE = 28
EPOCHS = 3

In [6]:
# Starting Spark Cluster with Docker Tensorflow

spark = SparkSession.builder\
        .appName("MNIST spark_tensorflow_distributor with Mesos") \
        .master("mesos://zk://10.129.64.20:2181,10.129.64.10:2181,10.129.64.30:2181/mesos") \
        .config("spark.mesos.executor.docker.image","lasid/spark-worker-tensorflow:3.1.1_bionic") \
        .config("spark.mesos.containerizer","docker") \
        .getOrCreate()

# Run local
#        .appName("MNIST spark_tensorflow_distributor local execution") \
#         .master("local[*]") \

# run with Mesos
#        .master("mesos://zk://10.129.64.20:2181,10.129.64.10:2181,10.129.64.30:2181/mesos") \
#        .config("spark.mesos.executor.docker.image","lasid/spark-worker-tensorflow:3.1.1_bionic") \
#        .config("spark.mesos.containerizer","docker") \

In [7]:
# Main train function

def train():
    import tensorflow as tf
    import numpy as np
    import os, random, cv2
    
    AUTOTUNE = tf.data.experimental.AUTOTUNE
    
    def prep_data(DATA_DIR, CATEGORIES):
        data = []
        for category in CATEGORIES:
            path = os.path.join(DATA_DIR,category)
            class_num = CATEGORIES.index(category)
            for img in os.listdir(path):
                img_array = cv2.imread(os.path.join(path,img) ,cv2.IMREAD_GRAYSCALE)
                new_array = cv2.resize(img_array, (IMG_SIZE, IMG_SIZE))
                data.append([new_array, class_num])
            #plt.figure(figsize=(1,1))
            #plt.imshow(new_array, cmap='gray')
            #plt.show()
        return data
    
    def prep_2(data):
        random.shuffle(data)
        X = []
        y = []
        for features,label in data:
            X.append(features)
            y.append(label)
        res = np.eye(10)[y]
        X = np.array(X).reshape(-1, IMG_SIZE, IMG_SIZE, 1)
        return X, res

    def build_and_compile_cnn_model():
        model = tf.keras.Sequential([
            tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
            tf.keras.layers.MaxPooling2D(),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(64, activation='relu'),
            tf.keras.layers.Dense(10, activation='softmax')
        ])
        model.compile(loss='categorical_crossentropy', optimizer='adam',  metrics=['accuracy'])
        #model.compile(
        #    loss=tf.keras.losses.sparse_categorical_crossentropy,
        #    optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
        #    metrics=['accuracy'])
        return model
 
    data = prep_data(TESTDIR, CATEGORIES)
    tX, ty = prep_2(data)
    data2 = prep_data(DATADIR, CATEGORIES)
    X, y = prep_2(data2)
    X=np.array(X/255.0)
    y=np.array(y)
    tX=np.array(tX/255.0)
    ty=np.array(ty)

 #   options = tf.data.Options()
 #   options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
 #   train_datasets = train_datasets.with_options(options)
    multi_worker_model = build_and_compile_cnn_model()
    multi_worker_model.fit(X, y, batch_size=BATCH_SIZE, epochs=EPOCHS, validation_data=(tX, ty))  #, callbacks=callbacks)
#    np.save(mnist_logs+'/my_history.npy',multi_worker_model.history)

## MirroredStrategyRunner parameters

### API default
MirroredStrategyRunner(num_slots, local_mode=False, use_gpu=True, gpu_resource_name='gpu', use_custom_strategy=False)

### Args

1. num_slots: Total number of GPUs or CPU only Spark tasks that participate in distributed training.

2. local_mode: If True, the training function will be run locally on the driver. If False training is distributed among the workers.

3. use_gpu: If True, training is done with GPUs using Spark resource scheduling with the gpu_resource_name parameter as the resource name. If False, do CPU only training.

4. gpu_resource_name: The name of the Spark resource scheduling GPU resource.

5. use_custom_strategy: When true, the training function passed to the MirroredStrategyRunner.run method must construct and use its own tensorflow.distribute.Strategy() object. When false, MirroredStrategyRunner constructs one for the user allowing the user to provide non-distributed TensorFlow code that is executed as distributed code.

In [8]:
# Start cluster...
MirroredStrategyRunner(num_slots=7, use_gpu=False).run(train)

Doing CPU training...
Will run with 7 Spark tasks.
Distributed training in progress...
View Spark executor stderr logs to inspect training...
Training with 7 slots is complete!


In [9]:
#history=np.load(mnist_logs+'/my_history.npy',allow_pickle='TRUE').item()

#plt.plot(history.history['accuracy'], label='accuracy')
#plt.plot(history.history['val_accuracy'], label = 'val_accuracy')
#plt.xlabel('Epoch')
#plt.ylabel('Accuracy')
#plt.ylim([0.5, 1])
#plt.legend(loc='lower right')

In [10]:
# Stop Spark
spark.stop()