In [None]:
# Import all necessary libraries and setup the environment for matplotlib
%matplotlib inline
import findspark
import time
import pandas as pd
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.ml.feature import PCA
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import functions
import numpy as np
import matplotlib.pyplot as plt
from numpy import *
from sklearn.metrics import accuracy_score
from pyspark.sql.types import DoubleType,IntegerType

In [None]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
def MLPClassifier(training_data, test_data,layers, Iter, size, features):
    start = time.time()
    
    labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(training_data)
    featureIndexer =VectorIndexer(inputCol="features", outputCol="indexedFeatures")   
    labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",labels=labelIndexer.labels)
    
    # Multi-layer perceptron classifier 
    NN=MultilayerPerceptronClassifier(labelCol="indexedLabel", featuresCol=features,\
    maxIter=Iter, layers=layers, blockSize=size, seed=1234)
    # Training model
    pipeline = Pipeline(stages=[labelIndexer,NN,labelConverter])
    model = pipeline.fit(training_data)
    # Prediction
    predictions = model.transform(test_data)
    #predictions.show()
    # Accuracy
    evaluator = MulticlassClassificationEvaluator(
            labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)
    print(accuracy)

    end = time.time()
    runtime=round(end - start, 2)
    print('Time: {}s'.format(runtime))
    
    return accuracy,runtime

In [None]:
def PCA_reduction(training_data,test_data,num):
    start = time.time() # time counting
    pca = PCA(k=num, inputCol='features', outputCol='pca') #PCA implementing
    model = pca.fit(training_data)
    recontructed_training_data = model.transform(training_data).select('label','features','pca')
    recontructed_test_data = model.transform(test_data).select('label','features','pca')
    end = time.time()
    print('Explained Variance: {}, Time: {}s'.format(round(model.explainedVariance.sum(),3), round(end - start, 2)))
    return recontructed_training_data, recontructed_test_data

In [None]:
spark = SparkSession \
    .builder \
    .appName("Python Spark Machine Learning MLP Classifier") \
    .getOrCreate()
start = time.time()

test_datafile= "Test-label-28x28.csv"
train_datafile = "Train-label-28x28.csv"


test_df = spark.read.csv(test_datafile,header=False,inferSchema="true")
train_df = spark.read.csv(train_datafile,header=False,inferSchema="true")

print('Number of test data: ',test_df.count())
print('Number of training data: ',train_df.count())

# Give header to the column
assembler = VectorAssembler(inputCols=test_df.columns[1:],
    outputCol="features")
test_data = assembler.transform(test_df).select(test_df[0].alias('label'),"features")
training_data = assembler.transform(train_df).select(train_df[0].alias('label'),"features")
data = training_data.union(test_data)


end = time.time()
print('Time: {}s'.format(round(end - start, 2)))


In [None]:
# PCA dimensionality reduction
k=50
training_data_pca,test_data_pca=PCA_reduction(training_data,test_data,50)

In [None]:
# Hidden layer size analysis
timeList=list()
accuracyList=list()


Iter=100
block_size=128
features='pca'

for num in range(50,110,10):
    layers = [k,num,10]
    a,t=MLPClassifier(training_data_pca, test_data_pca,layers,Iter,block_size,features)
    timeList.append(t)
    accuracyList.append(a)
    
# Plot
x=np.arange(50,110,10)
plt.title( 'Hidden layer size VS Accuracy')  
plt.ylabel('Accuracy',fontsize=15)
plt.xlabel('Hidden layer size',fontsize=15)
plt.plot(x,np.array(accuracyList))
plt.show()

x=np.arange(50,110,10)
plt.title( 'Hidden layer size VS Running Time')  
plt.ylabel('Running time',fontsize=15)
plt.xlabel('Hidden layer size',fontsize=15)
plt.plot(x,np.array(timeList))
plt.show()

In [None]:
# Block size analysis
timeList=list()
accuracyList=list()

layers = [k,50,10]
Iter=100
block_size=128
features='pca'

for num in range(10,140,10):
    
    a,t=MLPClassifier(training_data_pca, test_data_pca,layers,Iter,num,features)
    timeList.append(t)
    accuracyList.append(a)
    
# Plot
x=np.arange(10,140,10)
plt.title( 'BlockSize VS Accuracy')  
plt.ylabel('Accuracy',fontsize=15)
plt.xlabel('BlockSize',fontsize=15)
plt.plot(x,np.array(accuracyList))
plt.show()

x=np.arange(10,140,10)
plt.title( 'BlockSize VS Running Time')  
plt.ylabel('Running time',fontsize=15)
plt.xlabel('BlockSize',fontsize=15)
plt.plot(x,np.array(timeList))
plt.show()

In [None]:
# Iteration number analysis
timeList=list()
accuracyList=list()

layers = [k,50,10]
block_size=128
features='pca'

for num in range(10,101,10):
    
    a,t=MLPClassifier(training_data_pca, test_data_pca,layers,num,block_size,features)
    timeList.append(t)
    accuracyList.append(a)
    
# Plot
x=np.arange(10,101,10)
plt.title( 'Number of Iteration VS Accuracy')  
plt.ylabel('Accuracy',fontsize=15)
plt.xlabel('Number of Iteration',fontsize=15)
plt.plot(x,np.array(accuracyList))
plt.show()

x=np.arange(10,101,10)
plt.title( 'Number of Iteration VS Running Time')  
plt.ylabel('Running time',fontsize=15)
plt.xlabel('Number of Iteration',fontsize=15)
plt.plot(x,np.array(timeList))
plt.show()

In [None]:
spark.stop()