## Machine Learning menggunakan Spark ML


Muhammad Zha'farudin Pudya Wardana

19/448717/PPA/05800

In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType

In [2]:
# Create the session and Context
conf = (
    SparkConf()
        .setMaster("local[*]")
        .setAppName("FashionMNIST SparkML")
        .set("spark.ui.port", "4050")
        .set('spark.executor.memory', '4G')
        .set('spark.driver.memory', '45G')
        .set('spark.driver.maxResultSize', '10G')
)
sc = SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

In [3]:
sc

In [4]:
spark

## Dataset

From https://www.kaggle.com/zalando-research/fashionmnist

Labels

- 0 T-shirt/top
- 1 Trouser
- 2 Pullover
- 3 Dress
- 4 Coat
- 5 Sandal
- 6 Shirt
- 7 Sneaker
-  8 Bag
- 9 Ankle boot


TL;DR

- Each row is a separate image
- Column 1 is the class label.
- Remaining columns are pixel numbers (784 total).
- Each value is the darkness of the pixel (1 to 255)

Size of dataset
- fashion-mnist_train ~ 126 MB
- fashion-mnist_test ~ 21.15 MB 

In [5]:
df = spark.read.csv("data/fashion-mnist_train.csv", header=True, inferSchema=True)
df_test = spark.read.csv("data/fashion-mnist_test.csv", header=True, inferSchema=True)

