# **K-Means using Spark**

# **Task:** Perform a Kmeans clustering for the given Dataset
    
# **Dataset:** Breast cancer Dataset (From the Sklearn Library)

## **Date:** 20|10|22

### **Setup for PySpark API**

**Step 1:**  We install PySpark which is the Python API for Apache Spark which is an open source,distributed computing framework and set of libaries for real-time,large scale data processing.It is written in Python to run a Python application using Apache Spark capabilities.The Python driver program communicates with a local JVM(Java Virtual Machine) running Spark via Py4J. Which is a Java library that is integrated within PySpark and allows python to dynamically interface with JVM objects.

---



---



In [6]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 45 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 37.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=bf5d81d2e253bc4722f1024d5b6fc2431e6fbb443ecd5e5d479be338849f53d0
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


**Step 2: PyDrive** is installed, as it simplifies many common Google Drive API tasks. It has the following features:

1. Simplifies OAuth2.0 into just few lines with flexible settings.

2. Wraps Google Drive API into classes of each resource to make your program more object-oriented.

3. Helps common operations else than API calls, such as content fetching and pagination control.

---



---



In [7]:
!pip install -U -q PyDrive

**Step 3:**  
Since PySpark uses **Java**, we need to have Java while using PySpark. We use Open JDK(or Open Java Development Kit) which is a open source and free implementation of Java Platform.

**Extra info for understanding:**
Java Development Kit (in short JDK) is Kit which provides the environment to Develop and execute(run ) the Java program. JDK contains Java Run Environment (JRE) and Java Virtual Machine(JVM)

Here, **'8'** refers to the version and **headless** are software  capable of working on a device without a graphical user interface. Such software receives inputs and provides output through other interfaces like network or serial port and is common on servers and embedded devices.

In [8]:
!apt install openjdk-8-jdk-headless -qq

openjdk-8-jdk-headless is already the newest version (8u342-b07-0ubuntu1~18.04).
The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'apt autoremove' to remove it.
0 upgraded, 0 newly installed, 0 to remove and 22 not upgraded.


**Step 4:**

We import the **os** module which allows the user to interact with the Operating system that he/she is currently working on and also provides with a portable way to use the operating system dependent functionalities.

In [9]:
import os

**Step 5:** We need to set environment variables for the JDK 

JAVA_HOME is the name of an environment variable on the operating system that points to the installation directory of JDK (Java Development Kit) or JRE (Java Runtime Environment)

In [10]:
os.environ["JAVA_HOME"]="/usr/lib/jvm/java-8-openjdk-amd64"

**Step 6:** Importing the necessary Python Libraries for Data Analysis and Visualization. 

Additionally we import the following:

1. PySpark SQL is a module in Spark which integrates relational processing with Spark's functional programming API.Thus, we can extract the data by using an SQL query language.
2. pyspark.sql.types - It represents a list of available data types.
3. pyspark.sql.functions - It represents a list of built-in functions available for DataFrame.
4. SparkContext is an entry point to the PySpark functionality that is used to communicate with the cluster and to create an RDD, accumulator, and broadcast variables.The Spark driver program creates and uses SparkContext to connect to the cluster manager to submit PySpark jobs, and know what resource manager (YARN, Mesos, or Standalone) to communicate to. It is the heart of the PySpark application.
5. SparkConf - To run a Spark application on the local/cluster, you need to set a few configurations and parameters, this is what SparkConf helps with. It provides configurations to run a Spark application.

In [11]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

**Step 7:** We will create a **SparkConf** object with SparkConf(), which will load the values from spark.* Java system properties as well. Now we can set different parameters using the SparkConf object and their parameters will take priority over the system properties. And using the set attribute of PysparkConfig we set the configuration properties.

Then we create a **spark Context** object and also use the **SparkSession.builder.getOrCreate()** method which first checks whether there is a valid global default SparkSession, and if yes, return that one. If no valid global default SparkSession exists, the method creates a new SparkSession and assigns the newly created SparkSession as the global default.

In [12]:
# create the session 
conf = SparkConf().set("spark.ui.port","4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

**Step 8:** Now we can easily check the current version and get the link of the web interface. In the Spark UI, we can monitor the progress of the job and debug the performance bottlenecks.

In [21]:
spark

**Step 9:** Now we use the wget method to retrieve the .zip file. 

**Extra info:**
GNU Wget is a free software package for retrieving files using HTTP, HTTPS, FTP and FTPS, the most widely used Internet protocols.

In [15]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip

--2022-10-20 17:50:55--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 54.237.133.81, 52.202.168.65, 18.205.222.128, ...
Connecting to bin.equinox.io (bin.equinox.io)|54.237.133.81|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13832437 (13M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip.1’


2022-10-20 17:50:58 (6.49 MB/s) - ‘ngrok-stable-linux-amd64.zip.1’ saved [13832437/13832437]



**Step 10:** Now, we use unzip command to extract the data from the zipped file.

In [18]:
!unzip ngrok-stable-linux-amd64.zip

Archive:  ngrok-stable-linux-amd64.zip
replace ngrok? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
  inflating: ngrok                   


**Step 11:** We have configured the Spark UI and started a Spark session in the previous cells. Here, this command is used to create a Colab application UI on the Apache Spark website under the Jobs tab.

In [19]:
get_ipython().system_raw('./ngrok http 4050 &')

**Step 12:** The above code creates and gives a public URL for the UI page to view the Spark UI. Now we can view the jobs and their stages at the link created.

In [20]:
!curl -s http://localhost:4040/api/tunnels | python3 -c \
    "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"

http://ba25-34-80-143-255.ngrok.io


## **Data Processing**

Steps involved in Data Processing are:

**Step 1:** Importing the Breast cancer Dataset from the Scikit Learn Library. 

In [23]:
from sklearn.datasets import load_breast_cancer
breast_cancer = load_breast_cancer()

**Step 2:**

A) Converting the data to a pandas DataFrame

B) Then converting this data frame to Spark Data Frame using createDataFrame method.

