# K-Means Spark

# SET UP

### Let's set up spark in colab. Run the cell below

In [None]:
#importing pyspark
!pip install Pyspark

#installing the pydrive
!pip install -U -q PyDrive

#installing java 8
!apt install openjdk-8-jdk-headless -qq

#Setting the environment variable "JAVA_HOME". This installs the compiler in the required directory.
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting Pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 45 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 58.8 MB/s 
[?25hBuilding wheels for collected packages: Pyspark
  Building wheel for Pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for Pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=1f19a85f1408b01349c9b2ebc47c65d2317fecef08b3706553b801c863c98610
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built Pyspark
Installing collected packages: py4j, Pyspark
Successfully installed Pyspark-3.3.0 py4j-0.10.9.5
The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'apt autoremove

In [None]:
#importing necessary libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

# Importing pyspark which is the Python API for Apache Spark, containing the framework and set of libraries for large-scale data processing
import pyspark
# Importing PySpark SQL, which is a module in spark that integrates relational processing with Spark's functional programming API
from pyspark.sql import *
# Importing PySpark SQL Types class, which is a base class of all datatypes used to create Dataframes with a specific type.
from pyspark.sql.types import *
# Importing the list of built-in functions available for dataframes
from pyspark.sql.functions import *
# Importing SparkContext and SparkConf
from pyspark import SparkContext, SparkConf

In [None]:
#create the session
conf = SparkConf().set("spark.ui.port", "4050")

#create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

#### Sparkconf() provides configurations to run a Spark application.

#### SparkContext is an entry point to Spark. It is used to programmatically create Spark RDD, accumulators, and broadcast variables on the cluster. Its object sc is default variable available in spark-shell.

#### This SparkSession.builder.getOrCreate() first checks whether there is a valid global default SparkSession, and if yes, return that one. If no valid global default SparkSession exists, the method creates a new SparkSession and assigns the newly created SparkSession as the global default.

In [None]:
#checking the version of spark
spark

In [None]:
# Setting up ngrok
# Downloading ngrok on linux subsystem for windows
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip

# unzipping of the ngrok file
!unzip ngrok-stable-linux-amd64.zip

get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels | python3 -c \
    "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"

--2022-10-19 11:17:15--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 54.161.241.46, 54.237.133.81, 18.205.222.128, ...
Connecting to bin.equinox.io (bin.equinox.io)|54.161.241.46|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13832437 (13M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip’


2022-10-19 11:17:16 (57.0 MB/s) - ‘ngrok-stable-linux-amd64.zip’ saved [13832437/13832437]

Archive:  ngrok-stable-linux-amd64.zip
  inflating: ngrok                   
Traceback (most recent call last):
  File "<string>", line 1, in <module>
IndexError: list index out of range


In [None]:
# importing and loading the breast cancer dataset
from sklearn.datasets import load_breast_cancer
breast_cancer = load_breast_cancer()

In [None]:
pd_df = pd.DataFrame(breast_cancer.data, columns=breast_cancer.feature_names)
df = spark.createDataFrame(pd_df)

In [None]:
# Using PySpark, creating the schema from the dataset, and then printing the schema
def set_df_columns_nullable(spark, df, column_list, nullable=False):
    for struct_field in df.schema:
        if struct_field.name in column_list:
            struct_field.nullable = nullable
    df_mod = spark.createDataFrame(df.rdd, df.schema)
    return df_mod
 
df = set_df_columns_nullable(spark, df, df.columns)
df = df.withColumn('features', array(df.columns))
vectors = df.rdd.map(lambda row: Vectors.dense(row.features))
 
df.printSchema()

root
 |-- mean radius: double (nullable = false)
 |-- mean texture: double (nullable = false)
 |-- mean perimeter: double (nullable = false)
 |-- mean area: double (nullable = false)
 |-- mean smoothness: double (nullable = false)
 |-- mean compactness: double (nullable = false)
 |-- mean concavity: double (nullable = false)
 |-- mean concave points: double (nullable = false)
 |-- mean symmetry: double (nullable = false)
 |-- mean fractal dimension: double (nullable = false)
 |-- radius error: double (nullable = false)
 |-- texture error: double (nullable = false)
 |-- perimeter error: double (nullable = false)
 |-- area error: double (nullable = false)
 |-- smoothness error: double (nullable = false)
 |-- compactness error: double (nullable = false)
 |-- concavity error: double (nullable = false)
 |-- concave points error: double (nullable = false)
 |-- symmetry error: double (nullable = false)
 |-- fractal dimension error: double (nullable = false)
 |-- worst radius: double (nullable

In [None]:
# Importing the linear algebra package from the the ML Module
from pyspark.ml.linalg import Vectors

# Creating a features dataframe in pyspark 
features = spark.createDataFrame(vectors.map(Row), ["features"])

# creating a series for the target class
labels = pd.Series (breast_cancer.target)

In [None]:
# importing Clustering algorithm and the evaluator
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Training a k-means model
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(features)

#Making predictions
predictions = model.transform(features)
# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhoutte with squared euclidean distance = " + str(silhouette))

Silhoutte with squared euclidean distance = 0.8342904262826145
