In [1]:
import findspark

In [2]:
findspark.init()

In [5]:
import pyspark
from pyspark.sql import SparkSession

In [6]:
# create spark object 
spark = SparkSession\
    .builder.getOrCreate()

In [8]:
# filepath 
file_path = "./flights_small.csv"

df_flights = spark.read.csv(file_path)

In [9]:
df_flights.show(5)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
| _c0|  _c1|_c2|     _c3|      _c4|     _c5|      _c6|    _c7|    _c8|   _c9|  _c10|_c11|    _c12|    _c13|_c14|  _c15|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
+----+-----+---+--------+---------+-----

In [10]:
# again 
# read csv file
df_flight = spark.read.format("csv")\
    .option("header","true")\
    .option("inferSchema","true")\
    .load(file_path)

In [12]:
df_flight.show(3)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
only showing top 3 rows



In [15]:
# Aggregating1 
# groupby()

by_plane = df_flight.groupBy("tailnum")

In [17]:
by_plane.show()

AttributeError: 'GroupedData' object has no attribute 'show'

In [18]:
# Grouped Data has no show 
# count()

by_plane.count().show()

+-------+-----+
|tailnum|count|
+-------+-----+
| N442AS|   38|
| N102UW|    2|
| N36472|    4|
| N38451|    4|
| N73283|    4|
| N513UA|    2|
| N954WN|    5|
| N388DA|    3|
| N567AA|    1|
| N516UA|    2|
| N927DN|    1|
| N8322X|    1|
| N466SW|    1|
|  N6700|    1|
| N607AS|   45|
| N622SW|    4|
| N584AS|   31|
| N914WN|    4|
| N654AW|    2|
| N336NW|    1|
+-------+-----+
only showing top 20 rows



In [19]:
# group by origin 
by_origin = df_flight.groupBy("origin")

In [20]:
# avg_duration of flight from PDX and SEA 
by_origin.avg("air_time").show()

AnalysisException: "air_time" is not a numeric column. Aggregation function can only be applied on a numeric column.

In [21]:
df_flight

DataFrame[year: int, month: int, day: int, dep_time: string, dep_delay: string, arr_time: string, arr_delay: string, carrier: string, tailnum: string, flight: int, origin: string, dest: string, air_time: string, distance: int, hour: string, minute: string]

In [22]:
# .agg()
import pyspark.sql.functions as F

In [42]:
# Group by month and dest 
by_month_dest = df_flight.groupBy("month","dest")

In [43]:
by_month_dest.count().show(5)

+-----+----+-----+
|month|dest|count|
+-----+----+-----+
|    4| PHX|   60|
|    1| RDM|    8|
|    5| ONT|    9|
|    7| OMA|    2|
|    8| MDW|   20|
+-----+----+-----+
only showing top 5 rows



In [44]:
# avg delay by month and destination 
by_month_dest.avg("dep_delay").show()

+-----+----+-------------------+
|month|dest|     avg(dep_delay)|
+-----+----+-------------------+
|    4| PHX| 1.6833333333333333|
|    1| RDM|             -1.625|
|    5| ONT| 3.5555555555555554|
|    7| OMA|               -6.5|
|    8| MDW|               7.45|
|    6| DEN|  5.418181818181818|
|    5| IAD|               -4.0|
|   12| COS|               -1.0|
|   11| ANC|  7.529411764705882|
|    5| AUS|              -0.75|
|    5| COS| 11.666666666666666|
|    2| PSP|                0.6|
|    4| ORD|0.14285714285714285|
|   10| DFW| 18.176470588235293|
|   10| DCA|               -1.5|
|    8| JNU|             18.125|
|   11| KOA|               -1.0|
|   10| OMA|-0.6666666666666666|
|    6| ONT|              9.625|
|    3| MSP|                3.2|
+-----+----+-------------------+
only showing top 20 rows



In [30]:
from pyspark.sql.functions import col

