<a href="https://colab.research.google.com/github/pratikgujral/learn-PySpark/blob/master/Learn_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In this notebook, we'll learn how to use Spark using Python!

# Prerequisites
- Familiarity with Python


# Introduction
![Spark logo](https://spark.apache.org/docs/latest/api/python/_static/spark-logo-hd.png)

Spark is a tool for doing parallel computation with large datasets. Spark integrates with Python very well. PySpark is the Python package that makes the magic happen. We'll use this package to work with data about flights from Portland and Seattle. We'll learn to wrangle this data and build a whole machine learning pipeline to predict whether or not flights will be delayed. Get ready to put some Spark in your Python code and dive into the world of high-performance machine learning!

# Part 1: Getting to know PySpark
In this part, we'll learn how Spark manages data and how can you read and write tables from Python.

## What is Spark?
Spark is a platform for **cluster computing**. Spark lets you spread data and computations over clusters with multiple nodes (think of each node as a separate computer). Splitting up your data makes it easier to work with very large datasets because each node only works with a small amount of data.

![Cluster Computing](https://i.pinimg.com/originals/bf/c4/17/bfc4173a6e383fb815935fad9a8d9c11.png)

As each node works on its own subset of the total data, it also carries out a part of the total calculations required, so that both data processing and computation are performed in parallel over the nodes in the cluster. It is a fact that parallel computation can make certain types of programming tasks much faster.

However, with greater computing power comes greater complexity.

Deciding whether or not Spark is the best solution for your problem takes some experience, but you can consider questions like:
 - Is my data too big to work with on a single machine?
 - Can my calculations be easily parallelized?

## Using Spark in Python
The first step in using Spark is connecting to a cluster.

In practice, the cluster will be hosted on a remote machine that's connected to all other nodes. There will be one computer, called the master (also sometimes called a **frontend node**) that manages splitting up the data and the computations. The **master** is connected to the rest of the computers in the cluster, which are called **worker**. The master sends the workers data and calculations to run, and they send their results back to the master.

Creating the connection is as simple as creating an instance of the `SparkContext` class. The class constructor takes a few optional arguments that allow you to specify the attributes of the cluster you're connecting to.

An object holding all these attributes can be created with the `SparkConf()` constructor. Take a look at the documentation for all the details!

For the rest of this course we'll have a `SparkContext` called sc in our workspace.



## Installing and setting up Spark
Run the below code. **If  it throws an error, update the link to the Apache Spark dist in the wget and later while setting the environment variables.**

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

Now that we have installed Spark and Java in Colab, it is time to set the environment path that enables us to run PySpark in our Colab environment. Set the location of Java and Spark by running the following code:

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

Running a local Spark Session

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

## Reading the datasets

In [0]:
url_airports_dataset = r"https://assets.datacamp.com/production/repositories/1237/datasets/6e5c4ac2a4799338ba7e13d54ce1fa918da644ba/airports.csv"
url_flights_dataset = r"https://assets.datacamp.com/production/repositories/1237/datasets/fa47bb54e83abd422831cbd4f441bd30fd18bd15/flights_small.csv"
url_planes_datasets = r"https://assets.datacamp.com/production/repositories/1237/datasets/231480a2696c55fde829ce76d936596123f12c0c/planes.csv"

In [0]:
!wget --no-check-certificate "https://assets.datacamp.com/production/repositories/1237/datasets/6e5c4ac2a4799338ba7e13d54ce1fa918da644ba/airports.csv" -O /tmp/airports.csv
!wget --no-check-certificate "https://assets.datacamp.com/production/repositories/1237/datasets/fa47bb54e83abd422831cbd4f441bd30fd18bd15/flights_small.csv" -O /tmp/flights.csv
!wget --no-check-certificate "https://assets.datacamp.com/production/repositories/1237/datasets/231480a2696c55fde829ce76d936596123f12c0c/planes.csv" -O /tmp/planes.csv

--2020-04-18 01:10:43--  https://assets.datacamp.com/production/repositories/1237/datasets/6e5c4ac2a4799338ba7e13d54ce1fa918da644ba/airports.csv
Resolving assets.datacamp.com (assets.datacamp.com)... 13.225.205.57, 13.225.205.121, 13.225.205.66, ...
Connecting to assets.datacamp.com (assets.datacamp.com)|13.225.205.57|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 84548 (83K)
Saving to: ‘/tmp/airports.csv’


2020-04-18 01:10:44 (567 KB/s) - ‘/tmp/airports.csv’ saved [84548/84548]

--2020-04-18 01:10:46--  https://assets.datacamp.com/production/repositories/1237/datasets/fa47bb54e83abd422831cbd4f441bd30fd18bd15/flights_small.csv
Resolving assets.datacamp.com (assets.datacamp.com)... 13.225.62.126, 13.225.62.110, 13.225.62.106, ...
Connecting to assets.datacamp.com (assets.datacamp.com)|13.225.62.126|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 614174 (600K)
Saving to: ‘/tmp/flights.csv’


2020-04-18 01:10:47 (1.08 MB/s) - ‘/tmp/fli

## Using DataFrames
Spark's core data structure is the **Resilient Distributed Dataset (RDD)**. This is a low level object that lets Spark work its magic by splitting data across multiple nodes in the cluster. However, RDDs are hard to work with directly, so in this notebook we'll be using the Spark DataFrame abstraction built on top of RDDs.

The Spark DataFrame was designed to behave a lot like a SQL table (a table with variables in the columns and observations in the rows). Not only are they easier to understand, DataFrames are also more optimized for complicated operations than RDDs.

When you start modifying and combining columns and rows of data, there are many ways to arrive at the same result, but some often take much longer than others. When using RDDs, it's up to the data scientist to figure out the right way to optimize the query, but the DataFrame implementation has much of this optimization built in!

To start working with Spark DataFrames, you first have to create a `SparkSession` object from your `SparkContext`. You can think of the `SparkContext` as your connection to the cluster and the `SparkSession` as your interface with that connection.

Remember, for the rest of the notebook, we'll have a `SparkSession` called `spark` available in our workspace!

## Creating a SparkSession
We'll have a `SparkSession` called `spark`, but what if you're not sure there already is one? Creating multiple SparkSessions and SparkContexts can cause issues, so it's best practice to use the `SparkSession.builder.getOrCreate()` method. This returns an existing `SparkSession` if there's already one in the environment, or creates a new one if necessary!

In [0]:
from pyspark.sql import SparkSession

# Creating a new PySpark Session
my_spark = SparkSession.builder.getOrCreate()
print(my_spark)

<pyspark.sql.session.SparkSession object at 0x7f8b59e66710>


---

## Reading a CSV into a spark DataFrame

In [0]:
flights = spark.read.format("csv").option("header", "true").load('/tmp/flights.csv')
print(type(flights))

<class 'pyspark.sql.dataframe.DataFrame'>


### Printing top `n` rows of the Spark DataFrame
We use the `.show()` method on the DataFrame to fetch the top `n` rows. By default `n = 20`


In [0]:
flights.show(n=22)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
|2014|    1| 15|    1037|        7|    1

## Selecting data
### Selecting only specific column(s)

Passing the name of the column or a list of column
 names to the `.select()` method will return the desired columns only


In [0]:
flights.select(['carrier', 'origin']).show()

+-------+------+
|carrier|origin|
+-------+------+
|     VX|   SEA|
|     AS|   SEA|
|     VX|   SEA|
|     WN|   PDX|
|     AS|   SEA|
|     WN|   PDX|
|     WN|   PDX|
|     VX|   SEA|
|     AS|   SEA|
|     AS|   SEA|
|     AS|   SEA|
|     AS|   SEA|
|     AS|   SEA|
|     AS|   SEA|
|     AS|   SEA|
|     UA|   PDX|
|     AS|   SEA|
|     WN|   SEA|
|     AS|   SEA|
|     OO|   PDX|
+-------+------+
only showing top 20 rows



### Selecting everybody, but applying some transformation on the column
Selecting `carrier`, `origin` and `air_time` columns and incrementing the values in the `air_time` by 50

In [0]:
flights.select(flights['carrier'], flights['origin'],flights['air_time'] + 50).show()

+-------+------+---------------+
|carrier|origin|(air_time + 50)|
+-------+------+---------------+
|     VX|   SEA|          182.0|
|     AS|   SEA|          410.0|
|     VX|   SEA|          161.0|
|     WN|   PDX|          133.0|
|     AS|   SEA|          177.0|
|     WN|   PDX|          171.0|
|     WN|   PDX|          140.0|
|     VX|   SEA|          148.0|
|     AS|   SEA|          185.0|
|     AS|   SEA|          248.0|
|     AS|   SEA|          180.0|
|     AS|   SEA|          204.0|
|     AS|   SEA|          177.0|
|     AS|   SEA|          233.0|
|     AS|   SEA|          179.0|
|     UA|   PDX|          140.0|
|     AS|   SEA|          126.0|
|     WN|   SEA|          266.0|
|     AS|   SEA|          340.0|
|     OO|   PDX|          161.0|
+-------+------+---------------+
only showing top 20 rows



### Selecting data based on some condition
We use the `.filter()` method on the Spark DataFrame and fetch the data based on the condition provided. 

In [0]:
flights.filter(flights['air_time'] > 200).show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    8| 11|    1017|       -3|    1613|       -7|     WN| N8634A|   827|   SEA| MDW|     216|    1733|  10|    17|
|2014|    1| 13|    2156|       -9|     607|      -15|     AS| N597AS|    24|   SEA| BOS|     290|    2496|  21|    56|
|2014|    9| 26|     610|       -5|    1523|       65|     US| N127UW|   616|   SEA| PHL|     293|    2378|   6|    10|
|2014|   12|  4|     954|       -6|    1348|      -17|     HA| N395HA|    29|   SEA| OGG|     333|    2640|   9|    54|
|2014|    6|  7|    1823|       -7|    2

We would like to know how many flights originated from Seattle and Portland respectively.

In [0]:
flights.groupby('origin').count().show()

+------+-----+
|origin|count|
+------+-----+
|   SEA| 6754|
|   PDX| 3246|
+------+-----+



### Printing the schema

In [0]:
flights.printSchema()

root
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: string (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)



## Creating Tables
The `.createDataFrame()` method takes a `pandas` DataFrame and returns a Spark DataFrame.

The output of this method is stored locally, not in the `SparkSession` `catalog`. This means that you can use all the Spark DataFrame methods on it, but you can't access the data in other contexts.

For example, a SQL query (using the `.sql()` method) that references your DataFrame will throw an error. To access the data in this way, you have to save it as a temporary table.

You can do this using the `.createTempView()` Spark DataFrame method, which takes as its only argument the name of the temporary table you'd like to register. This method registers the DataFrame as a table in the catalog, but as this table is temporary, it can only be accessed from the specific `SparkSession` used to create the Spark DataFrame.

There is also the method `.createOrReplaceTempView()`. This safely creates a new temporary table if nothing was there before, or updates an existing table if one was already defined. You'll use this method to avoid running into problems with duplicate tables.

Check out the diagram to see all the different ways your Spark data structures interact with each other.

![alt](https://s3.amazonaws.com/assets.datacamp.com/production/course_4452/datasets/spark_figure.png)

## Viewing Tables
Now that we have created a `SparkSession`, we can start poking around to see what data is in our cluster!

Our `SparkSession` has an attribute called `catalog` which lists all the data inside the cluster. This attribute has a few methods for extracting different pieces of information.

One of the most useful is the `.listTables()` method, which returns the names of all the tables in your cluster as a list.

In [0]:
print(spark.catalog.listTables()) # Output -> []

[]


This just printed an empty list. That is because even though we have a variable storing a Spark DataFrame in memory, we do not have any Tables or Views in the Spark context.


Creating a new 'temporary view' from the `flights` Spark DataFrame and adding it to the catalog by the name `'flights'`

In [0]:
flights.createOrReplaceTempView("flights")

Now when we list the tables present in the catalog, we should get our newly created temporary table in the results.

In [0]:
print(spark.catalog.listTables()) # Output -> [Table(name='flights', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

[Table(name='flights', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


### Are you query-ious?
One of the advantages of the DataFrame interface is that you can run SQL queries on the tables in your Spark cluster.

As you saw in the last exercise, one of the tables in your cluster is the `flights` table. This table contains a row for every flight that left Portland International Airport (PDX) or Seattle-Tacoma International Airport (SEA) in 2014 and 2015.

Running a query on this table is as easy as using the `.sql()` method on your `SparkSession`. This method takes a string containing the query and returns a DataFrame with the results!

If you look closely, you'll notice that the table `flights` is only mentioned in the query, not as an argument to any of the methods. This is because there isn't a local object in your environment that holds that data, so it wouldn't make sense to pass the table as an argument.

Before that, we'll create a `SparkSession` called `spark`.

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Writing the query to fetch first 10 rows of flights
query = 'FROM flights SELECT * LIMIT 10' # === SELECT * FROM flights LIMIT 10

# Executing the query using the .sql() method of SparkSession
flights10 = spark.sql(query)

# Using the DataFrame method .show() to print flights10
flights10.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
|2014|    1| 15|    1037|        7|    1



---


### Panda-fying a Spark DataFrame

Suppose you've run a query on your huge dataset and aggregated it down to something a little more manageable.

Sometimes it makes sense to then take that table and work with it locally using a tool like `pandas`. Spark DataFrames make that easy with the `.toPandas()` method. Calling this method on a Spark DataFrame returns the corresponding `pandas` DataFrame. It's as simple as that!

This time the query counts the number of flights to each airport from SEA and PDX.

Ofcourse, we shall first create a `SparkSession` called `spark` in our workspace!

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

query = 'SELECT origin, dest, COUNT(*) AS N FROM flights GROUP BY origin, dest'

# Running the query
flight_counts = spark.sql(query)

# Conerting the resulting table into a Pandas DataFrame
pd_counts = flight_counts.toPandas()

# Print the head of pd_counts
print(pd_counts.head())

  origin dest    N
0    SEA  RNO    8
1    SEA  DTW   98
2    SEA  CLE    2
3    SEA  LAX  450
4    PDX  SEA  144


---
### Creating a Spark DataFrame from a Pandas DataFrame
Spark DataFrame can be created by calling the `spark.createDataFrame()` function with the `pandas` DataFrame as the argument

In [0]:
from pyspark.sql import SparkSession
import numpy as np
import pandas as pd

# Creating a temporary pandas DataFrame
pd_temp = pd.DataFrame(np.random.random(10))
pd_temp

Unnamed: 0,0
0,0.254494
1,0.633873
2,0.699481
3,0.215531
4,0.420659
5,0.862715
6,0.18199
7,0.486184
8,0.668209
9,0.608704


In [0]:
spark = SparkSession.builder.getOrCreate()

# Creating a Spark DataFrame called spark_temp by calling the .createDataFrame() method with pd_temp as the argument
spark_temp = spark.createDataFrame(pd_temp)

# Examing the tables in the catalog
print(spark.catalog.listTables()) # OUTPUT -> # [Table(name='flights', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]
# Note that it DOES NOT have the newly created spark_temp table as this is only created locally and not in the spark catalog

# Registering spark_temp as a temporary table names 'temp' using the createOrReplaceTempView() method. The name of the table is set by passing the desired name as an argument to the method
spark_temp.createOrReplaceTempView('temp')

# Examining the list of tables once again. This time, it will list our newly created temp DataFrame as a table
print(spark.catalog.listTables()) 
# OUTPUT-> [Table(name='flights', database=None, description=None, tableType='TEMPORARY', isTemporary=True), Table(name='temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

[Table(name='flights', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]
[Table(name='flights', database=None, description=None, tableType='TEMPORARY', isTemporary=True), Table(name='temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


---


### Dropping the middle man
Now you know how to put data into Spark via `pandas`, but you're probably wondering why deal with `pandas` at all? Wouldn't it be easier to just read a text file straight into Spark? Of course it would!

Luckily, your `SparkSession` has a `.read` attribute which has several methods for reading different data sources into Spark DataFrames. Using these you can create a DataFrame from a .csv file just like with regular pandas DataFrames!

The variable `file_path` is a string with the path to the file `airports.csv`. This file contains information about different airports all over the world.

In [0]:
spark = SparkSession.builder.getOrCreate()

file_path = '/tmp/airports.csv'

# Reading the CSV file into a Spark DataFrame
airports = spark.read.csv(file_path, header=True)
print(type(airports)) # Note the Datatype is <class 'pyspark.sql.dataframe.DataFrame'>

# Show the data
airports.show()

<class 'pyspark.sql.dataframe.DataFrame'>
+---+--------------------+----------------+-----------------+----+---+---+
|faa|                name|             lat|              lon| alt| tz|dst|
+---+--------------------+----------------+-----------------+----+---+---+
|04G|   Lansdowne Airport|      41.1304722|      -80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|      32.4605722|      -85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|      41.9893408|      -88.1012428| 801| -6|  A|
|06N|     Randall Airport|       41.431912|      -74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|      31.0744722|      -81.4277778|  11| -4|  A|
|0A9|Elizabethton Muni...|      36.3712222|      -82.1734167|1593| -4|  A|
|0G6|Williams County A...|      41.4673056|      -84.5067778| 730| -5|  A|
|0G7|Finger Lakes Regi...|      42.8835647|      -76.7812318| 492| -5|  A|
|0P2|Shoestring Aviati...|      39.7948244|      -76.6471914|1000| -5|  U|
|0S9|Jefferson County ...|      48.0538086|     -122.81064



---



---



# Part 2: Manipulating data
In this chapter, we'll learn about the pyspark.sql module, which provides optimized data queries to our Spark session.

## Creating columns
In this chapter, you'll learn how to use the methods defined by Spark's DataFrame class to perform common data operations.

Let's look at performing column-wise operations. In Spark you can do this using the `.withColumn()` method, which takes two arguments. 

> 1) A string with the name of your new column

> 2) Values for the new column

The new column must be an object of class `Column`. Creating one of these is as easy as extracting a column from your DataFrame using `df.colName`.

Updating a Spark DataFrame is somewhat different than working in `pandas` because the **Spark DataFrame is immutable**. This means that it can't be changed, and so columns can't be updated inplace.

Thus, all these methods return a **new** DataFrame. To overwrite the original DataFrame you must reassign the returned DataFrame using the method like so:

> `df = df.withColumn("newCol", df.oldCol + 1)`

The above code creates a DataFrame with the same columns as `df` plus a new column, `newCol`, where every entry is equal to the corresponding entry from `oldCol`, plus one.

To overwrite an existing column, just pass the name of that particular column as the first argument!

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

# Pulling the flights from the catalog to create a Spark DataFrame containing the values of the flights.
flights = spark.table('flights')

# Show the head using .show() method. The column air_time contains the duration of the flight in minutes.
flights.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
|2014|    1| 15|    1037|        7|    1

Verfying that the `spark.table()` method returns a Spark DataFrame

In [0]:
type(flights)

pyspark.sql.dataframe.DataFrame

### The `.withColumn()` method
We can manipulate a column, or add a new column to our Spark DataFrame by calling the `.withColumn()` method on the DataFrame.

Now, we'll add another column `duration_hrs` to our DataFrame whose value is derived from the `air_time` column...

In [0]:
flights = flights.withColumn('duration_hrs', flights.air_time / 60)

flights.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|      duration_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|               2.2|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|               6.0|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|              1.85|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|1.3833333333333333|
|2014|    3|  9|     754|  

---

### Filtering Data
Now that you have a bit of SQL know-how under your belt, it's easier to talk about the analogous operations using Spark DataFrames.

Let's take a look at the `.filter()` method. As you might suspect, this is the Spark counterpart of SQL's WHERE clause. The `.filter()` method takes either an expression that would follow the WHERE clause of a SQL expression as a string, or a Spark Column of boolean (True/False) values.

For example, the following two expressions will produce the same output:

> `flights.filter("air_time > 120").show()`

> `flights.filter(flights.air_time > 120).show()`

Notice that in the first case, we pass a **string** to `.filter()`. In SQL, we would write this filtering task as `SELECT * FROM flights WHERE air_time > 120`. Spark's `.filter()` can accept any expression that could go in the WHERE clause of a SQL query (in this case, `"air_time > 120"`), as long as it is passed as a string. Notice that in this case, we do not reference the name of the table in the string -- as we wouldn't in the SQL request.

In the second case, we actually pass a **column of boolean values** to `.filter()`. Remember that `flights.air_time > 120` returns a column of boolean values that has True in place of those records in `flights.air_time` that are over 120, and False otherwise.

Remember, a SparkSession called `spark` is already in your workspace, along with the Spark DataFrame `flights`.

In [0]:
# Filter flights by passing a string to filter() method to find all flights that flew over 1000 miles distance.
long_flights1 = flights.filter('distance > 1000')

# Filter flights by passing a column of boolean values
long_flights2 = flights.filter(flights.distance > 1000)

# Print the data to check they're equal
long_flights1.show()
long_flights2.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|      duration_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|               6.0|
|2014|    4| 19|    1236|       -4|    1508|       -7|     AS| N309AS|   490|   SEA| SAN|     135|    1050|  12|    36|              2.25|
|2014|   11| 19|    1812|       -3|    2352|       -4|     AS| N564AS|    26|   SEA| ORD|     198|    1721|  18|    12|               3.3|
|2014|    8|  3|    1120|        0|    1415|        2|     AS| N305AS|   656|   SEA| PHX|     154|    1107|  11|    20| 2.566666666666667|
|2014|   11| 12|    2346|  

---

### Selecting
The Spark variant of SQL's `SELECT` is the `.select()` method. This method takes multiple arguments - one for each column you want to select. These arguments can either be the column name as a string (one for each column) or a column object (using the `df.colName` syntax). When you pass a column object, you can perform operations like addition or subtraction on the column to change the data contained in it, much like inside `.withColumn()`.

#### Difference between `.select()` and `.withColumn()`
The difference between `.select()` and `.withColumn()` methods is that `.select()` returns only the columns you specify, while `.withColumn()` returns all the columns of the DataFrame in addition to the one you defined. It's often a good idea to drop columns you don't need at the beginning of an operation so that you're not dragging around extra data as you're wrangling. In this case, you would use `.select()` and not `.withColumn()`.


In [0]:
# BY STRINGS: Selecting the columns tailnum, origin, and dest from flights by passing the column names as strings.
selected1 = flights.select('tailnum', 'origin', 'dest')
selected1.show()

# BY BOOLEAN MASK: Selecting the columns origin, dest, and carrier using the df.colName
temp = flights.select(flights.origin, flights.dest, flights.carrier)
temp.show()

# Defining boolean filters and filtering data based on these filters
filterA = flights.origin == "SEA"
filterB = flights.dest == "PDX"
selected2  = flights.filter(filterA).filter(filterB)
selected2.show()

+-------+------+----+
|tailnum|origin|dest|
+-------+------+----+
| N846VA|   SEA| LAX|
| N559AS|   SEA| HNL|
| N847VA|   SEA| SFO|
| N360SW|   PDX| SJC|
| N612AS|   SEA| BUR|
| N646SW|   PDX| DEN|
| N422WN|   PDX| OAK|
| N361VA|   SEA| SFO|
| N309AS|   SEA| SAN|
| N564AS|   SEA| ORD|
| N323AS|   SEA| LAX|
| N305AS|   SEA| PHX|
| N433AS|   SEA| LAS|
| N765AS|   SEA| ANC|
| N713AS|   SEA| SFO|
| N27205|   PDX| SFO|
| N626AS|   SEA| SMF|
| N8634A|   SEA| MDW|
| N597AS|   SEA| BOS|
| N215AG|   PDX| BUR|
+-------+------+----+
only showing top 20 rows

+------+----+-------+
|origin|dest|carrier|
+------+----+-------+
|   SEA| LAX|     VX|
|   SEA| HNL|     AS|
|   SEA| SFO|     VX|
|   PDX| SJC|     WN|
|   SEA| BUR|     AS|
|   PDX| DEN|     WN|
|   PDX| OAK|     WN|
|   SEA| SFO|     VX|
|   SEA| SAN|     AS|
|   SEA| ORD|     AS|
|   SEA| LAX|     AS|
|   SEA| PHX|     AS|
|   SEA| LAS|     AS|
|   SEA| ANC|     AS|
|   SEA| SFO|     AS|
|   PDX| SFO|     UA|
|   SEA| SMF|     AS|
|   SE

Similar to SQL, we can also use the `.select()` method to perform column-wise operations. When we're selecting a column using the `df.colName` notation, we can perform any column operation and the `.select()` method will return the transformed column. For example,

> `flights.select(flights.air_time/60)`

returns a column of flight durations in hours instead of minutes. We can also use the `.alias()` method to rename a column we're selecting. So if we wanted to `.select()` the column duration_hrs (which isn't in our DataFrame) we could do

> `flights.select((flights.air_time/60).alias("duration_hrs"))`

The equivalent Spark DataFrame method `.selectExpr()` takes SQL expressions as a string:

> `flights.selectExpr("air_time/60 as duration_hrs")`

with the SQL as keyword being equivalent to the `.alias()` method. 

To select multiple columns, we can pass multiple strings.

In [0]:
# Define avg_speed
avg_speed = (flights.distance/(flights.air_time/60)).alias("avg_speed") # The result has a dataype of Spark Column
print(type(avg_speed))

## Just to prettify the output
print("-" * 5)

# Select the correct columns
speed1 = flights.select("origin", "dest", "tailnum", avg_speed)
print(type(speed1))
speed1.show()

## Just to prettify the output
print("-" * 5)

# Create the same table using a SQL expression
speed2 = flights.selectExpr("origin", "dest", "tailnum", "distance/(air_time/60) as avg_speed")
print(type(speed2))
speed2.show()

<class 'pyspark.sql.column.Column'>
-----
<class 'pyspark.sql.dataframe.DataFrame'>
+------+----+-------+------------------+
|origin|dest|tailnum|         avg_speed|
+------+----+-------+------------------+
|   SEA| LAX| N846VA| 433.6363636363636|
|   SEA| HNL| N559AS| 446.1666666666667|
|   SEA| SFO| N847VA|367.02702702702703|
|   PDX| SJC| N360SW| 411.3253012048193|
|   SEA| BUR| N612AS| 442.6771653543307|
|   PDX| DEN| N646SW|491.40495867768595|
|   PDX| OAK| N422WN|             362.0|
|   SEA| SFO| N361VA| 415.7142857142857|
|   SEA| SAN| N309AS| 466.6666666666667|
|   SEA| ORD| N564AS| 521.5151515151515|
|   SEA| LAX| N323AS| 440.3076923076923|
|   SEA| PHX| N305AS|431.29870129870125|
|   SEA| LAS| N433AS| 409.6062992125984|
|   SEA| ANC| N765AS|474.75409836065575|
|   SEA| SFO| N713AS| 315.8139534883721|
|   PDX| SFO| N27205| 366.6666666666667|
|   SEA| SMF| N626AS|477.63157894736844|
|   SEA| MDW| N8634A|481.38888888888886|
|   SEA| BOS| N597AS| 516.4137931034483|
|   PDX| BUR| 

---

## Aggregating
All of the common aggregation methods, like `.min()`, `.max()`, and `.count()` are `GroupedData` methods. These are created by calling the `.groupBy()` DataFrame method. For example, to find the minimum value of a column, `col`, in a DataFrame, `df`, we can do

> `df.groupBy().min("col").show()`

This creates a `GroupedData` object (so we can use the `.min()` method), then finds the minimum value in `col`, and returns it as a DataFrame.

Now we're ready to do some aggregating of our own!


Finding the length of the shortest flight (in terms of distance) that left `PDX` by first `.filter()`ing and using the `.min()` method.

In [0]:
flights.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|      duration_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|               2.2|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|               6.0|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|              1.85|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|1.3833333333333333|
|2014|    3|  9|     754|  

All the columns are in the `flights` table are of the datatype string because the data was read from a CSV file. 

In [0]:
from pyspark.sql.types import FloatType, IntegerType, DoubleType 

In [0]:
flights = flights.withColumn('distance', flights['distance'].cast(IntegerType()))
flights = flights.withColumn('air_time', flights['air_time'].cast(IntegerType()))
flights = flights.withColumn('dep_delay', flights['dep_delay'].cast(IntegerType()))

Finding the minimum flight distance of any flight 

In [0]:
flights.filter(flights.origin=='PDX').groupBy().min('distance').show()

+-------------+
|min(distance)|
+-------------+
|          106|
+-------------+



Finding the length of the longest (in terms of time) flight that left `SEA` by filter()ing and using the .max() method. Performing the filtering by referencing the column directly, not passing a SQL string.

In [0]:
flights.filter(flights.origin=='SEA').groupBy().max('duration_hrs').show()

+-----------------+
|max(duration_hrs)|
+-----------------+
|6.816666666666666|
+-----------------+



##Aggregating
All of the common aggregation methods, like `.min()`, `.max()`, and `.count()` are `GroupedData` methods. These are created by calling the `.groupBy()` DataFrame method. To use these functions, call that method on your DataFrame. For example, to find the minimum value of a column, `col`, in a DataFrame, `df`, you could do

> `df.groupBy().min("col").show()`

This creates a `GroupedData` object (so you can use the `.min()` method), then finds the minimum value in `col`, and returns it as a DataFrame.


Finding the length of the shortest (in terms of distance) flight that left `PDX` by first `.filter()`ing and using the `.min()` method...

In [0]:
flights.filter(flights.origin=='PDX').groupBy().min('distance').show()

+-------------+
|min(distance)|
+-------------+
|          106|
+-------------+



Finding the length of the longest (in terms of time) flight that left `SEA` by `filter()`ing and using the `.max()` method...

In [0]:
flights.filter(flights.origin=='SEA').groupBy().max('air_time').show()

+-------------+
|max(air_time)|
+-------------+
|          409|
+-------------+



Using the `.avg()` method to get the average air time of Delta Airlines flights (where the carrier column has the value `DL`) that left `SEA`. (The place of departure is stored in the column `origin`, and `show()` the result).

In [0]:
flights.filter(flights.origin == 'SEA').filter(flights.carrier=='DL').groupBy().avg('air_time').show()

+------------------+
|     avg(air_time)|
+------------------+
|188.20689655172413|
+------------------+



Using the `.sum()` method to get the total number of hours all planes in this dataset spent in the air by creating a column called `duration_hrs` from the column `air_time`...

In [0]:
flights.withColumn("duration_hrs", flights.air_time/60).groupBy().sum('duration_hrs').show()

+------------------+
| sum(duration_hrs)|
+------------------+
|25289.600000000126|
+------------------+



---

## Grouping and aggregating
Part of what makes aggregating so powerful is the addition of groups. PySpark has a whole class devoted to grouped data frames: `pyspark.sql.GroupedData`

You've learned how to create a grouped DataFrame by calling the `.groupBy()` method on a DataFrame with no arguments.

Now you'll see that when you pass the name of one or more columns in your DataFrame to the `.groupBy()` method, the aggregation methods behave like when you use a `GROUP BY` statement in a SQL query!

**INSTRUCTIONS**

1) Create a DataFrame called `by_plane` that is grouped by the column `tailnum`.

2) Use the `.count()` method with no arguments to count the number of flights each plane made.

3) Create a DataFrame called `by_origin` that is grouped by the column origin.

4) Find the `.avg()` of the `air_time` column to find average duration of flights from `PDX` and `SEA`.

In [0]:
# Group by tailnum
by_plane = flights.groupBy("tailnum")

# Number of flights each plane made
by_plane.count().show()

# Group by origin
by_origin = flights.groupBy("origin")

# Average duration of flights from PDX and SEA
by_origin.avg("air_time").show()

+-------+-----+
|tailnum|count|
+-------+-----+
| N442AS|   38|
| N102UW|    2|
| N36472|    4|
| N38451|    4|
| N73283|    4|
| N513UA|    2|
| N954WN|    5|
| N388DA|    3|
| N567AA|    1|
| N516UA|    2|
| N927DN|    1|
| N8322X|    1|
| N466SW|    1|
|  N6700|    1|
| N607AS|   45|
| N622SW|    4|
| N584AS|   31|
| N914WN|    4|
| N654AW|    2|
| N336NW|    1|
+-------+-----+
only showing top 20 rows

+------+------------------+
|origin|     avg(air_time)|
+------+------------------+
|   SEA| 160.4361496051259|
|   PDX|137.11543248288737|
+------+------------------+



In addition to the `GroupedData` methods you've already seen, there is also the `.agg()` method. This method lets you pass an aggregate column expression that uses any of the aggregate functions from the `pyspark.sql.functions` submodule.

This submodule contains many useful functions for computing things like standard deviations. All the aggregation functions in this submodule take the name of a column in a `GroupedData` table.

In [0]:
# Importing pyspark.sql.functions submodule
import pyspark.sql.functions as F

# Group by month and dest
by_month_dest = flights.groupBy('month', 'dest')

# Average departure delay by month and destination
by_month_dest.avg('dep_delay').show()

# Standard deviation of departure delay
by_month_dest.agg(F.stddev('dep_delay')).show()

+-----+----+--------------------+
|month|dest|      avg(dep_delay)|
+-----+----+--------------------+
|   11| TUS| -2.3333333333333335|
|   11| ANC|   7.529411764705882|
|    1| BUR|               -1.45|
|    1| PDX| -5.6923076923076925|
|    6| SBA|                -2.5|
|    5| LAX|-0.15789473684210525|
|   10| DTW|                 2.6|
|    6| SIT|                -1.0|
|   10| DFW|  18.176470588235293|
|    3| FAI|                -2.2|
|   10| SEA|                -0.8|
|    2| TUS| -0.6666666666666666|
|   12| OGG|  25.181818181818183|
|    9| DFW|   4.066666666666666|
|    5| EWR|               14.25|
|    3| RDM|                -6.2|
|    8| DCA|                 2.6|
|    7| ATL|   4.675675675675675|
|    4| JFK| 0.07142857142857142|
|   10| SNA| -1.1333333333333333|
+-----+----+--------------------+
only showing top 20 rows

+-----+----+----------------------+
|month|dest|stddev_samp(dep_delay)|
+-----+----+----------------------+
|   11| TUS|    3.0550504633038935|
|   11| ANC|  

---
## Joining
In PySpark, joins are performed using the DataFrame method `.join()`. This method takes three arguments. The first is the second DataFrame that you want to join with the first one. The second argument, on, is the name of the key column(s) as a string. The names of the key column(s) must be the same in each table. The third argument, how, specifies the kind of join to perform. In this notebook we'll always use the value `how="leftouter"`.

In [0]:
# We have Spark DataFrames called airport and flights that we previously read. Printing them out to recollect the contents of the two DataFrames
airports.show()
flights.show()

+---+--------------------+----------------+-----------------+----+---+---+
|faa|                name|             lat|              lon| alt| tz|dst|
+---+--------------------+----------------+-----------------+----+---+---+
|04G|   Lansdowne Airport|      41.1304722|      -80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|      32.4605722|      -85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|      41.9893408|      -88.1012428| 801| -6|  A|
|06N|     Randall Airport|       41.431912|      -74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|      31.0744722|      -81.4277778|  11| -4|  A|
|0A9|Elizabethton Muni...|      36.3712222|      -82.1734167|1593| -4|  A|
|0G6|Williams County A...|      41.4673056|      -84.5067778| 730| -5|  A|
|0G7|Finger Lakes Regi...|      42.8835647|      -76.7812318| 492| -5|  A|
|0P2|Shoestring Aviati...|      39.7948244|      -76.6471914|1000| -5|  U|
|0S9|Jefferson County ...|      48.0538086|     -122.8106436| 108| -8|  A|
|0W3|Harford County Ai...

We'll first rename the column `faa` from the `airports` DataFrame and rename it to `dest`. Renaming a column is done by calling the **`withColumnRenamed()`** method on the Spark DataFrame passing the old column name and new column name as parameters.

In [0]:
airports = airports.withColumnRenamed(existing='faa', new='dest')
airports.show(n=4)

+----+--------------------+----------+-----------+----+---+---+
|dest|                name|       lat|        lon| alt| tz|dst|
+----+--------------------+----------+-----------+----+---+---+
| 04G|   Lansdowne Airport|41.1304722|-80.6195833|1044| -5|  A|
| 06A|Moton Field Munic...|32.4605722|-85.6800278| 264| -5|  A|
| 06C| Schaumburg Regional|41.9893408|-88.1012428| 801| -6|  A|
| 06N|     Randall Airport| 41.431912|-74.3915611| 523| -5|  A|
+----+--------------------+----------+-----------+----+---+---+
only showing top 4 rows



Joining the `flight` Dataframe with the `airports` Dataframe on the `dest` columns

In [0]:
flights_with_airports = flights.join(other=airports, on='dest', how='leftouter')
flights_with_airports.show(5)

+----+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+--------+--------+----+------+------------------+--------------------+---------+-----------+---+---+---+
|dest|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|air_time|distance|hour|minute|      duration_hrs|                name|      lat|        lon|alt| tz|dst|
+----+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+--------+--------+----+------+------------------+--------------------+---------+-----------+---+---+---+
| LAX|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA|     132|     954|   6|    58|               2.2|    Los Angeles Intl|33.942536|-118.408075|126| -8|  A|
| HNL|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA|     360|    2677|  10|    40|               6.0|       Honolulu Intl|21.318681|-157.922428| 13|-10|  N|
| SFO|2014|    3|  9|    1443|



---



---



# Part 3: Getting started with machine learning pipelines
PySpark has built-in, cutting-edge machine learning routines, along with utilities to create full machine learning pipelines. We'll learn about them in this part.

## Overview
At the core of the **`pyspark.ml`** module are the **`Transformer`** and **`Estimator`** classes. Almost every other class in the module behaves similarly to these two basic classes.


> **Transformer** classes have a **`.transform()`** method that takes a DataFrame and returns a new DataFrame; usually the original one with a new column appended. For example, you might use the class **`Bucketizer`** to create discrete bins from a continuous feature or the class PCA to reduce the dimensionality of your dataset using principal component analysis.

> **Estimator** classes all implement a **`.fit()`** method. These methods also take a DataFrame, but instead of returning another DataFrame they return a model object. This can be something like a `StringIndexerModel` for including categorical data saved as strings in your models, or a `RandomForestModel` that uses the random forest algorithm for classification or regression.

---
## Goal
We want to build a model that predicts whether or not a flight will be delayed based on the flights data we've been working with.

- We'll first read the tables `flights` and `planes`

In [0]:
flights = spark.read.format('csv').option('header', 'true').load('/tmp/flights.csv')
planes = spark.read.format('csv').option('header','true').load('/tmp/planes.csv')
airports = spark.read.format('csv').option('header','true').load('/tmp/airports.csv')

flights.show(n=5)
planes.show(n=5)
airports.show(n=5)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
+----+-----+---+--------+---------+-----

- Next, we rename the `year` column of `planes` to `plane_year`. This is being done to avoid confusion as we have the same `year` column in both `planes` as well as `flights` DataFrames. 

To rename a column, we use the **`.withColumnsRenamed()`** method on the DataFrame

In [0]:
planes = planes.withColumnRenamed(existing='year', new='plane_year')

- Joining `flights` and `planes` DataFrames on the `tailnum` column

In [0]:
model_data = flights.join(other=planes, on='tailnum', how='leftouter')
model_data.show(5)

+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+------------+--------+-------+-----+-----+---------+
|tailnum|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|flight|origin|dest|air_time|distance|hour|minute|plane_year|                type|manufacturer|   model|engines|seats|speed|   engine|
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+------------+--------+-------+-----+-----+---------+
| N846VA|2014|   12|  8|     658|       -7|     935|       -5|     VX|  1780|   SEA| LAX|     132|     954|   6|    58|      2011|Fixed wing multi ...|      AIRBUS|A320-214|      2|  182|   NA|Turbo-fan|
| N559AS|2014|    1| 22|    1040|        5|    1505|        5|     AS|   851|   SEA| HNL|     360|    2677|  10|    40|      2006|Fixed wing multi ...|      BOEING| 737-890|      2|  1

## Data types
Before we get started modeling, it's important to know that Spark only handles numeric data. That means all of the columns in our DataFrame must be either integers or decimals (called 'doubles' in Spark).

When we imported our data, we let Spark guess what kind of information each column held. Unfortunately, Spark doesn't always guess right and we can see that some of the columns in our DataFrame are strings containing numbers as opposed to actual numeric values.

To remedy this, we can use the `.cast()` method in combination with the `.withColumn()` method. It's important to note that `.cast()` works on columns, while `.withColumn()` works on DataFrames.

The only argument we need to pass to `.cast()` is the kind of value we want to create, in string form. For example, to create integers, we'll pass the argument `"integer"` and for decimal numbers we'll use `"double"`.

We can put this call to `.cast()` inside a call to `.withColumn()` to overwrite the already existing column.



In [0]:
model_data.show(10)

+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+------------+--------+-------+-----+-----+---------+
|tailnum|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|flight|origin|dest|air_time|distance|hour|minute|plane_year|                type|manufacturer|   model|engines|seats|speed|   engine|
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+------------+--------+-------+-----+-----+---------+
| N846VA|2014|   12|  8|     658|       -7|     935|       -5|     VX|  1780|   SEA| LAX|     132|     954|   6|    58|      2011|Fixed wing multi ...|      AIRBUS|A320-214|      2|  182|   NA|Turbo-fan|
| N559AS|2014|    1| 22|    1040|        5|    1505|        5|     AS|   851|   SEA| HNL|     360|    2677|  10|    40|      2006|Fixed wing multi ...|      BOEING| 737-890|      2|  1

Converting the columns `arr_delay`, `air_time`, `month` and `plane_year` from string type to integers..

In [0]:
model_data = model_data.withColumn("arr_delay", model_data.arr_delay.cast("integer"))
model_data = model_data.withColumn("air_time", model_data.air_time.cast("integer"))
model_data = model_data.withColumn("month", model_data.month.cast("integer"))
model_data = model_data.withColumn("plane_year", model_data.plane_year.cast("integer"))

We'd like to calculate the age of the plane when a particular flight was made by that plane. This can be calculated by subtracting the year of manufacturing of the plane stored in the `plane_year` column from the year of the flight present in the `year` column. We'll call this new column as `plane_age`.

In [0]:
model_data = model_data.withColumn("plane_age", model_data.year - model_data.plane_year)

We also want to have another column `is_late` that stores whether a flight was late or not. So, this column is going to store only Boolean values. We can create this column from the `arr_delay` column.

In [0]:
model_data = model_data.withColumn("is_late", model_data.arr_delay > 0)
model_data.show(5)

+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+------------+--------+-------+-----+-----+---------+---------+-------+
|tailnum|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|flight|origin|dest|air_time|distance|hour|minute|plane_year|                type|manufacturer|   model|engines|seats|speed|   engine|plane_age|is_late|
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+------------+--------+-------+-----+-----+---------+---------+-------+
| N846VA|2014|   12|  8|     658|       -7|     935|       -5|     VX|  1780|   SEA| LAX|     132|     954|   6|    58|      2011|Fixed wing multi ...|      AIRBUS|A320-214|      2|  182|   NA|Turbo-fan|      3.0|  false|
| N559AS|2014|    1| 22|    1040|        5|    1505|        5|     AS|   851|   SEA| HNL|     360|    2677|  10|

Converting this column into an integer and storing it in another variable called 'label'. This label column shall act as a label for our ML model.

In [0]:
model_data = model_data.withColumn("label", model_data.is_late.cast("integer"))
model_data.show(3)

+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+------------+--------+-------+-----+-----+---------+---------+-------+-----+
|tailnum|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|flight|origin|dest|air_time|distance|hour|minute|plane_year|                type|manufacturer|   model|engines|seats|speed|   engine|plane_age|is_late|label|
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+------------+--------+-------+-----+-----+---------+---------+-------+-----+
| N846VA|2014|   12|  8|     658|       -7|     935|       -5|     VX|  1780|   SEA| LAX|     132|     954|   6|    58|      2011|Fixed wing multi ...|      AIRBUS|A320-214|      2|  182|   NA|Turbo-fan|      3.0|  false|    0|
| N559AS|2014|    1| 22|    1040|        5|    1505|        5|     AS|   851|   SEA| HNL

Filtering out the missing values from the DataFrame.

In [0]:
# Remove missing values
model_data = model_data.filter("arr_delay IS NOT NULL AND dep_delay IS NOT NULL AND air_time IS NOT NULL AND plane_year IS NOT NULL")

### Strings and factors
As we know, Spark requires numeric data for modeling. So far this hasn't been an issue; even boolean columns can easily be converted to integers without any trouble. But we'll also be using the airline and the plane's destination as features in our model. These are coded as strings and there isn't any obvious way to convert them to a numeric data type.

Fortunately, PySpark has functions for handling this built into the **`pyspark.ml.features`** submodule. We can create what are called 'one-hot vectors' to represent the carrier and the destination of each flight. A one-hot vector is a way of representing a categorical feature where every observation has a vector in which all elements are zero except for at most one element, which has a value of one (1).

Each element in the vector corresponds to a level of the feature, so it's possible to tell what the right level is by seeing which element of the vector is equal to one (1).

The first step to encoding our categorical feature is to create a `StringIndexer`. Members of this class are `Estimators` that take a DataFrame with a column of strings and map each unique string to a number. Then, the `Estimator` returns a `Transformer` that takes a DataFrame, attaches the mapping to it as metadata, and returns a new DataFrame with a numeric column corresponding to the string column.

The second step is to encode this numeric column as a one-hot vector using a `OneHotEncoder`. This works exactly the same way as the `StringIndexer` by creating an `Estimator` and then a `Transformer`. The end result is a column that encodes our categorical feature as a vector that's suitable for machine learning routines!

In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# Create a StringIndexer
carr_indexer = StringIndexer(inputCol="carrier", outputCol="carrier_index")

# Create a OneHotEncoder
carr_encoder = OneHotEncoder(inputCol="carrier_index", outputCol="carrier_fact")

Creating a `dest` column in a similar way...

In [0]:
dest_indexer = StringIndexer(inputCol="dest", outputCol="dest_indexer")

dest_encoder = OneHotEncoder(inputCol="dest_indexer", outputCol="dest_fact")

### Assembling all columns as a single vector
The last step in the `Pipeline` is to combine all of the columns containing our features into a **single** column. This has to be done before modeling can take place because every Spark modeling routine expects the data to be in this form. 

We can do this by storing each of the values from a column as an entry in a vector. Then, from the model's point of view, every observation is a vector that contains all of the information about it and a label that tells the modeler what value that observation corresponds to.

To do this, the `pyspark.ml.feature` submodule contains a class called `VectorAssembler`. This `Transformer` takes all of the columns we specify and combines them into a new vector column.

In [0]:
from  pyspark.ml.feature import VectorAssembler

vec_assembler = VectorAssembler(inputCols=["month", "air_time", "carrier_fact", "dest_fact", "plane_age"], outputCol="features")

We're finally ready to create a **`Pipeline`**. 

**`Pipeline`** is a class in the `pyspark.ml module` that combines all the `Estimators` and `Transformers` that we've already created. This lets us reuse the same modeling process over and over again by wrapping it up in one simple object.

In [0]:
# Importing Pipeline
from pyspark.ml import Pipeline

We create an object of the Pipeline class by calling its constructor with the keyword argument `stages` that specifies the stages we want out data to go through in the pipeline.

In [0]:
# Making the pipeline
flights_pipe = Pipeline(stages=[dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler])

---
## Test vs Train
After we've cleaned your data and gotten it ready for modeling, one of the most important steps is to split the data into a test set and a train set. After that, we don't touch our test data until we think we have a good model! As we're building models and forming hypotheses, we can test them on our training data to get an idea of their performance.

Once we've got our favorite model, we can see how well it predicts the new data in our test set. This never-before-seen data will give us a much more realistic idea of your model's performance in the real world when we're trying to predict or classify new data.

In Spark it's important to make sure you **split the data after all the transformations**. This is because operations like `StringIndexer` don't always produce the same index even when given the same list of strings.

In [0]:
## Remember we have a DataFrame called as model_data
model_data.show(4)

type(flights_pipe)

+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+------------+--------+-------+-----+-----+---------+---------+-------+-----+
|tailnum|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|flight|origin|dest|air_time|distance|hour|minute|plane_year|                type|manufacturer|   model|engines|seats|speed|   engine|plane_age|is_late|label|
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+------------+--------+-------+-----+-----+---------+---------+-------+-----+
| N846VA|2014|   12|  8|     658|       -7|     935|       -5|     VX|  1780|   SEA| LAX|     132|     954|   6|    58|      2011|Fixed wing multi ...|      AIRBUS|A320-214|      2|  182|   NA|Turbo-fan|      3.0|  false|    0|
| N559AS|2014|    1| 22|    1040|        5|    1505|        5|     AS|   851|   SEA| HNL

pyspark.ml.pipeline.Pipeline

Now, we'll simply pass our data through the Pipeline we just created, by chaining the `fit()` and `transform()` mehtods on the Pipeline we created passing the data `model_data` as arguments to both the methods. The result is a Spark DataFrame with the transformations performed.

In [0]:
piped_data = flights_pipe.fit(model_data).transform(model_data)

type(piped_data)

pyspark.sql.dataframe.DataFrame

In [0]:
piped_data.show(n=3)

+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+------------+--------+-------+-----+-----+---------+---------+-------+-----+------------+---------------+-------------+--------------+--------------------+
|tailnum|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|flight|origin|dest|air_time|distance|hour|minute|plane_year|                type|manufacturer|   model|engines|seats|speed|   engine|plane_age|is_late|label|dest_indexer|      dest_fact|carrier_index|  carrier_fact|            features|
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+------------+--------+-------+-----+-----+---------+---------+-------+-----+------------+---------------+-------------+--------------+--------------------+
| N846VA|2014|   12|  8|     658|       -7|     935|       -5|     VX|  1780|  

## Splitting train and test sets
We'll use the DataFrame method **`.randomSplit()`** to split `piped_data` into two pieces, training with 60% of the data, and test with 40% of the data by passing the list `[.6, .4]` to the `.randomSplit()` method.

In [0]:
training, test = piped_data.randomSplit(weights=[.6, .4], seed=42) ## seed value is optional and is provided only for repeatibility of results.

training.show(n=3)
print("Training dataset row count:", training.count())

test.show(n=3)
print("Testing dataset row count:", test.count())

+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+----------------+--------+-------+-----+-----+---------+---------+-------+-----+------------+---------------+-------------+--------------+--------------------+
|tailnum|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|flight|origin|dest|air_time|distance|hour|minute|plane_year|                type|    manufacturer|   model|engines|seats|speed|   engine|plane_age|is_late|label|dest_indexer|      dest_fact|carrier_index|  carrier_fact|            features|
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+----------------+--------+-------+-----+-----+---------+---------+-------+-----+------------+---------------+-------------+--------------+--------------------+
| N107US|2014|    5| 15|    1058|       -2|    1845|      -23|     

---
---

# Part 4: Model tuning and selection
In this last part, we'll apply what you've learned to create a model that predicts which flights will be delayed.

## What is logistic regression?
The model we'll be fitting is called a *logistic regression*. This model is very similar to a *linear regression*, but instead of predicting a numeric variable, it predicts the probability (between 0 and 1) of an event.

To use this as a classification algorithm, all we have to do is assign a cutoff point to these probabilities. If the predicted probability is above the cutoff point, we classify that observation as a 'yes' (in this case, the flight being late), if it's below, we classify it as a 'no'!

We'll tune this model by testing different values for several *hyperparameters*. A hyperparameter is just a value in the model that's not estimated from the data, but rather is supplied by the user to maximize performance.

.

Creating a **`LogisticRegression`** modeller using the **`pyspark.ml.classification`** module.

In [0]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression()

## Cross validation
In the next few exercises we'll be tuning your logistic regression model using a procedure called **k-fold cross** validation. This is a method of estimating the model's performance on unseen data (like our test DataFrame).

It works by splitting the training data into a few different partitions. The exact number is up to us, but in this course we'll be using PySpark's default value of three. Once the data is split up, one of the partitions is set aside, and the model is fit to the others. Then the error is measured against the held out partition. This is repeated for each of the partitions, so that every block of data is held out and used as a test set exactly once. Then the error on each of the partitions is averaged. This is called the cross validation error of the model, and is a good estimate of the actual error on the held out data.

we'll be using cross validation to choose the hyperparameters by creating a grid of the possible pairs of values for the two hyperparameters, **`elasticNetParam`** and **`regParam`**, and using the cross validation error to compare all the different models so we can choose the best one!

### Creating the evaluator

The first thing ywe need when doing cross validation for model selection is a way to compare different models. Luckily, the **`pyspark.ml.evaluation`** submodule has classes for evaluating different kinds of models. Our model is a binary classification model, so we'll be using the **`BinaryClassificationEvaluator`** from the **`pyspark.ml.evaluation module`**.

This evaluator calculates the area under the ROC. This is a metric that combines the two kinds of errors a binary classifier can make (false positives and false negatives) into a simple number.

In [0]:
import pyspark.ml.evaluation as evals

In [0]:
evaluator = evals.BinaryClassificationEvaluator(metricName='areaUnderROC')

print(evaluator)
print(type(evaluator))

BinaryClassificationEvaluator_bd91c2499374
<class 'pyspark.ml.evaluation.BinaryClassificationEvaluator'>


## Making the grid
Next, we need to create a grid of values to search over when looking for the optimal hyperparameters. The submodule **`pyspark.ml.tuning`** includes a class called **`ParamGridBuilder`** that does just that.

We'll need to use the **`.addGrid()`** and **`.build()`** methods to create a grid that we can use for cross validation. The `.addGrid()` method takes a model parameter (an attribute of the model `Estimator`, `lr`, that we created a few exercises ago) and a list of values that we want to try. The `.build()` method takes no arguments, it just returns the grid that we'll use later.

In [0]:
import pyspark.ml.tuning as tune
import numpy as np

In [0]:
grid = tune.ParamGridBuilder()

Calling the `.addGrid()` method on grid with `lr.regParam` as the first argument and `np.arange(0, .1, .01)` as the second argument. This creates a list of numbers from 0 to .1, incrementing by .01. We'll overwrite `grid` with the result.

In [0]:
grid = grid.addGrid(lr.regParam, np.arange(0, 0.1, 0.01))

Updating `grid` again by calling the `.addGrid()` method a second time creating a grid for `lr.elasticNetParam` that includes only the list of values `[0, 1]`.

In [0]:
grid = grid.addGrid(lr.elasticNetParam, [0, 1])

In [0]:
grid = grid.build()

## Making the validator
The submodule `pyspark.ml.tuning` also has a class called `CrossValidator` for performing cross validation. This Estimator takes the modeler we want to fit, the grid of hyperparameters we created, and the evaluator we want to use to compare our models.

The submodule pyspark.ml.tune has already been imported as tune. We'll create the `CrossValidator` by passing it the logistic regression Estimator `lr`, the parameter grid, and the evaluator we created in the previous exercises.

In [0]:
import pyspark.ml.tuning as tune

cv = tune.CrossValidator(estimator=lr,
               estimatorParamMaps=grid,
               evaluator=evaluator
               )

## Fitting the model
We're finally ready to fit the models and select the best one!

Unfortunately, cross validation is a very computationally intensive procedure.

For fitting the models we'll do
> `models = cv.fit(training)`

After fitting, we'll extract the best model by
> `best_lr = models.bestModel`

Remember, the training data is called `training` and we're using `lr` to fit a logistic regression model. Cross validation selected the parameter values `regParam=0` and `elasticNetParam=0` as being the best. These are the default values, so we don't need to do anything else with `lr` before fitting the model.

In [0]:
models = cv.fit(training)

In [0]:
# Extract the best model
best_lr = models.bestModel

Printing `best_lr` to verify that it's an object of the LogisticRegressionModel class.

In [0]:
print(best_lr)

LogisticRegressionModel: uid = LogisticRegression_0973db6e9935, numClasses = 2, numFeatures = 81


---
## Evaluating binary classifiers
We'll be using a common metric for binary classification algorithms called the **AUC**, or **area under the curve**. In this case, the curve is the ROC, or receiver operating curve. 

The closer the AUC is to one (1), the better the model is!



Using our model to generate predictions by applying **`best_lr.transform()`** to the test data and saving this as test_results

In [0]:
test_results = best_lr.transform(dataset=test)

Calling `evaluator.evaluate()` on `test_results` to compute the AUC and printing the output.

In [0]:
print(evaluator.evaluate(dataset=test_results))

0.699089985568893


So this is the accuracy of our model.

![The End](https://upload.wikimedia.org/wikipedia/commons/3/3d/The_End_Book.png)