In [1]:
import findspark
findspark.init()
findspark.find()
import pyspark
from pyspark import SparkContext
import pandas as pd
import numpy as np
from pyspark.sql.types import IntegerType, FloatType
import pyspark.sql.functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression
import pyspark.ml.evaluation as evals
import pyspark.ml.tuning as tune

In [8]:
# creating a spark session
# Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession

# Create my_spark
my_spark = SparkSession.builder.appName('Intro_to_pyspark').getOrCreate()
my_spark.conf.set("spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation","true")
# Print my_spark
print(my_spark)

<pyspark.sql.session.SparkSession object at 0x0000029F8DC91708>


In [None]:
sc = SparkContext("local", "Spark_Example_App")
print(sc.appName)

In [9]:
# Print the tables in the catalog
print(my_spark.catalog.listTables())

[]


In [10]:
# read csv file permanently created
flights = my_spark.read.csv('./flights.csv', header=True)
flights.write.saveAsTable("flights_1")

In [11]:
# Don't change this query
query = "SELECT * FROM flights_1 limit 10"

# Get the first 10 rows of flights
flights10 = my_spark.sql(query)

# Show the results
flights10.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
|2014|    1| 15|    1037|        7|    1

In [12]:
# Don't change this query
query = "SELECT origin, dest, COUNT(*) as N FROM flights_1 GROUP BY origin, dest"

# Run the query
flight_counts = my_spark.sql(query)

# Convert the results to a pandas DataFrame
pd_counts = flight_counts.toPandas()

# Print the head of pd_counts
print(pd_counts.head())

  origin dest    N
0    SEA  RNO    8
1    SEA  DTW   98
2    SEA  CLE    2
3    SEA  LAX  450
4    PDX  SEA  144


In [13]:
# Create pd_temp
pd_temp = pd.DataFrame(np.random.random(10))
print(pd_temp)
# Create spark_temp from pd_temp
spark_temp = my_spark.createDataFrame(pd_temp)
print('------------------')
print(spark_temp)
# Examine the tables in the catalog
print('------------------')
print(my_spark.catalog.listTables())

# Add spark_temp to the catalog
spark_temp.createOrReplaceTempView('temp')

# Examine the tables in the catalog again
print('------------------')
print(my_spark.catalog.listTables())

          0
