# Pyspark Implementation

In [3]:
import pandas as pd

In [4]:
dataset = pd.read_csv("Airline_Dataset.csv")
dataset.head()

Unnamed: 0,id,Gender,Customer Type,Age,Type of Travel,Class,Flight Distance,Inflight wifi service,Departure/Arrival time convenient,Ease of Online booking,...,Inflight entertainment,On-board service,Leg room service,Baggage handling,Checkin service,Inflight service,Cleanliness,Departure Delay in Minutes,Arrival Delay in Minutes,Satisfaction
0,70172,Male,Loyal Customer,13,Personal Travel,Eco Plus,460,3,4,3,...,5,4,3,4,4,5,5,25,18.0,neutral or dissatisfied
1,5047,Male,disloyal Customer,25,Business travel,Business,235,3,2,3,...,1,1,5,3,1,4,1,1,6.0,neutral or dissatisfied
2,110028,Female,Loyal Customer,26,Business travel,Business,1142,2,2,2,...,5,4,3,4,4,4,5,0,0.0,satisfied
3,24026,Female,Loyal Customer,25,Business travel,Business,562,2,5,5,...,2,2,5,3,1,4,2,11,9.0,neutral or dissatisfied
4,119299,Male,Loyal Customer,61,Business travel,Business,214,3,3,3,...,3,3,4,4,3,3,3,0,0.0,satisfied


In [5]:
dataset.isnull().sum()

id                                     0
Gender                                 0
Customer Type                          0
Age                                    0
Type of Travel                         0
Class                                  0
Flight Distance                        0
Inflight wifi service                  0
Departure/Arrival time convenient      0
Ease of Online booking                 0
Gate location                          0
Food and drink                         0
Online boarding                        0
Seat comfort                           0
Inflight entertainment                 0
On-board service                       0
Leg room service                       0
Baggage handling                       0
Checkin service                        0
Inflight service                       0
Cleanliness                            0
Departure Delay in Minutes             0
Arrival Delay in Minutes             393
Satisfaction                           0
dtype: int64

In [6]:
dataset.dropna(inplace=True)
dataset.isnull().sum()

id                                   0
Gender                               0
Customer Type                        0
Age                                  0
Type of Travel                       0
Class                                0
Flight Distance                      0
Inflight wifi service                0
Departure/Arrival time convenient    0
Ease of Online booking               0
Gate location                        0
Food and drink                       0
Online boarding                      0
Seat comfort                         0
Inflight entertainment               0
On-board service                     0
Leg room service                     0
Baggage handling                     0
Checkin service                      0
Inflight service                     0
Cleanliness                          0
Departure Delay in Minutes           0
Arrival Delay in Minutes             0
Satisfaction                         0
dtype: int64

In [7]:
dataset.drop(['id','Class', 'Type of Travel','Gender'],axis=1,inplace=True)
dataset.to_csv("cleaned_Dataset.csv")

In [78]:
sc.stop()
spark.stop()

# ML with Pyspark

In [79]:
# load our pkgs
from pyspark import SparkContext

In [80]:
sc = SparkContext(master='local[2]')

In [81]:
sc

In [82]:
# load pkgs
from pyspark.sql import SparkSession
import pandas as pd

In [83]:
# Spark
spark = SparkSession.builder.appName("MLwithSpark").getOrCreate()

#### WorkFlow
+ Data prep
+ Feature Engineering
+ Build Model
+ Evaluate

In [84]:
# Load our dataset
df = spark.read.csv("cleaned_Dataset.csv", header=True, inferSchema=True)

In [85]:
# Preview Dataset
df.show(2)

+---+-----------------+---+---------------+---------------------+---------------------------------+----------------------+-------------+--------------+---------------+------------+----------------------+----------------+----------------+----------------+---------------+----------------+-----------+--------------------------+------------------------+--------------------+
|_c0|    Customer Type|Age|Flight Distance|Inflight wifi service|Departure/Arrival time convenient|Ease of Online booking|Gate location|Food and drink|Online boarding|Seat comfort|Inflight entertainment|On-board service|Leg room service|Baggage handling|Checkin service|Inflight service|Cleanliness|Departure Delay in Minutes|Arrival Delay in Minutes|        Satisfaction|
+---+-----------------+---+---------------+---------------------+---------------------------------+----------------------+-------------+--------------+---------------+------------+----------------------+----------------+----------------+----------------+

