<h1> Install pyspark library </h1>


In [2]:
!pip install pyspark



In [3]:
#import pyspark library 
import pyspark

In [4]:
#import spark session library
from pyspark.sql import SparkSession

In [5]:
#create SparkSession object 
spark = SparkSession \
            .builder \
            .appName("K_means")\
            .getOrCreate()

<h1> Create DataFrame </h1>

In [7]:
#To create datafrale form external datasets 
df = spark.read.option("header","true").csv("airlines1.csv")

In [8]:
df.count()

50001

<h1> Data Preparation </h1>

In [9]:
# create new daraframe as per required columns for predictions 
AirlineDF = df.select("Origin","Dest", "AirTime", "Distance")

In [10]:
#cache data in-memory
AirlineDF.cache()

DataFrame[Origin: string, Dest: string, AirTime: string, Distance: string]

In [11]:
AirlineDF.show()

+------+----+-------+--------+
|Origin|Dest|AirTime|Distance|
+------+----+-------+--------+
|   MSP| SLC|  153.0|   991.0|
|   MKE| MCO|  141.0|  1066.0|
|   GJT| DFW|  103.0|   773.0|
|   LAX| DTW|  220.0|  1979.0|
|   EWR| CLT|   80.0|   529.0|
|   DFW| SHV|   28.0|   190.0|
|   BOS| CLE|   94.0|   563.0|
|   ATL| CAE|   35.0|   192.0|
|   ORD| CLE|   59.0|   316.0|
|   MDW| DAL|  114.0|   793.0|
|   SAN| LAX|   null|   109.0|
|   ELP| DAL|   77.0|   562.0|
|   SJU| MIA|   null|  1045.0|
|   ABQ| LAX|   95.0|   677.0|
|   ORD| LGA|   99.0|   733.0|
|   GSO| BWI|   null|   278.0|
|   DTW| MBS|   24.0|    98.0|
|   SLC| SEA|  102.0|   689.0|
|   LAX| IAD|  255.0|  2288.0|
|   SMF| LAX|   null|   373.0|
+------+----+-------+--------+
only showing top 20 rows



In [12]:
#check data types of each columns
AirlineDF.printSchema()

root
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- Distance: string (nullable = true)



In [13]:
from pyspark.sql.types import IntegerType

In [15]:
AirlineDF = AirlineDF.withColumn("Distance", AirlineDF["Distance"].cast(IntegerType()))

In [17]:
AirlineDF = AirlineDF.withColumn("AirTime", AirlineDF["AirTime"].cast(IntegerType()))

In [18]:
AirlineDF.printSchema()

root
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- AirTime: integer (nullable = true)
 |-- Distance: integer (nullable = true)



In [19]:
AirlineDF.show()

+------+----+-------+--------+
|Origin|Dest|AirTime|Distance|
+------+----+-------+--------+
|   MSP| SLC|    153|     991|
|   MKE| MCO|    141|    1066|
|   GJT| DFW|    103|     773|
|   LAX| DTW|    220|    1979|
|   EWR| CLT|     80|     529|
|   DFW| SHV|     28|     190|
|   BOS| CLE|     94|     563|
|   ATL| CAE|     35|     192|
|   ORD| CLE|     59|     316|
|   MDW| DAL|    114|     793|
|   SAN| LAX|   null|     109|
|   ELP| DAL|     77|     562|
|   SJU| MIA|   null|    1045|
|   ABQ| LAX|     95|     677|
|   ORD| LGA|     99|     733|
|   GSO| BWI|   null|     278|
|   DTW| MBS|     24|      98|
|   SLC| SEA|    102|     689|
|   LAX| IAD|    255|    2288|
|   SMF| LAX|   null|     373|
+------+----+-------+--------+
only showing top 20 rows



In [20]:
#check data types of each columns
AirlineDF.printSchema()

root
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- AirTime: integer (nullable = true)
 |-- Distance: integer (nullable = true)



