In [None]:
#importing libraries

import matplotlib.pyplot as plt

import numpy as np

import time

import tensorflow as tf

from tensorflow.keras.datasets import fashion_mnist
from keras.utils import to_categorical
from keras.models import Sequential
from keras.layers import Conv2D
from keras.layers import MaxPooling2D
from keras.layers import Dense
from keras.layers import Flatten
from keras.layers import Dropout
from keras.layers import BatchNormalization
from keras.optimizers import SGD
from tensorflow.keras.utils import to_categorical

import pyspark.sql.functions as f
from pyspark.sql import SparkSession
from pyspark.ml.image import ImageSchema
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import lit
from pyspark.sql.types import StructType, StructField, ArrayType, DoubleType

In [None]:
import findspark

In [None]:
findspark.init()

In [None]:
import pyspark

In [None]:
findspark.find()

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = pyspark.SparkConf().setAppName('sba22177').setMaster('local')

In [None]:
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

In [None]:
spark_session = SparkSession.builder \
      .master("local") \
      .appName("sba22177_CA01") \
      .config("spark.executor.memory", "6gb") \
      .getOrCreate()

In [None]:
#dowload the MNIST data
(train_X, train_y), (test_X, test_y) = fashion_mnist.load_data()

In [None]:
#examining dataset
print(test_X.shape)
print(train_X.shape)
print(test_y.shape)
print(train_y.shape)

In [None]:
#viewing unique labels
set(train_y)

In [None]:
#defining the image classes so not just numbers as above
class_names = ['T-shirt/top','Trouser', 'Pullover', 'Dress', 'Coat',
               'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']

In [None]:
#feature scaling colours from 0-255 to 0-1
train_X = train_X/255.0
test_X = test_X/255.0

In [None]:
#Showing the first 10 images
plt.figure(figsize=(10,10))
for i in range(10):
    plt.subplot(5,5,i+1)
    plt.xticks([])
    plt.yticks([])
    plt.grid(False)
    plt.imshow(train_X[i], cmap=plt.cm.binary)
    plt.xlabel(class_names[train_y[i]])
plt.show()

In [None]:
(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

train_images = train_images[:2000]
train_labels = train_labels[:2000]

#Reshape and normalize images
train_images = train_images.reshape(-1, 28, 28, 1).astype("float32") / 255
test_images = test_images.reshape(-1, 28, 28, 1).astype("float32") / 255

#Convert labels to categorical
train_labels = to_categorical(train_labels)
test_labels = to_categorical(test_labels)

In [None]:
#Define schema
schema = StructType([
    StructField("image", ArrayType(ArrayType(ArrayType(DoubleType()))), nullable=False),
    StructField("label", ArrayType(DoubleType()), nullable=False)
])

#Create Spark DataFrames
train_data = [(train_images[i].tolist(), train_labels[i].tolist()) for i in range(len(train_images))]
test_data = [(test_images[i].tolist(), test_labels[i].tolist()) for i in range(len(test_images))]

train_df = spark.createDataFrame(train_data, schema=schema)
test_df = spark.createDataFrame(test_data, schema=schema)

## Simple Neural Network

In [None]:
#creating one hidden layer with 128 nodes and output layer with 10 nodes
model = tf.keras.models.Sequential([
    tf.keras.layers.Flatten(input_shape=(28, 28)), 
    tf.keras.layers.Dense(128, activation='relu'),    
    tf.keras.layers.Dense(10, activation='softmax')
])

In [None]:
#backpropagation
model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

In [None]:
#training the model
model.fit(train_X, train_y, epochs=10)

In [None]:
#evaluating model
loss, acc = model.evaluate(test_X, test_y, verbose = 1)
print('\\nTest accuracy: ', acc)

## CNN_Model_One

In [None]:
def cnn_model_one():
    
    #creating a sequential instance to add layers to model
    cnn_model = Sequential()
    
    #first convolutional layer
    cnn_model.add(Conv2D(32, (3, 3), padding='same', activation='relu', input_shape=(28,28,1)))
    #first pooling layer
    cnn_model.add(MaxPooling2D((2,2), strides = 2))
    
    #second convolutional layer
    cnn_model.add(Conv2D(64, (3,3), padding='same', activation = 'relu'))
    #second pooling layer
    cnn_model.add(MaxPooling2D((2,2), strides = 2))
    
    #third convolutional layer
    cnn_model.add(Conv2D(128, (3,3), padding='same', activation = 'relu'))
    #third pooling layer
    cnn_model.add(MaxPooling2D((2,2), strides = 2))
    
    #Flatten and connect layers
    cnn_model.add(Flatten())
    cnn_model.add(Dense(128, activation = 'relu'))
    
    #Fully connected layer of 10 to refelct 10 labels and softmax activation
    cnn_model.add(Dense(10, activation = 'softmax'))
    
    return cnn_model

In [None]:
cnn_model_one = cnn_model_one()

# set up model optimizer, loss function, and accuracy
#adam - adaptive moment estimation
cnn_model_one.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['sparse_categorical_accuracy'])

