
# K-Means & PCA

### Setup

Let's setup Spark on Colab environment.  Run the cell below!

In [1]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

openjdk-8-jdk-headless is already the newest version (8u292-b10-0ubuntu1~18.04).
0 upgraded, 0 newly installed, 0 to remove and 37 not upgraded.


Now we import some of the libraries usually needed by our workload.

---







In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

Let's initialize the Spark context.

In [3]:
# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

I can easily check the current version and get the link of the web interface. In the Spark UI, I can monitor the progress of my job and debug the performance bottlenecks (if my Colab is running with a **local runtime**).

In [4]:
spark

If I run this on the Google colab hosted runtime, the cell below will create a *ngrok* tunnel which will allow me to still check the Spark UI.

In [5]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels | python3 -c \
    "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"

--2021-10-06 00:43:26--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 52.202.168.65, 54.237.133.81, 18.205.222.128, ...
Connecting to bin.equinox.io (bin.equinox.io)|52.202.168.65|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13832437 (13M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip.2’


2021-10-06 00:43:26 (50.3 MB/s) - ‘ngrok-stable-linux-amd64.zip.2’ saved [13832437/13832437]

Archive:  ngrok-stable-linux-amd64.zip
replace ngrok? [y]es, [n]o, [A]ll, [N]one, [r]ename: yes
  inflating: ngrok                   
https://c7fb-35-190-173-94.ngrok.io


### Data Preprocessing

In this Notebook, rather than downloading a file from some where, I will load a famous machine learning dataset, the [Breast Cancer Wisconsin dataset](https://scikit-learn.org/stable/modules/generated/sklearn.datasets.load_breast_cancer.html), using the ```scikit-learn``` datasets loader.

In [6]:
from sklearn.datasets import load_breast_cancer
breast_cancer = load_breast_cancer()

For convenience, given that the dataset is small, I will first construct a Pandas dataframe, tune the schema, and then convert it into a Spark dataframe.

In [7]:
pd_df = pd.DataFrame(breast_cancer.data, columns=breast_cancer.feature_names)
pd_df.head()

Unnamed: 0,mean radius,mean texture,mean perimeter,mean area,mean smoothness,mean compactness,mean concavity,mean concave points,mean symmetry,mean fractal dimension,radius error,texture error,perimeter error,area error,smoothness error,compactness error,concavity error,concave points error,symmetry error,fractal dimension error,worst radius,worst texture,worst perimeter,worst area,worst smoothness,worst compactness,worst concavity,worst concave points,worst symmetry,worst fractal dimension
0,17.99,10.38,122.8,1001.0,0.1184,0.2776,0.3001,0.1471,0.2419,0.07871,1.095,0.9053,8.589,153.4,0.006399,0.04904,0.05373,0.01587,0.03003,0.006193,25.38,17.33,184.6,2019.0,0.1622,0.6656,0.7119,0.2654,0.4601,0.1189
1,20.57,17.77,132.9,1326.0,0.08474,0.07864,0.0869,0.07017,0.1812,0.05667,0.5435,0.7339,3.398,74.08,0.005225,0.01308,0.0186,0.0134,0.01389,0.003532,24.99,23.41,158.8,1956.0,0.1238,0.1866,0.2416,0.186,0.275,0.08902
2,19.69,21.25,130.0,1203.0,0.1096,0.1599,0.1974,0.1279,0.2069,0.05999,0.7456,0.7869,4.585,94.03,0.00615,0.04006,0.03832,0.02058,0.0225,0.004571,23.57,25.53,152.5,1709.0,0.1444,0.4245,0.4504,0.243,0.3613,0.08758
3,11.42,20.38,77.58,386.1,0.1425,0.2839,0.2414,0.1052,0.2597,0.09744,0.4956,1.156,3.445,27.23,0.00911,0.07458,0.05661,0.01867,0.05963,0.009208,14.91,26.5,98.87,567.7,0.2098,0.8663,0.6869,0.2575,0.6638,0.173
4,20.29,14.34,135.1,1297.0,0.1003,0.1328,0.198,0.1043,0.1809,0.05883,0.7572,0.7813,5.438,94.44,0.01149,0.02461,0.05688,0.01885,0.01756,0.005115,22.54,16.67,152.2,1575.0,0.1374,0.205,0.4,0.1625,0.2364,0.07678


In [8]:
df = spark.createDataFrame(pd_df)

def set_df_columns_nullable(spark, df, column_list, nullable=False):
    for struct_field in df.schema:
        if struct_field.name in column_list:
            struct_field.nullable = nullable
    df_mod = spark.createDataFrame(df.rdd, df.schema)
    return df_mod

df = set_df_columns_nullable(spark, df, df.columns)
df = df.withColumn('features', array(df.columns))
vectors = df.rdd.map(lambda row: Vectors.dense(row.features))

df.printSchema()

root
 |-- mean radius: double (nullable = false)
 |-- mean texture: double (nullable = false)
 |-- mean perimeter: double (nullable = false)
 |-- mean area: double (nullable = false)
 |-- mean smoothness: double (nullable = false)
 |-- mean compactness: double (nullable = false)
 |-- mean concavity: double (nullable = false)
 |-- mean concave points: double (nullable = false)
 |-- mean symmetry: double (nullable = false)
 |-- mean fractal dimension: double (nullable = false)
 |-- radius error: double (nullable = false)
 |-- texture error: double (nullable = false)
 |-- perimeter error: double (nullable = false)
 |-- area error: double (nullable = false)
 |-- smoothness error: double (nullable = false)
 |-- compactness error: double (nullable = false)
 |-- concavity error: double (nullable = false)
 |-- concave points error: double (nullable = false)
 |-- symmetry error: double (nullable = false)
 |-- fractal dimension error: double (nullable = false)
 |-- worst radius: double (nullable

With the next cell, I am going build the two datastructures that we will be using throughout this Notebook:


*   ```features```, a dataframe of Dense vectors, containing all the original features in the dataset;
*   ```labels```, a series of binary labels indicating if the corresponding set of features belongs to a subject with breast cancer, or not.


In [9]:
from pyspark.ml.linalg import Vectors
features = spark.createDataFrame(vectors.map(Row), ["features"])
labels = pd.Series(breast_cancer.target)

### Building machine learning model

Now I am ready to cluster the data with the [K-means](https://spark.apache.org/docs/latest/ml-clustering.html) algorithm included in MLlib (Spark's Machine Learning library).
Also, I am setting  the ```k``` parameter to **2**, fit the model, and the compute the [Silhouette score](https://en.wikipedia.org/wiki/Silhouette_(clustering)) (i.e., a measure of quality of the obtained clustering).  

**IMPORTANT:** I am using the MLlib implementation of the Silhouette score (via ```ClusteringEvaluator```).

In [10]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(features)

In [11]:
# Make predictions
predictions = model.transform(features)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)

In [12]:
print(f'Silhouette: {silhouette}')

Silhouette: 0.8342904262826145


Next, I will take the predictions produced by K-means, and compare them with the ```labels``` variable (i.e., the ground truth from our dataset).  

Then, I will compute how many data points in the dataset have been clustered correctly (i.e., positive cases in one cluster, negative cases in the other).

I am using ```np.count_nonzero(series_a == series_b)``` to quickly compute the element-wise comparison of two series.

**IMPORTANT**: K-means is a clustering algorithm, so it will not output a label for each data point, but just a cluster identifier!  As such, label ```0``` does not necessarily match the cluster identifier ```0```.


In [13]:
predictions_df = predictions.toPandas()
converted_pre = predictions_df['prediction'].apply(lambda x: 0 if x else 1)
np.count_nonzero(converted_pre.values == labels.values)

486

Now I am performing dimensionality reduction on the ```features``` using the [PCA](https://spark.apache.org/docs/latest/ml-features.html#pca) statistical procedure, available as well in MLlib.

Setting the ```k``` parameter to **2**, effectively reducing the dataset size of a **15X** factor.

In [14]:
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors

pca = PCA(k=2, inputCol="features", outputCol="pca")
model = pca.fit(features)

result = model.transform(features).select("pca")
result.show(truncate=False)

+-----------------------------------------+
|pca                                      |
+-----------------------------------------+
|[-2260.0138862925405,-187.9603012226368] |
|[-2368.993755782054,121.58742425815508]  |
|[-2095.6652015478608,145.11398565870087] |
|[-692.6905100570508,38.576922592081765]  |
|[-2030.2124927427062,295.2979839927924]  |
|[-888.280053576076,26.079796157025726]   |
|[-1921.082212474845,58.807572473099206]  |
|[-1074.7813350047961,31.771227808469668] |
|[-908.5784781618829,63.83075279044624]   |
|[-861.5784494075679,40.57073549705321]   |
|[-1404.559130649947,88.23218257736237]   |
|[-1524.2324408687816,-3.2630573167779793]|
|[-1734.385647746415,273.1626781511459]   |
|[-1162.9140032230355,217.63481808344613] |
|[-903.4301030498832,135.61517666084782]  |
|[-1155.8759954206848,76.80889383742165]  |
|[-1335.7294321308068,-2.4684005450356024]|
|[-1547.2640922523087,3.805675972574325]  |
|[-2714.9647651812156,-164.37610864258804]|
|[-908.2502671870876,118.2164200

Now running K-means with the same parameters as above, but on the ```pcaFeatures``` produced by the PCA reduction that I just executed.

I am also computing the Silhouette score, as well as the number of data points that have been clustered correctly.

In [15]:
kmeans = KMeans(featuresCol='pca').setK(2).setSeed(1)
model = kmeans.fit(result)

pca_predictions = model.transform(result)
pca_evaluator = ClusteringEvaluator(featuresCol='pca')

pca_silhouette = pca_evaluator.evaluate(pca_predictions)

print(f'Silhouette after PCS {pca_silhouette}')

Silhouette after PCS 0.8348610363444836


In [16]:
pca_predictions_df = pca_predictions.toPandas()
pca_converted_pre = pca_predictions_df['prediction'].apply(lambda x: 0 if x else 1)
np.count_nonzero(pca_converted_pre == labels.values)

486

In [17]:
#stopping Spark environment
sc.stop()