In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, lit
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline

In [8]:
# starting spark session

# spark = SparkSession.builder.appName("Flight Delay Prediction").getOrCreate()
spark = (SparkSession.builder.appName("Flight Delay Prediction") \
         .config("spark.executor.memory", "4g") \
         .config("spark.driver.memory", "4g") \
         .getOrCreate())

# Loading the airline dataset
df = spark.read.csv("Airlines.csv",header=True,inferSchema=True)
df.show(5)

+---+-------+------+-----------+---------+---------+----+------+-----+
| id|Airline|Flight|AirportFrom|AirportTo|DayOfWeek|Time|Length|Delay|
+---+-------+------+-----------+---------+---------+----+------+-----+
|  1|     CO|   269|        SFO|      IAH|        3|  15|   205|    1|
|  2|     US|  1558|        PHX|      CLT|        3|  15|   222|    1|
|  3|     AA|  2400|        LAX|      DFW|        3|  20|   165|    1|
|  4|     AA|  2466|        SFO|      DFW|        3|  20|   195|    1|
|  5|     AS|   108|        ANC|      SEA|        3|  30|   202|    0|
+---+-------+------+-----------+---------+---------+----+------+-----+
only showing top 5 rows



In [10]:
# Define a function to standardize airline codes
def standardize_airline_codes(df):
    return df.withColumn("Airline", 
                         when(col("Airline").isin(["AS", "ASA"]), "AS")
                         .when(col("Airline").isin(["AA", "AAL"]), "AA")
                         .when(col("Airline").isin(["AC", "ACA"]), "AC")
                         .when(col("Airline").isin(["AM", "AMX"]), "AM")
                         .when(col("Airline").isin(["CO", "COA"]), "CO")
                         .when(col("Airline").isin(["DL", "DAL"]), "DL")
                         .when(col("Airline").isin(["FX", "FDX"]), "FX")
                         .when(col("Airline").isin(["HA", "HAL"]), "HA")
                         .when(col("Airline").isin(["NW", "NWA"]), "NW")
                         .when(col("Airline").isin(["PO", "PAC"]), "PO")
                         .when(col("Airline").isin(["SW", "SWA"]), "SW")
                         .when(col("Airline").isin(["UA", "UAL"]), "UA")
                         .when(col("Airline") == "5X", "5X")
                         .when(col("Airline").isin(["VS", "VIR"]), "VS")
                         .when(col("Airline").isin(["VB", "VIV"]), "VB")
                         .when(col("Airline").isin(["WS", "WJ"]), "WS")
                         .otherwise(col("Airline")))

# Apply the function to standardize airline codes
df_standardized = standardize_airline_codes(df)

# Show the updated DataFrame
df_standardized.show(5)

+---+-------+------+-----------+---------+---------+----+------+-----+
| id|Airline|Flight|AirportFrom|AirportTo|DayOfWeek|Time|Length|Delay|
+---+-------+------+-----------+---------+---------+----+------+-----+
|  1|     CO|   269|        SFO|      IAH|        3|  15|   205|    1|
|  2|     US|  1558|        PHX|      CLT|        3|  15|   222|    1|
|  3|     AA|  2400|        LAX|      DFW|        3|  20|   165|    1|
|  4|     AA|  2466|        SFO|      DFW|        3|  20|   195|    1|
|  5|     AS|   108|        ANC|      SEA|        3|  30|   202|    0|
+---+-------+------+-----------+---------+---------+----+------+-----+
only showing top 5 rows



In [11]:
# Any duplicate values? replace
duplicate_rows = df.groupBy(df.columns)\
    .count()\
    .where(col('count') > 1)

# Show duplicate rows
duplicate_rows.show()
df = df.dropDuplicates()

+---+-------+------+-----------+---------+---------+----+------+-----+-----+
| id|Airline|Flight|AirportFrom|AirportTo|DayOfWeek|Time|Length|Delay|count|
+---+-------+------+-----------+---------+---------+----+------+-----+-----+
+---+-------+------+-----------+---------+---------+----+------+-----+-----+



In [12]:

# Calculate the number of missing values for each column
missing_values = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])

# Show the count of missing values per column
missing_values.show()

+---+-------+------+-----------+---------+---------+----+------+-----+
| id|Airline|Flight|AirportFrom|AirportTo|DayOfWeek|Time|Length|Delay|
+---+-------+------+-----------+---------+---------+----+------+-----+
|  0|      0|     0|          0|        0|        0|   0|     0|    0|
+---+-------+------+-----------+---------+---------+----+------+-----+



In [13]:
from pyspark.sql.functions import hour, minute, dayofweek
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