In [6]:
df.groupBy("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|    1| 6000|
|    6| 6000|
|    3| 6000|
|    5| 6000|
|    9| 6000|
|    4| 6000|
|    8| 6000|
|    7| 6000|
|    2| 6000|
|    0| 6000|
+-----+-----+



In [7]:
df_test.groupBy("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|    1| 1000|
|    6| 1000|
|    3| 1000|
|    5| 1000|
|    9| 1000|
|    4| 1000|
|    8| 1000|
|    7| 1000|
|    2| 1000|
|    0| 1000|
+-----+-----+



In [8]:
df.head()

Row(label=2, pixel1=0, pixel2=0, pixel3=0, pixel4=0, pixel5=0, pixel6=0, pixel7=0, pixel8=0, pixel9=0, pixel10=0, pixel11=0, pixel12=0, pixel13=0, pixel14=0, pixel15=0, pixel16=0, pixel17=0, pixel18=0, pixel19=0, pixel20=0, pixel21=0, pixel22=0, pixel23=0, pixel24=0, pixel25=0, pixel26=0, pixel27=0, pixel28=0, pixel29=0, pixel30=0, pixel31=0, pixel32=0, pixel33=0, pixel34=0, pixel35=0, pixel36=0, pixel37=0, pixel38=0, pixel39=0, pixel40=0, pixel41=0, pixel42=0, pixel43=0, pixel44=0, pixel45=0, pixel46=0, pixel47=0, pixel48=0, pixel49=0, pixel50=0, pixel51=0, pixel52=0, pixel53=0, pixel54=0, pixel55=0, pixel56=0, pixel57=0, pixel58=0, pixel59=0, pixel60=0, pixel61=0, pixel62=0, pixel63=0, pixel64=0, pixel65=0, pixel66=0, pixel67=0, pixel68=0, pixel69=0, pixel70=0, pixel71=0, pixel72=0, pixel73=0, pixel74=0, pixel75=0, pixel76=0, pixel77=0, pixel78=0, pixel79=0, pixel80=0, pixel81=0, pixel82=0, pixel83=0, pixel84=0, pixel85=0, pixel86=0, pixel87=0, pixel88=0, pixel89=4, pixel90=0, pixel9

In [9]:
from pyspark.ml.feature import VectorAssembler

In [10]:
features = df.columns[1:]
print(features)
assembler = VectorAssembler(inputCols=features, outputCol="features")
feat_df = assembler.transform(df)
feat_test_df = assembler.transform(df_test)

['pixel1', 'pixel2', 'pixel3', 'pixel4', 'pixel5', 'pixel6', 'pixel7', 'pixel8', 'pixel9', 'pixel10', 'pixel11', 'pixel12', 'pixel13', 'pixel14', 'pixel15', 'pixel16', 'pixel17', 'pixel18', 'pixel19', 'pixel20', 'pixel21', 'pixel22', 'pixel23', 'pixel24', 'pixel25', 'pixel26', 'pixel27', 'pixel28', 'pixel29', 'pixel30', 'pixel31', 'pixel32', 'pixel33', 'pixel34', 'pixel35', 'pixel36', 'pixel37', 'pixel38', 'pixel39', 'pixel40', 'pixel41', 'pixel42', 'pixel43', 'pixel44', 'pixel45', 'pixel46', 'pixel47', 'pixel48', 'pixel49', 'pixel50', 'pixel51', 'pixel52', 'pixel53', 'pixel54', 'pixel55', 'pixel56', 'pixel57', 'pixel58', 'pixel59', 'pixel60', 'pixel61', 'pixel62', 'pixel63', 'pixel64', 'pixel65', 'pixel66', 'pixel67', 'pixel68', 'pixel69', 'pixel70', 'pixel71', 'pixel72', 'pixel73', 'pixel74', 'pixel75', 'pixel76', 'pixel77', 'pixel78', 'pixel79', 'pixel80', 'pixel81', 'pixel82', 'pixel83', 'pixel84', 'pixel85', 'pixel86', 'pixel87', 'pixel88', 'pixel89', 'pixel90', 'pixel91', 'pixel9

In [11]:
feat_df.head()

Row(label=2, pixel1=0, pixel2=0, pixel3=0, pixel4=0, pixel5=0, pixel6=0, pixel7=0, pixel8=0, pixel9=0, pixel10=0, pixel11=0, pixel12=0, pixel13=0, pixel14=0, pixel15=0, pixel16=0, pixel17=0, pixel18=0, pixel19=0, pixel20=0, pixel21=0, pixel22=0, pixel23=0, pixel24=0, pixel25=0, pixel26=0, pixel27=0, pixel28=0, pixel29=0, pixel30=0, pixel31=0, pixel32=0, pixel33=0, pixel34=0, pixel35=0, pixel36=0, pixel37=0, pixel38=0, pixel39=0, pixel40=0, pixel41=0, pixel42=0, pixel43=0, pixel44=0, pixel45=0, pixel46=0, pixel47=0, pixel48=0, pixel49=0, pixel50=0, pixel51=0, pixel52=0, pixel53=0, pixel54=0, pixel55=0, pixel56=0, pixel57=0, pixel58=0, pixel59=0, pixel60=0, pixel61=0, pixel62=0, pixel63=0, pixel64=0, pixel65=0, pixel66=0, pixel67=0, pixel68=0, pixel69=0, pixel70=0, pixel71=0, pixel72=0, pixel73=0, pixel74=0, pixel75=0, pixel76=0, pixel77=0, pixel78=0, pixel79=0, pixel80=0, pixel81=0, pixel82=0, pixel83=0, pixel84=0, pixel85=0, pixel86=0, pixel87=0, pixel88=0, pixel89=4, pixel90=0, pixel9

In [12]:
feat_df.select("label", "features").show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|    2|(784,[88,94,95,96...|
|    9|(784,[152,153,158...|
|    6|(784,[7,11,12,13,...|
|    0|(784,[3,4,10,11,1...|
|    3|(784,[12,14,15,36...|
|    4|[0.0,0.0,0.0,5.0,...|
|    4|(784,[12,13,14,15...|
|    5|(784,[289,290,291...|
|    4|[0.0,0.0,0.0,0.0,...|
|    8|(784,[11,14,15,17...|
|    0|(784,[4,9,10,11,1...|
|    8|[0.0,0.0,0.0,0.0,...|
|    9|(784,[65,68,70,71...|
|    0|[0.0,0.0,0.0,0.0,...|
|    2|(784,[4,5,10,11,1...|
|    2|[0.0,0.0,0.0,0.0,...|
|    9|(784,[151,153,154...|
|    3|(784,[10,11,12,15...|
|    3|(784,[9,10,17,18,...|
|    3|(784,[11,12,13,14...|
+-----+--------------------+
only showing top 20 rows



## Cast Label to DoubleType

In [13]:
feat_df = feat_df.withColumn("label", feat_df["label"].cast(DoubleType()))
feat_test_df = feat_test_df.withColumn("label", feat_test_df["label"].cast(DoubleType()))

In [14]:
feat_df.select("label").show()

+-----+
|label|
+-----+
|  2.0|
|  9.0|
|  6.0|
|  0.0|
|  3.0|
|  4.0|
|  4.0|
|  5.0|
|  4.0|
|  8.0|
|  0.0|
|  8.0|
|  9.0|
|  0.0|
|  2.0|
|  2.0|
|  9.0|
|  3.0|
|  3.0|
|  3.0|
+-----+
only showing top 20 rows



## Modelling

In [15]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [16]:
forest = RandomForestClassifier(labelCol="label", featuresCol="features")

In [17]:
forest_model = forest.fit(feat_df)

## Evaluation

In [18]:
predictions_train = forest_model.transform(feat_df)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label")
result = evaluator.evaluate(predictions_train)
print("Train Accuracy: ", result)

Train Accuracy:  0.7255097338571947


In [19]:
#Confusion Matrix
predictions_train.where("label = prediction").groupBy("label", "prediction").count().show(100)

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  7.0|       7.0| 5340|
|  1.0|       1.0| 5199|
|  2.0|       2.0| 4421|
|  6.0|       6.0|   91|
|  4.0|       4.0| 3921|
|  9.0|       9.0| 5495|
|  8.0|       8.0| 5642|
|  5.0|       5.0| 5023|
|  0.0|       0.0| 4777|
|  3.0|       3.0| 5427|
+-----+----------+-----+



In [20]:
prediction_test = forest_model.transform(feat_test_df)
test_result = evaluator.evaluate(prediction_test)
print("Test Accuracy: ", test_result)

Test Accuracy:  0.7208115071290307


In [21]:
prediction_test.where("label = prediction").groupBy("label", "prediction").count().show(100)

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  7.0|       7.0|  872|
|  1.0|       1.0|  871|
|  2.0|       2.0|  736|
|  6.0|       6.0|   13|
|  4.0|       4.0|  684|
|  9.0|       9.0|  901|
|  8.0|       8.0|  938|
|  5.0|       5.0|  823|
|  0.0|       0.0|  770|
|  3.0|       3.0|  910|
+-----+----------+-----+



In [22]:
from pyspark.mllib.evaluation import MulticlassMetrics

In [23]:
prediction_labels = predictions_train.select("prediction", "label").rdd
train_metrics = MulticlassMetrics(prediction_labels)

In [24]:
labels = range(10)
list(labels)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [25]:
def print_metrics(metrics, labels):
    for label in sorted(labels):
        print("Class %s precision = %s" % (label, metrics.precision(label)))
        print("Class %s recall = %s" % (label, metrics.recall(label)))
        print()
        # print("Class %s F1 Measure = %s" % (label, metrics.fMeasure(label)))

    # Weighted stats
    print("Weighted recall = %s" % metrics.weightedRecall)
    print("Weighted precision = %s" % metrics.weightedPrecision)
    print("Weighted F(1) Score = %s" % metrics.weightedFMeasure())
    print("Weighted F(0.5) Score = %s" % metrics.weightedFMeasure(beta=0.5))
    print("Weighted false positive rate = %s" % metrics.weightedFalsePositiveRate)

In [26]:
print_metrics(train_metrics, labels)

Class 0 precision = 0.7429237947122862
Class 0 recall = 0.7961666666666667

Class 1 precision = 0.9837275307473983
Class 1 recall = 0.8665

Class 2 precision = 0.5856404821830706
Class 2 recall = 0.7368333333333333

Class 3 precision = 0.6300940438871473
Class 3 recall = 0.9045

Class 4 precision = 0.502821236214414
Class 4 recall = 0.6535

Class 5 precision = 0.9475570647047726
Class 5 recall = 0.8371666666666666

Class 6 precision = 0.5515151515151515
Class 6 recall = 0.015166666666666667

Class 7 precision = 0.8276503409795413
Class 7 recall = 0.89

Class 8 precision = 0.9091202062520142
Class 8 recall = 0.9403333333333334

Class 9 precision = 0.8861473955813578
Class 9 recall = 0.9158333333333334

Weighted recall = 0.7556
Weighted precision = 0.7567197246777154
Weighted F(1) Score = 0.7255097338571947
Weighted F(0.5) Score = 0.715749883100119
Weighted false positive rate = 0.027155555555555558


In [27]:
test_prediction_labels = prediction_test.select("prediction", "label").rdd
test_metrics = MulticlassMetrics(test_prediction_labels)

In [28]:
print_metrics(test_metrics, labels)

Class 0 precision = 0.7361376673040153
Class 0 recall = 0.77

Class 1 precision = 0.9720982142857143
Class 1 recall = 0.871

Class 2 precision = 0.6097763048881525
Class 2 recall = 0.736

Class 3 precision = 0.6194690265486725
Class 3 recall = 0.91

Class 4 precision = 0.5173978819969742
Class 4 recall = 0.684

Class 5 precision = 0.9405714285714286
Class 5 recall = 0.823

Class 6 precision = 0.5909090909090909
Class 6 recall = 0.013

Class 7 precision = 0.8074074074074075
Class 7 recall = 0.872

Class 8 precision = 0.9010566762728146
Class 8 recall = 0.938

Class 9 precision = 0.8646833013435701
Class 9 recall = 0.901

Weighted recall = 0.7518
Weighted precision = 0.755950699952784
Weighted F(1) Score = 0.7208115071290307
Weighted F(0.5) Score = 0.7103420077976154
Weighted false positive rate = 0.027577777777777777
