## Import libraries

In [1]:
import pandas as pd
import os
import boto3
from io import BytesIO, StringIO
from botocore.exceptions import NoCredentialsError, ClientError
import seaborn as sns
import time
import warnings
from pyspark.sql import SparkSession
from pyspark.ml.classification import (
    LogisticRegression,
    RandomForestClassifier,
    GBTClassifier,
    DecisionTreeClassifier
)

# For Pipeline and Feature Transformation
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import (
    VectorAssembler,
    StringIndexer,
    OneHotEncoder,
    StandardScaler,
    MinMaxScaler,
    RobustScaler
)

# For Model Evaluation
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

# For SQL and DataFrame Operations
from pyspark.sql.functions import col, monotonically_increasing_id
from pyspark.sql.types import DoubleType

In [2]:
pd.set_option("display.max_columns", None)

warnings.filterwarnings("ignore")

sns.set(style="whitegrid")

## Create SparkSession

In [3]:
spark = SparkSession.builder \
    .appName("Classification") \
    .config("spark.executor.memory", "16g") \
    .config("spark.driver.memory", "16g") \
    .config("spark.memory.offHeap.enabled", True) \
    .config("spark.memory.offHeap.size", "2g") \
    .config("spark.executor.cores", "2") \
    .config("spark.task.maxFailures", "4") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/01/10 01:40:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark.sparkContext.setLogLevel("ERROR")

## Extract Historical Data from S3 Bucket

Defines a function `read_csv_from_s3_as_df` to fetch a CSV file from the specified S3 bucket and key, reads its contents into a pandas DataFrame, and prints the DataFrame or an error message if retrieval or parsing fails.

In [5]:
def read_csv_from_s3_as_df(bucket, key):
    try:
        # Create an S3 client
        s3 = boto3.client('s3')

        # Get the object from S3
        obj = s3.get_object(Bucket=bucket, Key=key)

        # Read the contents of the file into a pandas DataFrame
        df_pre_clean = pd.read_csv(BytesIO(obj['Body'].read()), header=0)

        return df_pre_clean
    except NoCredentialsError:
        print("Credentials not available")
    except ClientError as e:
        print(f"An error occurred: {e}")
    except Exception as e:
        print(f"An error occurred during DataFrame conversion: {e}")


bucket = 'big-data-team1-bucket'
key = 'cleaned-data/historical_data.csv'
historical_df = read_csv_from_s3_as_df(bucket, key)
if historical_df is not None:
    print(historical_df)
else:
    print("No data returned or error occurred")

         flightdate  day_of_week          airline tail_number dep_airport  \
0        2023-01-02            1     Endeavor Air      N605LR         BDL   
1        2023-01-03            2     Endeavor Air      N605LR         BDL   
2        2023-01-04            3     Endeavor Air      N331PQ         BDL   
3        2023-01-05            4     Endeavor Air      N906XJ         BDL   
4        2023-01-06            5     Endeavor Air      N337PQ         BDL   
...             ...          ...              ...         ...         ...   
6743368  2023-12-31            7  JetBlue Airways      N903JB         SJU   
6743369  2023-12-31            7  JetBlue Airways      N535JB         MCO   
6743370  2023-12-31            7  JetBlue Airways      N354JB         PHL   
6743371  2023-12-31            7  JetBlue Airways      N768JB         PBI   
6743372  2023-12-31            7  JetBlue Airways      N547JB         BDL   

                           dep_cityname deptime_label  dep_delay  \
0      

Defines a function `sample_flights` that samples a fixed number (`n_flights`) of flights per departure airport and date from the provided DataFrame, grouping by specified columns, and handles groups with fewer rows by enabling replacement during sampling.

In [6]:
def sample_flights(dataframe, dep_airport_col='dep_airport', date_col='flightdate', n_flights=10):
    dataframe[date_col] = pd.to_datetime(dataframe[date_col])

    # Group by dep_airport and date
    grouped = dataframe.groupby([dep_airport_col, date_col])

    # Sample 10 flights from each group, allowing replacement if a group has fewer rows
    sampled_data = grouped.apply(lambda x: x.sample(n=n_flights, replace=len(x) < n_flights)).reset_index(drop=True)

    return sampled_data

