<a href="https://colab.research.google.com/github/vigneshbaskar1410/Machine-Learning-Projects/blob/main/K_means_Using_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **K-Means using Spark**

**PySpark installation using PyPI is as follows:**

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


### **Installing pydrive**

In [None]:
!pip install -U -q PyDrive

**Spark is written in the Scala programming language and requires Java Virtual Machine(JVM) to run. Therefore, our first task is to download Java.**

In [None]:
!apt install openjdk-8-jdk-headless -qq

openjdk-8-jdk-headless is already the newest version (8u342-b07-0ubuntu1~18.04).
The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'apt autoremove' to remove it.
0 upgraded, 0 newly installed, 0 to remove and 22 not upgraded.


**Now, it is time to set the environment path. This code will enable us to run Pyspark in the colab environment.**

In [None]:
import os
os.environ["JAVA_HOME"]="/usr/lib/jvm/java-8-openjdk-amd64"

**like we do in sklearn we are importing pandas, numpy, matplotlib here also to build the Machine Learning Model**

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
import pyspark                       # importing pyspark
from pyspark.sql import *             # for working with sql   
from pyspark.sql.types import *       #List of data types available.
from pyspark.sql.functions import *  #List of built-in functions available for DataFrame
from pyspark import SparkContext, SparkConf # importing SparkContext and SparkConf

**The first thing a Spark program must do is to create a SparkContext object, which tells Spark how to access a cluster. To create a SparkContext you first need to build a SparkConf object that contains information about your application.**

Spark makes heavy use of the network, and some environments have strict requirements for using tight firewall settings.

"4050" is primary default ports that Spark uses for its communication with the help of web UI.   



**# this code tells that the information about your application contains in the sparkconf object  is setted to from web to driver of default port "4050".**

In [None]:
# create the session 
conf = SparkConf().set("spark.ui.port","4050")  
 




*  To create a SparkContext object, which tells Spark how to access a cluster

*  2nd line code, checks whether there is a valid global default SparkSession, and if yes, return that one. If no valid global default SparkSession exists, the method creates a new SparkSession and assigns the newly created SparkSession as the global default. 






In [None]:
# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

## **Finally, printing the SparkSession Variable**

In [None]:
spark   

## **This code helps to view the Spark UI (Web User Interfaces) and also able to view the jobs and their stages at the link created.**

In [None]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels | python3 -c \
    "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"

--2022-10-20 18:21:24--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 18.205.222.128, 54.161.241.46, 54.237.133.81, ...
Connecting to bin.equinox.io (bin.equinox.io)|18.205.222.128|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13832437 (13M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip.3’


2022-10-20 18:21:24 (47.7 MB/s) - ‘ngrok-stable-linux-amd64.zip.3’ saved [13832437/13832437]

Archive:  ngrok-stable-linux-amd64.zip
replace ngrok? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
  inflating: ngrok                   
Traceback (most recent call last):
  File "<string>", line 1, in <module>
IndexError: list index out of range


# **Data Processing**

**Loading the inbuild breast cancer dataset from sklearn.datasets**

In [None]:
from sklearn.datasets import load_breast_cancer
breast_cancer = load_breast_cancer()



*   With a SparkSession, applications can create DataFrames from an existing RDD, from a Hive table, or from Spark data sources.


*   PySpark supports various UDFs and APIs to allow users to execute Python native functions.

*   Resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel.


*   Create a PySpark DataFrame from an RDD consisting of a list of tuples.

*   map(func) Return a new distributed dataset formed by passing each element of the source through a function.

*   features - a dataframe of Dense vectors, containing all the original features in the dataset.

* labels - a series of binary labels indicating if the corresponding set of features belongs to a subject with breast cancer or not.

* **df.printschema()** - used to print or display the schema of the DataFrame in the tree format along with column name and data type.






In [None]:
pd_df = pd.DataFrame(breast_cancer.data, columns=breast_cancer.feature_names)
df = spark.createDataFrame(pd_df)

def set_df_columns_nullable(spark, df, column_list, nullable=False):
    for struct_field in df.schema:
        if struct_field.name in column_list:
            struct_field.nullable = nullable
    df_mod = spark.createDataFrame(df.rdd, df.schema)
    return df_mod
df = set_df_columns_nullable(spark, df, df.columns)
df = df.withColumn('features', array(df.columns))
vectors = df.rdd.map(lambda row: Vectors.dense(row.features))

df.printSchema()

root
 |-- mean radius: double (nullable = false)
 |-- mean texture: double (nullable = false)
 |-- mean perimeter: double (nullable = false)
 |-- mean area: double (nullable = false)
 |-- mean smoothness: double (nullable = false)
 |-- mean compactness: double (nullable = false)
 |-- mean concavity: double (nullable = false)
 |-- mean concave points: double (nullable = false)
 |-- mean symmetry: double (nullable = false)
 |-- mean fractal dimension: double (nullable = false)
 |-- radius error: double (nullable = false)
 |-- texture error: double (nullable = false)
 |-- perimeter error: double (nullable = false)
 |-- area error: double (nullable = false)
 |-- smoothness error: double (nullable = false)
 |-- compactness error: double (nullable = false)
 |-- concavity error: double (nullable = false)
 |-- concave points error: double (nullable = false)
 |-- symmetry error: double (nullable = false)
 |-- fractal dimension error: double (nullable = false)
 |-- worst radius: double (nullable

Dense vectors are simply represented as NumPy array objects, so there is no need to covert them for use in MLlib. For sparse vectors, the factory methods in this class create an MLlib-compatible type, or users can pass in SciPy’s scipy.sparse column vectors.

In [None]:
from pyspark.ml.linalg import Vectors
features = spark.createDataFrame(vectors.map(Row),["features"])
labels = pd.Series(breast_cancer.target)

In [None]:
# importing Kmeans and ClusteringEvaluator from pyspark.ml.
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# training a K-means model
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(features)

# Make Predictions
prediction = model.transform(features)

# Evaluate clustering by computing Silhoutte Score
evaluator = ClusteringEvaluator()

silhoutte = evaluator.evaluate(prediction)
print('Silhoutte with squared eculidean distance = '+ str(silhoutte))

Silhoutte with squared eculidean distance = 0.8342904262826145
