In [1]:
import prjmod as pm
import prjmod.commons as commons
from pyspark.sql import SparkSession
import pyspark
import pandas as pd
import numpy as np

In [2]:

spark = SparkSession.builder.getOrCreate()

In [3]:
# Print the tables in the catalog
print(spark.catalog.listTables())

[]


# Reading Data 

## Local file to Spark DataFrame (SDF)

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, FloatType

csv_schema = StructType([ \
    # StructField("_c0",StringType(),True), \
    StructField("year",IntegerType(),True), \
    StructField("month",IntegerType(),True), \
    StructField("day", IntegerType(), True), \
    StructField("dep_time", FloatType(), True), \
    StructField("sched_dep_time", IntegerType(), True), \
    StructField("dep_delay",FloatType(),True), \
    StructField("arr_time",FloatType(),True), \
    StructField("sched_arr_time",IntegerType(),True), \
    StructField("arr_delay", FloatType(), True), \
    StructField("carrier", StringType(), True), \
    StructField("flight", StringType(), True), \
    StructField("tailnum", StringType(), True), \
    StructField("origin", StringType(), True), \
    StructField("dest",StringType(),True), \
    StructField("air_time",FloatType(),True), \
    StructField("distance",IntegerType(),True), \
    StructField("hour", IntegerType(), True), \
    StructField("minute", IntegerType(), True), \
    StructField("time_hour", TimestampType(), True) \
  ])



In [5]:
sdf_flights = spark.read.csv(commons.DL_FILE_FLIGHTS, header=True, sep = ';', schema = csv_schema)
# sdf_flights = spark.read.csv(commons.DL_FILE_FLIGHTS, header=True, sep = ';')

