In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark import SparkContext
import os
import requests

## Download Datasets

In [2]:
DIR_PATH = os.getcwd()
PATH_TO_SAVE = f"{DIR_PATH}/datasets/"
URLS = ["https://assets.datacamp.com/production/repositories/1237/datasets/6e5c4ac2a4799338ba7e13d54ce1fa918da644ba/airports.csv", 
"https://assets.datacamp.com/production/repositories/1237/datasets/fa47bb54e83abd422831cbd4f441bd30fd18bd15/flights_small.csv",
"https://assets.datacamp.com/production/repositories/1237/datasets/231480a2696c55fde829ce76d936596123f12c0c/planes.csv"]

FILE_PATHS = [ f"{DIR_PATH}/datasets/{url.split('/')[-1]}" for url in URLS ]

In [3]:
def download_dataset(url, path_to_save):
    """

    #ref.: https://stackabuse.com/download-files-with-python/
    """
    print('Beginning datasets download with requests')
    file_name = url.split("/")[-1]
    r = requests.get(url)

    with open(path_to_save + "/" + file_name, 'wb') as f:
        f.write(r.content)

    # Retrieve HTTP meta-data
    print(r.status_code)

for url in URLS:
    download_dataset(url, PATH_TO_SAVE)

Beginning datasets download with requests
200
Beginning datasets download with requests
200
Beginning datasets download with requests
200


## Load Datasets

In [4]:
def load_dataset(file_path):

    file_name = os.path.splitext(file_path)[0].split('/')[-1]
    print("file_name: "+file_name)
    sc = SparkContext.getOrCreate()

    # Verify SparkContext
    #print(f"Printing sc: {sc}")

    # Print Spark version
    #print(f"Printing sc.version: {sc.version}")

    # Create spark_session
    spark_session = SparkSession.builder.getOrCreate()

    # Print spark_session
    #print(f"Printing spark_session: {spark_session}")

    # Print the tables in the catalog
    #print(f"spark_session.catalog.listTables(): {spark_session.catalog.listTables()}")

    # Read in the flights data
    dataset = spark_session.read.csv(file_path,header=True)

    # Show the data
    dataset.show()

    #Print Schema
    dataset.printSchema()

    # Add spark_temp to the catalog
    dataset.createOrReplaceTempView(file_name)

    # Examine the tables in the catalog again
    print(f"spark_session.catalog.listTables(): {spark_session.catalog.listTables()}")

    return dataset, file_name

In [5]:
dataset_objs = {}
for file_path in FILE_PATHS:
    print(file_path)
    dataset, file_name = load_dataset(file_path)
    dataset_objs[file_name] = dataset

/home/jovyan/work/datasets/airports.csv
file_name: airports
+---+--------------------+----------------+-----------------+----+---+---+
|faa|                name|             lat|              lon| alt| tz|dst|
+---+--------------------+----------------+-----------------+----+---+---+
|04G|   Lansdowne Airport|      41.1304722|      -80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|      32.4605722|      -85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|      41.9893408|      -88.1012428| 801| -6|  A|
|06N|     Randall Airport|       41.431912|      -74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|      31.0744722|      -81.4277778|  11| -4|  A|
|0A9|Elizabethton Muni...|      36.3712222|      -82.1734167|1593| -4|  A|
|0G6|Williams County A...|      41.4673056|      -84.5067778| 730| -5|  A|
|0G7|Finger Lakes Regi...|      42.8835647|      -76.7812318| 492| -5|  A|
|0P2|Shoestring Aviati...|      39.7948244|      -76.6471914|1000| -5|  U|
|0S9|Jefferson County ...|      48.05380

In [6]:
for key in dataset_objs:
    print(f"{key}:")
    dataset_objs[key].show()