In [32]:
# change type 
df_flight = df_flight.withColumn("dep_delay",col("dep_delay").cast("int"))

DataFrame[year: int, month: int, day: int, dep_time: string, dep_delay: int, arr_time: string, arr_delay: string, carrier: string, tailnum: string, flight: int, origin: string, dest: string, air_time: string, distance: int, hour: string, minute: string]

In [34]:
by_month_dest = df_flight.groupBy("month","dest")

In [45]:
# avg dep delat by month and dest 
by_month_dest.avg("dep_delay").show(3)

+-----+----+------------------+
|month|dest|    avg(dep_delay)|
+-----+----+------------------+
|    4| PHX|1.6833333333333333|
|    1| RDM|            -1.625|
|    5| ONT|3.5555555555555554|
+-----+----+------------------+
only showing top 3 rows



In [47]:
# standard deviation of dep_delay 
#. agg(F.stddev)
by_month_dest.agg(F.stddev("dep_delay")).show(5)

+-----+----+----------------------+
|month|dest|stddev_samp(dep_delay)|
+-----+----+----------------------+
|    4| PHX|    15.003380033491737|
|    1| RDM|     8.830749846821778|
|    5| ONT|    18.895178691342874|
|    7| OMA|    2.1213203435596424|
|    8| MDW|    14.467659032985843|
+-----+----+----------------------+
only showing top 5 rows



In [None]:
# joining


In [48]:
df_flight.show(3)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
only showing top 3 rows



In [50]:
df_flight.columns

['year',
 'month',
 'day',
 'dep_time',
 'dep_delay',
 'arr_time',
 'arr_delay',
 'carrier',
 'tailnum',
 'flight',
 'origin',
 'dest',
 'air_time',
 'distance',
 'hour',
 'minute']

In [None]:
# joining 
# .join()

# exmaine the data

In [53]:
df_airport = spark.read.format("csv")\
    .option("header","true")\
    .option("inferSchema","true")\
    .load("./airports.csv")

In [54]:
df_airport.show(3)

+---+--------------------+----------+-----------+----+---+---+
|faa|                name|       lat|        lon| alt| tz|dst|
+---+--------------------+----------+-----------+----+---+---+
|04G|   Lansdowne Airport|41.1304722|-80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|32.4605722|-85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|41.9893408|-88.1012428| 801| -6|  A|
+---+--------------------+----------+-----------+----+---+---+
only showing top 3 rows



In [55]:
# rename the faa column 
# rename 
# .withColumnRenaed("","")
df_airport = df_airport.withColumnRenamed("faa","dest")

In [56]:
# Join the dataframes 
# df.join(df2, on, how)
df_flight_airport = df_flight.join(df_airport, on="dest",how="leftouter")

In [57]:
df_flight_airport.show(3)

+----+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+--------+--------+----+------+------------------+---------+-----------+---+---+---+
|dest|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|air_time|distance|hour|minute|              name|      lat|        lon|alt| tz|dst|
+----+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+--------+--------+----+------+------------------+---------+-----------+---+---+---+
| LAX|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA|     132|     954|   6|    58|  Los Angeles Intl|33.942536|-118.408075|126| -8|  A|
| HNL|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA|     360|    2677|  10|    40|     Honolulu Intl|21.318681|-157.922428| 13|-10|  N|
| SFO|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA|     111|     679|  14|    43|San Francisco In

In [None]:
### ML pipelines 
# from pyspark.ml import Transformer , Estimator 
# .transform(), 
# .fit()

In [58]:
# Load planes data 
df_plane = spark.read.format("csv")\
    .option("header","true")\
    .option("inferSchema","true")\
    .load("./planes.csv")

In [59]:
df_plane.show(1)

+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
|tailnum|year|                type|    manufacturer|   model|engines|seats|speed|   engine|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
| N102UW|1998|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
only showing top 1 row



In [60]:
# Rename year column on df_plane 
df_plane = df_plane.withColumnRenamed("year","plane_year")

