In [1]:
from pyspark.sql import SparkSession
import numpy as np
import pandas as pd

In [2]:
spark = SparkSession.builder.getOrCreate()

### Import data

In [3]:
flights = spark.read.csv("data/flights_small.csv", header=True)
planes = spark.read.csv("data/planes.csv", header=True)
airports = spark.read.csv("data/airports.csv", header=True)

### Munge data

In [4]:
# Rename year column
planes = planes.withColumnRenamed("year","plane_year")

# Join the DataFrames
model_data = flights.join(planes, on="tailnum", how="leftouter")

# Correct data types
model_data = model_data.withColumn("arr_delay", model_data.arr_delay.cast("integer"))
model_data = model_data.withColumn("air_time", model_data.air_time.cast("integer"))
model_data = model_data.withColumn("month", model_data.month.cast("integer"))
model_data = model_data.withColumn("plane_year", model_data.plane_year.cast("integer"))

### Define features

In [6]:
# Create the column plane_age
model_data = model_data.withColumn("plane_age", model_data.year - model_data.plane_year)

# Create is_late
model_data = model_data.withColumn("is_late", model_data.arr_delay > 0)

# Remove missing values
model_data = model_data.filter("arr_delay is not NULL and dep_delay is not NULL and air_time is not NULL and plane_year is not NULL")

### Prepare features for model input

In [7]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

# Convert target to an integer and name is label
model_data = model_data.withColumn("label", model_data.is_late.cast("integer"))

# Create a StringIndexer for carrier
carr_indexer = StringIndexer(inputCol="carrier", outputCol="carrier_index")

# Create a OneHotEncoder for carrie
carr_encoder = OneHotEncoder(inputCol="carrier_index", outputCol="carrier_fact")

# Create a StringIndexer for dest
dest_indexer = StringIndexer(inputCol="dest", outputCol="dest_index")

# Create a OneHotEncoder for dest
dest_encoder = OneHotEncoder(inputCol="dest_index", outputCol="dest_fact")

# Make a VectorAssembler
vec_assembler = VectorAssembler(inputCols= \
    ["month", "air_time", "carrier_fact", "dest_fact", "plane_age"], outputCol="features")

# Make the pipeline
flights_pipe = Pipeline(stages=[dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler])

# Fit and transform the data
piped_data = flights_pipe.fit(model_data).transform(model_data)

### Create model and evaluation metric

In [8]:
from pyspark.ml.classification import LogisticRegression
import pyspark.ml.evaluation as evals

# Create a LogisticRegression Estimator
lr = LogisticRegression()

# Create a BinaryClassificationEvaluator
evaluator = evals.BinaryClassificationEvaluator(metricName="areaUnderROC")

### Setup train, test split and cross-validation

In [11]:
import pyspark.ml.tuning as tune

# Split the data into training and test sets
training, test = piped_data.randomSplit([.6, .4])

# Create the parameter grid
grid = tune.ParamGridBuilder()

# Add the hyperparameter
grid = grid.addGrid(lr.regParam, np.arange(0, .1, .01))
grid = grid.addGrid(lr.elasticNetParam, [0.0, 1.0]) # This has to be a double, an int will fail

# Build the grid
grid = grid.build()

# Create the CrossValidator
cv = tune.CrossValidator(estimator=lr, estimatorParamMaps=grid,\
    evaluator=evaluator)

### Perform optimalization

In [18]:
# Call lr.fit(training)
best_lr = cv.fit(training)

### Evaluate

In [13]:
# Use the model to predict the test set
test_results = best_lr.transform(test)

# Evaluate the predictions
print(evaluator.evaluate(test_results))

0.6977557486873057
