<a href="https://colab.research.google.com/github/snanditachn/Nandita.S_cts/blob/main/Nandita_S_2211551_Assignment_3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# K-Means using Spark

## *Introduction*
Clustering is an unsupervised machine learning algorithm and it recognizes patterns without specific labels and clusters the data according to the features. In our case, we will see if a clustering algorithm (k-means) can find a pattern between different images of the apparel in f-MNIST without the labels (y).



A gif illustrating how K-means works. Each red dot is a centroid and each different color represents a different cluster. Every frame is an iteration where the centroid is relocated.

K-means clustering works by assigning a number of centroids based on the number of clusters given. Each data point is assigned to the cluster whose centroid is nearest to it. The algorithm aims to minimize the squared Euclidean distances between the observation and the centroid of cluster to which it belongs.

Principal Component Analysis or PCA is a method of reducing the dimensions of the given dataset while still retaining most of its variance. Wikipedia defines it as, “PCA is defined as an orthogonal linear transformation that transforms the data to a new coordinate system such that the greatest variance by some scalar projection of the data comes to lie on the first coordinate (called the first principal component), the second greatest variance on the second coordinate, and so on.



PCA visualisation. The best PC (black moving line) is when the total length of those red lines are minimum. It will be used instead of the horizontal and vertical components.

Basically PCA reduces the dimensions of the dataset while conserving most of the information. For e.g. if a data-set has 500 features, it gets reduced to 200 features depending on the specified amount of variance retained. Higher the variance retained,more information is conserved, but more the resulting dimensions will be.

Less dimensions means less time to train and test the model. In some cases models which use data-set with PCA perform better than the original dataset. 

## Setup

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 34 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 48.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=7c2834348ca1bd3ac4fc0e956a28066c34e2272b5390413549a02ffef62bed16
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [None]:
!pip install -U -q PyDrive

In [None]:
!apt install openjdk-8-jdk-headless -qq

The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'apt autoremove' to remove it.
The following additional packages will be installed:
  openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra
  fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-microhei
  fonts-wqy-zenhei fonts-indic
The following NEW packages will be installed:
  openjdk-8-jdk-headless openjdk-8-jre-headless
0 upgraded, 2 newly installed, 0 to remove and 22 not upgraded.
Need to get 36.6 MB of archives.
After this operation, 143 MB of additional disk space will be used.
Selecting previously unselected package openjdk-8-jre-headless:amd64.
(Reading database ... 123941 files and directories currently installed.)
Preparing to unpack .../openjdk-8-jre-headless_8u342-b07-0ubuntu1~18.04_amd64.deb ...
Unpacking openjdk-8-jre-headless:amd64 (8u342-b07-0ubuntu1~18.04) ...
Selecting previously unselected package openjdk-8-

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

lets import some libraries

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

Initialising the Spark context

In [None]:
#create the session
conf = SparkConf().set("spark.ui.port", "4050")

#create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

we can easily check the current version and get the link of the web interface. In the Spark UI, I can monitor the progress of my job and debug the performance bottlenecks.

In [None]:
spark

In [None]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels | python3 -c \
    "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"