In [6]:
sdf_flights.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: float (nullable = true)
 |-- sched_dep_time: integer (nullable = true)
 |-- dep_delay: float (nullable = true)
 |-- arr_time: float (nullable = true)
 |-- sched_arr_time: integer (nullable = true)
 |-- arr_delay: float (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: float (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)
 |-- time_hour: timestamp (nullable = true)



In [7]:
sdf_flights.show()

+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+
|year|month|day|dep_time|sched_dep_time|dep_delay|arr_time|sched_arr_time|arr_delay|carrier|flight|tailnum|origin|dest|air_time|distance|hour|minute|          time_hour|
+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+
|2013|    1|  1|   517.0|           515|      2.0|   830.0|           819|     11.0|     UA|  1545| N14228|   EWR| IAH|   227.0|    1400|   5|    15|2013-01-01 05:00:00|
|2013|    1|  1|   533.0|           529|      4.0|   850.0|           830|     20.0|     UA|  1714| N24211|   LGA| IAH|   227.0|    1416|   5|    29|2013-01-01 05:00:00|
|2013|    1|  1|   542.0|           540|      2.0|   923.0|           850|     33.0|     AA|  1141| N619AA|   JFK| MIA|   160.0|    1089|   5|    40|2

In [8]:
print(sdf_flights.describe())

DataFrame[summary: string, year: string, month: string, day: string, dep_time: string, sched_dep_time: string, dep_delay: string, arr_time: string, sched_arr_time: string, arr_delay: string, carrier: string, flight: string, tailnum: string, origin: string, dest: string, air_time: string, distance: string, hour: string, minute: string]


In [9]:
# Add sdf_flights to the catalog
sdf_flights.createOrReplaceTempView("sdf_flights_temp")

print(spark.catalog.listTables())

[Table(name='sdf_flights_temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


In [10]:
# Now we can get the data as SDF from the table in the Spark cluster:
sdf_flights_frm_table = spark.table('sdf_flights_temp')

## SDF to pandas DataFrame (PDF)

In [11]:
pdf_flights = sdf_flights.toPandas()

In [12]:
print(pdf_flights.head())

   year  month  day  dep_time  sched_dep_time  dep_delay  arr_time  \
0  2013      1    1     517.0             515        2.0     830.0   
1  2013      1    1     533.0             529        4.0     850.0   
2  2013      1    1     542.0             540        2.0     923.0   
3  2013      1    1     544.0             545       -1.0    1004.0   
4  2013      1    1     554.0             600       -6.0     812.0   

   sched_arr_time  arr_delay carrier flight tailnum origin dest  air_time  \
0             819       11.0      UA   1545  N14228    EWR  IAH     227.0   
1             830       20.0      UA   1714  N24211    LGA  IAH     227.0   
2             850       33.0      AA   1141  N619AA    JFK  MIA     160.0   
3            1022      -18.0      B6    725  N804JB    JFK  BQN     183.0   
4             837      -25.0      DL    461  N668DN    LGA  ATL     116.0   

   distance  hour  minute           time_hour  
0      1400     5      15 2013-01-01 05:00:00  
1      1416     5   

In [13]:
pdf_flights.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 336776 entries, 0 to 336775
Data columns (total 19 columns):
 #   Column          Non-Null Count   Dtype         
---  ------          --------------   -----         
 0   year            336776 non-null  int32         
 1   month           336776 non-null  int32         
 2   day             336776 non-null  int32         
 3   dep_time        328521 non-null  float32       
 4   sched_dep_time  336776 non-null  int32         
 5   dep_delay       328521 non-null  float32       
 6   arr_time        328063 non-null  float32       
 7   sched_arr_time  336776 non-null  int32         
 8   arr_delay       327346 non-null  float32       
 9   carrier         336776 non-null  object        
 10  flight          336776 non-null  object        
 11  tailnum         336776 non-null  object        
 12  origin          336776 non-null  object        
 13  dest            336776 non-null  object        
 14  air_time        327346 non-null  flo

## Pandas to SDF

In [14]:
# Create sdf_flights_2 from pd_temp
sdf_flights_2 = spark.createDataFrame(pdf_flights.iloc[0:100, :], schema=csv_schema)


In [16]:
# sdf_flights_2.show()

In [17]:
# Examine the tables in the catalog
print(spark.catalog.listTables())

[Table(name='sdf_flights_temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


In [18]:
# Add sdf_flights_2 to the catalog
sdf_flights_2.createOrReplaceTempView("sdf_flights_2_temp")

# Examine the tables in the catalog again
print(spark.catalog.listTables())

[Table(name='sdf_flights_2_temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True), Table(name='sdf_flights_temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


# Querying Spark Tables with SQL Spark

In [19]:
flights10 = spark.sql("SELECT * FROM sdf_flights_temp LIMIT 10")

In [20]:
flights10.show()

+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+
|year|month|day|dep_time|sched_dep_time|dep_delay|arr_time|sched_arr_time|arr_delay|carrier|flight|tailnum|origin|dest|air_time|distance|hour|minute|          time_hour|
+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+
|2013|    1|  1|   517.0|           515|      2.0|   830.0|           819|     11.0|     UA|  1545| N14228|   EWR| IAH|   227.0|    1400|   5|    15|2013-01-01 05:00:00|
|2013|    1|  1|   533.0|           529|      4.0|   850.0|           830|     20.0|     UA|  1714| N24211|   LGA| IAH|   227.0|    1416|   5|    29|2013-01-01 05:00:00|
|2013|    1|  1|   542.0|           540|      2.0|   923.0|           850|     33.0|     AA|  1141| N619AA|   JFK| MIA|   160.0|    1089|   5|    40|2

## Filtering

In [21]:
# Filter flights by passing a string
long_flights1 = sdf_flights.filter("distance > 1000")

# Filter flights by passing a column of boolean values
long_flights2 = sdf_flights.filter(sdf_flights.distance > 1000)

# Print the data to check they're equal
long_flights1.show(10)

+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+
|year|month|day|dep_time|sched_dep_time|dep_delay|arr_time|sched_arr_time|arr_delay|carrier|flight|tailnum|origin|dest|air_time|distance|hour|minute|          time_hour|
+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+
|2013|    1|  1|   517.0|           515|      2.0|   830.0|           819|     11.0|     UA|  1545| N14228|   EWR| IAH|   227.0|    1400|   5|    15|2013-01-01 05:00:00|
|2013|    1|  1|   533.0|           529|      4.0|   850.0|           830|     20.0|     UA|  1714| N24211|   LGA| IAH|   227.0|    1416|   5|    29|2013-01-01 05:00:00|
|2013|    1|  1|   542.0|           540|      2.0|   923.0|           850|     33.0|     AA|  1141| N619AA|   JFK| MIA|   160.0|    1089|   5|    40|2

In [22]:
long_flights2.show(10)

+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+
|year|month|day|dep_time|sched_dep_time|dep_delay|arr_time|sched_arr_time|arr_delay|carrier|flight|tailnum|origin|dest|air_time|distance|hour|minute|          time_hour|
+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+
|2013|    1|  1|   517.0|           515|      2.0|   830.0|           819|     11.0|     UA|  1545| N14228|   EWR| IAH|   227.0|    1400|   5|    15|2013-01-01 05:00:00|
|2013|    1|  1|   533.0|           529|      4.0|   850.0|           830|     20.0|     UA|  1714| N24211|   LGA| IAH|   227.0|    1416|   5|    29|2013-01-01 05:00:00|
|2013|    1|  1|   542.0|           540|      2.0|   923.0|           850|     33.0|     AA|  1141| N619AA|   JFK| MIA|   160.0|    1089|   5|    40|2

## Selecting

In [23]:
# Select the first set of columns
selected1 = sdf_flights.select("tailnum", "origin", "dest")

# Select the second set of columns
temp = sdf_flights.select(sdf_flights.origin, sdf_flights.dest, sdf_flights.carrier)

# Define first filter
filterA = sdf_flights.origin == "SEA"

# Define second filter
filterB = sdf_flights.dest == "PDX"

# Filter the data, first by filterA then by filterB
selected2 = temp.filter(filterA).filter(filterB)

In [24]:
# Define avg_speed
avg_speed = (sdf_flights.distance/(sdf_flights.air_time/60)).alias("avg_speed")

# Select the correct columns
speed1 = sdf_flights.select("origin", "dest", "tailnum", avg_speed)

# Create the same table using a SQL expression
speed2 = sdf_flights.selectExpr("origin", "dest", "tailnum", "distance/(air_time/60) as avg_speed")

## Aggregating



In [25]:
# Find the shortest flight from PDX in terms of distance
sdf_flights.filter(sdf_flights.origin == 'JFK').groupBy().min('distance').show()

# Find the longest flight from SEA in terms of air time
sdf_flights.filter(sdf_flights.origin == 'JFK').groupBy().max('air_time').show()

+-------------+
|min(distance)|
+-------------+
|           94|
+-------------+

+-------------+
|max(air_time)|
+-------------+
|        691.0|
+-------------+



In [26]:
# Average duration of Delta flights
sdf_flights.filter(sdf_flights.carrier == "DL").filter(sdf_flights.origin == "JFK").groupBy().avg("air_time").show()

# Total hours in the air
sdf_flights.withColumn("duration_hrs", sdf_flights.air_time/60).groupBy().sum("duration_hrs").show()

+------------------+
|     avg(air_time)|
+------------------+
|229.81132350795272|
+------------------+

+-----------------+
|sum(duration_hrs)|
+-----------------+
|822110.1666666722|
+-----------------+



## Grouping and Aggregating

In [27]:
# Group by tailnum
by_plane = sdf_flights.groupBy("tailnum")

# Number of flights each plane made
by_plane.count().show(10)

+-------+-----+
|tailnum|count|
+-------+-----+
| N513UA|  102|
| N510UW|   48|
| N8390A|   31|
| N3CWAA|   68|
| N73283|  110|
| N369NB|  187|
| N396AA|   21|
| N8322X|   15|
| N3AEMQ|  276|
| N4YUAA|   42|
+-------+-----+
only showing top 10 rows



In [28]:
# Group by origin
by_origin = sdf_flights.groupBy("origin")

# Average duration of flights from PDX and SEA
by_origin.avg("air_time").show()

+------+------------------+
|origin|     avg(air_time)|
+------+------------------+
|   LGA|117.82580581372355|
|   EWR|153.30002475944914|
|   JFK| 178.3490497712667|
+------+------------------+



In [29]:
# Import pyspark.sql.functions as F
import pyspark.sql.functions as F

# Group by month and dest
by_month_dest = sdf_flights.groupBy('month', 'dest')

# Average departure delay by month and destination
by_month_dest.avg('dep_delay').show()
# Identical:
# by_month_dest.agg(F.avg('dep_delay')).show()

+-----+----+-------------------+
|month|dest|     avg(dep_delay)|
+-----+----+-------------------+
|    1| EYW|               13.0|
|   10| CLE|  4.405172413793103|
|   10| JAX|  9.313807531380753|
|   10| BHM| 24.153846153846153|
|   10| DAY| 15.118110236220472|
|   11| OKC|   8.10344827586207|
|   10| DCA|  4.209424083769633|
|   10| DFW|  3.522948539638387|
|   10| OMA|  13.39080459770115|
|   10| MHT|  13.80722891566265|
|   11| LAS|  4.782700421940929|
|    1| MSP|  11.76172607879925|
|   10| BTV| 2.5508474576271185|
|   10| MEM|  7.928104575163399|
|   11| BHM|  19.61904761904762|
|   12| MEM| 31.747899159663866|
|   12| ILM|              31.25|
|   11| HNL|0.18181818181818182|
|    1| SLC|  8.360406091370558|
|   10| TUL| 30.153846153846153|
+-----+----+-------------------+
only showing top 20 rows



In [30]:
# Standard deviation of departure delay
by_month_dest.agg(F.stddev('dep_delay')).show()

+-----+----+----------------------+
|month|dest|stddev_samp(dep_delay)|
+-----+----+----------------------+
|    1| EYW|                  null|
|   10| CLE|    25.820811853116663|
|   10| JAX|    33.089734402632345|
|   10| BHM|     46.98058518809003|
|   10| DAY|     38.15998364721879|
|   11| OKC|     19.23416756635034|
|   10| DCA|    29.070946167440905|
|   10| DFW|     25.93755548637475|
|   10| OMA|     37.85663172581741|
|   10| MHT|    31.988358336380983|
|   11| LAS|    26.271577988930474|
|    1| MSP|     42.05931944173929|
|   10| BTV|     20.65917856191837|
|   10| MEM|      26.8930886678019|
|   11| BHM|    26.433834739734962|
|   12| MEM|    52.201243326870106|
|   12| ILM|      54.0333401878123|
|   11| HNL|    15.146312237992932|
|    1| SLC|     33.21385646170493|
|   10| TUL|     43.20341866814922|
+-----+----+----------------------+
only showing top 20 rows



## Joining
Another very common data operation is the join. Joins are a whole topic unto themselves, so in this course we'll just look at simple joins. If you'd like to learn more about joins, you can take a look here.

A join will combine two different tables along a column that they share. This column is called the key. Examples of keys here include the tailnum and carrier columns from the flights table.

For example, suppose that you want to know more information about the plane that flew a flight than just the tail number. This information isn't in the flights table because the same plane flies many different flights over the course of two years, so including this information in every row would result in a lot of duplication. To avoid this, you'd have a second table that has only one row for each plane and whose columns list all the information about the plane, including its tail number. You could call this table planes

When you join the flights table to this table of airplane information, you're adding all the columns from the planes table to the flights table. To fill these columns with information, you'll look at the tail number from the flights table and find the matching one in the planes table, and then use that row to fill out all the new columns.

Now you'll have a much bigger table than before, but now every row has all information about the plane that flew that flight!

In [31]:
csv_schema = StructType([ \
    StructField("tailnum",StringType(),True), \
    StructField("year",IntegerType(),True), \
    StructField("type",StringType(),True), \
    StructField("manufacturer", StringType(), True), \
    StructField("model", StringType(), True), \
    StructField("engines", IntegerType(), True), \
    StructField("seats",IntegerType(),True), \
    StructField("speed",IntegerType(),True), \
    StructField("engine",StringType(),True) \
  ])

sdf_planes = spark.read.csv(commons.DL_FILE_PLANES, header=True, sep = ';', schema = csv_schema)

In [32]:
                                                

csv_schema = StructType([ \
    StructField("faa",StringType(),True), \
    StructField("name",StringType(),True), \
    StructField("lat",FloatType(),True), \
    StructField("lon", FloatType(), True), \
    StructField("alt", FloatType(), True), \
    StructField("tz", FloatType(), True), \
    StructField("dst",IntegerType(),True), \
    StructField("tzone",IntegerType(),True)\
  ])

sdf_airports = spark.read.csv(commons.DL_FILE_AIRPORTS, header=True, sep = ';', schema = csv_schema)

In [34]:
# Examine the data
print(sdf_airports.show())

+----+--------------------+----+----+------+----+----+-----+
|dest|                name| lat| lon|   alt|  tz| dst|tzone|
+----+--------------------+----+----+------+----+----+-----+
| 04G|   Lansdowne Airport|null|null|1044.0|-5.0|null| null|
| 06A|Moton Field Munic...|null|null| 264.0|-6.0|null| null|
| 06C| Schaumburg Regional|null|null| 801.0|-6.0|null| null|
| 06N|     Randall Airport|null|null| 523.0|-5.0|null| null|
| 09J|Jekyll Island Air...|null|null|  11.0|-5.0|null| null|
| 0A9|Elizabethton Muni...|null|null|1593.0|-5.0|null| null|
| 0G6|Williams County A...|null|null| 730.0|-5.0|null| null|
| 0G7|Finger Lakes Regi...|null|null| 492.0|-5.0|null| null|
| 0P2|Shoestring Aviati...|null|null|1000.0|-5.0|null| null|
| 0S9|Jefferson County ...|null|null| 108.0|-8.0|null| null|
| 0W3|Harford County Ai...|null|null| 409.0|-5.0|null| null|
| 10C|  Galt Field Airport|null|null| 875.0|-6.0|null| null|
| 17G|Port Bucyrus-Craw...|null|null|1003.0|-5.0|null| null|
| 19A|Jackson County Ai.

In [35]:

# Rename the faa column
sdf_airports = sdf_airports.withColumnRenamed("dst", "dest")

# Join the DataFrames
sdf_flights_with_airports = sdf_flights.join(sdf_airports, on = "dest", how = "leftouter")

# Examine the new DataFrame
print(sdf_flights_with_airports.show())

+----+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+--------+--------+----+------+-------------------+--------------------+----+----+------+----+----+-----+
|dest|year|month|day|dep_time|sched_dep_time|dep_delay|arr_time|sched_arr_time|arr_delay|carrier|flight|tailnum|origin|air_time|distance|hour|minute|          time_hour|                name| lat| lon|   alt|  tz|dest|tzone|
+----+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+--------+--------+----+------+-------------------+--------------------+----+----+------+----+----+-----+
| IAH|2013|    1|  1|   517.0|           515|      2.0|   830.0|           819|     11.0|     UA|  1545| N14228|   EWR|   227.0|    1400|   5|    15|2013-01-01 05:00:00|George Bush Inter...|null|null|  97.0|-6.0|null| null|
| IAH|2013|    1|  1|   533.0|           529|      4.0|   850.0|           830|     20.0|     UA|  1714|

# Machine Learning Pipelines

In the next two chapters you'll step through every stage of the machine learning pipeline, from data intake to model evaluation. Let's get to it!

At the core of the pyspark.ml module are the Transformer and Estimator classes. Almost every other class in the module behaves similarly to these two basic classes.

Transformer classes have a .transform() method that takes a DataFrame and returns a new DataFrame; usually the original one with a new column appended. For example, you might use the class Bucketizer to create discrete bins from a continuous feature or the class PCA to reduce the dimensionality of your dataset using principal component analysis.

Estimator classes all implement a .fit() method. These methods also take a DataFrame, but instead of returning another DataFrame they return a model object. This can be something like a StringIndexerModel for including categorical data saved as strings in your models, or a RandomForestModel that uses the random forest algorithm for classification or regression.

## Join the DataFrames
In the next two chapters you'll be working to build a model that predicts whether or not a flight will be delayed based on the flights data we've been working with. This model will also include information about the plane that flew the route, so the first step is to join the two tables: flights and planes!

In [36]:
# Rename year column
planes = sdf_planes.withColumnRenamed("year", "plane_year")

# Join the DataFrames
model_data = sdf_flights.join(planes, on="tailnum", how="leftouter")

## Data types
Good work! Before you get started modeling, it's important to know that Spark only handles numeric data. That means all of the columns in your DataFrame must be either integers or decimals (called 'doubles' in Spark).

When we imported our data, we let Spark guess what kind of information each column held. Unfortunately, Spark doesn't always guess right and you can see that some of the columns in our DataFrame are strings containing numbers as opposed to actual numeric values.

To remedy this, you can use the .cast() method in combination with the .withColumn() method. It's important to note that .cast() works on columns, while .withColumn() works on DataFrames.

The only argument you need to pass to .cast() is the kind of value you want to create, in string form. For example, to create integers, you'll pass the argument "integer" and for decimal numbers you'll use "double".

You can put this call to .cast() inside a call to .withColumn() to overwrite the already existing column, just like you did in the previous chapter!

In [37]:
model_data.printSchema()

root
 |-- tailnum: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: float (nullable = true)
 |-- sched_dep_time: integer (nullable = true)
 |-- dep_delay: float (nullable = true)
 |-- arr_time: float (nullable = true)
 |-- sched_arr_time: integer (nullable = true)
 |-- arr_delay: float (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: float (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)
 |-- time_hour: timestamp (nullable = true)
 |-- plane_year: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- engines: integer (nullable = true)
 |-- seats: integer (nullable = true)
 |--

In [38]:
# Cast the columns to integers
model_data = model_data.withColumn("arr_delay", model_data.arr_delay.cast("integer"))
model_data = model_data.withColumn("air_time", model_data.air_time.cast("integer"))
model_data = model_data.withColumn("month", model_data.month.cast("integer"))
model_data = model_data.withColumn("plane_year", model_data.plane_year.cast("integer"))

In [39]:
model_data.printSchema()

root
 |-- tailnum: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: float (nullable = true)
 |-- sched_dep_time: integer (nullable = true)
 |-- dep_delay: float (nullable = true)
 |-- arr_time: float (nullable = true)
 |-- sched_arr_time: integer (nullable = true)
 |-- arr_delay: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)
 |-- time_hour: timestamp (nullable = true)
 |-- plane_year: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- engines: integer (nullable = true)
 |-- seats: integer (nullable = true)


## Create a new column
In the last exercise, you converted the column plane_year to an integer. This column holds the year each plane was manufactured. However, your model will use the planes' age, which is slightly different from the year it was made!

In [40]:
# Create the column plane_age
model_data = model_data.withColumn("plane_age", model_data.year - model_data.plane_year)

## Making a Boolean
Consider that you're modeling a yes or no question: is the flight late? However, your data contains the arrival delay in minutes for each flight. Thus, you'll need to create a boolean column which indicates whether the flight was late or not!

In [41]:
# Create is_late
model_data = model_data.withColumn("is_late", model_data.arr_delay > 0)

# Convert to an integer
model_data = model_data.withColumn("label", model_data.is_late.cast("integer"))

# Remove missing values
model_data = model_data.filter("arr_delay is not NULL and dep_delay is not NULL and air_time is not NULL and plane_year is not NULL")

## Strings and factors
As you know, Spark requires numeric data for modeling. So far this hasn't been an issue; even boolean columns can easily be converted to integers without any trouble. But you'll also be using the airline and the plane's destination as features in your model. These are coded as strings and there isn't any obvious way to convert them to a numeric data type.

Fortunately, PySpark has functions for handling this built into the pyspark.ml.features submodule. You can create what are called 'one-hot vectors' to represent the carrier and the destination of each flight. A one-hot vector is a way of representing a categorical feature where every observation has a vector in which all elements are zero except for at most one element, which has a value of one (1).

Each element in the vector corresponds to a level of the feature, so it's possible to tell what the right level is by seeing which element of the vector is equal to one (1).

The first step to encoding your categorical feature is to create a StringIndexer. Members of this class are Estimators that take a DataFrame with a column of strings and map each unique string to a number. Then, the Estimator returns a Transformer that takes a DataFrame, attaches the mapping to it as metadata, and returns a new DataFrame with a numeric column corresponding to the string column.

The second step is to encode this numeric column as a one-hot vector using a OneHotEncoder. This works exactly the same way as the StringIndexer by creating an Estimator and then a Transformer. The end result is a column that encodes your categorical feature as a vector that's suitable for machine learning routines!

This may seem complicated, but don't worry! All you have to remember is that you need to create a StringIndexer and a OneHotEncoder, and the Pipeline will take care of the rest.

## Carrier
In this exercise you'll create a StringIndexer and a OneHotEncoder to code the carrier column. To do this, you'll call the class constructors with the arguments inputCol and outputCol.

The inputCol is the name of the column you want to index or encode, and the outputCol is the name of the new column that the Transformer should create.

In [42]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# Create a StringIndexer
carr_indexer = StringIndexer(inputCol="carrier", outputCol="carrier_index")

# Create a OneHotEncoder
carr_encoder = OneHotEncoder(inputCol="carrier_index", outputCol="carrier_fact")

## Destination
Now you'll encode the dest column just like you did in the previous exercise.

In [43]:
# Create a StringIndexer
dest_indexer = StringIndexer(inputCol="dest", outputCol="dest_index")

# Create a OneHotEncoder
dest_encoder = OneHotEncoder(inputCol="dest_index", outputCol="dest_fact")

## Assemble a vector
The last step in the Pipeline is to combine all of the columns containing our features into a single column. This has to be done before modeling can take place because every Spark modeling routine expects the data to be in this form. You can do this by storing each of the values from a column as an entry in a vector. Then, from the model's point of view, every observation is a vector that contains all of the information about it and a label that tells the modeler what value that observation corresponds to.

Because of this, the pyspark.ml.feature submodule contains a class called VectorAssembler. This Transformer takes all of the columns you specify and combines them into a new vector column.

In [44]:
from pyspark.ml.feature import VectorAssembler

# Make a VectorAssembler
vec_assembler = VectorAssembler(inputCols=["month", "air_time", "carrier_fact", "dest_fact", "plane_age"], outputCol="features")

## Create the pipeline
You're finally ready to create a Pipeline!

Pipeline is a class in the pyspark.ml module that combines all the Estimators and Transformers that you've already created. This lets you reuse the same modeling process over and over again by wrapping it up in one simple object. Neat, right?

In [45]:
# Import Pipeline
from pyspark.ml import Pipeline

# Make the pipeline
flights_pipe = Pipeline(stages=[dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler])

## Test vs Train
After you've cleaned your data and gotten it ready for modeling, one of the most important steps is to split the data into a test set and a train set. After that, don't touch your test data until you think you have a good model! As you're building models and forming hypotheses, you can test them on your training data to get an idea of their performance.

Once you've got your favorite model, you can see how well it predicts the new data in your test set. This never-before-seen data will give you a much more realistic idea of your model's performance in the real world when you're trying to predict or classify new data.

In Spark it's important to make sure you split the data after all the transformations. This is because operations like StringIndexer don't always produce the same index even when given the same list of strings.

## Transform the data
Hooray, now you're finally ready to pass your data through the Pipeline you created!

In [46]:
# Fit and transform the data
piped_data = flights_pipe.fit(model_data).transform(model_data)

## Split the data
Now that you've done all your manipulations, the last step before modeling is to split the data!

In [47]:
# Split the data into training and test sets
training, test = piped_data.randomSplit([.6, .4])

# What is logistic regression?
The model you'll be fitting in this chapter is called a logistic regression. This model is very similar to a linear regression, but instead of predicting a numeric variable, it predicts the probability (between 0 and 1) of an event.

To use this as a classification algorithm, all you have to do is assign a cutoff point to these probabilities. If the predicted probability is above the cutoff point, you classify that observation as a 'yes' (in this case, the flight being late), if it's below, you classify it as a 'no'!

You'll tune this model by testing different values for several hyperparameters. A hyperparameter is just a value in the model that's not estimated from the data, but rather is supplied by the user to maximize performance. For this course it's not necessary to understand the mathematics behind all of these values - what's important is that you'll try out a few different choices and pick the best one.

## Create the modeler
The Estimator you'll be using is a LogisticRegression from the pyspark.ml.classification submodule.

In [48]:
# Import LogisticRegression
from pyspark.ml.classification import LogisticRegression

# Create a LogisticRegression Estimator
lr = LogisticRegression()

## Cross validation
In the next few exercises you'll be tuning your logistic regression model using a procedure called k-fold cross validation. This is a method of estimating the model's performance on unseen data (like your test DataFrame).

It works by splitting the training data into a few different partitions. The exact number is up to you, but in this course you'll be using PySpark's default value of three. Once the data is split up, one of the partitions is set aside, and the model is fit to the others. Then the error is measured against the held out partition. This is repeated for each of the partitions, so that every block of data is held out and used as a test set exactly once. Then the error on each of the partitions is averaged. This is called the cross validation error of the model, and is a good estimate of the actual error on the held out data.

You'll be using cross validation to choose the hyperparameters by creating a grid of the possible pairs of values for the two hyperparameters, elasticNetParam and regParam, and using the cross validation error to compare all the different models so you can choose the best one!

What does cross validation allow you to estimate?

## Create the evaluator
The first thing you need when doing cross validation for model selection is a way to compare different models. Luckily, the pyspark.ml.evaluation submodule has classes for evaluating different kinds of models. Your model is a binary classification model, so you'll be using the BinaryClassificationEvaluator from the pyspark.ml.evaluation module.

This evaluator calculates the area under the ROC. This is a metric that combines the two kinds of errors a binary classifier can make (false positives and false negatives) into a simple number. You'll learn more about this towards the end of the chapter!

In [49]:
# Import the evaluation submodule
import pyspark.ml.evaluation as evals

# Create a BinaryClassificationEvaluator
evaluator = evals.BinaryClassificationEvaluator(metricName="areaUnderROC")

## Make a grid
Next, you need to create a grid of values to search over when looking for the optimal hyperparameters. The submodule pyspark.ml.tuning includes a class called ParamGridBuilder that does just that (maybe you're starting to notice a pattern here; PySpark has a submodule for just about everything!).

You'll need to use the .addGrid() and .build() methods to create a grid that you can use for cross validation. The .addGrid() method takes a model parameter (an attribute of the model Estimator, lr, that you created a few exercises ago) and a list of values that you want to try. The .build() method takes no arguments, it just returns the grid that you'll use later.

In [50]:
# Import the tuning submodule
import pyspark.ml.tuning as tune

# Create the parameter grid
grid = tune.ParamGridBuilder()

# Add the hyperparameter
grid = grid.addGrid(lr.regParam, np.arange(0, .1, .01))
grid = grid.addGrid(lr.elasticNetParam, [0, 1])

# Build the grid
grid = grid.build()

## Make the validator
The submodule pyspark.ml.tuning also has a class called CrossValidator for performing cross validation. This Estimator takes the modeler you want to fit, the grid of hyperparameters you created, and the evaluator you want to use to compare your models.

The submodule pyspark.ml.tune has already been imported as tune. You'll create the CrossValidator by passing it the logistic regression Estimator lr, the parameter grid, and the evaluator you created in the previous exercises.

In [51]:
import pyspark.ml.tuning as tune

# Create the CrossValidator
cv = tune.CrossValidator(estimator=lr,
               estimatorParamMaps=grid,
               evaluator=evaluator
               )

## Fit the model(s)
You're finally ready to fit the models and select the best one!

Unfortunately, cross validation is a very computationally intensive procedure. Fitting all the models would take too long on DataCamp.

To do this locally you would use the code:

```
# Fit cross validation models
models = cv.fit(training)
```

# Extract the best model
best_lr = models.bestModel
Remember, the training data is called training and you're using lr to fit a logistic regression model. Cross validation selected the parameter values regParam=0 and elasticNetParam=0 as being the best. These are the default values, so you don't need to do anything else with lr before fitting the model.

In [52]:
# Call lr.fit()
best_lr = lr.fit(training)

# Print best_lr
print(best_lr)

LogisticRegressionModel: uid=LogisticRegression_a7794e9cddfa, numClasses=2, numFeatures=121


## Evaluating binary classifiers
For this course we'll be using a common metric for binary classification algorithms call the AUC, or area under the curve. In this case, the curve is the ROC, or receiver operating curve. The details of what these things actually measure isn't important for this course. All you need to know is that for our purposes, the closer the AUC is to one (1), the better the model is!

If you've created a perfect binary classification model, what would the AUC be?

## Evaluate the model
Remember the test data that you set aside waaaaaay back in chapter 3? It's finally time to test your model on it! You can use the same evaluator you made to fit the model.

In [53]:
# Use the model to predict the test set
test_results = best_lr.transform(test)

# Evaluate the predictions
print(evaluator.evaluate(test_results))

0.682091914083105


Congratulations! What do you think of the AUC? Your model isn't half bad! You went from knowing nothing about Spark to doing advanced machine learning. Great job on making it to the end of the course! The next steps are learning how to create large scale Spark clusters and manage and submit jobs so that you can use models in the real world. Check out some of the other DataCamp courses that use Spark! And remember, Spark is still being actively developed, so there's new features coming all the time!

## Try CV

In [54]:
# Fit cross validation models
models = cv.fit(training)

In [71]:
print("avgMetrics:")
print(models.avgMetrics)

import statistics

print("\nMean:")
print(statistics.mean(models.avgMetrics))

print("\nBest Model:")
print(models.bestModel)

avgMetrics:
[0.6790213191893577, 0.6790203067515375, 0.6279045251087142, 0.5689037709840317, 0.6088337359540937, 0.5400975811393618, 0.6003528323426284, 0.5274220807147917, 0.5955311147417572, 0.5, 0.5923729332839507, 0.5, 0.5901164130053708, 0.5, 0.5884025810448517, 0.5, 0.5870390616489812, 0.5, 0.585926354365404, 0.5]

Mean:
0.5685472305137416

Best Model:
LogisticRegressionModel: uid=LogisticRegression_a7794e9cddfa, numClasses=2, numFeatures=121


In [56]:
# Use the model to predict the test set
test_results_cv = models.transform(test)

# Evaluate the predictions
print(evaluator.evaluate(test_results_cv))

0.6820894351909542
