#K-Means using Spark

In [1]:
# pySpark setup
# Installing PySpark in colab
!pip install pyspark #usually for local usage

# Installing the PyDrive
!pip install -U -q PyDrive#PyDrive is a wrapper library of google-api-python-client that simplifies many common Google Drive API tasks.

# Installing java 8 
!apt install openjdk-8-jdk-headless -qq

# Environment variable setting up "JAVA_HOME". This basically installs the compiler in the required directory.
import os #importing the os(The os module is a part of the standard library)
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" 

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 39 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 46.7 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=f557c5ebe861b137a4cdfa42000b338b6ed66fcfeca4c201e2a303e65c715bae
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0
The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'apt autoremove

In [2]:
# Importing necessary libraries required in python like pandas for panel data handling, numpy for handling numbers, arrays and series,visualization libraries
import pandas as pd # mainly used for data analysis
import numpy as np #for working with array
import matplotlib.pyplot as plt #for visualization
%matplotlib inline

# Importing pyspark containing the framework and set of libraries for large-scale data processing
import pyspark #Python API for Apache Spark

# Importing PySpark SQL, which is a module in spark that integrates relational processing with Spark's functional programming API
from pyspark.sql import * #for structured data processing

# Importing PySpark SQL Types class, which is a base class of all datatypes used to create Dataframes with a specific type.
from pyspark.sql.types import * 

# Importing the list of built-in functions available for dataframes
from pyspark.sql.functions import * 

# Importing SparkContext and SparkConf
from pyspark import SparkContext, SparkConf #A SparkContext represents the connection to a Spark cluster
#sparkconf is used for to set various Spark parameters as key-value pairs

In [3]:
# Create a session
conf = SparkConf().set("spark.ui.port","4050")

#Create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

In [4]:
spark # Checking the version details of the Spark

In [5]:
# Setting up ngrok
# Downloading ngrok on linux subsystem for windows
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip

# Unzipping the ngrok file
!unzip ngrok-stable-linux-amd64.zip

get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels | python3 -c \
    "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"

--2022-10-20 18:51:43--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 54.237.133.81, 54.161.241.46, 52.202.168.65, ...
Connecting to bin.equinox.io (bin.equinox.io)|54.237.133.81|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13832437 (13M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip’


2022-10-20 18:51:44 (17.3 MB/s) - ‘ngrok-stable-linux-amd64.zip’ saved [13832437/13832437]

Archive:  ngrok-stable-linux-amd64.zip
  inflating: ngrok                   
Traceback (most recent call last):
  File "<string>", line 1, in <module>
IndexError: list index out of range


In [6]:
# Importing and loading the breast cancer dataset
from sklearn.datasets import load_breast_cancer
breast_cancer = load_breast_cancer() #rename

In [7]:
# Using PySpark, creating the schema from the dataset, and then printing the schema
pd_df = pd.DataFrame(breast_cancer.data, columns=breast_cancer.feature_names)#creating dataframe
df = spark.createDataFrame(pd_df)

def set_df_columns_nullable(spark, df, column_list, nullable=False):
  for struct_field in df.schema:
    if struct_field.name in column_list:
      struct_field.nullable = nullable
  df_mod = spark.createDataFrame(df.rdd,df.schema)
  return df_mod

df = set_df_columns_nullable(spark, df, df.columns)
df = df.withColumn('features', array(df.columns))
vectors = df.rdd.map(lambda row: Vectors.dense(row.features))
df.printSchema()

root
 |-- mean radius: double (nullable = false)
 |-- mean texture: double (nullable = false)
 |-- mean perimeter: double (nullable = false)
 |-- mean area: double (nullable = false)
 |-- mean smoothness: double (nullable = false)
 |-- mean compactness: double (nullable = false)
 |-- mean concavity: double (nullable = false)
 |-- mean concave points: double (nullable = false)
 |-- mean symmetry: double (nullable = false)
 |-- mean fractal dimension: double (nullable = false)
 |-- radius error: double (nullable = false)
 |-- texture error: double (nullable = false)
 |-- perimeter error: double (nullable = false)
 |-- area error: double (nullable = false)
 |-- smoothness error: double (nullable = false)
 |-- compactness error: double (nullable = false)
 |-- concavity error: double (nullable = false)
 |-- concave points error: double (nullable = false)
 |-- symmetry error: double (nullable = false)
 |-- fractal dimension error: double (nullable = false)
 |-- worst radius: double (nullable

In [8]:
# Importing the linear algebra package from the the Machine Learning module in pyspark
from pyspark.ml.linalg import Vectors

# Creating a features dataframe in pyspark and a series for the target class
features = spark.createDataFrame(vectors.map(Row),["features"])
labels = pd.Series(breast_cancer.target) #the label for the target class

In [9]:
# Importing the clustering algorithm and the evaluator
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Train a k-means model
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(features)

# Making predictions
predictions = model.transform(features)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions) #The Silhouette is a measure for the validation of the consistency within clusters
print("Silhouette score with squared eucledian distance: " + str(silhouette))

Silhouette score with squared eucledian distance: 0.8342904262826145
