In [1]:
#Setuping the spark
!pip install pyspark #installing pyspark 
!pip install -U -q PyDrive ##PyDrive is a wrapper library of google-api-python-client that simplifies many common Google Drive API tasks.
!apt install openjdk-8-jdk-headless -qq # #software capable of working on a device without a graphical user interface. Such software receives inputs and provides output through other interfaces like network or serial port and is common on servers and embedded devices.

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
openjdk-8-jdk-headless is already the newest version (8u342-b07-0ubuntu1~18.04).
The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'apt autoremove' to remove it.
0 upgraded, 0 newly installed, 0 to remove and 27 not upgraded.


In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" #The path is the most important environment variable of the Java environment which is used to locate the JDK packages that are used to convert the java source code into the machine-readable binary format


In [3]:
import pandas as pd #library for dataframes
import numpy as np #library for arrays
import matplotlib.pyplot as plt # library for visualization
%matplotlib inline

import pyspark #library for processing big data
from pyspark.sql import * # this is library for sql data processing.
from pyspark.sql.types import * #importing the datatypes of sql using spark.
from pyspark.sql.functions import * #importing all sql functions
from pyspark import SparkContext, SparkConf # for creating spark sessiong and configuring it so that transformation and actions can be applied to RDDs.

In [4]:
 #Creating Spark Session
 conf=SparkConf().set("Spark.ui.port","4050")

 #creat the context
 sc=pyspark.SparkContext(conf=conf)
 spark=SparkSession.builder.getOrCreate()

In [5]:
#checking the version 
spark

If Google colab is running on the google hosted runtime,then the cell below will create a ngrok tunnel which allow us to check spark UI.

In [6]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels | python3 -c \
    "import sys,json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"

--2022-10-22 10:43:39--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 54.237.133.81, 18.205.222.128, 54.161.241.46, ...
Connecting to bin.equinox.io (bin.equinox.io)|54.237.133.81|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13832437 (13M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip.4’


2022-10-22 10:43:39 (103 MB/s) - ‘ngrok-stable-linux-amd64.zip.4’ saved [13832437/13832437]

Archive:  ngrok-stable-linux-amd64.zip
replace ngrok? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
  inflating: ngrok                   
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/usr/lib/python3.7/json/__init__.py", line 296, in load
    parse_constant=parse_constant, object_pairs_hook=object_pairs_hook, **kw)
  File "/usr/lib/python3.7/json/__init__.py", line 348, in loads
    return _default_decoder.decode(s)
  File "/usr/lib/python3.7/json/decoder.py", lin

# Data Preprocessing


In [7]:
# Downloading breast cancer dataset
from sklearn.datasets import load_breast_cancer
breast_cancer = load_breast_cancer()

As this dataset is small, so we will first make a dataframe and then convert it into Spark Dataframe.

In [8]:
import pandas as pd

In [9]:
pd_df=pd.DataFrame(breast_cancer.data,columns=breast_cancer.feature_names)
df=spark.createDataFrame(pd_df)
def set_df_columns_nullable(spark,df,column_list,nullable=False):
    for struct_field in df.schema:
        if struct_field.name in column_list:
            struct_field.nullable= nullable
    df_mod=spark.createDataFrame(df.rdd,df.schema)
    return df_mod

df=set_df_columns_nullable(spark,df,df.columns)
df=df.withColumn('features',array(df.columns))
vectors=df.rdd.map(lambda row: Vectors.dense(row.features))


df.printSchema() # printing the schema of breast cancer dataset


root
 |-- mean radius: double (nullable = false)
 |-- mean texture: double (nullable = false)
 |-- mean perimeter: double (nullable = false)
 |-- mean area: double (nullable = false)
 |-- mean smoothness: double (nullable = false)
 |-- mean compactness: double (nullable = false)
 |-- mean concavity: double (nullable = false)
 |-- mean concave points: double (nullable = false)
 |-- mean symmetry: double (nullable = false)
 |-- mean fractal dimension: double (nullable = false)
 |-- radius error: double (nullable = false)
 |-- texture error: double (nullable = false)
 |-- perimeter error: double (nullable = false)
 |-- area error: double (nullable = false)
 |-- smoothness error: double (nullable = false)
 |-- compactness error: double (nullable = false)
 |-- concavity error: double (nullable = false)
 |-- concave points error: double (nullable = false)
 |-- symmetry error: double (nullable = false)
 |-- fractal dimension error: double (nullable = false)
 |-- worst radius: double (nullable

Now We will make two datastructure that we will use throught this colab:

> Features- A dataframe of dense vector,containing all the orignal features in the dataset.

> Labels: A series of binary labels indicating if the corresponding set of features belongs to subject with breast cncer or not. 

In [10]:
from pyspark.ml.linalg import Vectors # importing the vectors of pyspark it is similar to numpy arrays.
features=spark.createDataFrame(vectors.map(Row),["features"])
labels=pd.Series(breast_cancer.target)

In [11]:
# Making Kmeans Clustering model using Spark
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Trains a k-means Model

kmeans=KMeans().setK(2).setSeed(1)
model=kmeans.fit(features)

#Making Predictions

predictions=model.transform(features)

#Evaluation model using Silhouette score
evaluator= ClusteringEvaluator()

Silhouette=evaluator.evaluate(predictions) # Finding score using silhouette
print("Silhoutee with scored euclidian distance ="+ str(Silhouette))

Silhoutee with scored euclidian distance =0.8342904262826145


In [12]:
predictions_series=predictions.select("prediction").toPandas()
true_prediction_count=np.count_nonzero(predictions_series['prediction']==labels)
if true_prediction_count<len(labels)-true_prediction_count:
  true_prediction_count=len(labels)-true_prediction_count
print('precison:',true_prediction_count/len(labels))

precison: 0.8541300527240774