cnn_model_one.summary()

In [None]:
from sparkdl import KerasImageFileTransformer

In [None]:
# Create a Keras Image File Transformer for training
keras_transformer = KerasImageFileTransformer(inputCol="image", outputCol="prediction", model=cnn_model_one)

In [None]:




# Create a pipeline for resizing and training
pipeline = Pipeline(stages=[resizer, keras_transformer])

# Train the model
model_pipeline = pipeline.fit(train_df)

# Evaluate the Model
test_predictions = model_pipeline.transform(test_df)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(test_predictions)
print(f"Accuracy: {accuracy * 100:.2f}%")

# Stop Spark Session
spark.stop()

In [None]:
#training the model
t0_one=time.time()
train_model_one = cnn_model_one.fit(train_X, train_y, epochs=10, validation_split=0.33)
print("Training time:", time.time()-t0_one)

In [None]:
#test the model
loss, acc = cnn_model_one.evaluate(test_X, test_y, verbose = 1)
print('\nTest accuracy: ', acc)
print('\nTest loss: ', loss)

In [None]:
#plotting the accuracy
plt.plot(train_model_one.history['sparse_categorical_accuracy'])
plt.plot(train_model_one.history['val_sparse_categorical_accuracy'])
plt.title('Model Accuracy - CNN One')
plt.ylabel('Accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.show()

In [None]:
#plotting the loss
plt.plot(train_model_one.history['loss'])
plt.plot(train_model_one.history['val_loss'])
plt.title('Model Loss - CNN One')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.show()

## CNN_Model_One - Epoch Increase

In [None]:
#training the model
t0_one_epoch=time.time()
train_model_one_epoch = cnn_model_one.fit(train_X, train_y, epochs=30, validation_split=0.33)
print("Training time:", time.time()-t0_one_epoch)

In [None]:
#test the model
loss, acc = cnn_model_one.evaluate(test_X, test_y, verbose = 1)
print('\nTest accuracy: ', acc)
print('\nTest loss: ', loss)

In [None]:
#plotting the accuracy
fig, ax1 = plt.subplots()

ax1.plot(train_model_one_epoch.history['sparse_categorical_accuracy'], label='Train')
ax1.set_xlabel('epoch')
ax1.set_ylabel('Train')
ax1.tick_params(axis='y')

ax2 = ax1.twinx()

ax2.plot(train_model_one_epoch.history['val_sparse_categorical_accuracy'], label='Validation', color='orange')
ax2.set_ylabel('Validation')
ax2.tick_params(axis='y')

lines1, labels1 = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
ax1.legend(lines1 + lines2, labels1 + labels2, loc='upper left')

plt.title('Model Accuracy - CNN One Epoch Increase')
plt.show()

In [None]:
# plotting the loss
fig, ax1 = plt.subplots()

ax1.plot(train_model_one_epoch.history['loss'], label='Train')
ax1.set_xlabel('epoch')
ax1.set_ylabel('Train')
ax1.tick_params(axis='y')

ax2 = ax1.twinx()

ax2.plot(train_model_one_epoch.history['val_loss'], label='Validation', color='orange')
ax2.set_ylabel('Validation')
ax2.tick_params(axis='y')

lines1, labels1 = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
ax1.legend(lines1 + lines2, labels1 + labels2, loc='upper left')

plt.title('Model Loss - CNN One Epoch Increase')
plt.show()

## CNN_Model_Two

In [None]:
def cnn_model_two():
    
    #creating a sequential instance to add layers to model
    cnn_model_two = Sequential()
    
    #first convolutional layer
    cnn_model_two.add(Conv2D(32, (3, 3), padding='same', activation='relu', input_shape=(28,28,1)))
    #first pooling layer
    cnn_model_two.add(MaxPooling2D((2,2), strides = 2))
    
    #second convolutional layer
    cnn_model_two.add(Conv2D(64, (3,3), padding='same', activation = 'relu'))
    #second pooling layer
    cnn_model_two.add(MaxPooling2D((2,2), strides = 2))
    
    #third convolutional layer
    cnn_model_two.add(Conv2D(128, (3,3), padding='same', activation = 'relu'))
    #third pooling layer
    cnn_model_two.add(MaxPooling2D((2,2), strides = 2))
    
    #Flatten and connect layers
    cnn_model_two.add(Flatten())
    cnn_model_two.add(Dense(128, activation = 'relu'))
    cnn_model_two.add(Dense(128, activation = 'relu'))
    cnn_model_two.add(Dense(128, activation = 'relu'))
    cnn_model_two.add(Dense(128, activation = 'relu'))
    cnn_model_two.add(Dense(128, activation = 'relu'))
    cnn_model_two.add(Dense(128, activation = 'relu'))
    
    #Fully connected layer of 10 to refelct 10 labels and softmax activation
    cnn_model_two.add(Dense(10, activation = 'softmax'))
    
    return cnn_model_two

In [None]:
cnn_model_two = cnn_model_two()

# set up model optimizer, loss function, and accuracy
#adam - adaptive moment estimation
cnn_model_two.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['sparse_categorical_accuracy'])