0  0.479477
1  0.599164
2  0.401932
3  0.068809
4  0.471894
5  0.666400
6  0.770015
7  0.990154
8  0.236896
9  0.078585
------------------
DataFrame[0: double]
------------------
[Table(name='flights_1', database='default', description=None, tableType='MANAGED', isTemporary=False)]
------------------
[Table(name='flights_1', database='default', description=None, tableType='MANAGED', isTemporary=False), Table(name='temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


In [14]:
# Create the DataFrame flights
flights = my_spark.table('flights_1')

# Show the head
flights.show()

# Add duration_hrs
flights = flights.withColumn('duration_hrs', flights.air_time / 60)
flights.show(5)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
|2014|    1| 15|    1037|        7|    1

In [15]:
# Select the first set of columns
selected1 = flights.select("tailnum", "origin", "dest")

# Select the second set of columns
temp = flights.select(flights.origin, flights.dest, flights.carrier)

# Define first filter
filterA = flights.origin == "SEA"

# Define second filter
filterB = flights.dest == "PDX"

# Filter the data, first by filterA then by filterB
selected2 = temp.filter(filterA).filter(filterB)

selected2.show(10)

+------+----+-------+
|origin|dest|carrier|
+------+----+-------+
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     AS|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
+------+----+-------+
only showing top 10 rows



In [16]:
# Define avg_speed
avg_speed = (flights.distance/(flights.air_time/60)).alias("avg_speed")

# Select the correct columns
speed1 = flights.select("origin", "dest", "tailnum", avg_speed)

# Create the same table using a SQL expression
speed2 = flights.selectExpr("origin", "dest", "tailnum", "distance/(air_time/60) as avg_speed")
print(speed1.show(1))
print(speed2.show(1))

+------+----+-------+-----------------+
|origin|dest|tailnum|        avg_speed|
+------+----+-------+-----------------+
|   SEA| LAX| N846VA|433.6363636363636|
+------+----+-------+-----------------+
only showing top 1 row

None
+------+----+-------+-----------------+
|origin|dest|tailnum|        avg_speed|
+------+----+-------+-----------------+
|   SEA| LAX| N846VA|433.6363636363636|
+------+----+-------+-----------------+
only showing top 1 row

None


In [17]:
# Find the shortest flight from PDX in terms of distance
flights = flights \
    .withColumn("distance", flights["distance"].cast(FloatType())) \
    .withColumn("air_time", flights["air_time"].cast(FloatType()))
flights.filter(flights.origin == 'PDX').groupBy().min('distance').show()

# Find the longest flight from SEA in terms of air time
flights.filter(flights.origin == 'SEA').groupBy().max('air_time').show()

+-------------+
|min(distance)|
+-------------+
|        106.0|
+-------------+

+-------------+
|max(air_time)|
+-------------+
|        409.0|
+-------------+



In [18]:
# Average duration of Delta flights
flights.filter(flights.carrier == "DL").filter(flights.origin == "SEA").groupBy().avg("air_time").show()

# Total hours in the air
flights.withColumn("duration_hrs", flights.air_time/60).groupBy().sum("duration_hrs").show()

+------------------+
|     avg(air_time)|
+------------------+
|188.20689655172413|
+------------------+

+------------------+
| sum(duration_hrs)|
+------------------+
|25289.600000000126|
+------------------+



In [19]:
# Group by tailnum
by_plane = flights.groupBy("tailnum")

# Number of flights each plane made
by_plane.count().show(5)

# Group by origin
by_origin = flights.groupBy("origin")

# Average duration of flights from PDX and SEA
by_origin.avg("air_time").show()

+-------+-----+
|tailnum|count|
+-------+-----+
| N442AS|   38|
| N102UW|    2|
| N36472|    4|
| N38451|    4|
| N73283|    4|
+-------+-----+
only showing top 5 rows

+------+------------------+
|origin|     avg(air_time)|
+------+------------------+
|   SEA| 160.4361496051259|
|   PDX|137.11543248288737|
+------+------------------+



In [20]:
# change datatype of the dep_delay to float
flights = flights \
    .withColumn("dep_delay", flights["dep_delay"].cast(FloatType()))

# Group by month and dest
by_month_dest = flights.groupBy('month', 'dest')

# Average departure delay by month and destination
by_month_dest.avg('dep_delay').show(3)

# Standard deviation of departure delay
by_month_dest.agg(F.stddev('dep_delay')).show(3)

+-----+----+-------------------+
|month|dest|     avg(dep_delay)|
+-----+----+-------------------+
|   11| TUS|-2.3333333333333335|
|   11| ANC|  7.529411764705882|
|    1| BUR|              -1.45|
+-----+----+-------------------+
only showing top 3 rows

+-----+----+----------------------+
|month|dest|stddev_samp(dep_delay)|
+-----+----+----------------------+
|   11| TUS|    3.0550504633038935|
|   11| ANC|    18.604716401245316|
|    1| BUR|     15.22627576540667|
+-----+----+----------------------+
only showing top 3 rows



In [21]:
# Don't change this file path
file_path = "./airports.csv"

# Read in the airports data
airports = my_spark.read.csv(file_path, header=True)
airports.write.saveAsTable("airports")

# Show the results
airports.show(5)

+---+--------------------+----------+-----------+----+---+---+
|faa|                name|       lat|        lon| alt| tz|dst|
+---+--------------------+----------+-----------+----+---+---+
|04G|   Lansdowne Airport|41.1304722|-80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|32.4605722|-85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|41.9893408|-88.1012428| 801| -6|  A|
|06N|     Randall Airport| 41.431912|-74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|31.0744722|-81.4277778|  11| -4|  A|
+---+--------------------+----------+-----------+----+---+---+
only showing top 5 rows



In [22]:
# Examine the data
print(airports.show(5))

# Rename the faa column
airports = airports.withColumnRenamed('faa', 'dest')

# Join the DataFrames
flights_with_airports = flights.join(airports, on='dest', how='leftouter')

# Examine the new DataFrame
print(flights_with_airports.show(5))

+---+--------------------+----------+-----------+----+---+---+
|faa|                name|       lat|        lon| alt| tz|dst|
+---+--------------------+----------+-----------+----+---+---+
|04G|   Lansdowne Airport|41.1304722|-80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|32.4605722|-85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|41.9893408|-88.1012428| 801| -6|  A|
|06N|     Randall Airport| 41.431912|-74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|31.0744722|-81.4277778|  11| -4|  A|
+---+--------------------+----------+-----------+----+---+---+
only showing top 5 rows

None
+----+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+--------+--------+----+------+------------------+--------------------+---------+-----------+---+---+---+
|dest|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|air_time|distance|hour|minute|      duration_hrs|                name|      lat|        lon|alt| tz|dst|
+----+----+----

In [23]:
# Don't change this file path
file_path = "./planes.csv"

# Read in the airports data
planes = my_spark.read.csv(file_path, header=True)
planes.write.saveAsTable("planes")

# Show the results
planes.show(5)

+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
|tailnum|year|                type|    manufacturer|   model|engines|seats|speed|   engine|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
| N102UW|1998|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N103US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N104UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N105UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N107US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
only showing top 5 rows



In [24]:
# Rename year column
planes = planes.withColumnRenamed('year', 'plane_year')

# Join the DataFrames
model_data = flights.join(planes, on='tailnum', how="leftouter")
model_data.show(5)

+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+------------------+----------+--------------------+------------+--------+-------+-----+-----+---------+
|tailnum|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|flight|origin|dest|air_time|distance|hour|minute|      duration_hrs|plane_year|                type|manufacturer|   model|engines|seats|speed|   engine|
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+------------------+----------+--------------------+------------+--------+-------+-----+-----+---------+
| N846VA|2014|   12|  8|     658|     -7.0|     935|       -5|     VX|  1780|   SEA| LAX|   132.0|   954.0|   6|    58|               2.2|      2011|Fixed wing multi ...|      AIRBUS|A320-214|      2|  182|   NA|Turbo-fan|
| N559AS|2014|    1| 22|    1040|      5.0|    1505|        5|     AS|   851|   SEA| HNL|   360.0|  2677.0| 

In [25]:
# Cast the columns to integers
model_data = model_data.withColumn("arr_delay", model_data.arr_delay.cast('integer'))
model_data = model_data.withColumn("air_time", model_data.air_time.cast('integer'))
model_data = model_data.withColumn("month", model_data.month.cast('integer'))
model_data = model_data.withColumn("plane_year", model_data.plane_year.cast('integer'))
model_data.dtypes

[('tailnum', 'string'),
 ('year', 'string'),
 ('month', 'int'),
 ('day', 'string'),
 ('dep_time', 'string'),
 ('dep_delay', 'float'),
 ('arr_time', 'string'),
 ('arr_delay', 'int'),
 ('carrier', 'string'),
 ('flight', 'string'),
 ('origin', 'string'),
 ('dest', 'string'),
 ('air_time', 'int'),
 ('distance', 'float'),
 ('hour', 'string'),
 ('minute', 'string'),
 ('duration_hrs', 'double'),
 ('plane_year', 'int'),
 ('type', 'string'),
 ('manufacturer', 'string'),
 ('model', 'string'),
 ('engines', 'string'),
 ('seats', 'string'),
 ('speed', 'string'),
 ('engine', 'string')]

In [26]:
# Create the column plane_age
model_data = model_data.withColumn("plane_age", model_data.year - model_data.plane_year)
model_data.show(5)

+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+------------------+----------+--------------------+------------+--------+-------+-----+-----+---------+---------+
|tailnum|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|flight|origin|dest|air_time|distance|hour|minute|      duration_hrs|plane_year|                type|manufacturer|   model|engines|seats|speed|   engine|plane_age|
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+------------------+----------+--------------------+------------+--------+-------+-----+-----+---------+---------+
| N846VA|2014|   12|  8|     658|     -7.0|     935|       -5|     VX|  1780|   SEA| LAX|     132|   954.0|   6|    58|               2.2|      2011|Fixed wing multi ...|      AIRBUS|A320-214|      2|  182|   NA|Turbo-fan|      3.0|
| N559AS|2014|    1| 22|    1040|      5.0|    1505|        5|     A

In [27]:
# Create is_late
model_data = model_data.withColumn("is_late", model_data.arr_delay > 0)
model_data.select('is_late').show(3)
# Convert to an integer
model_data = model_data.withColumn("label", model_data.is_late.cast('integer'))
model_data.select('label').show(3)
# Remove missing values
model_data = model_data.filter("arr_delay is not NULL and dep_delay is not NULL and air_time is not NULL and plane_year is not NULL")

+-------+
|is_late|
+-------+
|  false|
|   true|
|   true|
+-------+
only showing top 3 rows

+-----+
|label|
+-----+
|    0|
|    1|
|    1|
+-----+
only showing top 3 rows



In [31]:
model_data.select('carrier').show(5)

+-------+
|carrier|
+-------+
|     US|
|     US|
|     US|
|     US|
|     US|
+-------+
only showing top 5 rows



In [33]:
# Create a StringIndexer for carrier
carr_indexer = StringIndexer(inputCol='carrier', outputCol='carrier_index')
print(carr_indexer)
# Create a OneHotEncoder for carrier
carr_encoder = OneHotEncoder(inputCol='carrier_index', outputCol='carrier_fact')
print(carr_encoder)

StringIndexer_45b27aa3eeba
OneHotEncoder_7a2125de7267


In [34]:
# Create a StringIndexer for distance
dest_indexer = StringIndexer(inputCol='dest', outputCol='dest_index')

# Create a OneHotEncoder for distance
dest_encoder = OneHotEncoder(inputCol='dest_index', outputCol='dest_fact')

In [37]:
# Make a VectorAssembler
vec_assembler = VectorAssembler(
    inputCols=['month', 'air_time', 'carrier_fact', 'dest_fact', 'plane_age'], 
    outputCol='features'
)

In [39]:
# Make the pipeline
flights_pipe = Pipeline(
    stages=[dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler]
)

In [40]:
# Fit and transform the data
piped_data = flights_pipe.fit(model_data).transform(model_data)

In [41]:
# Split the data into training and test sets
training, test = piped_data.randomSplit([.6, .4])

In [43]:
# Create a LogisticRegression Estimator
lr = LogisticRegression()

In [45]:
# Create a BinaryClassificationEvaluator
evaluator = evals.BinaryClassificationEvaluator(metricName='areaUnderROC')

In [47]:
# Create the parameter grid
grid = tune.ParamGridBuilder()

# Add the hyperparameter
grid = grid.addGrid(lr.regParam, np.arange(0, .1, .01))
grid = grid.addGrid(lr.elasticNetParam, [0, 1])

# Build the grid
grid = grid.build()

In [48]:
# Create the CrossValidator
cv = tune.CrossValidator(estimator=lr,
               estimatorParamMaps=grid,
               evaluator=evaluator
               )

In [49]:
# Fit cross validation models
models = cv.fit(training)

# Extract the best model
best_lr = models.bestModel

In [50]:
print(best_lr)
# Use the model to predict the test set
test_results = best_lr.transform(test)

# Evaluate the predictions
print(evaluator.evaluate(test_results))

LogisticRegressionModel: uid = LogisticRegression_55766992d6b0, numClasses = 2, numFeatures = 81
0.7011696395780714
