## Classification Of Hand Written Digits

In [1]:
import pyspark
import numpy as np
from splearn.rdd import ArrayRDD
from sklearn.datasets import fetch_mldata
from sklearn.model_selection import train_test_split

from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import *
import numpy as np

import matplotlib
import matplotlib.pyplot as plt
matplotlib.rcParams['figure.figsize']=[10, 10]
%matplotlib inline


#initialize spark session
spark = SparkSession\
        .builder\
        .appName("Test")\
        .config('spark.sql.warehouse.dir', 'file:///C:/')\
       .getOrCreate()
    
sc = spark.sparkContext

#sc = pyspark.SparkContext(appName="MNIST Classification")

### Grab the MNIST Data

In [2]:
def load_mnist(data_dir):
    """
    Load the MNIST dataset
    
    Parameters:
    ----------
    * `data_dir` [str]
        Location for the data.
        - If it does not exit, the data will be downloaded there.
        
    Returns:
    -------
    * `X` [nd-array shape=(70000, 784)]
        Handwritten digits data.
    * `y` [nd-array shape(70000,)]
        Labels.
    """
    mnist = fetch_mldata('MNIST original', data_home=data_dir)
    X = mnist['data']
    y = mnist['target']
    return X, y


X, y = load_mnist('../data')

### Creating Training and Test Splits

In [3]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42)

### Setting up RDDs and ArrayRDDs

In [4]:
def create_arrayrdd(array, num_partitions=4):
    """
    Create an ArrayRDD for Pyspark
    
    Parameters:
    ----------
    * `array` [numpy nd-array]
        Array to be converted to ArrayRDD.
        
    * `num_partitions` [int default=4]
        Number of distributed blocks to create.
    
    Returns:
    -------
    An ArrayRDD
    """
    rdd = sc.parallelize(array, num_partitions)
    return ArrayRDD(rdd)


X_trainArry = create_arrayrdd(X_train, 4)
y_trainArry = create_arrayrdd(y_train, 4)

X_testArry = create_arrayrdd(X_test, 4)
y_testArry = create_arrayrdd(y_test, 4)

### A Few Sanity Checks:

In [5]:
print('Number of distributed partitions in array: {}'.format(X_trainArry.getNumPartitions()))
print('Training set has shape {}'.format(X_trainArry.shape))

# Sum of the training set partitions == number of training set examples?
sum = 0
for block in X_trainArry:
    sum += len(block)
    
print('Total number of examples across partitions: {}'.format(sum))

Number of distributed partitions in array: 4
Training set has shape (46900, 784)
Total number of examples across partitions: 46900


In [7]:
from splearn.rdd import DictRDD


def create_dictRdd(X, y, num_partitions=4):
    """
    """
    X_rdd = sc.parallelize(X, num_partitions)
    y_rdd = sc.parallelize(y, num_partitions)

    Z = DictRDD((X_rdd, y_rdd), columns=('X', 'y'),
                dtype=[np.ndarray, np.ndarray])
    return Z


z = create_dictRdd(X_train, y_train, 4)

In [8]:
X_trainRDD = sc.parallelize(X_train, 4)
y_trainRDD = sc.parallelize(y_train, 4)
X_testRDD = sc.parallelize(X_test, 4)
y_testRDD = sc.parallelize(y_test, 4)

In [6]:
from pyspark.ml.linalg import Vectors


def create_df(y, X):
    """
    Create Pyspark dataframe from numpy arrays.
    
    Parameters:
    ----------
    * `y` [numpy nd_array]
        Labels for the dataset.
    
    * `X` [numpy nd_array]
        Features for the dataset.
        
    Returns:
    -------
    Pyspark dataframe.
        - schema=["label", "features"]
    """
    stack = np.column_stack((y, X))
    data = map(lambda x: (int(x[0]), Vectors.dense(x[1:])), stack)
    return spark.createDataFrame(data, schema=["label", "features"])
    

In [7]:
df = create_df(y_train, X_train)

In [8]:
(df_train, df_val) = df.randomSplit([0.1, 0.90])

In [9]:
df_train.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|    0|[0.0,0.0,0.0,0.0,...|
|    0|[0.0,0.0,0.0,0.0,...|
|    0|[0.0,0.0,0.0,0.0,...|
|    0|[0.0,0.0,0.0,0.0,...|
|    0|[0.0,0.0,0.0,0.0,...|
|    0|[0.0,0.0,0.0,0.0,...|
|    0|[0.0,0.0,0.0,0.0,...|
|    0|[0.0,0.0,0.0,0.0,...|
|    0|[0.0,0.0,0.0,0.0,...|
|    0|[0.0,0.0,0.0,0.0,...|
|    0|[0.0,0.0,0.0,0.0,...|
|    0|[0.0,0.0,0.0,0.0,...|
|    0|[0.0,0.0,0.0,0.0,...|
|    0|[0.0,0.0,0.0,0.0,...|
|    0|[0.0,0.0,0.0,0.0,...|
|    0|[0.0,0.0,0.0,0.0,...|
|    0|[0.0,0.0,0.0,0.0,...|
|    0|[0.0,0.0,0.0,0.0,...|
|    0|[0.0,0.0,0.0,0.0,...|
|    0|[0.0,0.0,0.0,0.0,...|
+-----+--------------------+
only showing top 20 rows



In [12]:
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import LinearSVC
from pyspark.ml.classification import OneVsRest

gbt = GBTClassifier(labelCol="label", featuresCol="features", maxIter=2)
lsvc = LinearSVC(maxIter=10, regParam=0.1)

In [None]:
#model = gbt.fit(df_train)
lsvc_model = lsvc.fit(df_train)

In [None]:
ovr = OneVsRest(classifier=lsvc)

# train the multiclass model.
ovrModel = ovr.fit(train)

In [None]:
# Make predictions.
predictions = lsvc_model.transform(df_val)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy"
)

accuracy = evaluator.evaluate(predictions)
print("Test Accuracy = {}".format(accuracy))
print("Test Error = %g" % (1.0 - accuracy))