cnn_model_two.summary()

In [None]:
#training the model
t0_two=time.time()
train_model_two = cnn_model_two.fit(train_X, train_y, epochs=10, validation_split=0.33)
print("Training time:", time.time()-t0_two)

In [None]:
#test the model
loss, acc = cnn_model_two.evaluate(test_X, test_y, verbose = 1)
print('\nTest accuracy: ', acc)
print('\nTest loss: ', loss)

In [None]:
#plotting the accuracy
plt.plot(train_model_two.history['sparse_categorical_accuracy'])
plt.plot(train_model_two.history['val_sparse_categorical_accuracy'])
plt.title('Model Accuracy - CNN Two')
plt.ylabel('Accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.show()

In [None]:
#plotting the loss
plt.plot(train_model_two.history['loss'])
plt.plot(train_model_two.history['val_loss'])
plt.title('Model Loss - CNN Two')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper left')
plt.show()

## CNN_MODEL_THREE

In [None]:
def cnn_model_three():
    
    #creating a sequential instance to add layers to model
    cnn_model_three = Sequential()
    
    #first convolutional layer
    cnn_model_three.add(Conv2D(32, (3, 3), padding='same', activation='relu', input_shape=(28,28,1)))
    #first pooling layer
    cnn_model_three.add(MaxPooling2D((2,2), strides = 2))
    #add dropout
    model.add(Dropout(0.3)) 
    
    #second convolutional layer
    cnn_model_three.add(Conv2D(64, (3,3), padding='same', activation = 'relu'))
    #second pooling layer
    cnn_model_three.add(MaxPooling2D((2,2), strides = 2))
    #add dropout
    model.add(Dropout(0.3)) 
    
    #third convolutional layer
    cnn_model_three.add(Conv2D(128, (3,3), padding='same', activation = 'relu'))
    #third pooling layer
    cnn_model_three.add(MaxPooling2D((2,2), strides = 2))
    #add dropout
    model.add(Dropout(0.3)) 
    
    #Flatten and connect layers
    cnn_model_three.add(Flatten())
    cnn_model_three.add(Dense(128, activation = 'relu'))
    cnn_model_three.add(Dense(128, activation = 'relu'))
    cnn_model_three.add(Dense(128, activation = 'relu'))
    cnn_model_three.add(Dense(128, activation = 'relu'))
    cnn_model_three.add(Dense(128, activation = 'relu'))
    cnn_model_three.add(Dense(128, activation = 'relu'))
    model.add(Dropout(0.3)) 
    
    #Fully connected layer of 10 to refelct 10 labels and softmax activation
    cnn_model_three.add(Dense(10, activation = 'softmax'))
    
    return cnn_model_three

In [None]:
cnn_model_three = cnn_model_three()

# set up model optimizer, loss function, and accuracy
#adam - adaptive moment estimation
cnn_model_three.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['sparse_categorical_accuracy'])