In [61]:
# join the dataframe 
model_data = df_flight.join(df_plane, on="tailnum",how="leftouter")

In [62]:
model_data.show(3)

+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+------------+--------+-------+-----+-----+---------+
|tailnum|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|flight|origin|dest|air_time|distance|hour|minute|plane_year|                type|manufacturer|   model|engines|seats|speed|   engine|
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+------------+--------+-------+-----+-----+---------+
| N846VA|2014|   12|  8|     658|       -7|     935|       -5|     VX|  1780|   SEA| LAX|     132|     954|   6|    58|      2011|Fixed wing multi ...|      AIRBUS|A320-214|      2|  182|   NA|Turbo-fan|
| N559AS|2014|    1| 22|    1040|        5|    1505|        5|     AS|   851|   SEA| HNL|     360|    2677|  10|    40|      2006|Fixed wing multi ...|      BOEING| 737-890|      2|  1

In [63]:
model_data

DataFrame[tailnum: string, year: int, month: int, day: int, dep_time: string, dep_delay: int, arr_time: string, arr_delay: string, carrier: string, flight: int, origin: string, dest: string, air_time: string, distance: int, hour: string, minute: string, plane_year: string, type: string, manufacturer: string, model: string, engines: int, seats: int, speed: string, engine: string]

In [64]:
model_data.columns

['tailnum',
 'year',
 'month',
 'day',
 'dep_time',
 'dep_delay',
 'arr_time',
 'arr_delay',
 'carrier',
 'flight',
 'origin',
 'dest',
 'air_time',
 'distance',
 'hour',
 'minute',
 'plane_year',
 'type',
 'manufacturer',
 'model',
 'engines',
 'seats',
 'speed',
 'engine']

In [66]:
# change datatypes 
# withcolumn(), .cast() => integer 
# arr_delay, air_time, month, plane_year
model_data = model_data.withColumn("arr_delay", col("arr_delay").cast("integer"))
model_data = model_data.withColumn("air_time",col("air_time").cast("integer"))
model_data = model_data.withColumn("month",col("month").cast("integer"))
model_data = model_data.withColumn("plane_year", col("plane_year").cast("integer"))

In [67]:
# create  new column "plane_age"
model_data = model_data.withColumn("plane_age", model_data.year - model_data.plane_year)

In [68]:
model_data.show(1)

+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+------------+--------+-------+-----+-----+---------+---------+
|tailnum|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|flight|origin|dest|air_time|distance|hour|minute|plane_year|                type|manufacturer|   model|engines|seats|speed|   engine|plane_age|
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+------------+--------+-------+-----+-----+---------+---------+
| N846VA|2014|   12|  8|     658|       -7|     935|       -5|     VX|  1780|   SEA| LAX|     132|     954|   6|    58|      2011|Fixed wing multi ...|      AIRBUS|A320-214|      2|  182|   NA|Turbo-fan|        3|
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------

##### making a boolean

In [70]:
# create is late col 
is_late_cond = model_data.arr_delay > 0 

model_data = model_data.withColumn("is_late",is_late_cond)


In [71]:
# convert to an integer 
model_data = model_data.withColumn("label",model_data.is_late.cast("integer"))

In [72]:
model_data.show(1)

+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+------------+--------+-------+-----+-----+---------+---------+-------+-----+
|tailnum|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|flight|origin|dest|air_time|distance|hour|minute|plane_year|                type|manufacturer|   model|engines|seats|speed|   engine|plane_age|is_late|label|
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+------------+--------+-------+-----+-----+---------+---------+-------+-----+
| N846VA|2014|   12|  8|     658|       -7|     935|       -5|     VX|  1780|   SEA| LAX|     132|     954|   6|    58|      2011|Fixed wing multi ...|      AIRBUS|A320-214|      2|  182|   NA|Turbo-fan|        3|  false|    0|
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----