airports:
+---+--------------------+----------------+-----------------+----+---+---+
|faa|                name|             lat|              lon| alt| tz|dst|
+---+--------------------+----------------+-----------------+----+---+---+
|04G|   Lansdowne Airport|      41.1304722|      -80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|      32.4605722|      -85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|      41.9893408|      -88.1012428| 801| -6|  A|
|06N|     Randall Airport|       41.431912|      -74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|      31.0744722|      -81.4277778|  11| -4|  A|
|0A9|Elizabethton Muni...|      36.3712222|      -82.1734167|1593| -4|  A|
|0G6|Williams County A...|      41.4673056|      -84.5067778| 730| -5|  A|
|0G7|Finger Lakes Regi...|      42.8835647|      -76.7812318| 492| -5|  A|
|0P2|Shoestring Aviati...|      39.7948244|      -76.6471914|1000| -5|  U|
|0S9|Jefferson County ...|      48.0538086|     -122.8106436| 108| -8|  A|
|0W3|Harford Co

## Casting String Columns to Integer

Spark only handles numeric data. That means all of the columns in your DataFrame must be either integers or decimals (called 'doubles' in Spark).

In [7]:
# Rename year column
dataset_objs["planes"] = dataset_objs["planes"].withColumnRenamed("year","plane_year")

# Join the DataFrames
model_data = dataset_objs["flights_small"].join(dataset_objs["planes"], on='tailnum', how="leftouter")

# Cast the columns to integers
model_data = model_data.withColumn("arr_delay", model_data.arr_delay.cast("integer"))
model_data = model_data.withColumn("air_time", model_data.air_time.cast("integer"))
model_data = model_data.withColumn("month", model_data.month.cast("integer"))
model_data = model_data.withColumn("plane_year", model_data.plane_year.cast("integer"))


## Creating new columns