Samples 10 flights per departure airport and flight date from the `historical_df` DataFrame and outputs the shape of the resulting sampled DataFrame to verify the operation.

In [7]:
historical_df = sample_flights(historical_df, dep_airport_col='dep_airport', date_col='flightdate', n_flights=10)

# Verify the sampling
historical_df.shape

(1192190, 32)

Converts the Pandas DataFrame `historical_df` into a PySpark DataFrame for distributed processing and scalability.

In [8]:
historical_df = spark.createDataFrame(historical_df)

## Train Model (Classification)

Defines a function `prepare_data` to preprocess a PySpark DataFrame by adding an ID column, encoding categorical and target columns using label or one-hot encoding, assembling features into a vector, and applying transformations via a pipeline for training or testing datasets.

In [10]:
def prepare_data(dataframe, target_column="label", encoder_type="label", is_train=True, fitted_pipeline=None):
    # Add an ID column if not present
    dataframe = dataframe.withColumn("id", monotonically_increasing_id())

    # Encode categorical columns
    categorical_columns = [coll for coll, dtype in dataframe.dtypes if dtype == "string"]
    stages = []

    # Add encoders for categorical columns
    for coll in categorical_columns:
        if encoder_type == "label":
            indexer = StringIndexer(inputCol=coll, outputCol=f"{coll}_indexed", handleInvalid="keep")
            stages.append(indexer)
        elif encoder_type == "onehot":
            indexer = StringIndexer(inputCol=coll, outputCol=f"{coll}_indexed", handleInvalid="keep")
            encoder = OneHotEncoder(inputCol=f"{coll}_indexed", outputCol=f"{coll}_encoded")
            stages.append(indexer)
            stages.append(encoder)

    # Encode the target column
    target_indexer = StringIndexer(inputCol=target_column, outputCol="label_indexed", handleInvalid="keep")
    stages.append(target_indexer)

    # Assemble features
    encoded_categorical_columns = [
        f"{coll}_encoded" if encoder_type == "onehot" else f"{coll}_indexed"
        for coll in categorical_columns
    ]
    numeric_columns = [
        coll for coll, dtype in dataframe.dtypes if dtype in ["double", "bigint"] and coll != target_column
    ]
    feature_columns = encoded_categorical_columns + numeric_columns
    vector_assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
    stages.append(vector_assembler)

    # Create a Pipeline
    pipeline = Pipeline(stages=stages)

    if is_train:
        # Fit the pipeline on training data
        fitted_pipeline = pipeline.fit(dataframe)
        dataframe = fitted_pipeline.transform(dataframe)
        return dataframe, fitted_pipeline
    else:
        # Use fitted pipeline for test data
        dataframe = fitted_pipeline.transform(dataframe)
        return dataframe


Defines the `scale_data` function to scale features in training and testing datasets using the specified scaler (`standard`, `minmax`, or `robust`), returning the scaled data.

In [11]:
def scale_data(X_train, X_test, scaler_type='standard'):
    # Step 1: Choose the appropriate scaler
    if scaler_type == 'standard':
        scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)
    elif scaler_type == 'minmax':
        scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")
    elif scaler_type == 'robust':
        scaler = RobustScaler(inputCol="features", outputCol="scaled_features")
    else:
        raise ValueError("Invalid scaler_type. Choose from 'standard', 'minmax', 'robust'.")

    # Step 2: Create a Pipeline
    pipeline = Pipeline(stages=[scaler])

    # Step 3: Fit the pipeline on the training data
    pipeline_model = pipeline.fit(X_train)

    # Step 4: Transform both train and test data
    X_train_scaled = pipeline_model.transform(X_train).drop("features").withColumnRenamed("scaled_features", "features")
    X_test_scaled = pipeline_model.transform(X_test).drop("features").withColumnRenamed("scaled_features", "features")

    return X_train_scaled, X_test_scaled