In [73]:
# Remove missing values using .filter()
filter_not_null = """
    arr_delay is not NULL and dep_delay is not NULL and air_time is not NULL and
    plane_year is not NULL
"""
# apply 
model_data =model_data.filter(filter_not_null)

##### make one-hot vector 

In [None]:
# from pyspark.ml.features import OneHotEncoder

In [75]:
# StringIndexer??
model_data.select("carrier").show(5)

+-------+
|carrier|
+-------+
|     VX|
|     AS|
|     VX|
|     WN|
|     AS|
+-------+
only showing top 5 rows



In [82]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

In [81]:
# Create a StringIndexer 
carr_indexer = StringIndexer(
    inputCol="carrier",outputCol="carrier_index"
)

In [83]:
# Create a OneHotEncoder
carr_encoder = OneHotEncoder(
    inputCol="carrier_index",
    outputCol="carrier_fact"
)

In [89]:
# Destination StringIndexer, OneHotEncoder 
dest_indexer = StringIndexer(
    inputCol="dest",
    outputCol="dest_index"
)
# Create OneHotEncoder 
dest_encoder = OneHotEncoder(
    inputCol="dest_index",
    outputCol="dest_fact"
)

In [91]:
from pyspark.ml.feature import VectorAssembler

In [92]:
# make a VectorAssembler
# pipeline, features into a single column
vec_assembler = VectorAssembler(
    inputCols= ["month","air_time","carrier_fact","dest_fact","plane_age"],
    outputCol = "features"
)

### Create the pipeline

In [93]:
from pyspark.ml import Pipeline

In [94]:
# set stages_list 
stage_list = [dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler]

In [95]:
# make the pipeline 
flight_pipeline = Pipeline(stages=stage_list)

In [96]:
# fit and transform the data 
piped_data = flight_pipeline.fit(model_data).transform(model_data)

In [97]:
# split the data 
# .randomSplit()
train, test = piped_data.randomSplit([.6, .4])

#### create the modeler

In [98]:
# import LogisticRegression
from pyspark.ml.classification import LogisticRegression

In [99]:
# create lr estimator 
lr = LogisticRegression()

In [None]:
# Cross validation
# 2 hyperparameteres:
# 1. elasticNetParam
# 2. regParam

#### create the evaluator

In [100]:
# import the evaluation submodule 
import pyspark.ml.evaluation as evals

In [101]:
# Create a BinaryClassificationEvaluator 
evaluator = evals.BinaryClassificationEvaluator(
    metricName="areaUnderROC"
)

#### Make a grid
    => to search optimal hyperparameters

In [102]:
# import the tuning submodule 
import pyspark.ml.tuning as tune

In [103]:
# Create the parameter grid 
grid = tune.ParamGridBuilder()

In [105]:
import numpy as np
import pandas as pd

In [106]:
# Add the hyperparameter 
grid = grid.addGrid(
    lr.regParam, np.arange(0, .1, .01)
)

In [107]:
# add the hyper paramter elasticNetParam 
grid = grid.addGrid(
    lr.elasticNetParam, [0,1]
)

In [108]:
# build the grid 
grid = grid.build()

#### Make the validator

In [110]:
# import tune
import pyspark.ml.tuning as tune

In [111]:
# Create the Crossvalidator 
cv = tune.CrossValidator(
    estimator = lr, 
    estimatorParamMaps = grid, 
    evaluator=evaluator
)

#### fit the model

In [112]:
# fit cross validation model 
## ** Training***
models = cv.fit(train)

In [113]:
# Extract the best model 
best_lr = models.bestModel

In [114]:
print(best_lr)

LogisticRegressionModel: uid=LogisticRegression_3fd279b4be63, numClasses=2, numFeatures=81


#### Evaluate the model

In [115]:
# Use the model to predict the test set
# *** Predict**
test_results = best_lr.transform(test)

In [116]:
# Evaluate the predicttions 
# evaluator.evaluate()
print(evaluator.evaluate(test_results))

0.6885569061638319
