In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count

from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

import numpy as np

In [2]:
# Initialize Spark Session
spark = SparkSession.builder.appName("Spark-Ml").getOrCreate()

24/09/15 01:52:49 WARN Utils: Your hostname, Rizwan.local resolves to a loopback address: 127.0.0.1; using 192.168.31.253 instead (on interface en0)
24/09/15 01:52:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/15 01:52:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Load data with NA replacement
df_flights = spark.read.csv("../../datasets/datasets-01/flights_small.csv", header="true", inferSchema="true").replace("NA", None)
df_planes = spark.read.csv("../../datasets/datasets-01/planes.csv", header="true", inferSchema="true").replace("NA", None)

In [4]:
# Clean and prepare data
df_planes = df_planes.drop("speed").dropna()
df_flights = df_flights.dropna()

In [5]:
# Rename columns for clarity
df_planes = df_planes.withColumnRenamed("year", "manuf_year")

In [6]:
# Join flights and planes data
model_data = df_flights.join(df_planes, on="tailnum", how="leftouter")

In [7]:
# Cast relevent columns to integer
columns_to_cast = ["dep_time", "dep_delay", "arr_time", "air_time", "arr_delay", "hour", "minute", "manuf_year"]
for column in columns_to_cast:
    model_data = model_data.withColumn(column, col(column).cast("int"))

In [8]:
# Create new features
model_data = model_data.withColumn("plane_age", col("year") - col("manuf_year"))
model_data = model_data.withColumn("is_late", col("arr_delay") > 0)
model_data = model_data.withColumn("label", col("is_late").cast("int"))     # target col

In [9]:
# Remove from where key features are missing
required_columns = ["arr_delay", "dep_delay", "arr_time", "manuf_year"]
model_data = model_data.dropna(subset=required_columns)

In [10]:
# # Optimized method to count nulls in each column
# null_counts = model_data.select(
#     [count(when(col(c).isNull(), c)).alias(c) for c in model_data.columns]
# )
# # Show the null counts
# null_counts.show()

In [11]:
# Define categorical columns and their stages for the pipeline 
categorical_cols = ["carrier", "dest"]
stages = []

for column in categorical_cols:
    indexer = StringIndexer(inputCol=column, outputCol=column+"_index")                 
    encoder = OneHotEncoder(inputCol=column+"_index", outputCol=column+"_encoded")      
    stages += [indexer, encoder]

In [12]:
# Assemble features into a single vector
features_cols = ["dep_delay", "arr_delay", "arr_time", "month", "plane_age"]

assembler_inputs = [column+"_encoded" for column in categorical_cols] + features_cols       
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")               
stages += [assembler]

In [13]:
# Create a feature engineering pipeline
feature_pipleline = Pipeline(stages=stages)

In [14]:
# Fit the pipeline on the model data
fitted_pipeline = feature_pipleline.fit(model_data)

                                                                                

In [15]:
# Transform the model data using fitted pipeline (for training or testing)
model_data_transformed = fitted_pipeline.transform(model_data)

In [16]:
model_data_transformed.select("carrier", "carrier_index", "carrier_encoded").distinct().show()

+-------+-------------+---------------+
|carrier|carrier_index|carrier_encoded|
+-------+-------------+---------------+
|     US|          5.0| (10,[5],[1.0])|
|     OO|          2.0| (10,[2],[1.0])|
|     WN|          1.0| (10,[1],[1.0])|
|     F9|          8.0| (10,[8],[1.0])|
|     AA|         10.0|     (10,[],[])|
|     AS|          0.0| (10,[0],[1.0])|
|     HA|          9.0| (10,[9],[1.0])|
|     B6|          6.0| (10,[6],[1.0])|
|     VX|          7.0| (10,[7],[1.0])|
|     UA|          4.0| (10,[4],[1.0])|
|     DL|          3.0| (10,[3],[1.0])|
+-------+-------------+---------------+



In [17]:
model_data_transformed.select("features", "label").show(3, truncate=False)