In [86]:
# Check for columns
print(df.columns)

['_c0', 'Customer Type', 'Age', 'Flight Distance', 'Inflight wifi service', 'Departure/Arrival time convenient', 'Ease of Online booking', 'Gate location', 'Food and drink', 'Online boarding', 'Seat comfort', 'Inflight entertainment', 'On-board service', 'Leg room service', 'Baggage handling', 'Checkin service', 'Inflight service', 'Cleanliness', 'Departure Delay in Minutes', 'Arrival Delay in Minutes', 'Satisfaction']


In [87]:
# check for datatypes
df.dtypes

[('_c0', 'int'),
 ('Customer Type', 'string'),
 ('Age', 'int'),
 ('Flight Distance', 'int'),
 ('Inflight wifi service', 'int'),
 ('Departure/Arrival time convenient', 'int'),
 ('Ease of Online booking', 'int'),
 ('Gate location', 'int'),
 ('Food and drink', 'int'),
 ('Online boarding', 'int'),
 ('Seat comfort', 'int'),
 ('Inflight entertainment', 'int'),
 ('On-board service', 'int'),
 ('Leg room service', 'int'),
 ('Baggage handling', 'int'),
 ('Checkin service', 'int'),
 ('Inflight service', 'int'),
 ('Cleanliness', 'int'),
 ('Departure Delay in Minutes', 'int'),
 ('Arrival Delay in Minutes', 'double'),
 ('Satisfaction', 'string')]

In [88]:
# check for the schema
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Customer Type: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Flight Distance: integer (nullable = true)
 |-- Inflight wifi service: integer (nullable = true)
 |-- Departure/Arrival time convenient: integer (nullable = true)
 |-- Ease of Online booking: integer (nullable = true)
 |-- Gate location: integer (nullable = true)
 |-- Food and drink: integer (nullable = true)
 |-- Online boarding: integer (nullable = true)
 |-- Seat comfort: integer (nullable = true)
 |-- Inflight entertainment: integer (nullable = true)
 |-- On-board service: integer (nullable = true)
 |-- Leg room service: integer (nullable = true)
 |-- Baggage handling: integer (nullable = true)
 |-- Checkin service: integer (nullable = true)
 |-- Inflight service: integer (nullable = true)
 |-- Cleanliness: integer (nullable = true)
 |-- Departure Delay in Minutes: integer (nullable = true)
 |-- Arrival Delay in Minutes: double (nullable = true)
 |-- Satisf

In [89]:
# Descriptive summary
print(df.describe().show())

+-------+------------------+-----------------+------------------+------------------+---------------------+---------------------------------+----------------------+------------------+------------------+------------------+------------------+----------------------+------------------+------------------+-----------------+------------------+------------------+------------------+--------------------------+------------------------+--------------------+
|summary|               _c0|    Customer Type|               Age|   Flight Distance|Inflight wifi service|Departure/Arrival time convenient|Ease of Online booking|     Gate location|    Food and drink|   Online boarding|      Seat comfort|Inflight entertainment|  On-board service|  Leg room service| Baggage handling|   Checkin service|  Inflight service|       Cleanliness|Departure Delay in Minutes|Arrival Delay in Minutes|        Satisfaction|
+-------+------------------+-----------------+------------------+------------------+------------------

In [90]:
# Value count
df.groupBy('Satisfaction').count().show()

+--------------------+-----+
|        Satisfaction|count|
+--------------------+-----+
|neutral or dissat...|73225|
|           satisfied|56262|
+--------------------+-----+



#### Feature Engineering
+ Numerical values
+ Vectorization
+ Scaling

In [91]:
# Load ML pkgs
from pyspark.ml.feature import VectorAssembler,StringIndexer

In [92]:
# Encoding for satisfaction
# Label encoding
satEncoding = StringIndexer(inputCol="Satisfaction",outputCol="Target").fit(df)
df = satEncoding.transform(df)

In [93]:
df.show(2)