Defines the `evaluate_classification_models` function to train, evaluate, and save the best classification model from a given list using various metrics such as accuracy, F1 score, precision, and recall, while logging the performance results.

In [12]:
def evaluate_classification_models(X_train, y_train, X_test, y_test, models, save_path="best_model"):
    # Combine features and labels into single DataFrames
    train_data = X_train.join(y_train, "id")
    test_data = X_test.join(y_test, "id")

    model_results = []
    trained_models = {}
    best_model = None
    best_accuracy = 0

    for model in models:
        start_time = time.time()

        # Train the model using a pipeline
        pipeline = Pipeline(stages=[model])
        trained_model = pipeline.fit(train_data)
        trained_models[model.__class__.__name__] = trained_model

        # Make predictions on the test dataset
        predictions = trained_model.transform(test_data)

        # Define evaluators for metrics
        evaluator_accuracy = MulticlassClassificationEvaluator(labelCol="label_indexed", predictionCol="prediction", metricName="accuracy")
        evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label_indexed", predictionCol="prediction", metricName="f1")
        evaluator_precision = MulticlassClassificationEvaluator(labelCol="label_indexed", predictionCol="prediction", metricName="weightedPrecision")
        evaluator_recall = MulticlassClassificationEvaluator(labelCol="label_indexed", predictionCol="prediction", metricName="weightedRecall")
    

        # Calculate metrics for test data
        accuracy = evaluator_accuracy.evaluate(predictions)
        f1_score = evaluator_f1.evaluate(predictions)
        precision = evaluator_precision.evaluate(predictions)
        recall = evaluator_recall.evaluate(predictions)
        inference_time = time.time() - start_time  # Inference time in seconds

        # Log results
        print(f"{model.__class__.__name__} is ready")

        model_results.append({
            "Model-Name": model.__class__.__name__,
            "Test_Accuracy": accuracy,
            "F1_Score": f1_score,
            "Precision": precision,
            "Recall": recall,
            "Inference Time (ms)": inference_time * 1000
        })

        # Update the best model
        if accuracy > best_accuracy:
            best_accuracy = accuracy
            best_model = trained_model

    # Save the best model
    if best_model:
        if not os.path.exists(save_path):
            best_model.save(save_path)
        print(f"Best model saved at {save_path}")

    # Convert results to a pandas DataFrame
    models_df = pd.DataFrame(model_results)
    models_df = models_df.set_index("Model-Name")

    return models_df.sort_values("Test_Accuracy", ascending=False), trained_models

Defines a list of classification models, including Logistic Regression, Random Forest, Gradient-Boosted Trees, and Decision Tree classifiers, configured with specific parameters for training and evaluation.

In [13]:
classification_models = [
    LogisticRegression(featuresCol="features", labelCol="label_indexed", maxIter=10, regParam=0.01),

    RandomForestClassifier(featuresCol="features", labelCol="label_indexed", numTrees=50, seed=42),

    GBTClassifier(featuresCol="features", labelCol="label_indexed", maxIter=10, seed=42),
    
    DecisionTreeClassifier(featuresCol="features", labelCol="label_indexed", maxDepth=5, seed=42)
]

Filters the historical dataset to retain only relevant columns, sets the target column (`dep_delay_tag`) as the label by casting it to `DoubleType`, and removes the original target column to avoid duplication.

In [14]:
# Combine X and y into a single DataFrame
data = historical_df.select(
    *[col(c) for c in historical_df.columns if c not in [
        'dep_delay', "flightdate", "tail_number", "deptime_label",
        "dep_airport", "dep_cityname", "arr_airport", "arr_cityname", "tmin", "tmax", "day_of_week",
        "delay_carrier", "delay_nas", "delay_security", "delay_lastaircraft", "delay_weather"
    ]],  # Keep only desired columns
).withColumn("label", col("dep_delay_tag").cast(DoubleType()))  # Set 'label' column as target variable

# Drop the original target column to avoid duplication
data = data.drop("dep_delay_tag")

Splits the data into training and testing sets, applies data preparation steps (including encoding and feature assembly) on the training data using a pipeline, and transforms the test data using the fitted pipeline for consistent preprocessing.