# Assuming 'Time' is in HHMM integer format
df = df.withColumn("Hour", (col("Time") / 100).cast("integer"))
df = df.withColumn("Minute", (col("Time") % 100).cast("integer"))

# Encode categorical variables
indexer = StringIndexer(inputCols=["Airline", "AirportFrom", "AirportTo"], outputCols=["Airline_Index", "AirportFrom_Index", "AirportTo_Index"])
df = indexer.fit(df).transform(df)

encoder = OneHotEncoder(inputCols=["Airline_Index", "AirportFrom_Index", "AirportTo_Index"], outputCols=["Airline_Encoded", "AirportFrom_Encoded", "AirportTo_Encoded"])
df = encoder.fit(df).transform(df)

# Assemble all features into one vector column for modeling
assembler = VectorAssembler(inputCols=["Hour", "Minute", "Airline_Encoded", "AirportFrom_Encoded", "AirportTo_Encoded"], outputCol="features")
df = assembler.transform(df)

df.show(5)

# Show the processed DataFrame
df.select("features", "Delay").show(5)


                                                                                

+---+-------+------+-----------+---------+---------+----+------+-----+----+------+-------------+-----------------+---------------+---------------+-------------------+-----------------+--------------------+
| id|Airline|Flight|AirportFrom|AirportTo|DayOfWeek|Time|Length|Delay|Hour|Minute|Airline_Index|AirportFrom_Index|AirportTo_Index|Airline_Encoded|AirportFrom_Encoded|AirportTo_Encoded|            features|
+---+-------+------+-----------+---------+---------+----+------+-----+----+------+-------------+-----------------+---------------+---------------+-------------------+-----------------+--------------------+
|176|     9E|  3955|        LBB|      MEM|        3| 355|   119|    0|   3|    55|         11.0|            108.0|           25.0|(17,[11],[1.0])|  (292,[108],[1.0])| (292,[25],[1.0])|(603,[0,1,13,127,...|
|526|     B6|   742|        SJU|      MCO|        3| 361|   184|    0|   3|    61|         12.0|             60.0|           11.0|(17,[12],[1.0])|   (292,[60],[1.0])| (292,[11]

24/04/15 22:42:15 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


+---+-------+------+-----------+---------+---------+----+------+-----+----+------+-------------+-----------------+---------------+---------------+-------------------+-----------------+--------------------+
| id|Airline|Flight|AirportFrom|AirportTo|DayOfWeek|Time|Length|Delay|Hour|Minute|Airline_Index|AirportFrom_Index|AirportTo_Index|Airline_Encoded|AirportFrom_Encoded|AirportTo_Encoded|            features|
+---+-------+------+-----------+---------+---------+----+------+-----+----+------+-------------+-----------------+---------------+---------------+-------------------+-----------------+--------------------+
|176|     9E|  3955|        LBB|      MEM|        3| 355|   119|    0|   3|    55|         11.0|            108.0|           25.0|(17,[11],[1.0])|  (292,[108],[1.0])| (292,[25],[1.0])|(603,[0,1,13,127,...|
|526|     B6|   742|        SJU|      MCO|        3| 361|   184|    0|   3|    61|         12.0|             60.0|           11.0|(17,[12],[1.0])|   (292,[60],[1.0])| (292,[11]

In [14]:
# Data Splitting

(trainingData, testData) = df.randomSplit([0.7, 0.3], seed=42)

print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

                                                                                

Training Dataset Count: 377864
Test Dataset Count: 161519


In [8]:
ffrom pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Initialize the classifiers
classifiers = [
    LogisticRegression(featuresCol='features', labelCol='Delay'),
    DecisionTreeClassifier(featuresCol='features', labelCol='Delay'),
    RandomForestClassifier(featuresCol='features', labelCol='Delay'),
    GBTClassifier(featuresCol='features', labelCol='Delay')
]

# Prepare an evaluator for different metrics
evaluator = MulticlassClassificationEvaluator(labelCol='Delay', predictionCol="prediction")

results = []

for classifier in classifiers:
    # Set up the parameter grid
    paramGrid = ParamGridBuilder().build()  # Customize grids for each model if needed

    # Set up cross-validation
    cv = CrossValidator(estimator=classifier,
                        estimatorParamMaps=paramGrid,
                        evaluator=evaluator,
                        numFolds=5)

    # Fit model
    cvModel = cv.fit(trainingData)

    # Predict on test data
    predictions = cvModel.transform(testData)

    # Calculate metrics
    accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
    f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})
    precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
    recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})

    # Store results
    results.append((classifier.__class__.__name__, accuracy, f1, precision, recall))

# Output results
for result in results:
    print(f"Model: {result[0]}, Accuracy: {result[1]:.2f}, F1: {result[2]:.2f}, Precision: {result[3]:.2f}, Recall: {result[4]:.2f}")


24/04/15 22:42:29 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/04/15 22:42:29 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
                                                                                

In [12]:
#LLOGISTIC REGRESSION

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Binary Classification Evaluator for ROC-AUC or area under PR
binaryEvaluator = BinaryClassificationEvaluator(labelCol='Delay')

print("Logistic Regression")

predictions = cvModel.transform(testData)
accuracy = evaluator.evaluate(predictions)
print("Test Accuracy: ", accuracy)

# Multiclass Classification Evaluator for F1, Precision, Recall
multiEvaluator = MulticlassClassificationEvaluator(labelCol='Delay', metricName='f1')
f1_score = multiEvaluator.evaluate(predictions)
print("F1 Score: ", f1_score)

# Changing the metric name for precision and recall
precision = multiEvaluator.setMetricName('weightedPrecision').evaluate(predictions)
recall = multiEvaluator.setMetricName('weightedRecall').evaluate(predictions)
print("Precision: ", precision)
print("Recall: ", recall)


Logistic Regression


                                                                                

Test Accuracy:  0.6868242171859389
F1 Score:  0.633762688272908
Precision:  0.6422725744686333
Recall:  0.6433670342188844


In [17]:
#Solving the same thing using pandas

In [15]:
import pandas as pd
from sklearn.model_selection import train_test_split

# Load your dataset
df = pd.read_csv('Airlines.csv')

# Handling missing values by imputation or removal
df = df.fillna(method='ffill')  # Forward fill for simplicity

# Removing duplicates if any
df = df.drop_duplicates()

# Split data into training and testing sets
train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)


Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd
  df = df.fillna(method='ffill')  # Forward fill for simplicity


In [16]:
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline

# Selecting features and target
features = ['Airline', 'Flight', 'AirportFrom', 'AirportTo', 'DayOfWeek', 'Time']  # example features
target = 'Delay'  # Target variable

# Creating a pipeline for transforming data
categorical_features = ['Airline', 'AirportFrom', 'AirportTo']
numeric_features = ['DayOfWeek', 'Time']  # Assuming these are the only numeric features

# Preprocessing for categorical data
categorical_transformer = Pipeline(steps=[
    ('encoder', OneHotEncoder(handle_unknown='ignore'))
])

# Preprocessing for numerical data
numeric_transformer = Pipeline(steps=[
    ('scaler', StandardScaler())
])

# Combining preprocessing steps
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ])