C) Now data.schema contains three parts which are: **name,StringType,nullable (Which is set to true by default).**
Now we create a function **'set_df_columns_nullable'** which converts the nullable into **False**, Thus, no** none values** are allowed in each of the columns. Next our function first converts the dataframe to a RDD (Using .rdd) and then returns a DataFrame  

D) Now using the method .withcolumn we add another column (called features) to our data frame in which each row contains a list consisting of values of the other columns.


In [33]:
pd_df = pd.DataFrame(breast_cancer.data, columns=breast_cancer.feature_names)
df = spark.createDataFrame(pd_df)

def set_df_columns_nullable(spark, df, column_list, nullable=False):
    for struct_field in df.schema:
        if struct_field.name in column_list:
            struct_field.nullable = nullable
    df_mod = spark.createDataFrame(df.rdd, df.schema)
    return df_mod
df = set_df_columns_nullable(spark, df, df.columns)
df = df.withColumn('features', array(df.columns))
df.show()

+-----------+------------+--------------+---------+---------------+----------------+--------------+-------------------+-------------+----------------------+------------+-------------+---------------+----------+----------------+-----------------+---------------+--------------------+--------------+-----------------------+------------+-------------+---------------+----------+----------------+-----------------+---------------+--------------------+--------------+-----------------------+--------------------+
|mean radius|mean texture|mean perimeter|mean area|mean smoothness|mean compactness|mean concavity|mean concave points|mean symmetry|mean fractal dimension|radius error|texture error|perimeter error|area error|smoothness error|compactness error|concavity error|concave points error|symmetry error|fractal dimension error|worst radius|worst texture|worst perimeter|worst area|worst smoothness|worst compactness|worst concavity|worst concave points|worst symmetry|worst fractal dimension|      

**Step 3:** Here, we first convert the data frame into rdd then use **map** which is an RDD transformation that is used to apply the transformation function (lambda) on every element of RDD/DataFrame and returns a new RDD. And thus it converts each row in the **'feature'** column from a **list** to **dense Vectors** data type.

In [53]:
from pyspark.ml.linalg import Vectors
vectors = df.rdd.map(lambda row: Vectors.dense(row.features))

df.printSchema()

root
 |-- mean radius: double (nullable = false)
 |-- mean texture: double (nullable = false)
 |-- mean perimeter: double (nullable = false)
 |-- mean area: double (nullable = false)
 |-- mean smoothness: double (nullable = false)
 |-- mean compactness: double (nullable = false)
 |-- mean concavity: double (nullable = false)
 |-- mean concave points: double (nullable = false)
 |-- mean symmetry: double (nullable = false)
 |-- mean fractal dimension: double (nullable = false)
 |-- radius error: double (nullable = false)
 |-- texture error: double (nullable = false)
 |-- perimeter error: double (nullable = false)
 |-- area error: double (nullable = false)
 |-- smoothness error: double (nullable = false)
 |-- compactness error: double (nullable = false)
 |-- concavity error: double (nullable = false)
 |-- concave points error: double (nullable = false)
 |-- symmetry error: double (nullable = false)
 |-- fractal dimension error: double (nullable = false)
 |-- worst radius: double (nullable

**Step 4:** Finally we  create a new data frame called features, also a series for our dependent variable.

In [54]:
features = spark.createDataFrame(vectors.map(Row),["features"])
features.show()

+--------------------+
|            features|
+--------------------+
|[17.99,10.38,122....|
|[20.57,17.77,132....|
|[19.69,21.25,130....|
|[11.42,20.38,77.5...|
|[20.29,14.34,135....|
|[12.45,15.7,82.57...|
|[18.25,19.98,119....|
|[13.71,20.83,90.2...|
|[13.0,21.82,87.5,...|
|[12.46,24.04,83.9...|
|[16.02,23.24,102....|
|[15.78,17.89,103....|
|[19.17,24.8,132.4...|
|[15.85,23.95,103....|
|[13.73,22.61,93.6...|
|[14.54,27.54,96.7...|
|[14.68,20.13,94.7...|
|[16.13,20.68,108....|
|[19.81,22.15,130....|
|[13.54,14.36,87.4...|
+--------------------+
only showing top 20 rows



In [None]:
#Target Variable
labels = pd.Series(breast_cancer.target)

### **Model Building**

Steps involved are:

**Step 1:** Importing the required libraries, these are pySpark specific libraries for using Kmeans algorithm.

In [55]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

**Step 2:** Using two clusters and using setSeed method to 1, so that we get the same set of clusters evertime we run the code.

In [58]:
# trains a K-means model
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(features)

**Step 3:** Now, predicting the cluster observations using the fitted model.

In [59]:
# Make Predictions
prediction = model.transform(features)

**Step 4:** Finally, evaluating our prediction using the silhoutte coefficient

In [60]:
# Evaluate clustering by computing Silhoutte Score
evaluator = ClusteringEvaluator()



In [61]:
silhoutte = evaluator.evaluate(prediction)
print('Silhoutte with squared eculidean distance = '+ str(silhoutte))

Silhoutte with squared eculidean distance = 0.8342904262826145