In [16]:
# Split the data into training and testing sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Prepare the training data
train_data_prepared, pipeline_model = prepare_data(train_data, target_column="label", encoder_type="onehot", is_train=True)

# Prepare the test data using the fitted pipeline
test_data_prepared = prepare_data(test_data, target_column="label", encoder_type="onehot", is_train=False, fitted_pipeline=pipeline_model)

                                                                                

Splits the prepared training and testing data into separate feature and label DataFrames, associating features and indexed labels with unique IDs for model evaluation.

In [17]:
# Split prepared data into features and labels
X_train = train_data_prepared.select("id", "features")
y_train = train_data_prepared.select("id", "label_indexed")

X_test = test_data_prepared.select("id", "features")
y_test = test_data_prepared.select("id", "label_indexed")

Evaluates multiple classification models on the training and testing datasets, calculates performance metrics, and saves the best-performing model to the specified path.

In [18]:
models_class_no_s, trained_no_s = evaluate_classification_models(X_train, y_train, X_test,
                                                                 y_test, classification_models,
                                                                 save_path="goodmodel")

                                                                                

LogisticRegression is ready


                                                                                

RandomForestClassifier is ready


                                                                                

GBTClassifier is ready


                                                                                

DecisionTreeClassifier is ready


                                                                                

Best model saved at goodmodel


Displays all columns except the last one from the classification model evaluation results DataFrame.

In [19]:
models_class_no_s.iloc[:, :-1]

Unnamed: 0_level_0,Test_Accuracy,F1_Score,Precision,Recall
Model-Name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
GBTClassifier,0.88329,0.87632,0.887891,0.88329
DecisionTreeClassifier,0.881242,0.873269,0.887953,0.881242
RandomForestClassifier,0.873855,0.863551,0.884591,0.873855
LogisticRegression,0.873214,0.862819,0.883926,0.873214


Standardizes the feature data in `X_train` and `X_test` using the `StandardScaler`, ensuring that the features have a mean of 0 and a standard deviation of 1, and assigns the scaled datasets to `X_train_ss` and `X_test_ss`.

In [20]:
X_train_ss, X_test_ss = scale_data(X_train, X_test, scaler_type="standard")

                                                                                

Trains and evaluates the classification models using scaled feature data (`X_train_ss` and `X_test_ss`) and their corresponding labels (`y_train` and `y_test`), and displays the evaluation metrics (excluding the inference time) for comparison.

In [21]:
models_class_ss, trained_ss = evaluate_classification_models(X_train_ss, y_train, X_test_ss,
                                                             y_test, classification_models,
                                                             save_path="goodmodel_ss")

models_class_ss.iloc[:, :-1]

                                                                                

LogisticRegression is ready


                                                                                

RandomForestClassifier is ready


                                                                                

GBTClassifier is ready


                                                                                

DecisionTreeClassifier is ready
Best model saved at goodmodel_ss


Unnamed: 0_level_0,Test_Accuracy,F1_Score,Precision,Recall
Model-Name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
GBTClassifier,0.883357,0.876843,0.886812,0.883357
DecisionTreeClassifier,0.878302,0.869213,0.887229,0.878302
RandomForestClassifier,0.873662,0.863511,0.883806,0.873662
LogisticRegression,0.873214,0.862819,0.883926,0.873214


Scales the training (`X_train`) and testing (`X_test`) feature data using Min-Max scaling and assigns the scaled datasets to `X_train_mm` and `X_test_mm` respectively.

In [22]:
X_train_mm, X_test_mm = scale_data(X_train, X_test, scaler_type="minmax")

                                                                                

Trains and evaluates classification models using Min-Max scaled features (`X_train_mm` and `X_test_mm`) and their corresponding labels (`y_train` and `y_test`), saving the best model as `goodmodel_mm`, and displays the evaluation metrics excluding inference time.

In [23]:
models_class_mm, trained_mm = evaluate_classification_models(X_train_mm, y_train, X_test_mm,
                                                             y_test, classification_models,
                                                             save_path="goodmodel_mm")

