# Problem definition

In order to evaluate PySpark framework, we want to perform an entire pipeline of steps, on a large dataset, and compare the same analysis using an usual framework, like scikit-learn, without paralel computation.

We will work on a classification problem, based on flights informations from North America. This dataset is available in Databricks datasets, and it contains 1,391,578 rows and 5 columns. It has no null values.

Our workflow will consist of feature engineering, cross-validation, hyperparameter tuning and simple versus ensemble model.

In [0]:
# Import libraries
import time
from pyspark.sql.functions import concat, hour, lit, to_timestamp
from pyspark.ml import Pipeline
from pyspark.ml.feature import Bucketizer, StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# 1. Feature Engineering

Since PySpark Machine Learning algorithms require only numeric variables, we need to perform some transformations on the data.

First we will create a new column named label, representing our target variable, delayed flight or not.<br>
After that, we will implement steps of feature engineering to our pipeline.<br>
We will be transforming the date column into readable datetime format and bucketizing it into intervals of 3 hours along 24 hours.<br>
We will also be implementing an indexer for the categorical features (origin and destinantion) and one-hot encoder for them.

## 1.1. Target variable label

The Federal Aviation Administration (FAA) considers a flight to be "delayed" when it arrives 15 minutes or more after its scheduled time. Thus we will be creating the boolean label, accordingly to FAA definition. After that we will convert the boolean column into numerical.

In [0]:
data = spark.read.csv('dbfs:/databricks-datasets/flights/departuredelays.csv', header=True, inferSchema=True)

In [0]:
df = data
df = df.withColumn('label', df.delay >= 15)
df.groupBy('label').count().show()

+-----+-------+
|label|  count|
+-----+-------+
| true| 314474|
|false|1077104|
+-----+-------+



We can see that the data is unbalanced, having 3 times more non delayed flights.<br>
However we will work with the data without changing its distribution, in order to evaluate both models performance.

In [0]:
# Transform boolean into numerical
df = df.withColumn('label', df.label.cast('integer'))

In [0]:
df.display()

date,delay,distance,origin,destination,label
1011245,6,602,ABE,ATL,0
1020600,-8,369,ABE,DTW,0
1021245,-2,602,ABE,ATL,0
1020605,-4,602,ABE,ATL,0
1031245,-4,602,ABE,ATL,0
1030605,0,602,ABE,ATL,0
1041243,10,602,ABE,ATL,0
1040605,28,602,ABE,ATL,1
1051245,88,602,ABE,ATL,1
1050605,9,602,ABE,ATL,0


In [0]:
# Get number of records
print("The data contain %d records." % df.count())

The data contain 1391578 records.


In [0]:
# Print schema of DataFrame
df.printSchema()

root
 |-- date: integer (nullable = true)
 |-- delay: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- destination: string (nullable = true)
 |-- label: integer (nullable = true)



## 1.2. Column departure

We will first convert the time column into string, so next we can convert it into timestamp.<br>
Then we will declare the bucketizer step for intervals of 3 hours.

In [0]:
# Transform int to string
df = df.withColumn('departure', concat(lit('20140'), df.date.cast('string')))

# Transform string to timestamp
df = df.withColumn('departure', to_timestamp('departure', 'yyyyMMddHHmm'))

# Get hour from departure time
df = df.withColumn('hour', hour(df.departure))

In [0]:
# Define Bucketizer
bucketizer = Bucketizer(splits=[0,3,6,9,12,15,18,21,24], inputCol='hour', outputCol='departure_bucket')

## 1.3. Categorical features

The categorical features origin and destination contain the IATA code for airports of North America. Since there are around 300 different airports in the dataset, we will replace them by their state, reducing to a total of 65 states.

In order to do this, we will need to import a second database, containing the airports informations. We will perform a join between the two tables, to get the corresponding states.

After this, we will implement an indexer and a one-hot encoder for each categorical feature.<br>
The buckets from previous section will need one-hot encoding also, since they are categorical values now.<br>
These steps will be gathered in the pipeline.

In [0]:
df_air = spark.read.option("header", "true").option("delimiter", "\t").csv('dbfs:/databricks-datasets/flights/airport-codes-na.txt')

In [0]:
df_air.display()