<h1> Check is there any null values DataFrame </h1>

In [21]:
#import library for sql function col
from pyspark.sql.functions import col

In [23]:
#check null values in each columns 
print(AirlineDF.where(col("origin").isNull()).count())
print(AirlineDF.where(col("Dest").isNull()).count())
print(AirlineDF.where(col("AirTime").isNull()).count())
print(AirlineDF.where(col("Distance").isNull()).count())

0
0
10529
0


<h1> we need to handle null values </h1>

In [24]:
#nw drop row as correspondance to null values
AirlineDF = AirlineDF.dropna(subset=["AirTime"])

In [25]:
#check again 
print(AirlineDF.where(col("origin").isNull()).count())
print(AirlineDF.where(col("Dest").isNull()).count())
print(AirlineDF.where(col("AirTime").isNull()).count())

0
0
0


<h1> we need convert distance miles into kilometers </h1>

In [30]:
from pyspark.sql.functions import round

#Convert 'mile' to 'km'
AirlineDF1 = AirlineDF.withColumn('Distance', round(AirlineDF.Distance * 1.60934, 0))
AirlineDF1.show()

+------+----+-------+--------+
|Origin|Dest|AirTime|Distance|
+------+----+-------+--------+
|   MSP| SLC|    153|  2567.0|
|   MKE| MCO|    141|  2762.0|
|   GJT| DFW|    103|  2002.0|
|   LAX| DTW|    220|  5126.0|
|   EWR| CLT|     80|  1370.0|
|   DFW| SHV|     28|   492.0|
|   BOS| CLE|     94|  1458.0|
|   ATL| CAE|     35|   497.0|
|   ORD| CLE|     59|   819.0|
|   MDW| DAL|    114|  2054.0|
|   ELP| DAL|     77|  1455.0|
|   ABQ| LAX|     95|  1754.0|
|   ORD| LGA|     99|  1899.0|
|   DTW| MBS|     24|   254.0|
|   SLC| SEA|    102|  1785.0|
|   LAX| IAD|    255|  5926.0|
|   MSY| ORD|    119|  2168.0|
|   SGF| ATL|     85|  1458.0|
|   PHX| LGB|     62|   919.0|
|   BUR| SFO|     53|   845.0|
+------+----+-------+--------+
only showing top 20 rows



<h1> Vectorize the features </h1>

In [27]:
from pyspark.ml.feature import *

In [28]:
from pyspark.ml.feature import VectorAssembler

In [31]:
vectorizer = VectorAssembler()
vectorizer.setInputCols(['AirTime', 'Distance'])
vectorizer.setOutputCol('features')

dataset = vectorizer.setHandleInvalid("keep").transform(AirlineDF1)

In [32]:
dataset.show()