models_class_mm.iloc[:, :-1]

                                                                                

LogisticRegression is ready


                                                                                

RandomForestClassifier is ready


                                                                                

GBTClassifier is ready


                                                                                

DecisionTreeClassifier is ready
Best model saved at goodmodel_mm


Unnamed: 0_level_0,Test_Accuracy,F1_Score,Precision,Recall
Model-Name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
GBTClassifier,0.883478,0.876245,0.888864,0.883478
DecisionTreeClassifier,0.881196,0.872927,0.888784,0.881196
RandomForestClassifier,0.873654,0.863549,0.883645,0.873654
LogisticRegression,0.873214,0.862819,0.883926,0.873214


Applies Robust Scaler to scale the features of training (`X_train`) and testing (`X_test`) datasets, producing scaled datasets (`X_train_rs` and `X_test_rs`).

In [24]:
X_train_rs, X_test_rs = scale_data(X_train, X_test, scaler_type="robust")

                                                                                

Evaluated multiple classification models with robust scaling on training and testing datasets to identify the best performing model.

In [25]:
models_class_rs, trained_rs = evaluate_classification_models(X_train_rs, y_train, X_test_rs,
                                                             y_test, classification_models,
                                                             save_path="goodmodel_rs")

models_class_rs.iloc[:, :-1]

                                                                                

LogisticRegression is ready


                                                                                

RandomForestClassifier is ready


                                                                                

GBTClassifier is ready


                                                                                

DecisionTreeClassifier is ready
Best model saved at goodmodel_rs


Unnamed: 0_level_0,Test_Accuracy,F1_Score,Precision,Recall
Model-Name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
GBTClassifier,0.863921,0.859699,0.861033,0.863921
DecisionTreeClassifier,0.860831,0.856296,0.857792,0.860831
LogisticRegression,0.845382,0.83163,0.851635,0.845382
RandomForestClassifier,0.724735,0.625055,0.765354,0.724735


Consolidated evaluation results from all scaling techniques and sorted the models based on their test accuracy to compare performance.

In [26]:
models_class_no_s["Scaler"] = "No Scaling"
models_class_ss["Scaler"] = "Standard Scaler"
models_class_mm["Scaler"] = "MinMax Scaler"
models_class_rs["Scaler"] = "Robust Scaler"


all_models = pd.concat([models_class_no_s, models_class_ss, models_class_mm, models_class_rs], axis=0)
all_models = all_models.sort_values(by="Test_Accuracy", ascending=False)
all_models

Unnamed: 0_level_0,Test_Accuracy,F1_Score,Precision,Recall,Inference Time (ms),Scaler
Model-Name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
GBTClassifier,0.883478,0.876245,0.888864,0.883478,90700.886726,MinMax Scaler
GBTClassifier,0.883357,0.876843,0.886812,0.883357,88798.323393,Standard Scaler
GBTClassifier,0.88329,0.87632,0.887891,0.88329,89242.838621,No Scaling
DecisionTreeClassifier,0.881242,0.873269,0.887953,0.881242,77189.481497,No Scaling
DecisionTreeClassifier,0.881196,0.872927,0.888784,0.881196,74924.170971,MinMax Scaler
DecisionTreeClassifier,0.878302,0.869213,0.887229,0.878302,73110.928774,Standard Scaler
RandomForestClassifier,0.873855,0.863551,0.884591,0.873855,141983.590841,No Scaling
RandomForestClassifier,0.873662,0.863511,0.883806,0.873662,110478.057861,Standard Scaler
RandomForestClassifier,0.873654,0.863549,0.883645,0.873654,111692.800522,MinMax Scaler
LogisticRegression,0.873214,0.862819,0.883926,0.873214,97643.759012,No Scaling


## Import Streaming Data

Loaded real-time data from the specified S3 bucket and key into a pandas DataFrame for further processing.

In [27]:
bucket = 'big-data-team1-bucket'
key = 'cleaned-data/realtime_data.csv'
streaming_df = read_csv_from_s3_as_df(bucket, key)

Converted the real-time data from a pandas DataFrame to a PySpark DataFrame for distributed processing.