+---+-----------------+---+---------------+---------------------+---------------------------------+----------------------+-------------+--------------+---------------+------------+----------------------+----------------+----------------+----------------+---------------+----------------+-----------+--------------------------+------------------------+--------------------+------+
|_c0|    Customer Type|Age|Flight Distance|Inflight wifi service|Departure/Arrival time convenient|Ease of Online booking|Gate location|Food and drink|Online boarding|Seat comfort|Inflight entertainment|On-board service|Leg room service|Baggage handling|Checkin service|Inflight service|Cleanliness|Departure Delay in Minutes|Arrival Delay in Minutes|        Satisfaction|Target|
+---+-----------------+---+---------------+---------------------+---------------------------------+----------------------+-------------+--------------+---------------+------------+----------------------+----------------+----------------+---

In [94]:
# Get the labels
satEncoding.labels

['neutral or dissatisfied', 'satisfied']

In [95]:
# Encoding for customer type and class
# Label encoding
custEncoding = StringIndexer(inputCol="Customer Type",outputCol="Type of Customer").fit(df)
df = custEncoding.transform(df)

df.show(2)

+---+-----------------+---+---------------+---------------------+---------------------------------+----------------------+-------------+--------------+---------------+------------+----------------------+----------------+----------------+----------------+---------------+----------------+-----------+--------------------------+------------------------+--------------------+------+----------------+
|_c0|    Customer Type|Age|Flight Distance|Inflight wifi service|Departure/Arrival time convenient|Ease of Online booking|Gate location|Food and drink|Online boarding|Seat comfort|Inflight entertainment|On-board service|Leg room service|Baggage handling|Checkin service|Inflight service|Cleanliness|Departure Delay in Minutes|Arrival Delay in Minutes|        Satisfaction|Target|Type of Customer|
+---+-----------------+---+---------------+---------------------+---------------------------------+----------------------+-------------+--------------+---------------+------------+----------------------+---

In [96]:
### Features
print(df.columns)

['_c0', 'Customer Type', 'Age', 'Flight Distance', 'Inflight wifi service', 'Departure/Arrival time convenient', 'Ease of Online booking', 'Gate location', 'Food and drink', 'Online boarding', 'Seat comfort', 'Inflight entertainment', 'On-board service', 'Leg room service', 'Baggage handling', 'Checkin service', 'Inflight service', 'Cleanliness', 'Departure Delay in Minutes', 'Arrival Delay in Minutes', 'Satisfaction', 'Target', 'Type of Customer']


In [97]:
df.dtypes

[('_c0', 'int'),
 ('Customer Type', 'string'),
 ('Age', 'int'),
 ('Flight Distance', 'int'),
 ('Inflight wifi service', 'int'),
 ('Departure/Arrival time convenient', 'int'),
 ('Ease of Online booking', 'int'),
 ('Gate location', 'int'),
 ('Food and drink', 'int'),
 ('Online boarding', 'int'),
 ('Seat comfort', 'int'),
 ('Inflight entertainment', 'int'),
 ('On-board service', 'int'),
 ('Leg room service', 'int'),
 ('Baggage handling', 'int'),
 ('Checkin service', 'int'),
 ('Inflight service', 'int'),
 ('Cleanliness', 'int'),
 ('Departure Delay in Minutes', 'int'),
 ('Arrival Delay in Minutes', 'double'),
 ('Satisfaction', 'string'),
 ('Target', 'double'),
 ('Type of Customer', 'double')]

In [98]:
df2 = df.select('Type of Customer', 'Age', 'Flight Distance', 'Inflight wifi service', 'Departure/Arrival time convenient', 'Ease of Online booking', 'Gate location', 'Food and drink', 'Online boarding', 'Seat comfort', 'Inflight entertainment', 'On-board service', 'Leg room service', 'Baggage handling', 'Checkin service', 'Inflight service', 'Cleanliness', 'Departure Delay in Minutes', 'Arrival Delay in Minutes', 'Target')

In [99]:
df2.printSchema()

root
 |-- Type of Customer: double (nullable = false)
 |-- Age: integer (nullable = true)
 |-- Flight Distance: integer (nullable = true)
 |-- Inflight wifi service: integer (nullable = true)
 |-- Departure/Arrival time convenient: integer (nullable = true)
 |-- Ease of Online booking: integer (nullable = true)
 |-- Gate location: integer (nullable = true)
 |-- Food and drink: integer (nullable = true)
 |-- Online boarding: integer (nullable = true)
 |-- Seat comfort: integer (nullable = true)
 |-- Inflight entertainment: integer (nullable = true)
 |-- On-board service: integer (nullable = true)
 |-- Leg room service: integer (nullable = true)
 |-- Baggage handling: integer (nullable = true)
 |-- Checkin service: integer (nullable = true)
 |-- Inflight service: integer (nullable = true)
 |-- Cleanliness: integer (nullable = true)
 |-- Departure Delay in Minutes: integer (nullable = true)
 |-- Arrival Delay in Minutes: double (nullable = true)
 |-- Target: double (nullable = false)