--2022-10-20 04:51:31--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 54.161.241.46, 18.205.222.128, 52.202.168.65, ...
Connecting to bin.equinox.io (bin.equinox.io)|54.161.241.46|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13832437 (13M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip’


2022-10-20 04:51:31 (36.6 MB/s) - ‘ngrok-stable-linux-amd64.zip’ saved [13832437/13832437]

Archive:  ngrok-stable-linux-amd64.zip
  inflating: ngrok                   
Traceback (most recent call last):
  File "<string>", line 1, in <module>
IndexError: list index out of range


In this Notebook, rather than downloading a file from some where,we are using a famous machine learning dataset, the Breast Cancer Wisconsin dataset, using the scikit-learn datasets loader.

In [None]:
from sklearn.datasets import load_breast_cancer
breast_cancer = load_breast_cancer()

For convenience, given that the dataset is small, we first construct a Pandas dataframe, tune the schema, and then convert it into a Spark dataframe

In [None]:
pd_df = pd.DataFrame(breast_cancer.data, columns=breast_cancer.feature_names)
df = spark.createDataFrame(pd_df)



In [None]:
def set_df_columns_nullable(spark, df, column_list, nullable=False):
    for struct_field in df.schema:
        if struct_field.name in column_list:
            struct_field.nullable = nullable
    df_mod = spark.createDataFrame(df.rdd, df.schema)
    return df_mod

df = set_df_columns_nullable(spark, df, df.columns)
df = df.withColumn('features', array(df.columns))
vectors = df.rdd.map(lambda row: Vectors.dense(row.features))

df.printSchema()

root
 |-- mean radius: double (nullable = false)
 |-- mean texture: double (nullable = false)
 |-- mean perimeter: double (nullable = false)
 |-- mean area: double (nullable = false)
 |-- mean smoothness: double (nullable = false)
 |-- mean compactness: double (nullable = false)
 |-- mean concavity: double (nullable = false)
 |-- mean concave points: double (nullable = false)
 |-- mean symmetry: double (nullable = false)
 |-- mean fractal dimension: double (nullable = false)
 |-- radius error: double (nullable = false)
 |-- texture error: double (nullable = false)
 |-- perimeter error: double (nullable = false)
 |-- area error: double (nullable = false)
 |-- smoothness error: double (nullable = false)
 |-- compactness error: double (nullable = false)
 |-- concavity error: double (nullable = false)
 |-- concave points error: double (nullable = false)
 |-- symmetry error: double (nullable = false)
 |-- fractal dimension error: double (nullable = false)
 |-- worst radius: double (nullable

In the next cell, we are building the two datastructures:

***features***, a dataframe of Dense vectors, containing all the original features in the dataset;

***labels***, a series of binary labels indicating if the corresponding set of features belongs to a subject with breast cancer, or not.

In [None]:
from pyspark.ml.linalg import Vectors
features = spark.createDataFrame(vectors.map(Row),["features"])
labels = pd.Series(breast_cancer.target)

## Building machine learning model

Now the data is ready to be clustered with the K-means algorithm included in MLlib (Spark's Machine Learning library).

Setting the k parameter to 2, fit the model, and the compute the Silhouette score (i.e., a measure of quality of the obtained clustering).

we are using the MLlib implementation of the Silhouette score (via ClusteringEvaluator).

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model= kmeans.fit(features)

#Make predictions
predictions = model.transform(features)
#Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.8342904262826145


We will take the predictions produced by K-means, and compare them with the labels variable (i.e., the ground truth from our dataset).

Then, we will compute how many data points in the dataset have been clustered correctly (i.e., positive cases in one cluster, negative cases in the other).


In [None]:
predictions_series=predictions.select('prediction').toPandas()
true_prediction_count=np.count_nonzero(predictions_series['prediction']==labels)
if true_prediction_count<len(labels)-true_prediction_count:
    true_prediction_count=len(labels)-true_prediction_count
print('precision:',true_prediction_count/len(labels))

precision: 0.8541300527240774


we are performing dimensionality reduction on the features using the PCA statistical procedure, available in MLlib.

Setting the k parameter to 2, effectively reducing the dataset size of a 15X factor.

In [None]:
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors

pca = PCA(k=2, inputCol="features", outputCol="pca")
model = pca.fit(features)

result = model.transform(features).select("pca")
result.show(truncate=False)

+-----------------------------------------+
|pca                                      |
+-----------------------------------------+
|[-2260.0138862925405,-187.96030122263687]|
|[-2368.9937557820535,121.58742425815493] |
|[-2095.66520154786,145.11398565870115]   |
|[-692.6905100570506,38.57692259208172]   |
|[-2030.2124927427058,295.29798399279287] |
|[-888.2800535760758,26.079796157025683]  |
|[-1921.0822124748443,58.80757247309935]  |
|[-1074.7813350047963,31.771227808469586] |
|[-908.5784781618829,63.83075279044626]   |
|[-861.578449407568,40.57073549705317]    |
|[-1404.5591306499468,88.23218257736238]  |
|[-1524.2324408687816,-3.2630573167779446]|
|[-1734.3856477464153,273.1626781511456]  |
|[-1162.9140032230355,217.63481808344625] |
|[-903.4301030498832,135.61517666084788]  |
|[-1155.8759954206846,76.80889383742178]  |
|[-1335.7294321308066,-2.4684005450354807]|
|[-1547.2640922523085,3.8056759725745044] |
|[-2714.964765181215,-164.37610864258824] |
|[-908.2502671870876,118.2164200

we are running K-means with the same parameters as above, but on the pcaFeatures produced by the PCA reduction that we executed before.

we are computing the Silhouette score, as well as the number of data points that have been clustered correctly.

In [None]:
kmeans = KMeans(featuresCol='pca').setK(2).setSeed(1)
model = kmeans.fit(result)

pca_predictions = model.transform(result)
pca_evaluator = ClusteringEvaluator(featuresCol='pca')

pca_silhouette = pca_evaluator.evaluate(pca_predictions)

print(f'Silhouette after PCA {pca_silhouette}')

Silhouette after PCA 0.8348610363444836


In [None]:
pca_predictions_df = pca_predictions.toPandas()
pca_converted_pre = pca_predictions_df['prediction'].apply(lambda x: 0 if x else 1)
np.count_nonzero(pca_converted_pre == labels.values)

486

to stop the spark environment

In [None]:
sc.stop()