In [28]:
streaming_df = spark.createDataFrame(streaming_df)

Selected relevant columns from the real-time data, excluding unnecessary features such as delay details and non-essential attributes, to prepare it for further processing.

In [29]:
# Combine X and y into a single DataFrame
realtime_data = streaming_df.select(
    *[col(c) for c in streaming_df.columns if c not in [
        'dep_delay', "flightdate", "tail_number", "deptime_label",
        "dep_airport", "dep_cityname", "arr_airport", "arr_cityname", "tmin", "tmax", "day_of_week",
        "delay_carrier", "delay_nas", "delay_security", "delay_lastaircraft", "delay_weather", "dep_delay_tag"
    ]],  # Keep only desired columns
)

Obtained the data types of all columns in the `realtime_data` DataFrame to verify the structure and ensure compatibility for further processing.

In [30]:
realtime_data.dtypes

[('airline', 'string'),
 ('dep_delay_type', 'string'),
 ('arr_delay', 'bigint'),
 ('arr_delay_type', 'string'),
 ('flight_duration', 'bigint'),
 ('distance_type', 'string'),
 ('manufacturer', 'string'),
 ('model', 'string'),
 ('aicraft_age', 'bigint'),
 ('tavg', 'double'),
 ('prcp', 'double'),
 ('snow', 'double'),
 ('wdir', 'double'),
 ('wspd', 'double'),
 ('pres', 'double')]

Added an ID column to `realtime_data`, encoded categorical columns using `StringIndexer` and `OneHotEncoder`, combined encoded and numeric columns into a feature vector using `VectorAssembler`, and applied the transformations using a pipeline, resulting in a DataFrame with `id` and `features`.

In [31]:
realtime_data = realtime_data.withColumn("id", monotonically_increasing_id())

categorical_columns = [coll for coll, dtype in realtime_data.dtypes if dtype == "string"]
numeric_columns = [coll for coll, dtype in realtime_data.dtypes if dtype in ["double", "bigint"]]

stages = []  # For storing transformations
encoded_categorical_columns = []  # To store names of new indexed columns

for coll in categorical_columns:
    indexed_col = f"{coll}_indexed"
    encoded_col = f"{coll}_encoded"
    indexer = StringIndexer(inputCol=coll, outputCol=indexed_col, handleInvalid="keep")
    encoder = OneHotEncoder(inputCol=f"{coll}_indexed", outputCol=f"{coll}_encoded")
    stages.append(indexer)
    stages.append(encoder)
    encoded_categorical_columns.append(encoded_col)  # Add to encoded column list

real_feature_columns = numeric_columns + encoded_categorical_columns
vector_assembler = VectorAssembler(inputCols=real_feature_columns, outputCol="features")
stages.append(vector_assembler)

pipeline = Pipeline(stages=stages)
pipeline_model = pipeline.fit(realtime_data)

realtime_data = pipeline_model.transform(realtime_data).select("id", "features")

# Show the resulting DataFrame
realtime_data.show(5, truncate=False)

