# K-Means using Spark

In [None]:
!pip install pyspark #This is usually for local usage or as a client to connect to a cluster instead of setting up a cluster itself.
!pip install -U -q PyDrive #PyDrive is a wrapper library of google-api-python-client that simplifies many common Google Drive API tasks.
!apt install openjdk-8-jdk-headless -qq #software capable of working on a device without a graphical user interface. Such software receives inputs and provides output through other interfaces like network or serial port and is common on servers and embedded devices.
import os #The os module is a part of the standard library, or stdlib, within Python 3. This means that it comes with your Python installation, but you still must import it
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" #The path is the most important environment variable of the Java environment which is used to locate the JDK packages that are used to convert the java source code into the machine-readable binary format

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 51 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 64.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=71a25df03de46c779af44045d6ef3352bcf09176788a9e4ae1f4b26f5250d0f2
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0
The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'apt autoremove

In [None]:
import pandas as pd #Pandas is mainly used for data analysis and associated manipulation of tabular data in DataFrames. 
import numpy as np #NumPy is a Python library used for working with arrays
import matplotlib.pyplot as plt #Data Vizualization
%matplotlib inline

import pyspark # PySpark is the Python API for Apache Spark, an open source, distributed computing framework and set of libraries for real-time, large-scale data processing.
from pyspark.sql import * #Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as distributed SQL query engine.
from pyspark.sql.types import * #PySpark SQL Types class is a base class of all data types in PuSpark which defined in a package pyspark.sql.types.DataType and they are used to create DataFrame with a specific type
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf #A SparkContext represents the connection to a Spark cluster, and can be used to create RDD and broadcast variables on that cluster
#ublic class SparkConf extends java.lang.Object implements scala.Cloneable, Logging. Configuration for a Spark application. 

In [None]:
# Create a session
conf = SparkConf().set("spark.ui.port","4050") #working with the port number

#Create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

#This method first checks whether there is a valid global default SparkSession, and if yes, return that one. If no valid global default SparkSession exists, the method creates a new SparkSession and assigns the newly created SparkSession as the global default.

In [None]:
spark

In [None]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip #ngrok is the fastest way to put anything on the internet with a single command.
!unzip ngrok-stable-linux-amd64.zip #ngrok is a cross-platform application that enables developers to expose a local development server to the Internet with minimal effort. 
get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels | python3 -c \
    "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"

--2022-10-19 18:17:54--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 18.205.222.128, 52.202.168.65, 54.161.241.46, ...
Connecting to bin.equinox.io (bin.equinox.io)|18.205.222.128|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13832437 (13M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip’


2022-10-19 18:17:55 (14.0 MB/s) - ‘ngrok-stable-linux-amd64.zip’ saved [13832437/13832437]

Archive:  ngrok-stable-linux-amd64.zip
  inflating: ngrok                   
Traceback (most recent call last):
  File "<string>", line 1, in <module>
IndexError: list index out of range


In [None]:
from sklearn.datasets import load_breast_cancer
breast_cancer = load_breast_cancer()

In [None]:
pd_df = pd.DataFrame(breast_cancer.data, columns=breast_cancer.feature_names)
df = spark.createDataFrame(pd_df)

def set_df_columns_nullable(spark, df, column_list, nullable=False):
  for struct_field in df.schema:
    if struct_field.name in column_list:
      struct_field.nullable = nullable
  df_mod = spark.createDataFrame(df.rdd,df.schema)
  return df_mod

df = set_df_columns_nullable(spark, df, df.columns)
df = df.withColumn('features', array(df.columns))
vectors = df.rdd.map(lambda row: Vectors.dense(row.features))
df.printSchema()

root
 |-- mean radius: double (nullable = false)
 |-- mean texture: double (nullable = false)
 |-- mean perimeter: double (nullable = false)
 |-- mean area: double (nullable = false)
 |-- mean smoothness: double (nullable = false)
 |-- mean compactness: double (nullable = false)
 |-- mean concavity: double (nullable = false)
 |-- mean concave points: double (nullable = false)
 |-- mean symmetry: double (nullable = false)
 |-- mean fractal dimension: double (nullable = false)
 |-- radius error: double (nullable = false)
 |-- texture error: double (nullable = false)
 |-- perimeter error: double (nullable = false)
 |-- area error: double (nullable = false)
 |-- smoothness error: double (nullable = false)
 |-- compactness error: double (nullable = false)
 |-- concavity error: double (nullable = false)
 |-- concave points error: double (nullable = false)
 |-- symmetry error: double (nullable = false)
 |-- fractal dimension error: double (nullable = false)
 |-- worst radius: double (nullable

In [None]:
from pyspark.ml.linalg import Vectors
features = spark.createDataFrame(vectors.map(Row),["features"])
labels = pd.Series(breast_cancer.target)

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Train a k-means model
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(features)

# Making predictions
predictions = model.transform(features)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette score with squared eucledian distance: " + str(silhouette))

Silhouette score with squared eucledian distance: 0.8342904262826145