In [100]:
# Find Count of Null, None, NaN of All DataFrame Columns
from pyspark.sql.functions import col,isnan, when, count
df2.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in df2.columns]).show()

+----------------+---+---------------+---------------------+---------------------------------+----------------------+-------------+--------------+---------------+------------+----------------------+----------------+----------------+----------------+---------------+----------------+-----------+--------------------------+------------------------+------+
|Type of Customer|Age|Flight Distance|Inflight wifi service|Departure/Arrival time convenient|Ease of Online booking|Gate location|Food and drink|Online boarding|Seat comfort|Inflight entertainment|On-board service|Leg room service|Baggage handling|Checkin service|Inflight service|Cleanliness|Departure Delay in Minutes|Arrival Delay in Minutes|Target|
+----------------+---+---------------+---------------------+---------------------------------+----------------------+-------------+--------------+---------------+------------+----------------------+----------------+----------------+----------------+---------------+----------------+----------

In [101]:
required_features = ['Type of Customer', 'Age', 'Flight Distance', 'Inflight wifi service', 'Departure/Arrival time convenient', 'Ease of Online booking', 'Gate location', 'Food and drink', 'Online boarding', 'Seat comfort', 'Inflight entertainment', 'On-board service', 'Leg room service', 'Baggage handling', 'Checkin service', 'Inflight service', 'Cleanliness', 'Departure Delay in Minutes', 'Arrival Delay in Minutes', 'Target']

In [102]:
# Vector Asm
vec_assembler = VectorAssembler(inputCols=required_features,outputCol='features',handleInvalid="skip")

In [103]:
vec_df = vec_assembler.transform(df2)

In [104]:
vec_df.show(2)

+----------------+---+---------------+---------------------+---------------------------------+----------------------+-------------+--------------+---------------+------------+----------------------+----------------+----------------+----------------+---------------+----------------+-----------+--------------------------+------------------------+------+--------------------+
|Type of Customer|Age|Flight Distance|Inflight wifi service|Departure/Arrival time convenient|Ease of Online booking|Gate location|Food and drink|Online boarding|Seat comfort|Inflight entertainment|On-board service|Leg room service|Baggage handling|Checkin service|Inflight service|Cleanliness|Departure Delay in Minutes|Arrival Delay in Minutes|Target|            features|
+----------------+---+---------------+---------------------+---------------------------------+----------------------+-------------+--------------+---------------+------------+----------------------+----------------+----------------+----------------+-

### Train,Test Split

In [105]:
train_df,test_df = vec_df.randomSplit([0.6,0.4])

In [106]:
train_df.show(5)

+----------------+---+---------------+---------------------+---------------------------------+----------------------+-------------+--------------+---------------+------------+----------------------+----------------+----------------+----------------+---------------+----------------+-----------+--------------------------+------------------------+------+--------------------+
|Type of Customer|Age|Flight Distance|Inflight wifi service|Departure/Arrival time convenient|Ease of Online booking|Gate location|Food and drink|Online boarding|Seat comfort|Inflight entertainment|On-board service|Leg room service|Baggage handling|Checkin service|Inflight service|Cleanliness|Departure Delay in Minutes|Arrival Delay in Minutes|Target|            features|
+----------------+---+---------------+---------------------+---------------------------------+----------------------+-------------+--------------+---------------+------------+----------------------+----------------+----------------+----------------+-

#### Model Building
+ Pyspark.ml: Dataframe
+ Pyspark.mllib: RDD legacy

In [107]:
from pyspark.ml.classification import LogisticRegression,DecisionTreeClassifier

In [108]:
# Logistic model
lr = LogisticRegression(featuresCol='features',labelCol='Target')

In [109]:
lr_model = lr.fit(train_df)

In [110]:
y_pred = lr_model.transform(test_df)

In [111]:
print(y_pred.columns)