City,State,Country,IATA
Abbotsford,BC,Canada,YXX
Aberdeen,SD,USA,ABR
Abilene,TX,USA,ABI
Akron,OH,USA,CAK
Alamosa,CO,USA,ALS
Albany,GA,USA,ABY
Albany,NY,USA,ALB
Albuquerque,NM,USA,ABQ
Alexandria,LA,USA,AEX
Allentown,PA,USA,ABE


In [0]:
# Join on origin
df_origin = df_air.withColumnRenamed('IATA', 'origin')
df_origin = df_origin.withColumnRenamed('State', 'origin_state')
df_origin = df_origin.select('origin_state', 'origin')
df = df.join(df_origin, on='origin', how='leftouter')

In [0]:
# Join on destination
df_dest = df_air.withColumnRenamed('IATA', 'destination')
df_dest = df_dest.withColumnRenamed('State', 'dest_state')
df_dest = df_dest.select('dest_state', 'destination')
df = df.join(df_dest, on='destination', how='leftouter')

In [0]:
df = df.dropna()

In [0]:
# Define Indexer
indexer = StringIndexer(inputCols=['origin_state', 'dest_state'],
                        outputCols=['origin_index', 'dest_index'])

In [0]:
# Define One-Hot Encoder
encoder = OneHotEncoder(inputCols=['departure_bucket', 'origin_index', 'dest_index'],
                        outputCols=['departure_dummy', 'origin_dummy', 'dest_dummy'])

## 1.4. Train-test split

Before applying any fitting or prediction, we will split the data into training and test sets. This will ensure that data leakage does not occur during all process.

In [0]:
# Split the data
train, test = df.randomSplit([.7, .3])

# 2. Pipeline

Now we will define a pipeline to run all previous declared steps.

## 2.1. Vector Assembler

This step is required for PySpark, since it requires all features into one vector, in sparse representation.

In [0]:
# Assembling columns
assembler = VectorAssembler(inputCols=['delay', 'distance', 'departure_dummy', 'origin_dummy', 'dest_dummy'],
                            outputCol='features')

## 2.2. Pipeline

Now we can declare and build our pipeline with all previous steps.<br>
By applying the pipeline, we can properly fit with the training data and transform both the training and test sets.

In [0]:
# Create the Pipeline
pipeline = Pipeline(stages=[bucketizer, indexer, encoder, assembler])

In [0]:
pipeline = pipeline.fit(train)

In [0]:
train = pipeline.transform(train)

In [0]:
test = pipeline.transform(test)

In [0]:
# Displaying the training dataset after the pipeline
train.display()

