In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840625 sha256=8bd88289af9003ff9bb0380337851ae710e35f7c599841f1a0fa0e53e29d7faa
  Stored in directory: /root/.cache/pip/wheels/1b/3a/92/28b93e2fbfdbb07509ca4d6f50c5e407f48dce4ddbda69a4ab
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.3


In [2]:
import os




Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np




In [5]:
spark = SparkSession.builder.getOrCreate()

In [6]:
airports = pd.read_csv('airports.csv')
flights = pd.read_csv('flights_small.csv')
planes = pd.read_csv('planes.csv')

In [7]:

spark_airports = spark.createDataFrame(airports)
spark_flights = spark.createDataFrame(flights)
spark_planes = spark.createDataFrame(planes)

In [8]:
print(spark.catalog.listTables())

[]


In [9]:
# use pandas to make spark table
spark_airports.createOrReplaceTempView("airports")
spark_airports.createOrReplaceTempView("flights")
spark_airports.createOrReplaceTempView("planes")

In [10]:
print(spark.catalog.listTables())

[Table(name='airports', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True), Table(name='flights', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True), Table(name='planes', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]


In [11]:
# read straight in to spark
file_path = "airports.csv"

# Read in the airports data
airports = spark.read.csv(file_path, header=True)

In [12]:
spark_airports.show(5)

+---+--------------------+----------+-----------+----+---+---+
|faa|                name|       lat|        lon| alt| tz|dst|
+---+--------------------+----------+-----------+----+---+---+
|04G|   Lansdowne Airport|41.1304722|-80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|32.4605722|-85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|41.9893408|-88.1012428| 801| -6|  A|
|06N|     Randall Airport| 41.431912|-74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|31.0744722|-81.4277778|  11| -4|  A|
+---+--------------------+----------+-----------+----+---+---+
only showing top 5 rows



In [13]:
airports.show(5)

+---+--------------------+----------+-----------+----+---+---+
|faa|                name|       lat|        lon| alt| tz|dst|
+---+--------------------+----------+-----------+----+---+---+
|04G|   Lansdowne Airport|41.1304722|-80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|32.4605722|-85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|41.9893408|-88.1012428| 801| -6|  A|
|06N|     Randall Airport| 41.431912|-74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|31.0744722|-81.4277778|  11| -4|  A|
+---+--------------------+----------+-----------+----+---+---+
only showing top 5 rows



In [14]:
flights = spark_flights

In [15]:
# add in a column in spark
flights = flights.withColumn("duration_hrs",flights.air_time/60)
# this adds column duration hours and the calculation for the data

In [16]:
#  filtering in spark can be done 2 ways
# Filter flights by passing a string
long_flights1 = flights.filter("distance > 1000")

# Filter flights by passing a column of boolean values
long_flights2 = flights.filter(flights.distance > 1000)

In [17]:
# Print the data to check they're equal
long_flights1.show(5)
long_flights2.show(5)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+-----------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|     duration_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+-----------------+
|2014|    1| 22|  1040.0|      5.0|  1505.0|      5.0|     AS| N559AS|   851|   SEA| HNL|   360.0|    2677|10.0|  40.0|              6.0|
|2014|    4| 19|  1236.0|     -4.0|  1508.0|     -7.0|     AS| N309AS|   490|   SEA| SAN|   135.0|    1050|12.0|  36.0|             2.25|
|2014|   11| 19|  1812.0|     -3.0|  2352.0|     -4.0|     AS| N564AS|    26|   SEA| ORD|   198.0|    1721|18.0|  12.0|              3.3|
|2014|    8|  3|  1120.0|      0.0|  1415.0|      2.0|     AS| N305AS|   656|   SEA| PHX|   154.0|    1107|11.0|  20.0|2.566666666666667|
|2014|   11| 12|  2346.0|     -4.0

In [18]:
# it is all very similar to sql for instance a select clause but can.
# call 2 ways

selected1 = flights.select("tailnum", "origin", "dest")
temp = flights.select(flights.origin, flights.dest, flights.carrier)

In [19]:
# Define first filter
filterA = flights.origin == "SEA"

# Define second filter
filterB = flights.dest == "PDX"
selected2 = temp.filter(filterA).filter(filterB)

In [20]:
# Define avg_speed
avg_speed = (flights.distance/(flights.air_time/60)).alias("avg_speed")

In [21]:
speed1 = flights.select("origin", "dest", "tailnum", avg_speed)

# Create the same table using a SQL expression
speed2 = flights.selectExpr("origin", "dest", "tailnum", "distance/(air_time/60) as avg_speed")

In [22]:
# Find the shortest flight from PDX in terms of distance
flights.filter(flights.origin == "PDX").groupBy().min("distance").show()

# Find the longest flight from SEA in terms of air time
flights.filter(flights.origin == "PDX").groupBy().max("air_time").show()

+-------------+
|min(distance)|
+-------------+
|          106|
+-------------+

+-------------+
|max(air_time)|
+-------------+
|          NaN|
+-------------+



In [23]:
flights.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|      duration_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|2014|   12|  8|   658.0|     -7.0|   935.0|     -5.0|     VX| N846VA|  1780|   SEA| LAX|   132.0|     954| 6.0|  58.0|               2.2|
|2014|    1| 22|  1040.0|      5.0|  1505.0|      5.0|     AS| N559AS|   851|   SEA| HNL|   360.0|    2677|10.0|  40.0|               6.0|
|2014|    3|  9|  1443.0|     -2.0|  1652.0|      2.0|     VX| N847VA|   755|   SEA| SFO|   111.0|     679|14.0|  43.0|              1.85|
|2014|    4|  9|  1705.0|     45.0|  1839.0|     34.0|     WN| N360SW|   344|   PDX| SJC|    83.0|     569|17.0|   5.0|1.3833333333333333|
|2014|    3|  9|   754.0|  

In [24]:
# Average duration of Delta flights
flights.filter(flights.carrier == "AS").filter(flights.origin == "SEA").groupBy().avg("air_time").show()

# Total hours in the air
flights.withColumn("duration_hrs", flights.air_time/60).groupBy().sum("duration_hrs").show()

+-------------+
|avg(air_time)|
+-------------+
|          NaN|
+-------------+

+-----------------+
|sum(duration_hrs)|
+-----------------+
|              NaN|
+-----------------+



In [25]:
# Group by tailnum
by_plane = flights.groupBy("tailnum")

# Number of flights each plane made
by_plane.count().show()

# Group by origin
by_origin = flights.groupBy("origin")

# Average duration of flights from PDX and SEA
by_origin.avg("air_time").show()

+-------+-----+
|tailnum|count|
+-------+-----+
| N442AS|   38|
| N102UW|    2|
| N36472|    4|
| N38451|    4|
| N73283|    4|
| N513UA|    2|
| N954WN|    5|
| N388DA|    3|
| N567AA|    1|
| N516UA|    2|
| N927DN|    1|
| N607AS|   45|
| N622SW|    4|
| N584AS|   31|
| N914WN|    4|
| N654AW|    2|
| N336NW|    1|
| N445WN|    3|
| N669DN|    2|
| N389HA|    4|
+-------+-----+
only showing top 20 rows

+------+-------------+
|origin|avg(air_time)|
+------+-------------+
|   SEA|          NaN|
|   PDX|          NaN|
+------+-------------+



In [26]:
# Import pyspark.sql.functions as F
import pyspark.sql.functions as F

# Group by month and dest
by_month_dest = flights.groupBy("month", "dest")

# Average departure delay by month and destination
by_month_dest.avg("dep_delay").show()

# Standard deviation of departure delay
by_month_dest.agg(F.stddev("dep_delay")).show()


+-----+----+-------------------+
|month|dest|     avg(dep_delay)|
+-----+----+-------------------+
|    7| BWI|             25.625|
|   11| PHL|               26.0|
|    4| LAS|  8.972972972972974|
|   10| JNU|                0.8|
|    7| FLL|               -3.0|
|    2| BUR|               -1.1|
|    4| BOI|               -3.0|
|    4| SNA| -6.136363636363637|
|    1| KTN|                9.0|
|    7| SNA|                3.0|
|   12| MKE|                3.0|
|    3| SBA|                2.0|
|    5| LGB|-2.6666666666666665|
|   12| KTN|               1.75|
|   11| PSP| 2.3333333333333335|
|   12| OGG| 25.181818181818183|
|   12| OAK|               13.0|
|    5| SAT| 0.6666666666666666|
|   10| ONT|               -3.0|
|   12| ATL| 1.1764705882352942|
+-----+----+-------------------+
only showing top 20 rows

+-----+----+------------------+
|month|dest| stddev(dep_delay)|
+-----+----+------------------+
|    7| BWI|48.059301166074285|
|   11| PHL| 52.23664103545199|
|    4| LAS|   24.8366

In [27]:
# Examine the data
print(airports.show())

# Rename the faa column
airports = airports.withColumnRenamed("faa", "dest")
# airports = airports.withColumnRenamed("target", "new name")
# Join the DataFrames
flights_with_airports = flights.join(airports,on="dest",how="leftouter")

+---+--------------------+----------------+-----------------+----+---+---+
|faa|                name|             lat|              lon| alt| tz|dst|
+---+--------------------+----------------+-----------------+----+---+---+
|04G|   Lansdowne Airport|      41.1304722|      -80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|      32.4605722|      -85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|      41.9893408|      -88.1012428| 801| -6|  A|
|06N|     Randall Airport|       41.431912|      -74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|      31.0744722|      -81.4277778|  11| -4|  A|
|0A9|Elizabethton Muni...|      36.3712222|      -82.1734167|1593| -4|  A|
|0G6|Williams County A...|      41.4673056|      -84.5067778| 730| -5|  A|
|0G7|Finger Lakes Regi...|      42.8835647|      -76.7812318| 492| -5|  A|
|0P2|Shoestring Aviati...|      39.7948244|      -76.6471914|1000| -5|  U|
|0S9|Jefferson County ...|      48.0538086|     -122.8106436| 108| -8|  A|
|0W3|Harford County Ai...

In [28]:
# Examine the new DataFrame
print(flights_with_airports.show())

+----+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+--------+--------+----+------+------------------+--------------------+---------+-----------+----+---+---+
|dest|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|air_time|distance|hour|minute|      duration_hrs|                name|      lat|        lon| alt| tz|dst|
+----+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+--------+--------+----+------+------------------+--------------------+---------+-----------+----+---+---+
| LAX|2014|   12|  8|   658.0|     -7.0|   935.0|     -5.0|     VX| N846VA|  1780|   SEA|   132.0|     954| 6.0|  58.0|               2.2|    Los Angeles Intl|33.942536|-118.408075| 126| -8|  A|
| HNL|2014|    1| 22|  1040.0|      5.0|  1505.0|      5.0|     AS| N559AS|   851|   SEA|   360.0|    2677|10.0|  40.0|               6.0|       Honolulu Intl|21.318681|-157.922428|  13|-10|  N|
| SFO|2014|    3|  9|  14

In [29]:
# Rename year column
planes = spark_planes.withColumnRenamed("year","plane_year")

# Join the DataFrames
model_data = flights.join(planes, on="tailnum", how="leftouter")

In [30]:
# change column values for modelling
model_data = model_data.withColumn("arr_delay", model_data.arr_delay.cast("integer"))
model_data = model_data.withColumn("air_time", model_data.air_time.cast("integer"))
model_data = model_data.withColumn("month", model_data.month.cast("integer"))
model_data = model_data.withColumn("plane_year", model_data.plane_year.cast("integer"))

In [31]:
# Create the column plane_age
model_data = model_data.withColumn("plane_age", model_data.year - model_data.plane_year)


In [32]:
# Create is_late
model_data = model_data.withColumn("is_late", model_data.arr_delay>0)

# Convert to an integer
model_data = model_data.withColumn("label", model_data.is_late.cast("integer"))



In [33]:
# Remove missing values
model_data = model_data.filter("arr_delay is not NULL and dep_delay is not NULL and air_time is not NULL and plane_year is not NULL")

In [34]:
# spark works with transformers which take a dataframe and output the transformed dataframe
# estimators return an object to transform other data frames


In [35]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# Create a StringIndexer
carr_indexer = StringIndexer(inputCol="carrier", outputCol="carrier_index")

# Create a OneHotEncoder
carr_encoder = OneHotEncoder(inputCol="carrier_index", outputCol="carrier_fact")


In [36]:
# Create a StringIndexer
dest_indexer = StringIndexer(inputCol="dest", outputCol="dest_index")

# Create a OneHotEncoder
dest_encoder = OneHotEncoder(inputCol="dest_index", outputCol="dest_fact")

Spark has Transformers and Estimators

Transformers create new data gram with new column

Estimators return a model


Cast is basically the spark equivalent of astype


Vector assembler is a transformer that combines all of the data into a vector column as this is needed for modelling


In [37]:
from pyspark.ml.feature import VectorAssembler
# Make a VectorAssembler
vec_assembler = VectorAssembler(inputCols=["month", "air_time", "carrier_fact", "dest_fact", "plane_age"], outputCol="features")

Then you can create a pipeline
The pipeline takes all of the estimators and transformers and builds a reusable process for making a model wrapped in one object


In [38]:
# Import Pipeline
from pyspark.ml import Pipeline

# Make the pipeline
flights_pipe = Pipeline(stages=[dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler])

In [39]:
# Fit and transform the data
piped_data = flights_pipe.fit(model_data).transform(model_data)

Once the data is transformed with the pipeline split as normal before using the model


In [40]:
# Split the data into training and test sets
training, test = piped_data.randomSplit([.6, .4])

In [41]:
# Import LogisticRegression
from pyspark.ml.classification import LogisticRegression

# Create a LogisticRegression Estimator
lr = LogisticRegression()

Cross validation allows you to estimate the models error on held out data


In [42]:
# Import the evaluation submodule
import pyspark.ml.evaluation as evals

# Create a BinaryClassificationEvaluator
evaluator = evals.BinaryClassificationEvaluator(metricName="areaUnderROC")

Next, you need to create a grid of values to search over when looking for the optimal hyperparameters. The submodule pyspark.ml.tuning includes a class called ParamGridBuilder that does just that (maybe you're starting to notice a pattern here; PySpark has a submodule for just about everything!).
You'll need to use the .addGrid() and .build() methods to create a grid that you can use for cross validation. The .addGrid() method takes a model parameter (an attribute of the model Estimator, lr, that you created a few exercises ago) and a list of values that you want to try. The .build() method takes no arguments, it just returns the grid that you'll use later.

In [43]:
# Import the tuning submodule
import pyspark.ml.tuning as tune

# Create the parameter grid
grid = tune.ParamGridBuilder()

In [44]:
# Add the hyperparameter
grid = grid.addGrid(lr.regParam, np.arange(0, .1, .01))
grid = grid.addGrid(lr.elasticNetParam, [0,1])

# Build the grid
grid = grid.build()


The submodule pyspark.ml.tuning also has a class called CrossValidator for performing cross validation. This Estimator takes the modeler you want to fit, the grid of hyperparameters you created, and the evaluator you want to use to compare your models.

In [46]:
# Create the CrossValidator
cv = tune.CrossValidator(estimator=lr,
               estimatorParamMaps=grid,
               evaluator=evaluator
               )

# Call lr.fit()
best_lr = lr.fit(training)

# Print best_lr
print(best_lr)

LogisticRegressionModel: uid=LogisticRegression_5f8e56074b1a, numClasses=2, numFeatures=81


In [47]:
# Use the model to predict the test set
test_results = best_lr.transform(test)

# Evaluate the predictions
print(evaluator.evaluate(test_results))

0.7033039913005976