+--------------------------------------------------------------+-----+
|features                                                      |label|
+--------------------------------------------------------------+-----+
|(83,[7,11,78,79,80,81,82],[1.0,1.0,-7.0,-5.0,935.0,12.0,12.0])|0    |
|(83,[0,29,78,79,80,81,82],[1.0,1.0,5.0,5.0,1505.0,1.0,17.0])  |1    |
|(83,[7,10,78,79,80,81,82],[1.0,1.0,-2.0,2.0,1652.0,3.0,12.0]) |1    |
+--------------------------------------------------------------+-----+
only showing top 3 rows



In [18]:
# Optonally cache the transformed data to speed up future operations
model_data_transformed.cache()

24/09/15 01:54:02 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


DataFrame[tailnum: string, year: int, month: int, day: int, dep_time: int, dep_delay: int, arr_time: int, arr_delay: int, carrier: string, flight: int, origin: string, dest: string, air_time: int, distance: int, hour: int, minute: int, manuf_year: int, type: string, manufacturer: string, model: string, engines: int, seats: int, engine: string, plane_age: int, is_late: boolean, label: int, carrier_index: double, carrier_encoded: vector, dest_index: double, dest_encoded: vector, features: vector]

In [19]:
# Split the data into training and test sets
training, test = model_data_transformed.randomSplit([0.6, 0.4], seed=42)

In [20]:
# Define the Logistic Regression model
model = LogisticRegression(featuresCol="features", labelCol="label")
# model.fit(training)

In [21]:
# Hyperparameter Tuning with Cross Validation

# Define hyperparameter grid for tuning
param_grid = ParamGridBuilder()\
    .addGrid(model.regParam, np.arange(0, 0.1, 0.01))\
    .addGrid(model.elasticNetParam, [0, 1])\
    .build()

In [22]:
# Define model evaluator
evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")

In [23]:
# Set up CrossValidator
crossval = CrossValidator(
    estimator=model,                    
    estimatorParamMaps=param_grid,      
    evaluator=evaluator,                
    numFolds=5,                     
    parallelism=4                     
)

In [24]:
# Train the model using CrossValidator
cv_model = crossval.fit(training)               # ML Model

24/09/15 01:54:36 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/09/15 01:54:51 WARN BlockManager: Asked to remove block broadcast_2029_piece0, which does not exist


In [25]:
# Define paths to save the pipeline and model
# fitted_pipeline_path = "model_artifacts/fitted_pipeline_model"   
# cv_model_path = "model_artifacts/cv_model"

In [26]:
# Save the fitted pipeline model to a path
fitted_pipeline.write().overwrite().save("model-artifacts-01/fitted_pipeline_model")

                                                                                

In [28]:
# Save the cross-validated model to a path
cv_model.write().overwrite().save("model-artifacts-01/cv_model")

In [29]:
# Model Performance

In [30]:
# Get the best model from CrossValidator
best_model = cv_model.bestModel

# Make prediction on the training data
training_predictions = best_model.transform(training) 

# Evaluate training accuracy
training_evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC", labelCol="label")
training_accuracy = training_evaluator.evaluate(training_predictions) 
print(f"Training Area Under ROC : {training_accuracy}")


# Make prediction on the test data
test_predictions = best_model.transform(test)  
    
# Evaluate tesing accuracy
test_evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC", labelCol="label")
testing_accuracy = test_evaluator.evaluate(test_predictions) 
print(f"Testing Area Under ROC : {testing_accuracy}")

Training Area Under ROC : 1.0
Testing Area Under ROC : 1.0


In [31]:
# Initilize evaluators
accuracy_eveluator = MulticlassClassificationEvaluator(metricName="accuracy", labelCol="label")
precision_eveluator = MulticlassClassificationEvaluator(metricName="weightedPrecision", labelCol="label")
recall_eveluator = MulticlassClassificationEvaluator(metricName="weightedRecall", labelCol="label")
f1_eveluator = MulticlassClassificationEvaluator(metricName="f1", labelCol="label")

# Calculate metrices for training data 
training_accuracy = training_evaluator.evaluate(training_predictions) 
training_precision = precision_eveluator.evaluate(training_predictions) 
training_recall = recall_eveluator.evaluate(training_predictions) 
training_f1 = f1_eveluator.evaluate(training_predictions) 

print(f"Training accuracy : {training_accuracy}")
print(f"Training precision : {training_precision}")
print(f"Training recall : {training_recall}")
print(f"Training F1 score : {training_f1}")


