# CS483 - Colab 3
## K-Means & PCA

### Setup

Let's set up Spark on your Colab environment.  Run the cell below!

This code is written by Parikha Goyanka

Resources used:
*   Google search
*   GFG
*   W3School
*   Stackoverflow










In [28]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

openjdk-8-jdk-headless is already the newest version (8u422-b05-1~22.04).
0 upgraded, 0 newly installed, 0 to remove and 49 not upgraded.


Now we import some of the libraries usually needed by our workload.





In [29]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

Let's initialize the Spark context.

In [30]:
# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

In [31]:
#To check cxurrent version of my spark
spark

### Data Preprocessing

In this Colab, rather than downloading a file from Google Drive, we will load a famous machine learning dataset, the [Breast Cancer Wisconsin dataset](https://scikit-learn.org/stable/modules/generated/sklearn.datasets.load_breast_cancer.html), using the ```scikit-learn``` datasets loader.

In [32]:
from sklearn.datasets import load_breast_cancer
breast_cancer = load_breast_cancer()

In [33]:
pd_df = pd.DataFrame(breast_cancer.data, columns=breast_cancer.feature_names)
pd_df.head()

Unnamed: 0,mean radius,mean texture,mean perimeter,mean area,mean smoothness,mean compactness,mean concavity,mean concave points,mean symmetry,mean fractal dimension,...,worst radius,worst texture,worst perimeter,worst area,worst smoothness,worst compactness,worst concavity,worst concave points,worst symmetry,worst fractal dimension
0,17.99,10.38,122.8,1001.0,0.1184,0.2776,0.3001,0.1471,0.2419,0.07871,...,25.38,17.33,184.6,2019.0,0.1622,0.6656,0.7119,0.2654,0.4601,0.1189
1,20.57,17.77,132.9,1326.0,0.08474,0.07864,0.0869,0.07017,0.1812,0.05667,...,24.99,23.41,158.8,1956.0,0.1238,0.1866,0.2416,0.186,0.275,0.08902
2,19.69,21.25,130.0,1203.0,0.1096,0.1599,0.1974,0.1279,0.2069,0.05999,...,23.57,25.53,152.5,1709.0,0.1444,0.4245,0.4504,0.243,0.3613,0.08758
3,11.42,20.38,77.58,386.1,0.1425,0.2839,0.2414,0.1052,0.2597,0.09744,...,14.91,26.5,98.87,567.7,0.2098,0.8663,0.6869,0.2575,0.6638,0.173
4,20.29,14.34,135.1,1297.0,0.1003,0.1328,0.198,0.1043,0.1809,0.05883,...,22.54,16.67,152.2,1575.0,0.1374,0.205,0.4,0.1625,0.2364,0.07678


For convenience, given that the dataset is small, we first

*   construct a Pandas dataframe
*   tune the schema
*   and convert it into a Spark dataframe.

In [34]:
#pd_df = pd.DataFrame(breast_cancer.data, columns=breast_cancer.feature_names)
df = spark.createDataFrame(pd_df)

def set_df_columns_nullable(spark, df, column_list, nullable=False):
    for struct_field in df.schema:
        if struct_field.name in column_list:
            struct_field.nullable = nullable
    df_mod = spark.createDataFrame(df.rdd, df.schema)
    return df_mod

df = set_df_columns_nullable(spark, df, df.columns)
df = df.withColumn('features', array(df.columns))
vectors = df.rdd.map(lambda row: Vectors.dense(row.features))

df.printSchema()

root
 |-- mean radius: double (nullable = false)
 |-- mean texture: double (nullable = false)
 |-- mean perimeter: double (nullable = false)
 |-- mean area: double (nullable = false)
 |-- mean smoothness: double (nullable = false)
 |-- mean compactness: double (nullable = false)
 |-- mean concavity: double (nullable = false)
 |-- mean concave points: double (nullable = false)
 |-- mean symmetry: double (nullable = false)
 |-- mean fractal dimension: double (nullable = false)
 |-- radius error: double (nullable = false)
 |-- texture error: double (nullable = false)
 |-- perimeter error: double (nullable = false)
 |-- area error: double (nullable = false)
 |-- smoothness error: double (nullable = false)
 |-- compactness error: double (nullable = false)
 |-- concavity error: double (nullable = false)
 |-- concave points error: double (nullable = false)
 |-- symmetry error: double (nullable = false)
 |-- fractal dimension error: double (nullable = false)
 |-- worst radius: double (nullable

With the next cell, we build the two data structures that we will be using throughout this Colab:


*   ```features```, a dataframe of Dense vectors, containing all the original features in the dataset;
*   ```labels```, a series of binary labels indicating if the corresponding set of features belongs to a subject with breast cancer, or not.



In [35]:
from pyspark.ml.linalg import Vectors
features = spark.createDataFrame(vectors.map(Row), ["features"])
labels = pd.Series(breast_cancer.target)

In [36]:
#print(features)
#print(labels)

### Your task

If you run successfully the Setup and Data Preprocessing stages, you are now ready to cluster the data with the [K-means](https://spark.apache.org/docs/latest/ml-clustering.html) algorithm included in MLlib (Spark's Machine Learning library).
Set the ```k``` parameter to **2**, fit the model, and the compute the [Silhouette score](https://en.wikipedia.org/wiki/Silhouette_(clustering)) (i.e., a measure of quality of the obtained clustering).  

**IMPORTANT:** use the MLlib implementation of the Silhouette score (via ```ClusteringEvaluator```).

In [38]:
# YOUR CODE HERE
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

#k-means model initialization
kmeans = KMeans().setK(2).setSeed(1)

#k-means model fitting
model = kmeans.fit(features)

# predictions from the k-mean model
predictions = model.transform(features)

# Evaluation using Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print(f'Silhouette Score: {silhouette}')

Silhouette Score: 0.8342904262826145


Take the predictions produced by K-means, and compare them with the ```labels``` variable (i.e., the ground truth from our dataset).  

Compute how many data points in the dataset have been clustered correctly (i.e., positive cases in one cluster, negative cases in the other).

*HINT*: you can use ```np.count_nonzero(series_a == series_b)``` to quickly compute the element-wise comparison of two series.

**IMPORTANT**: K-means is a clustering algorithm, so it will not output a label for each data point, but just a cluster identifier!  As such, label ```0``` does not necessarily match the cluster identifier ```0```.


In [41]:
# YOUR CODE HERE

#Making predections and storing them in a dataframe
predictions_df = predictions.toPandas()
#print(predictions_df.head().head())

#to compute element-wise comparison of the series
converted_pre = predictions_df['prediction'].apply(lambda x: 0 if x else 1)
count=np.count_nonzero(converted_pre.values == labels.values)
print(f'Count of data points: {count}')

Count of data points: 486


Now perform dimensionality reduction on the ```features``` using the [PCA](https://spark.apache.org/docs/latest/ml-features.html#pca) statistical procedure, available as well in MLlib.

Set the ```k``` parameter to **2**, effectively reducing the dataset size of a **15X** factor.

In [42]:
# YOUR CODE HERE
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors

#PCA model initialization
pca = PCA(k=2, inputCol='features', outputCol='pca')

#PCA model fitting
model = pca.fit(features)

#PCA model Results
result = model.transform(features).select("pca")
result.show(truncate=False)

+-----------------------------------------+
|pca                                      |
+-----------------------------------------+
|[-2260.013886292542,-187.96030122263687] |
|[-2368.9937557820544,121.58742425815537] |
|[-2095.6652015478608,145.11398565870124] |
|[-692.6905100570509,38.57692259208171]   |
|[-2030.2124927427067,295.2979839927931]  |
|[-888.2800535760762,26.079796157025662]  |
|[-1921.0822124748454,58.807572473099455] |
|[-1074.7813350047968,31.771227808469558] |
|[-908.5784781618834,63.83075279044635]   |
|[-861.5784494075684,40.57073549705316]   |
|[-1404.5591306499475,88.23218257736251]  |
|[-1524.2324408687823,-3.2630573167779313]|
|[-1734.385647746416,273.16267815114594]  |
|[-1162.914003223036,217.6348180834464]   |
|[-903.4301030498837,135.6151766608479]   |
|[-1155.8759954206853,76.8088938374218]   |
|[-1335.7294321308073,-2.4684005450354585]|
|[-1547.2640922523092,3.805675972574516]  |
|[-2714.964765181216,-164.37610864258835] |
|[-908.2502671870881,118.2164200

Now run K-means with the same parameters as above, but on the ```pcaFeatures``` produced by the PCA reduction you just executed.

Compute the Silhouette score, as well as the number of data points that have been clustered correctly.

In [43]:
# YOUR CODE HERE
kmeans = KMeans(featuresCol='pca').setK(2).setSeed(1)
model = kmeans.fit(result)

pca_predictions = model.transform(result)
pca_evaluator = ClusteringEvaluator(featuresCol='pca')

# To compute Silhouette score
pca_silhouette = pca_evaluator.evaluate(pca_predictions)

print(f'Silhouette score after PCS: {pca_silhouette}')

Silhouette score after PCS: 0.8348610363444832


In [44]:
#Number of data points
pca_predictions_df = pca_predictions.toPandas()
pca_converted_pre = pca_predictions_df['prediction'].apply(lambda x: 0 if x else 1)
print(f'Number of data points: {np.count_nonzero(pca_converted_pre == labels.values)}')

Number of data points: 486


In [45]:
#stopping Spark environment
sc.stop()

Once you obtained the desired results, **head over to Gradescope and submit your solution for this Colab**!