# Spark Preparation
We check if we are in Google Colab.  If this is the case, install all necessary packages.

To run spark in Colab, we need to first install all the dependencies in Colab environment i.e. Apache Spark 3.2.1 with hadoop 3.2, Java 8 and Findspark to locate the spark in the system. The tools installation can be carried out inside the Jupyter Notebook of the Colab.
Learn more from [A Must-Read Guide on How to Work with PySpark on Google Colab for Data Scientists!](https://www.analyticsvidhya.com/blog/2020/11/a-must-read-guide-on-how-to-work-with-pyspark-on-google-colab-for-data-scientists/)

credit: Natawut Nupairoj

In [1]:
try:
  import google.colab
  IN_COLAB = True
except:
  IN_COLAB = False

In [2]:
if IN_COLAB:
    !apt-get install openjdk-8-jdk-headless -qq > /dev/null
    !wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
    !tar xf spark-3.2.1-bin-hadoop3.2.tgz
    !mv spark-3.2.1-bin-hadoop3.2 spark
    !pip install -q findspark

In [3]:
if IN_COLAB:
  import os
  os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
  os.environ["SPARK_HOME"] = "/content/spark"

In [4]:
import findspark
findspark.init()

# Pyspark_Clustering_Pipeline_Cdr

In [5]:
#1 - import module
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler,MaxAbsScaler

In [6]:
#2 - Create SparkContext
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

In [7]:
sc

In [8]:
sc._conf.getAll()

[('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.driver.host', 'e9dbea06cbc7'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.app.startTime', '1647788178705'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.id', 'local-1647788180625'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.app.name', 'pyspark-shell'),
 ('spark.driver.port', '44369')]

In [9]:
print  (sc.getConf().toDebugString())

spark.app.id=local-1647788180625
spark.app.name=pyspark-shell
spark.app.startTime=1647788178705
spark.driver.host=e9dbea06cbc7
spark.driver.port=44369
spark.executor.id=driver
spark.master=local[*]
spark.rdd.compress=True
spark.serializer.objectStreamReset=100
spark.submit.deployMode=client
spark.submit.pyFiles=
spark.ui.showConsoleProgress=true


In [10]:
#3 - Setup SparkSession(SparkSQL)
spark = (SparkSession
         .builder
         .appName("Pyspark_Clustering_Pipeline_Cdr")
         .getOrCreate())
print (spark)

<pyspark.sql.session.SparkSession object at 0x7f7122457850>


In [11]:
!wget https://github.com/kaopanboonyuen/GISTDA2022/raw/main/dataset/cdr_extractFeatures.csv

--2022-03-20 14:56:23--  https://github.com/kaopanboonyuen/GISTDA2022/raw/main/dataset/cdr_extractFeatures.csv
Resolving github.com (github.com)... 140.82.112.3
Connecting to github.com (github.com)|140.82.112.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/kaopanboonyuen/GISTDA2022/main/dataset/cdr_extractFeatures.csv [following]
--2022-03-20 14:56:23--  https://raw.githubusercontent.com/kaopanboonyuen/GISTDA2022/main/dataset/cdr_extractFeatures.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 28541 (28K) [text/plain]
Saving to: ‘cdr_extractFeatures.csv’


2022-03-20 14:56:23 (33.6 MB/s) - ‘cdr_extractFeatures.csv’ saved [28541/28541]



In [12]:
#4 - Read file to spark DataFrame

data = (spark
        .read
        .option("header","true")
        .option("inferSchema", "true")
        .csv("cdr_extractFeatures.csv"))
data.cache()
print ("finish caching data")

finish caching data


In [13]:
data.describe().toPandas()

Unnamed: 0,summary,uniquePN,no_CallIn_Unique,no_CallOut_Unique,no_CallIn,no_CallOut,avg_CallIn_Length,avg_CallOut_Length,avg_Call_Length
0,count,501,501.0,501.0,501.0,501.0,501.0,501.0,501.0
1,mean,,19.56087824351297,19.56087824351297,19.960079840319363,19.960079840319363,305.5274154043913,306.4270003514968,305.8882365878245
2,stddev,,4.306133582129764,4.322821581692413,4.407993102718377,4.397999908323414,38.40930235550977,38.123520925514285,27.72859975460591
3,min,089-1000000,9.0,7.0,9.0,7.0,177.6315789,176.1666667,216.9090909
4,max,089-1000500,33.0,33.0,33.0,34.0,421.0714286,437.2857143,384.975


In [14]:
data.printSchema()

root
 |-- uniquePN: string (nullable = true)
 |-- no_CallIn_Unique: integer (nullable = true)
 |-- no_CallOut_Unique: integer (nullable = true)
 |-- no_CallIn: integer (nullable = true)
 |-- no_CallOut: integer (nullable = true)
 |-- avg_CallIn_Length: double (nullable = true)
 |-- avg_CallOut_Length: double (nullable = true)
 |-- avg_Call_Length: double (nullable = true)



In [15]:
data.toPandas()

Unnamed: 0,uniquePN,no_CallIn_Unique,no_CallOut_Unique,no_CallIn,no_CallOut,avg_CallIn_Length,avg_CallOut_Length,avg_Call_Length
0,089-1000000,13,25,15,26,304.466667,241.692308,264.658537
1,089-1000001,12,19,12,20,271.083333,314.500000,298.218750
2,089-1000002,17,31,18,31,306.055556,263.032258,278.836735
3,089-1000003,16,14,16,14,300.250000,311.142857,305.333333
4,089-1000004,22,16,24,16,308.750000,306.187500,307.725000
...,...,...,...,...,...,...,...,...
496,089-1000496,20,23,20,23,254.200000,258.217391,256.348837
497,089-1000497,29,18,29,18,283.275862,264.277778,276.000000
498,089-1000498,15,18,15,19,346.733333,303.421053,322.529412
499,089-1000499,15,22,15,22,187.400000,321.772727,267.297297


In [16]:
#5 - Print sample 5 rows of all variables
column_name = ["no_CallIn_Unique","no_CallOut_Unique","no_CallIn","no_CallOut"
               ,"avg_CallIn_Length","avg_CallOut_Length","avg_Call_Length"]

In [17]:
#6 - Create Vector
assem =  VectorAssembler(inputCols=column_name ,outputCol="temp_features")

print (assem)

VectorAssembler_b1e04e724e32


In [18]:
#7 - Normalize
scaler = MaxAbsScaler(inputCol="temp_features", outputCol="features")

print (scaler)

MaxAbsScaler_e7f375a9f71d


In [19]:
#8 - Create model
kmeans = KMeans().setK(3).setSeed(50)

In [20]:
#9 - Set ML pipeline
all_process_list = [assem,scaler,kmeans]
for process in all_process_list: print(process)

pipeline = Pipeline(stages=all_process_list)
print (pipeline)

VectorAssembler_b1e04e724e32
MaxAbsScaler_e7f375a9f71d
KMeans_cbe49547b19a
Pipeline_6b6bb1a35d19


In [21]:
#10 - Train model
model = pipeline.fit(data)

In [22]:
#11 - Make predictions
predictions = model.transform(data).select("features","prediction")
predictions.cache()

DataFrame[features: vector, prediction: int]

In [23]:
# Print sample result
predictions.sample(False, 0.3, 1234).toPandas()

Unnamed: 0,features,prediction
0,"[0.5151515151515151, 0.9393939393939394, 0.545...",2
1,"[0.48484848484848486, 0.42424242424242425, 0.4...",0
2,"[0.48484848484848486, 0.6060606060606061, 0.48...",0
3,"[0.4545454545454546, 0.6060606060606061, 0.454...",0
4,"[0.7272727272727273, 0.5151515151515151, 0.727...",1
...,...,...
143,"[0.3939393939393939, 0.48484848484848486, 0.39...",0
144,"[0.5151515151515151, 0.5757575757575758, 0.515...",0
145,"[0.4545454545454546, 0.5454545454545454, 0.454...",0
146,"[0.4545454545454546, 0.6666666666666667, 0.454...",0


In [24]:
#12 Shows Cluster's Center
centers = model.stages[2].clusterCenters()

scaler_model = model.stages[-2]
max = scaler_model.maxAbs
print("Cluster Centers: ")
for center in centers:
    print(center*max)

Cluster Centers: 
[ 15.76162791  17.16860465  16.04651163  17.51162791 301.89721689
 308.3416379  305.32854295]
[ 23.38888889  18.12121212  23.87373737  18.52525253 307.89342457
 308.3554855  308.00701047]
[ 18.76335878  24.8778626   19.18320611  25.34351145 306.71767746
 300.99831548 303.42067992]