cnn_model_three.summary()

In [None]:
#training the model
t0_three = time.time()
train_model_three = cnn_model_three.fit(train_X, train_y, epochs=10, validation_split=0.33)
print("Training time:", time.time()-t0_three)

In [None]:
#test the model
loss, acc = cnn_model_three.evaluate(test_X, test_y, verbose = 1)
print('\nTest accuracy: ', acc)
print('\nTest loss: ', loss)

In [None]:
#plotting the accuracy
plt.plot(train_model_three.history['sparse_categorical_accuracy'])
plt.plot(train_model_three.history['val_sparse_categorical_accuracy'])
plt.title('Model Accuracy - CNN Three')
plt.ylabel('Accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.show()

In [None]:
#plotting the loss
plt.plot(train_model_three.history['loss'])
plt.plot(train_model_three.history['val_loss'])
plt.title('Model Loss - CNN Three')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper left')
plt.show()

## CNN_Model_Four

In [None]:
def cnn_model_four():
    
    #creating a sequential instance to add layers to model
    cnn_model_four = Sequential()
    
    #first convolutional layer
    cnn_model_four.add(Conv2D(32, (3, 3), padding='same', activation='relu', input_shape=(28,28,1)))
    #add batch normalization
    cnn_model_four.add(BatchNormalization())
    #first pooling layer
    cnn_model_four.add(MaxPooling2D((2,2), strides = 2))
    
    
    #second convolutional layer
    cnn_model_four.add(Conv2D(64, (3,3), padding='same', activation = 'relu'))
    #add batch normalization
    cnn_model_four.add(BatchNormalization())
    #second pooling layer
    cnn_model_four.add(MaxPooling2D((2,2), strides = 2))

    #third convolutional layer
    cnn_model_four.add(Conv2D(128, (3,3), padding='same', activation = 'relu'))
    #add batch normalization
    cnn_model_four.add(BatchNormalization())
    #third pooling layer
    cnn_model_four.add(MaxPooling2D((2,2), strides = 2))

    #Flatten and connect layers
    cnn_model_four.add(Flatten())
    cnn_model_four.add(Dense(128, activation = 'relu'))
    cnn_model_four.add(Dense(128, activation = 'relu'))
    cnn_model_four.add(Dense(128, activation = 'relu'))
    cnn_model_four.add(Dense(128, activation = 'relu'))
    cnn_model_four.add(Dense(128, activation = 'relu'))
    cnn_model_four.add(Dense(128, activation = 'relu'))
    cnn_model_four.add(Dropout(0.5))
    
    #Fully connected layer of 10 to refelct 10 labels and softmax activation
    cnn_model_four.add(Dense(10, activation = 'softmax'))
    
    return cnn_model_four

In [None]:
cnn_model_four = cnn_model_four()

# set up model optimizer, loss function, and accuracy
#adam - adaptive moment estimation
cnn_model_four.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['sparse_categorical_accuracy'])

cnn_model_four.summary()

In [None]:
#training the model
t0_four = time.time()
train_model_four = cnn_model_four.fit(train_X, train_y, epochs=10, validation_split=0.33)
print("Training time:", time.time()-t0_four)

In [None]:
#test the model
loss, acc = cnn_model_four.evaluate(test_X, test_y, verbose = 1)
print('\nTest accuracy: ', acc)
print('\nTest loss: ', loss)

In [None]:
#plotting the accuracy
plt.plot(train_model_four.history['sparse_categorical_accuracy'])
plt.plot(train_model_four.history['val_sparse_categorical_accuracy'])
plt.title('Model Accuracy - CNN Four')
plt.ylabel('Accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.show()

In [None]:
#plotting the loss
plt.plot(train_model_four.history['loss'])
plt.plot(train_model_four.history['val_loss'])
plt.title('Model Loss - CNN Four')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.show()