destination,origin,date,delay,distance,label,departure,hour,origin_state,dest_state,departure_bucket,origin_index,dest_index,departure_dummy,origin_dummy,dest_dummy,features
ABE,ATL,1012039,-4,602,0,2014-01-01T20:39:00.000+0000,20,GA,PA,6.0,3.0,12.0,"Map(vectorType -> sparse, length -> 7, indices -> List(6), values -> List(1.0))","Map(vectorType -> sparse, length -> 48, indices -> List(3), values -> List(1.0))","Map(vectorType -> sparse, length -> 49, indices -> List(12), values -> List(1.0))","Map(vectorType -> sparse, length -> 106, indices -> List(0, 1, 8, 12, 69), values -> List(-4.0, 602.0, 1.0, 1.0, 1.0))"
ABE,ATL,1021016,-7,602,0,2014-01-02T10:16:00.000+0000,10,GA,PA,3.0,3.0,12.0,"Map(vectorType -> sparse, length -> 7, indices -> List(3), values -> List(1.0))","Map(vectorType -> sparse, length -> 48, indices -> List(3), values -> List(1.0))","Map(vectorType -> sparse, length -> 49, indices -> List(12), values -> List(1.0))","Map(vectorType -> sparse, length -> 106, indices -> List(0, 1, 5, 12, 69), values -> List(-7.0, 602.0, 1.0, 1.0, 1.0))"
ABE,ATL,1022039,0,602,0,2014-01-02T20:39:00.000+0000,20,GA,PA,6.0,3.0,12.0,"Map(vectorType -> sparse, length -> 7, indices -> List(6), values -> List(1.0))","Map(vectorType -> sparse, length -> 48, indices -> List(3), values -> List(1.0))","Map(vectorType -> sparse, length -> 49, indices -> List(12), values -> List(1.0))","Map(vectorType -> sparse, length -> 106, indices -> List(1, 8, 12, 69), values -> List(602.0, 1.0, 1.0, 1.0))"
ABE,ATL,1032039,0,602,0,2014-01-03T20:39:00.000+0000,20,GA,PA,6.0,3.0,12.0,"Map(vectorType -> sparse, length -> 7, indices -> List(6), values -> List(1.0))","Map(vectorType -> sparse, length -> 48, indices -> List(3), values -> List(1.0))","Map(vectorType -> sparse, length -> 49, indices -> List(12), values -> List(1.0))","Map(vectorType -> sparse, length -> 106, indices -> List(1, 8, 12, 69), values -> List(602.0, 1.0, 1.0, 1.0))"
ABE,ATL,1041015,-3,602,0,2014-01-04T10:15:00.000+0000,10,GA,PA,3.0,3.0,12.0,"Map(vectorType -> sparse, length -> 7, indices -> List(3), values -> List(1.0))","Map(vectorType -> sparse, length -> 48, indices -> List(3), values -> List(1.0))","Map(vectorType -> sparse, length -> 49, indices -> List(12), values -> List(1.0))","Map(vectorType -> sparse, length -> 106, indices -> List(0, 1, 5, 12, 69), values -> List(-3.0, 602.0, 1.0, 1.0, 1.0))"
ABE,ATL,1051016,84,602,1,2014-01-05T10:16:00.000+0000,10,GA,PA,3.0,3.0,12.0,"Map(vectorType -> sparse, length -> 7, indices -> List(3), values -> List(1.0))","Map(vectorType -> sparse, length -> 48, indices -> List(3), values -> List(1.0))","Map(vectorType -> sparse, length -> 49, indices -> List(12), values -> List(1.0))","Map(vectorType -> sparse, length -> 106, indices -> List(0, 1, 5, 12, 69), values -> List(84.0, 602.0, 1.0, 1.0, 1.0))"
ABE,ATL,1060930,-4,602,0,2014-01-06T09:30:00.000+0000,9,GA,PA,3.0,3.0,12.0,"Map(vectorType -> sparse, length -> 7, indices -> List(3), values -> List(1.0))","Map(vectorType -> sparse, length -> 48, indices -> List(3), values -> List(1.0))","Map(vectorType -> sparse, length -> 49, indices -> List(12), values -> List(1.0))","Map(vectorType -> sparse, length -> 106, indices -> List(0, 1, 5, 12, 69), values -> List(-4.0, 602.0, 1.0, 1.0, 1.0))"
ABE,ATL,1061502,82,602,1,2014-01-06T15:02:00.000+0000,15,GA,PA,5.0,3.0,12.0,"Map(vectorType -> sparse, length -> 7, indices -> List(5), values -> List(1.0))","Map(vectorType -> sparse, length -> 48, indices -> List(3), values -> List(1.0))","Map(vectorType -> sparse, length -> 49, indices -> List(12), values -> List(1.0))","Map(vectorType -> sparse, length -> 106, indices -> List(0, 1, 7, 12, 69), values -> List(82.0, 602.0, 1.0, 1.0, 1.0))"
ABE,ATL,1062115,0,602,0,2014-01-06T21:15:00.000+0000,21,GA,PA,7.0,3.0,12.0,"Map(vectorType -> sparse, length -> 7, indices -> List(), values -> List())","Map(vectorType -> sparse, length -> 48, indices -> List(3), values -> List(1.0))","Map(vectorType -> sparse, length -> 49, indices -> List(12), values -> List(1.0))","Map(vectorType -> sparse, length -> 106, indices -> List(1, 12, 69), values -> List(602.0, 1.0, 1.0))"
ABE,ATL,1070930,159,602,1,2014-01-07T09:30:00.000+0000,9,GA,PA,3.0,3.0,12.0,"Map(vectorType -> sparse, length -> 7, indices -> List(3), values -> List(1.0))","Map(vectorType -> sparse, length -> 48, indices -> List(3), values -> List(1.0))","Map(vectorType -> sparse, length -> 49, indices -> List(12), values -> List(1.0))","Map(vectorType -> sparse, length -> 106, indices -> List(0, 1, 5, 12, 69), values -> List(159.0, 602.0, 1.0, 1.0, 1.0))"


