# MNIST K-means Clustering using PySpark on SageMaker 

This notebook is based on the example notebook provided by Amazon SageMaker 

https://github.com/awslabs/amazon-sagemaker-examples/blob/master/sagemaker-spark/pyspark_mnist/pyspark_mnist_kmeans.ipynb


## Part 1 Training Models

### Step 1
import modules and create ```SparkSession``` with required dependencies 

In [1]:
import os
import boto3

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

import sagemaker
from sagemaker import get_execution_role
import sagemaker_pyspark

In [2]:
# get my execution role as defined based on my IAM policy
role = get_execution_role()

# Configure Spark to use the SageMaker Spark dependency jars
jars = sagemaker_pyspark.classpath_jars()

classpath = ":".join(sagemaker_pyspark.classpath_jars())

spark = SparkSession.builder.config("spark.driver.extraClassPath", classpath).master("local[*]").getOrCreate()

# start SparkSession
spark

### Step 2

Setup instance regions and initialize endpoints 

In [3]:
# set region
region = boto3.Session().region_name

# set endpoint
spark._jsc.hadoopConfiguration().set('fs.s3a.endpoint', 's3.{}.amazonaws.com'.format(region))

### Step 3

Load Training and Test data

In [5]:
trainingData = spark.read.format('libsvm').option('numFeatures', '784').load('s3a://sagemaker-sample-data-{}/spark/mnist/train/'.format(region))

testData = spark.read.format('libsvm').option('numFeatures', '784').load('s3a://sagemaker-sample-data-{}/spark/mnist/test/'.format(region))

trainingData.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  5.0|(784,[152,153,154...|
|  0.0|(784,[127,128,129...|
|  4.0|(784,[160,161,162...|
|  1.0|(784,[158,159,160...|
|  9.0|(784,[208,209,210...|
|  2.0|(784,[155,156,157...|
|  1.0|(784,[124,125,126...|
|  3.0|(784,[151,152,153...|
|  1.0|(784,[152,153,154...|
|  4.0|(784,[134,135,161...|
|  3.0|(784,[123,124,125...|
|  5.0|(784,[216,217,218...|
|  3.0|(784,[143,144,145...|
|  6.0|(784,[72,73,74,99...|
|  1.0|(784,[151,152,153...|
|  7.0|(784,[211,212,213...|
|  2.0|(784,[151,152,153...|
|  8.0|(784,[159,160,161...|
|  6.0|(784,[100,101,102...|
|  9.0|(784,[209,210,211...|
+-----+--------------------+
only showing top 20 rows



### Step 4

Create K-Means Estimator, configured based on IAMRole and specify types of instances to be used for training and model inference

In [6]:
from sagemaker_pyspark import IAMRole
from sagemaker_pyspark.algorithms import KMeansSageMakerEstimator
from sagemaker_pyspark import RandomNamePolicyFactory

In [7]:
kmeans_estimator = KMeansSageMakerEstimator(
    sagemakerRole = IAMRole(role),
    trainingInstanceType = 'ml.m4.xlarge', # Instance type to train K-means on SageMaker
    trainingInstanceCount = 1,
    endpointInstanceType = 'ml.t2.large', # Instance type to serve model (endpoint) for inference
    endpointInitialInstanceCount = 1,
    namePolicyFactory = RandomNamePolicyFactory("sparksm-1a-")) # All the resources created are prefixed with sparksm-1


### Step 5

Set parameters for k-means and train models

In [None]:
# Set parameters for K-Means
kmeans_estimator.setFeatureDim(784)
kmeans_estimator.setK(10)

# fit model
initialModel = kmeans_estimator.fit(trainingData)

# get initial model endpoint name 
initialModelEndpointName = initialModel.endpointName
print(initialModelEndpointName)

### Step 6

Run inferences on the test data using the fitted model 

In [None]:
transformedData = initialModel.transform(testData)

# show results
transformedData.show()