# Calculate metrices for testing data 
testing_accuracy = training_evaluator.evaluate(test_predictions) 
testing_precision = precision_eveluator.evaluate(test_predictions) 
testing_recall = recall_eveluator.evaluate(test_predictions) 
testing_f1 = f1_eveluator.evaluate(test_predictions) 

print(f"Testing accuracy : {testing_accuracy}")
print(f"Testing precision : {testing_precision}")
print(f"Testing recall : {testing_recall}")
print(f"Testing F1 score : {testing_f1}")

Training accuracy : 1.0
Training precision : 0.9770655880103198
Training recall : 0.9761904761904762
Training F1 score : 0.9760241170229791
Testing accuracy : 1.0
Testing precision : 0.9736364410612954
Testing recall : 0.9724745389485274
Testing F1 score : 0.9722475814381987


In [32]:
# Classification Metrics

# CM (confusion matrix)

# accuracy = number of crroect predictions / total number of prediction
# precision = true pos / true pos + false pos
# recall = true pos / true pos + false neg
# f1 score = 2 * (precision * recall / precision + recall)

In [33]:
model_data_transformed.select("features", "label").show(2, truncate=False)

+--------------------------------------------------------------+-----+
|features                                                      |label|
+--------------------------------------------------------------+-----+
|(83,[7,11,78,79,80,81,82],[1.0,1.0,-7.0,-5.0,935.0,12.0,12.0])|0    |
|(83,[0,29,78,79,80,81,82],[1.0,1.0,5.0,5.0,1505.0,1.0,17.0])  |1    |
+--------------------------------------------------------------+-----+
only showing top 2 rows



In [34]:
assembler_inputs

['carrier_encoded',
 'dest_encoded',
 'dep_delay',
 'arr_delay',
 'arr_time',
 'month',
 'plane_age']

In [35]:
model_data_transformed.show(2)

+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+------------+--------+-------+-----+---------+---------+-------+-----+-------------+---------------+----------+---------------+--------------------+
|tailnum|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|flight|origin|dest|air_time|distance|hour|minute|manuf_year|                type|manufacturer|   model|engines|seats|   engine|plane_age|is_late|label|carrier_index|carrier_encoded|dest_index|   dest_encoded|            features|
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+------------+--------+-------+-----+---------+---------+-------+-----+-------------+---------------+----------+---------------+--------------------+
| N846VA|2023|   12|  8|     658|       -7|     935|       -5|     VX|  1780|   SEA| LAX|     132|  

In [36]:
# Input Data or Live Data

In [37]:
from pyspark.ml import PipelineModel
# from pyspark.ml.classification import LogisticRegressionModel
from pyspark.ml.tuning import CrossValidatorModel

In [38]:
from pyspark.sql import Row

In [39]:
# Load the pipeline model and the cross-validated model

# Load the fitted pipeline model
pipeline_model = PipelineModel.load("model-artifacts-01/fitted_pipeline_model")

# Load the cross-validated model
cv_model = CrossValidatorModel.load("model-artifacts-01/cv_model")

                                                                                

In [40]:
# Example input data or live data 
input_data = Row(
    carrier="AS",
    dest="HNL",
    dep_delay=5,
    arr_delay=5,
    arr_time=1505,
    month=1,
    plane_age=17
)

# Create a Spark DataFrame from the input data
input_df = spark.createDataFrame([input_data])

In [41]:
input_df.show()

+-------+----+---------+---------+--------+-----+---------+
|carrier|dest|dep_delay|arr_delay|arr_time|month|plane_age|
+-------+----+---------+---------+--------+-----+---------+
|     AS| HNL|        5|        5|    1505|    1|       17|
+-------+----+---------+---------+--------+-----+---------+



In [42]:
input_transformed = pipeline_model.transform(input_df)
predictions = cv_model.transform(input_transformed)

In [43]:
predictions.select("features", "prediction", "probability").show(truncate=False)

+------------------------------------------------------------+----------+---------------------------------------+
|features                                                    |prediction|probability                            |
+------------------------------------------------------------+----------+---------------------------------------+
|(83,[0,29,78,79,80,81,82],[1.0,1.0,5.0,5.0,1505.0,1.0,17.0])|1.0       |[0.2226147759073654,0.7773852240926347]|
+------------------------------------------------------------+----------+---------------------------------------+