# Applying preprocessing to training data
X_train = preprocessor.fit_transform(train_df[features])
y_train = train_df[target].values

# Applying preprocessing to testing data
X_test = preprocessor.transform(test_df[features])
y_test = test_df[target].values


In [6]:



from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score, precision_score, recall_score

# Define the classifiers
classifiers = {
    'Logistic Regression': LogisticRegression(),
    'Random Forest': RandomForestClassifier(n_estimators=100),
    'Gradient Boosting': GradientBoostingClassifier(n_estimators=100)
}

# Training and evaluating the classifiers
results = {}
for name, clf in classifiers.items():
    # Create a pipeline with the preprocessor and classifier
    pipeline = Pipeline(steps=[('preprocessor', preprocessor),
                               ('classifier', clf)])
    
    # Train the model
    pipeline.fit(train_df[features], train_df[target])
    
    # Make predictions
    y_pred = pipeline.predict(test_df[features])
    
    # Evaluate the model
    results[name] = {
        'Accuracy': accuracy_score(test_df[target], y_pred),
        'F1 Score': f1_score(test_df[target], y_pred, average='weighted'),
        'ROC AUC': roc_auc_score(test_df[target], pipeline.predict_proba(test_df[features])[:, 1]),
        'Precision': precision_score(test_df[target], y_pred, average='weighted'),
        'Recall': recall_score(test_df[target], y_pred, average='weighted')
    }

# Print results
for result in results:
    print(f"{result} Performance: {results[result]}")


Logistic Regression Performance: {'Accuracy': 0.6458559285111748, 'F1 Score': 0.637416749335298, 'ROC AUC': 0.6909410382872345, 'Precision': 0.6439162365744134, 'Recall': 0.6458559285111748}
Random Forest Performance: {'Accuracy': 0.6177683843636734, 'F1 Score': 0.6169814437827008, 'ROC AUC': 0.6545554382762797, 'Precision': 0.6165126457561193, 'Recall': 0.6177683843636734}
Gradient Boosting Performance: {'Accuracy': 0.6491559832030924, 'F1 Score': 0.626767416598852, 'ROC AUC': 0.6947004780612609, 'Precision': 0.658614737293002, 'Recall': 0.6491559832030924}