['Type of Customer', 'Age', 'Flight Distance', 'Inflight wifi service', 'Departure/Arrival time convenient', 'Ease of Online booking', 'Gate location', 'Food and drink', 'Online boarding', 'Seat comfort', 'Inflight entertainment', 'On-board service', 'Leg room service', 'Baggage handling', 'Checkin service', 'Inflight service', 'Cleanliness', 'Departure Delay in Minutes', 'Arrival Delay in Minutes', 'Target', 'features', 'rawPrediction', 'probability', 'prediction']


In [112]:
y_pred.select('Target','rawPrediction', 'probability', 'prediction').show(10)

+------+--------------------+--------------------+----------+
|Target|       rawPrediction|         probability|prediction|
+------+--------------------+--------------------+----------+
|   0.0|[21.4291874775231...|[0.99999999950634...|       0.0|
|   0.0|[17.7843770390490...|[0.99999998110516...|       0.0|
|   0.0|[20.7224693053810...|[0.99999999899920...|       0.0|
|   0.0|[17.1674151632025...|[0.99999996498240...|       0.0|
|   0.0|[16.5578767638398...|[0.99999993558226...|       0.0|
|   0.0|[15.5272436832605...|[0.99999981944743...|       0.0|
|   0.0|[17.6536346742091...|[0.99999997846604...|       0.0|
|   0.0|[18.1961942445054...|[0.99999998748320...|       0.0|
|   0.0|[20.5119962721458...|[0.99999999876475...|       0.0|
|   0.0|[17.3092896204382...|[0.99999996961417...|       0.0|
+------+--------------------+--------------------+----------+
only showing top 10 rows



#### Model Evaluation

In [113]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [114]:
# Check for accuracy
multi_evaluator = MulticlassClassificationEvaluator(labelCol='Target',metricName='accuracy',predictionCol='prediction')

In [115]:
multi_evaluator.evaluate(y_pred)

1.0

## K means Clustering model

In [116]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans(featuresCol=vec_assembler.getOutputCol(),predictionCol="cluster", k=2)
model = kmeans.fit(vec_df)
print("Model is successfully trained!")

Model is successfully trained!


## Centroid of each cluster

In [117]:
centers = model.clusterCenters()
print("Clsuter Centers: ")
for center in centers:
    print(center)

Clsuter Centers: 
[2.38553022e-01 3.83688254e+01 6.64565242e+02 2.72605342e+00
 3.08650001e+00 2.70968627e+00 2.97035763e+00 3.16076950e+00
 3.08782964e+00 3.31815160e+00 3.25292572e+00 3.30128848e+00
 3.24634087e+00 3.59074744e+00 3.25687241e+00 3.60650254e+00
 3.21227694e+00 1.47320790e+01 1.52439876e+01 3.46369364e-01]
[3.19087663e-02 4.23213627e+01 2.62471524e+03 2.73534155e+00
 2.97779634e+00 2.88532427e+00 2.99478747e+00 3.32453058e+00
 3.70271282e+00 3.77845294e+00 3.64500058e+00 3.60675613e+00
 3.63690819e+00 3.74415390e+00 3.44096302e+00 3.74026610e+00
 3.48801981e+00 1.44013363e+01 1.46739719e+01 6.75008640e-01]


## Cluster the data

In [118]:
prediction = model.transform(vec_df)
prediction.groupBy("cluster").count().orderBy("cluster").show()
prediction.select('cluster').show(5)

+-------+-----+
|cluster|count|
+-------+-----+
|      0|94763|
|      1|34724|
+-------+-----+

+-------+
|cluster|
+-------+
|      0|
|      0|
|      0|
|      0|
|      0|
+-------+
only showing top 5 rows



## Naive Bayes Classifier

In [119]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Split the data into train and test
splits = vec_df.randomSplit([0.6, 0.4], 1234)
train = splits[0]
test = splits[1]

# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1.0, modelType="multinomial",featuresCol=vec_assembler.getOutputCol(),predictionCol="pred",
               labelCol='Target')

# train the model
model = nb.fit(train)

# select example rows to display.
predictions = model.transform(test)
predictions.select("Target","pred").show(5)

+------+----+
|Target|pred|
+------+----+
|   0.0| 0.0|
|   0.0| 0.0|
|   0.0| 0.0|
|   0.0| 0.0|
|   0.0| 0.0|
+------+----+
only showing top 5 rows



In [120]:
# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="Target", predictionCol="pred",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

Test set accuracy = 0.6397160931954945