# 3. Baseline Model

We will apply two classification models: one simple logistic regression and one ensemble method, Random Forest.<br>
Since the logistic regression is more simple, we will use it as baseline model.<br>
We will perform hyperparameter tuning on it, with grid-search cross-validation and evaluate the best model on the test set.<br>
We will apply two evaluations for both models: ROC-AUC and Confusion Matrix.

## 3.1. Evaluators

Since we will be evaluating both models with the same roc_auc metric, we will define it first.<br>
We will also define a function to calculate the Confusion Matrix as well as accuracy, precision, recall and F1-Score.

In [0]:
# Define the evaluator
evaluator = BinaryClassificationEvaluator(metricName='areaUnderROC')

In [0]:
# Confusion Matrix
def confusion_matrix(prediction):
  prediction.groupBy('label', 'prediction').count().show()
  
  # Calculate the elements of the confusion matrix
  TN = prediction.filter('prediction = 0 AND label = prediction').count()
  TP = prediction.filter('prediction = 1 AND label = prediction').count()
  FN = prediction.filter('prediction = 0 AND label <> prediction').count()
  FP = prediction.filter('prediction = 1 AND label <> prediction').count()
  
  # Accuracy, precision and recall metrics
  accuracy = (TP+TN)/(TP+TN+FP+FN)
  precision = TP/(TP+FP)
  recall = TP/(TP+FN)
  f1 = 2*(precision*recall)/(precision+recall)
  
  print('Accuracy: ', accuracy)
  print('Precision: ', precision)
  print('Recall: ', recall)
  print('F1-Score: ', f1)

## 3.2. Logistic Regression

The Logistic Regression will be our baseline model, for comparison.<br>
For its hyperparameters we will search for 'regParam' wich is the Regularization λ and 'elasticNetParam' which is the Regularization α (L1/L2).

In [0]:
# Create Logistic Regression model
lr = LogisticRegression()

In [0]:
# Make a grid for grid-search
params_lr = ParamGridBuilder()

# Add the hyperparameters
params_lr = params_lr.addGrid(lr.regParam, [1, .1, .01])
params_lr = params_lr.addGrid(lr.elasticNetParam, [0, 1])

# Build the grid
params_lr = params_lr.build()

## 3.3. Hyperparameter Tuning

Now through grid-search cross-validation, we may search for the best hyperparameters, on the training set, and use the best model to predict the test set, in order to evaluate the results.

In [0]:
# Create the CrossValidator
cv_lr = CrossValidator(estimator=lr,
                       estimatorParamMaps=params_lr,
                       evaluator=evaluator,
                       numFolds=3)

In [0]:
# Train the model and time it
t_start = time.time()
cv_lr = cv_lr.fit(train)
t_total = time.time() - t_start
print('Cross-validation training with Logistic Regression took around {:.2f} min.'.format(t_total/60))

Cross-validation training with Logistic Regression took around 7.91 min.


## 3.4. Cross-Validation Best Model

In [0]:
# Extract the best model
best_lr = cv_lr.bestModel

# Print best_lr
print(best_lr)

LogisticRegressionModel: uid=LogisticRegression_34e3dd8a08f2, numClasses=2, numFeatures=106


In [0]:
best_lr.explainParam('regParam')

Out[77]: 'regParam: regularization parameter (>= 0). (default: 0.0, current: 0.1)'

In [0]:
best_lr.explainParam('elasticNetParam')

Out[78]: 'elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0, current: 1.0)'

## 3.5. Model Evaluation

In [0]:
# Testing the data
predictions_lr = cv_lr.transform(test)

In [0]:
# Evaluate the predictions
print('roc_auc score: ', evaluator.evaluate(predictions_lr))

confusion_matrix(predictions_lr)

roc_auc score:  1.0
+-----+----------+------+
|label|prediction| count|
+-----+----------+------+
|    1|       0.0| 54001|
|    0|       0.0|300984|
|    1|       1.0| 34712|
+-----+----------+------+

Accuracy:  0.8614282378360624
Precision:  1.0
Recall:  0.3912842537170426
F1-Score:  0.562479238403889


# 4. Ensemble Random Forest

Now we will apply a Random Forest model, also searching for its best hyperparameters, through grid-search cross-validation.

## 4.1. Model