+---+-------------------------------------------------------------------------------------------------------------+
|id |features                                                                                                     |
+---+-------------------------------------------------------------------------------------------------------------+
|0  |(58,[1,2,3,6,7,8,17,25,28,31,37,42],[87.0,21.0,5.88,310.0,31.5,1022.0,1.0,1.0,1.0,1.0,1.0,1.0])              |
|1  |(58,[0,1,2,3,6,7,8,9,17,25,28,31,37,42],[-16.0,43.0,21.0,5.88,310.0,31.5,1022.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|2  |(58,[0,1,2,3,6,7,8,9,17,25,28,31,37,42],[-15.0,68.0,20.0,5.88,310.0,31.5,1022.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|3  |(58,[0,1,2,3,6,7,8,9,17,25,28,31,37,42],[-14.0,62.0,11.0,5.88,310.0,31.5,1022.0,3.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|4  |(58,[0,1,2,3,6,7,8,9,17,27,30,31,37,42],[906.0,55.0,20.0,5.88,310.0,31.5,1022.0,4.0,1.0,1.0,1.0,1.0,1.0,1.0])|
+---+-------------------------------------------------------------------

Displayed the data types of the columns in `realtime_data`, which now include `id` (bigint) and `features` (vector).

In [32]:
realtime_data.dtypes

[('id', 'bigint'), ('features', 'vector')]

Loaded the saved model `goodmodel` for making predictions on new data.

In [33]:
# Load the saved model
loaded_model = PipelineModel.load("goodmodel")



Applied the loaded model `goodmodel` to generate predictions on the `realtime_data`.

In [34]:
predictions = loaded_model.transform(realtime_data)

Displayed the schema of the `predictions` DataFrame to verify the output structure after applying the model.

In [35]:
predictions.printSchema()

root
 |-- id: long (nullable = false)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



Displayed the top 10 rows of the `predictions` DataFrame, including the `features`, `prediction`, and `probability` columns, to analyze the model's output on the real-time data.

In [36]:
predictions.select("features", "prediction", "probability").show(10, truncate=False)

+--------------------------------------------------------------------------------------------------------------+----------+----------------------------------------+
|features                                                                                                      |prediction|probability                             |
+--------------------------------------------------------------------------------------------------------------+----------+----------------------------------------+
|(58,[1,2,3,6,7,8,17,25,28,31,37,42],[87.0,21.0,5.88,310.0,31.5,1022.0,1.0,1.0,1.0,1.0,1.0,1.0])               |1.0       |[0.07804449086734791,0.9219555091326521]|
|(58,[0,1,2,3,6,7,8,9,17,25,28,31,37,42],[-16.0,43.0,21.0,5.88,310.0,31.5,1022.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]) |1.0       |[0.07804449086734791,0.9219555091326521]|
|(58,[0,1,2,3,6,7,8,9,17,25,28,31,37,42],[-15.0,68.0,20.0,5.88,310.0,31.5,1022.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0]) |1.0       |[0.07804449086734791,0.9219555091326521]|
|(58,[0,1,

Combined the original `streaming_df` DataFrame with the `predictions` DataFrame using a common `id` column to integrate predictions into the original dataset, dropped redundant columns, and converted the final DataFrame into a Pandas DataFrame for further analysis, displaying the last 10 rows.

In [42]:
streaming_df = streaming_df.withColumn("id", monotonically_increasing_id())
predictions = predictions.withColumn("id", monotonically_increasing_id())

streaming_with_predictions = streaming_df.join(predictions.select("id", "prediction"), on="id", how="inner")

streaming_with_predictions = streaming_with_predictions.drop("id")

# Convert PySpark DataFrame to Pandas DataFrame
prediction_df = streaming_with_predictions.toPandas()

prediction_df = prediction_df.drop(columns=["dep_delay_tag"])

# Display the Pandas DataFrame
prediction_df.tail(10)

                                                                                

Unnamed: 0,flightdate,day_of_week,airline,tail_number,dep_airport,dep_cityname,deptime_label,dep_delay,dep_delay_type,arr_airport,arr_cityname,arr_delay,arr_delay_type,flight_duration,distance_type,delay_carrier,delay_weather,delay_nas,delay_security,delay_lastaircraft,manufacturer,model,aicraft_age,tavg,tmin,tmax,prcp,snow,wdir,wspd,pres,prediction
42843,2025-01-13,5,Skywest Airlines Inc.,N122SY,ORD,"Chicago, IL",Morning,-3,Low <5min,AUS,"Austin, TX",-36,Low <5min,149,Short Haul >1500Mi,0,0,0,0,0,EMBRAER,170/175,10,-5.73,-9.72,-1.74,0.0,1.05,296.0,23.4,1024.0,0.0
42844,2025-01-13,5,Skywest Airlines Inc.,N918SW,ORD,"Chicago, IL",Morning,-4,Low <5min,BHM,"Birmingham, AL",9,Low <5min,139,Short Haul >1500Mi,0,0,0,0,0,CANADAIR REGIONAL JET,CRJ,22,-5.73,-9.72,-1.74,0.0,1.05,296.0,23.4,1024.0,0.0
42845,2025-01-13,5,United Air Lines Inc.,N815UA,ORD,"Chicago, IL",Afternoon,-1,Low <5min,DCA,"Washington, DC",8,Low <5min,121,Short Haul >1500Mi,0,0,0,0,0,AIRBUS,A319,26,-5.73,-9.72,-1.74,0.0,1.05,296.0,23.4,1024.0,1.0
42846,2025-01-15,7,American Eagle Airlines Inc.,N253NN,ORD,"Chicago, IL",Morning,-4,Low <5min,OKC,"Oklahoma City, OK",-18,Low <5min,127,Short Haul >1500Mi,0,0,0,0,0,EMBRAER,170/175,7,-5.27,-9.38,-1.16,0.0,0.0,217.0,15.84,1031.0,1.0
42847,2025-01-15,7,American Eagle Airlines Inc.,N619AE,ORD,"Chicago, IL",Morning,-7,Low <5min,ALO,"Waterloo, IA",-17,Low <5min,68,Short Haul >1500Mi,0,0,0,0,0,EMBRAER,135/145,26,-5.27,-9.38,-1.16,0.0,0.0,217.0,15.84,1031.0,1.0
42848,2025-01-15,7,United Air Lines Inc.,N14228,ORD,"Chicago, IL",Evening,5,Low <5min,PHL,"Philadelphia, PA",-5,Low <5min,116,Short Haul >1500Mi,0,0,0,0,0,BOEING,737 NG,25,-5.27,-9.38,-1.16,0.0,0.0,217.0,15.84,1031.0,1.0
42849,2025-01-15,7,United Air Lines Inc.,N873UA,ORD,"Chicago, IL",Afternoon,1,Low <5min,ATL,"Atlanta, GA",7,Low <5min,122,Short Haul >1500Mi,0,0,0,0,0,AIRBUS,A319,21,-5.27,-9.38,-1.16,0.0,0.0,217.0,15.84,1031.0,1.0
42850,2025-01-14,6,Spirit Air Lines,N957NK,ORD,"Chicago, IL",Morning,-8,Low <5min,LAX,"Los Angeles, CA",-9,Low <5min,284,Medium Haul <3000Mi,0,0,0,0,0,AIRBUS,A320,2,-9.12,-11.29,-6.96,0.0,0.0,280.0,18.25,1035.0,1.0
42851,2025-01-14,6,Skywest Airlines Inc.,N618UX,ORD,"Chicago, IL",Evening,7,Low <5min,OKC,"Oklahoma City, OK",7,Low <5min,150,Short Haul >1500Mi,0,0,0,0,0,EMBRAER,170/175,5,-9.12,-11.29,-6.96,0.0,0.0,280.0,18.25,1035.0,0.0
42852,2025-01-14,6,United Air Lines Inc.,N803UA,ORD,"Chicago, IL",Evening,16,Medium >15min,EWR,"Newark, NJ",10,Low <5min,129,Short Haul >1500Mi,0,0,0,0,0,AIRBUS,A319,27,-9.12,-11.29,-6.96,0.0,0.0,280.0,18.25,1035.0,1.0


Defined a function `upload_s3_csv` to upload a Pandas DataFrame as a CSV file to a specified S3 bucket and folder, leveraging the AWS Boto3 library for seamless integration with AWS S3 storage.

In [44]:
s3_resource = boto3.Session().resource('s3')

s3_bucket = 'big-data-team1-bucket'
prediction_data_file = "prediction_data.csv"


def upload_s3_csv(filename, folder, dataframe):
    csv_buffer = StringIO()
    dataframe.to_csv(csv_buffer, header=True, index=False)
    s3_resource.Bucket(s3_bucket).Object(os.path.join(folder, filename)).put(Body=csv_buffer.getvalue())

Uploaded the `prediction_df` DataFrame as a CSV file named `prediction_data.csv` to the `prediction-data` folder in the specified S3 bucket using the `upload_s3_csv` function.

In [45]:
upload_s3_csv(prediction_data_file, "prediction-data", prediction_df)