Converts 'is_late' column to an integer column so that you can use it in our model and name it 'label' (this is the default name for the response variable in Spark's machine learning routines)

In [8]:
# Create the column plane_age
model_data = model_data.withColumn("plane_age", model_data.year-model_data.plane_year)

# Create is_late
model_data = model_data.withColumn("is_late", model_data.arr_delay > 0)

# Convert to an integer
model_data = model_data.withColumn("label", model_data.is_late.cast("integer"))

## Remove missing values

In [9]:
# Remove missing values
model_data = model_data.filter("arr_delay is not NULL and dep_delay is not NULL and air_time is not NULL and plane_year is not NULL")
model_data.show()

+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+--------------+-----------+-------+-----+-----+---------+---------+-------+-----+
|tailnum|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|flight|origin|dest|air_time|distance|hour|minute|plane_year|                type|  manufacturer|      model|engines|seats|speed|   engine|plane_age|is_late|label|
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+--------------+-----------+-------+-----+-----+---------+---------+-------+-----+
| N846VA|2014|   12|  8|     658|       -7|     935|       -5|     VX|  1780|   SEA| LAX|     132|     954|   6|    58|      2011|Fixed wing multi ...|        AIRBUS|   A320-214|      2|  182|   NA|Turbo-fan|      3.0|  false|    0|
| N559AS|2014|    1| 22|    1040|        5|    1505|        5|     A

## Pipeline

The first step to encoding our categorical feature is to create a **StringIndexer**. Members of this class are **Estimators** that take a DataFrame with a column of strings and map each unique string to a number. Then, the **Estimator** returns a **Transformer** that takes a DataFrame, attaches the mapping to it as metadata, and returns a new DataFrame with a numeric column corresponding to the string column.

The second step is to encode this numeric column as a one-hot vector using a **OneHotEncoder**. This works exactly the same way as the **StringIndexer** by creating an **Estimator** and then a **Transformer**. The end result is a column that encodes our categorical feature as a vector that's suitable for machine learning routines.

### Carrier

In [10]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
# Create a StringIndexer
carr_indexer = StringIndexer(inputCol="carrier", outputCol="carrier_index")

# Create a OneHotEncoder
carr_encoder = OneHotEncoder(inputCol="carrier_index", outputCol="carrier_fact")

### Destination

In [11]:
# Create a StringIndexer
dest_indexer = StringIndexer(inputCol="dest", outputCol="dest_index")

# Create a OneHotEncoder
dest_encoder = OneHotEncoder(inputCol="dest_index", outputCol="dest_fact")

### Assemble a vector

The last step in the Pipeline is to combine all of the columns containing our features into a single column. This has to be done before modeling can take place because every Spark modeling routine expects the data to be in this form. We can do this by storing each of the values from a column as an entry in a vector. 

Then, from the model's point of view, every observation is a vector that contains all of the information about it and a label that tells the modeler what value that observation corresponds to.

Because of this, the pyspark.ml.feature submodule contains a class called VectorAssembler. This Transformer takes all of the columns you specify and combines them into a new vector column.

In [12]:
from pyspark.ml.feature import VectorAssembler
# Make a VectorAssembler
vec_assembler = VectorAssembler(inputCols=["month", "air_time", "carrier_fact", "dest_fact", "plane_age"], outputCol="features")

### Create the pipeline

Pipeline is a class in the pyspark.ml module that combines all the Estimators and Transformers that we've already created. This lets we reuse the same modeling process over and over again by wrapping it up in one simple object.

stages should be a list holding all the stages you want your data to go through in the pipeline.

In [13]:
# Import Pipeline
from pyspark.ml import Pipeline

# Make the pipeline
flights_pipe = Pipeline(stages=[dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler])

## Train and Test

After we've cleaned our data and gotten it ready for modeling, one of the most important steps is to split the data into a test set and a train set. After that, we don't touch our test data until we think we have a good model. As we're building models and forming hypotheses, we can test them on our training data to get an idea of their performance.

In [14]:
# Fit and transform the data
piped_data = flights_pipe.fit(model_data).transform(model_data)

### Split the data

In [15]:
# Split the data into training and test sets
training, test = piped_data.randomSplit([.6, .4])

### Model tuning and selection

#### Create the modeler

In [16]:
# Import LogisticRegression
from pyspark.ml.classification import LogisticRegression

# Create a LogisticRegression Estimator
lr = LogisticRegression()

#### Cross validation

It works by splitting the training data into a few different partitions. Once the data is split up, one of the partitions is set aside, and the model is fit to the others. Then the error is measured against the held out partition. This is repeated for each of the partitions, so that every block of data is held out and used as a test set exactly once.

##### Create the evaluator

The first thing we need when doing cross validation for model selection is a way to compare different models. The model is a binary classification model, so we'll be using the BinaryClassificationEvaluator from the pyspark.ml.evaluation module.

This evaluator calculates the area under the ROC. This is a metric that combines the two kinds of errors a binary classifier can make (false positives and false negatives) into a simple number.

In [17]:
# Import the evaluation submodule
import pyspark.ml.evaluation as evals

# Create a BinaryClassificationEvaluator
evaluator = evals.BinaryClassificationEvaluator(metricName="areaUnderROC")

##### Make a grid

Next, we need to create a grid of values to search over when looking for the optimal hyperparameters.

In [18]:
# Import the tuning submodule
import pyspark.ml.tuning as tune
import numpy as np

# Create the parameter grid
grid = tune.ParamGridBuilder()

# Add the hyperparameter
grid = grid.addGrid(lr.regParam, np.arange(0, .1, .01))
grid = grid.addGrid(lr.elasticNetParam, [0, 1])

# Build the grid
grid = grid.build()

##### Make the validator

The submodule pyspark.ml.tuning also has a class called CrossValidator for performing cross validation. This Estimator takes the modeler we want to fit, the grid of hyperparameters we created, and the evaluator we want to use to compare our models.

The submodule pyspark.ml.tune has already been imported as tune. We'll create the CrossValidator by passing it the logistic regression Estimator lr, the parameter grid, and the evaluator we created previously.

In [19]:
# Create the CrossValidator
cv = tune.CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)

##### Fit the model(s)

The training data is called 'training' and we're using lr to fit a logistic regression model. Cross validation selected the parameter values regParam=0 and elasticNetParam=0 as being the best. These are the default values, so we don't need to do anything else with lr before fitting the model.

In [20]:
# Fit cross validation models
models = cv.fit(training)

# Extract the best model
best_lr = models.bestModel

In [21]:
# Call lr.fit()
best_lr = lr.fit(training)

# Print best_lr
print(best_lr)

LogisticRegressionModel: uid=LogisticRegression_047b2fa6aca6, numClasses=2, numFeatures=81


In [22]:
# Use the model to predict the test set
test_results = best_lr.transform(test)

# Evaluate the predictions
print(evaluator.evaluate(test_results))

0.6898034847229739