For its hyperparameters we will search for 'featureSubsetStrategy' wich is the number of features to consider and 'maxDepth' which is the maximum depth of the tree.

In [0]:
# Create Random Forest model
rf = RandomForestClassifier()

In [0]:
# Make a grid for grid-search
params_rf = ParamGridBuilder()

# Add the hyperparameters
params_rf = params_rf.addGrid(rf.featureSubsetStrategy, ['all', 'onethird', 'sqrt', 'log2'])
params_rf = params_rf.addGrid(rf.maxDepth, [2, 5, 10])

# Build the grid
params_rf = params_rf.build()

## 4.2. Hyperparameter Tuning

In [0]:
# Create the CrossValidator
cv_rf = CrossValidator(estimator=rf,
                       estimatorParamMaps=params_rf,
                       evaluator=evaluator,
                       numFolds=3)

In [0]:
# Train the model and time it
t_start = time.time()
cv_rf = cv_rf.fit(train)
t_total = time.time() - t_start
print('Cross-validation training with Random Forest took around {:.2f} min.'.format(t_total/60))

Cross-validation training with Random Forest took around 22.73 min.


## 4.3. Cross-Validation Best Model

In [0]:
# Extract the best model
best_rf = cv_rf.bestModel

# Print best_rf
print(best_rf)

RandomForestClassificationModel: uid=RandomForestClassifier_2d8266ad910a, numTrees=20, numClasses=2, numFeatures=106


In [0]:
best_rf.explainParam('featureSubsetStrategy')

Out[86]: "featureSubsetStrategy: The number of features to consider for splits at each tree node. Supported options: 'auto' (choose automatically for task: If numTrees == 1, set to 'all'. If numTrees > 1 (forest), set to 'sqrt' for classification and to 'onethird' for regression), 'all' (use all features), 'onethird' (use 1/3 of the features), 'sqrt' (use sqrt(number of features)), 'log2' (use log2(number of features)), 'n' (when n is in the range (0, 1.0], use n * number of features. When n is in the range (1, number of features), use n features). default = 'auto' (default: auto, current: all)"

In [0]:
best_rf.explainParam('maxDepth')

Out[87]: 'maxDepth: Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30]. (default: 5, current: 5)'

## 4.4. Model Evaluation

In [0]:
# Testing the data
predictions_rf = cv_rf.transform(test)

In [0]:
# Evaluate the predictions
print('roc_auc score: ', evaluator.evaluate(predictions_rf))

confusion_matrix(predictions_rf)

roc_auc score:  0.999801222799958
+-----+----------+------+
|label|prediction| count|
+-----+----------+------+
|    1|       0.0|  2685|
|    0|       0.0|300416|
|    1|       1.0| 86028|
|    0|       1.0|   568|
+-----+----------+------+

Accuracy:  0.991652488985032
Precision:  0.9934408055799344
Recall:  0.9697338608772108
F1-Score:  0.98144419282524


# 5. Conclusions

Spark is a framework developed specially for big data processing.<br>
Its default HDFS partition size is 64MB, which means for larger data, Spark process with multiple processors. This way, the processing time does not scalate with the data size.

The pyspark.ml is the Machine Learning library, with Python interface, developed to work properly with all the advantages of the Spark framework.

Analyzing both models evaluation, we can see that, although Random Forest achieved a slightly lower ROC AUC Score, its Confusion Matrix metrics were much better than Logistic Regression.

|Metric|Logistic Regression|Random Forest|
|--|--|--|
|ROC AUC|1.0000|0.9998|
|Accuracy|0.8614|0.9917|
|Precision|1.0000|0.9934|
|Recall|0.3913|0.9697|
|F1-Score|0.5625|0.9814|

The most significant difference is the high Recall and F1-Score for Random Forest, meaning that the Ensemble Model is capable of achieving better predictions of true class (delay). Other diffence is that Logistic Regression is a linear model, while Decision Tree Classifiers are capable of modeling non-linearities from the data.

The unbalanced distribution of the data affects both classification models. However since Random Forest is an ensemble model, composed of multiple different predictors, it was capable of compensate the unbalance. This unbalance was also responsible for Logistic Regression's high accuracy and precision.

For the entire data of 1.4 million rows, the Logistic Regression cross-validation took around 8 min to run, while the Random Forest took around 22 min.