+------+----+-------+--------+--------------+
|Origin|Dest|AirTime|Distance|      features|
+------+----+-------+--------+--------------+
|   MSP| SLC|    153|  2567.0|[153.0,2567.0]|
|   MKE| MCO|    141|  2762.0|[141.0,2762.0]|
|   GJT| DFW|    103|  2002.0|[103.0,2002.0]|
|   LAX| DTW|    220|  5126.0|[220.0,5126.0]|
|   EWR| CLT|     80|  1370.0| [80.0,1370.0]|
|   DFW| SHV|     28|   492.0|  [28.0,492.0]|
|   BOS| CLE|     94|  1458.0| [94.0,1458.0]|
|   ATL| CAE|     35|   497.0|  [35.0,497.0]|
|   ORD| CLE|     59|   819.0|  [59.0,819.0]|
|   MDW| DAL|    114|  2054.0|[114.0,2054.0]|
|   ELP| DAL|     77|  1455.0| [77.0,1455.0]|
|   ABQ| LAX|     95|  1754.0| [95.0,1754.0]|
|   ORD| LGA|     99|  1899.0| [99.0,1899.0]|
|   DTW| MBS|     24|   254.0|  [24.0,254.0]|
|   SLC| SEA|    102|  1785.0|[102.0,1785.0]|
|   LAX| IAD|    255|  5926.0|[255.0,5926.0]|
|   MSY| ORD|    119|  2168.0|[119.0,2168.0]|
|   SGF| ATL|     85|  1458.0| [85.0,1458.0]|
|   PHX| LGB|     62|   919.0|  [6

<h1> Divide data unto multiple clusters </h1>

In [33]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

In [35]:
#Trains a k-means model 
kmeans = KMeans().setK(5).setSeed(1)
model = kmeans.fit(dataset)

In [36]:
#Make predictions
predictions = model.transform(dataset)

In [37]:
#Evaluate clustering by computing silhouette score
evaluator = ClusteringEvaluator()

In [39]:
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidian distance = "+ str(silhouette))

Silhouette with squared euclidian distance = 0.7227996253298555


In [40]:
#Shows the result
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Cluster Centers: 
[ 164.94000298 3238.86035432]
[ 267.31289833 5576.85250379]
[ 51.99164804 779.71879779]
[ 103.13336559 1877.94009031]
[  463.53571429 10366.28571429]


In [41]:
predictions.show()

+------+----+-------+--------+--------------+----------+
|Origin|Dest|AirTime|Distance|      features|prediction|
+------+----+-------+--------+--------------+----------+
|   MSP| SLC|    153|  2567.0|[153.0,2567.0]|         0|
|   MKE| MCO|    141|  2762.0|[141.0,2762.0]|         0|
|   GJT| DFW|    103|  2002.0|[103.0,2002.0]|         3|
|   LAX| DTW|    220|  5126.0|[220.0,5126.0]|         1|
|   EWR| CLT|     80|  1370.0| [80.0,1370.0]|         3|
|   DFW| SHV|     28|   492.0|  [28.0,492.0]|         2|
|   BOS| CLE|     94|  1458.0| [94.0,1458.0]|         3|
|   ATL| CAE|     35|   497.0|  [35.0,497.0]|         2|
|   ORD| CLE|     59|   819.0|  [59.0,819.0]|         2|
|   MDW| DAL|    114|  2054.0|[114.0,2054.0]|         3|
|   ELP| DAL|     77|  1455.0| [77.0,1455.0]|         3|
|   ABQ| LAX|     95|  1754.0| [95.0,1754.0]|         3|
|   ORD| LGA|     99|  1899.0| [99.0,1899.0]|         3|
|   DTW| MBS|     24|   254.0|  [24.0,254.0]|         2|
|   SLC| SEA|    102|  1785.0|[

<h1> How to get clustered data </h1>

In [44]:
predictions.select("Origin","Dest","AirTime","Distance").where(predictions.prediction=='2').show()

+------+----+-------+--------+
|Origin|Dest|AirTime|Distance|
+------+----+-------+--------+
|   DFW| SHV|     28|   492.0|
|   ATL| CAE|     35|   497.0|
|   ORD| CLE|     59|   819.0|
|   DTW| MBS|     24|   254.0|
|   PHX| LGB|     62|   919.0|
|   BUR| SFO|     53|   845.0|
|   DCA| BOS|     63|  1033.0|
|   CAE| ATL|     39|   497.0|
|   CHO| CLT|     52|   634.0|
|   MCI| STL|     38|   613.0|
|   PHX| LAX|     57|   958.0|
|   PHX| SNA|     55|   875.0|
|   OGG| HNL|     24|   259.0|
|   LNK| ORD|     71|  1207.0|
|   DAY| ORD|     43|   621.0|
|   PHX| SAN|     50|   787.0|
|   SFO| LAX|     59|   872.0|
|   RDU| ATL|     85|   922.0|
|   HNL| KOA|     28|   422.0|
|   DFW| LAW|     30|   362.0|
+------+----+-------+--------+
only showing top 20 rows

