In [79]:
# =================== 1. Setup Spark and Import Libraries ===================
from pyspark.sql import SparkSession
from pyspark.ml.regression import RandomForestRegressor, DecisionTreeRegressor, LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import RandomForestRegressor, DecisionTreeRegressor, LinearRegression, GBTRegressor, GeneralizedLinearRegression, IsotonicRegression
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
import argparse
from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import *
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.ml.tuning import *
import traceback
import pickle
import shutil

from pyspark import keyword_only
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml import Pipeline, Transformer, Estimator, PipelineModel
from pyspark.ml.feature import *
from pyspark.sql import functions as F
import json
from pyspark.sql.types import IntegerType, DoubleType, FloatType, StructField, Row
from pyspark.sql.functions import col, sum
from math import pi, cos, sin
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.context import SparkContext as sc
import os

# Initialize Spark Session
# spark = SparkSession.builder.appName("MachineLearningProject").getOrCreate()


In [80]:
# =================== 1. Data Reading ===================
def load_data(spark, input_path, mode):
    """
    Load the input dataset, drop forbidden columns, and validate its structure.

    Args:
        spark (SparkSession): The active Spark session.
        input_path (str): Path to the input CSV file.
        mode (str): Mode of operation ("train" or "predict").
    
    Returns:
        DataFrame: Processed Spark DataFrame.
    """
    forbidden_columns = [
        "ArrTime", "ActualElapsedTime", "AirTime", "TaxiIn", "Diverted",
        "CarrierDelay", "WeatherDelay", "NASDelay", "SecurityDelay", "LateAircraftDelay"
    ]

    try:
        # Read the dataset
        data = spark.read.csv(input_path, header=True, inferSchema=True)

        # Drop forbidden columns
        data = data.drop(*forbidden_columns)

        # Check if the dataset is empty
        if data.count() == 0:
            raise ValueError("The dataset is empty.")

        # Validate the presence of the target variable for training
        if mode == "train" and "ArrDelay" not in data.columns:
            raise ValueError("The target variable 'ArrDelay' is missing.")
    
    except Exception as e:
        print(f"Error reading the dataset: {e}")
        spark.stop()
        raise
    
    return data


In [81]:
# =================== 2. Exploratory Data Analysis (EDA) ===================
def eda(data):
    """
    Perform exploratory data analysis on the dataset, including univariate and multivariate analysis.

    Args:
        data (DataFrame): Spark DataFrame to analyze.

    Returns:
        DataFrame: DataFrame with a feature vector column added for further processing.
    """
    # Univariate analysis: Display statistical summary
    print("Statistical Summary:")
    data.describe().show()

    # Multivariate analysis: Correlations and patterns
    # Select numeric columns for correlation analysis
    numeric_cols = [col for col, dtype in data.dtypes if dtype in ('int', 'double')]

    if numeric_cols:
        # Assemble numeric columns into a single feature vector
        vector_col = "features_vector"
        assembler = VectorAssembler(inputCols=numeric_cols, outputCol=vector_col)
        data = assembler.transform(data)

        # Display correlation matrix for the numeric features
        from pyspark.ml.stat import Correlation
        try:
            correlation_matrix = Correlation.corr(data, vector_col).head()[0]
            print(f"Correlation matrix:\n{correlation_matrix}")
        except Exception as e:
            print(f"Error calculating correlations: {e}")
    else:
        print("No numeric columns available for correlation analysis.")

    return data


In [102]:
TARGET_COLUMN = "ArrDelay"
# Path to Parquet file
FLIGHT_PARQUET_PATH = './data/flights.parquet'
PLANES_PARQUET_PATH = './data/planes.parquet'
PROCESSING_DIR = "data/processing/"
# Path to schema file
PLANE_SCHEMA_PATH = './data/plane-schema.json'
FLIGHT_SCHEMA_PATH = './data/flight-schema.json'
# Load paths
FLIGHT_RAW_PATH = './data/*.csv.bz2'
PLANE_RAW_PATH = './data/plane-data.csv'
# Result paths
PROCESSED_DIR = './data/processed/'
PROCESSED_TRAIN_PARQUET = os.path.join(PROCESSED_DIR, "train.parquet")
PROCESSED_TEST_PARQUET = os.path.join(PROCESSED_DIR, "test.parquet")
PROCESSED_SCHEMA = os.path.join(PROCESSED_DIR, "schema.json")


def load_csv_save_parquet(spark, raw_path, parquet_path, schema_path) -> DataFrame:
    # Read csv
    df = spark.read.csv(
        raw_path,
        header=True,
        inferSchema=True
    )

    schema = df.schema
    schema_json = schema.json()

    # Write the schema JSON to a file
    with open(schema_path, 'w') as f:
        f.write(schema_json)

    # Save DataFrame as Parquet for future use
    df.repartition(1)
    df.write.parquet(parquet_path)

    df = spark.read.parquet(parquet_path, schema=schema)
    return df


def load_parquet(spark, parquet_path, schema_file) -> DataFrame:
    with open(schema_file, 'r') as f:
        schema_json = f.read()

    # Deserialize the JSON string back into a StructType object
    schema_from_file = StructType.fromJson(json.loads(schema_json))

    df = spark.read.parquet(parquet_path, schema=schema_from_file)
    return df


def load(spark, parquet_path, schema_file_path, wildcard_path) -> (DataFrame, DataFrame):
    if os.path.exists(parquet_path):
        # If Parquet exists, load it using the schema files
        df = load_parquet(spark, parquet_path, schema_file_path)
        df_planes = load_parquet(spark, PLANES_PARQUET_PATH, PLANE_SCHEMA_PATH)
    else:
        # If Parquet file does not exist, read CSV files and save as Parquet
        df = load_csv_save_parquet(spark, wildcard_path, parquet_path, schema_file_path)
        df_planes = load_csv_save_parquet(spark, PLANE_RAW_PATH, PLANES_PARQUET_PATH, PLANE_SCHEMA_PATH)
    return df, df_planes


def custom_polar_time_encode(df):
    print(f"Transforming Month, DayofMonth and DayofWeek to polar coordinates.")

    def polar_encoding(value, max_value):
        frac = value / max_value
        circle = 2 * pi
        angle = frac * circle
        return cos(angle), sin(angle)

    # Register UDF for polar encoding
    polar_udf = udf(polar_encoding, "struct<cos:double, sin:double>")

    # Apply polar encoding on 'Month', 'DayofMonth', 'DayOfWeek'
    df = df.withColumn("Month_polar", polar_udf(col("Month"), lit(12))) \
        .withColumn("DayofMonth_polar", polar_udf(col("DayofMonth"),
                                                  when(col("Month") == 2, lit(28))  # February (can adjust for leap year
                                                  # if needed)
                                                  .when(col("Month").isin([4, 6, 9, 11]),
                                                        lit(30))  # Months with 30 days
                                                  .otherwise(lit(31)))) \
        .withColumn("DayOfWeek_polar", polar_udf(col("DayOfWeek"), lit(7)))
    # df = df.drop(*["DayofMonth", "DayOfWeek", "Month"])

    # Subdivide feature pairs into two columns each
    df = df.withColumn("Month_cos", col("Month_polar.cos")) \
        .withColumn("Month_sin", col("Month_polar.sin")) \
        .withColumn("DayofMonth_cos", col("DayofMonth_polar.cos")) \
        .withColumn("DayofMonth_sin", col("DayofMonth_polar.sin")) \
        .withColumn("DayOfWeek_cos", col("DayOfWeek_polar.cos")) \
        .withColumn("DayOfWeek_sin", col("DayOfWeek_polar.sin"))

    df = df.drop(*["DayofMonth_polar", "DayOfWeek_polar", "Month_polar"])

    return df


def static_preprocess(df, df_planes):
    df_planes = df_planes.withColumnRenamed("tailnum", "TailNum")
    df = df.join(df_planes, on="TailNum", how="inner")

    print("Schema before static preprocessing")
    df.printSchema()

    forbidden_cols = [
        "ArrTime",
        "ActualElapsedTime",
        "AirTime",
        "TaxiIn",
        "Diverted",
        "CarrierDelay",
        "WeatherDelay",
        "NASDelay",
        "SecurityDelay",
        "LateAircraftDelay"
    ]
    df = df.drop(*forbidden_cols)

    target_column = "ArrDelay"

    # List of Ordinal features
    cyclic_ordinal_time = [
        'Month',
        'DayofMonth',
        'DayOfWeek'
    ]
    non_cyclic_ordinal_time = ['Year', 'PlaneIssueYear']

    # List of Time features
    quant_time_features = [
        'DepTime',
        'CRSDepTime',
        'CRSArrTime'
    ]

    # List of Quantitative features
    quantitative_features = [
        'CRSElapsedTime',
        'DepDelay',
        'Distance',
        'TaxiOut'
    ]

    # List of Nominal features
    nominal_features = [
        'UniqueCarrier',
        'FlightNum',
        'TailNum',
        'Origin',
        'Dest',
        'Cancelled',
        'CancellationCode',
        'EngineType',
        'AircraftType',
        'Manufacturer',
        'Model',
        "issue_date", "status",
        "type"
    ]

    # WE ARE PREDICTING DELAY. REMOVE CANCELLED FLIGHTS
    df = df.filter("Cancelled != 1")

    # DROP NOMINALS WITH TOO MANY GROUPS OR THAT ARE USELESS
    useless_fea = ["TailNum", "FlightNum", "UniqueCarrier", "CancellationCode", "Cancelled", "issue_date", "status",
                   "type"]
    for fea in useless_fea:
        print(f"Discarding {fea}.")
        nominal_features.remove(fea)
    df = df.drop(*useless_fea)

    # RENAME VARIABLES
    df = df.withColumnRenamed("year", "PlaneIssueYear")
    df = df.withColumnRenamed("engine_type", "EngineType")
    df = df.withColumnRenamed("aircraft_type", "AircraftType")
    df = df.withColumnRenamed("model", "Model")
    df = df.withColumnRenamed("manufacturer", "Manufacturer")

    # CAST QUANTITATIVE COLUMNS TO NUMERIC, SOME ARE STRINGS
    for column in quantitative_features + [target_column]:
        print(f"Forcing {column} to be read as integer.")
        df = df.withColumn(column, col(column).cast(IntegerType()))
    df = df.dropna(subset=[target_column])
    null_count = df.filter(col(target_column).isNull()).count()
    print(f"Number of nulls in {target_column}: {null_count}")

    # CAST HHMM COLUMNS TO MINUTE QUANTITIES
    for column in quant_time_features:  # They are strings hhmm
        print(f"Casting {column} from hhmm to minutes (integer).")
        df = df.withColumn(
            column + "_minutes",
            (F.col(column).substr(1, 2).cast("int") * 60 + F.col(column).substr(3, 2).cast("int"))
        )
        quantitative_features.append(column + "_minutes")
    df = df.drop(*quant_time_features)

    df = custom_polar_time_encode(df)
    ordinal_features = []
    ordinal_features += [fea + "_sin" for fea in cyclic_ordinal_time]
    ordinal_features += [fea + "_cos" for fea in cyclic_ordinal_time]

    return df, quantitative_features, ordinal_features, nominal_features + non_cyclic_ordinal_time


def train_preprocess(df, nominal_features, ordinal_features, quantitative_features, dir_save_params,
                     cardinality_threshold, frequency_threshold, high_cardinality_strategy):
    spark = SparkSession.builder.getOrCreate()
    # -------------------------------- IMPUTER --------------------------------
    # This should be the column, the values considered nulls, and the value to be used to fill

    print("Analyzing medians")
    imputer_maps = {
        fea: {'extra_nulls': [],
              'fill_value': df.approxQuantile(col=fea, probabilities=[0.5], relativeError=0.025)[0]} for fea in
        quantitative_features
    }
    print("Current imputing dictionary: ")
    print(imputer_maps)
    print("Analyzing modes")
    imputer_maps.update({
        fea: {'extra_nulls': ['None'],
              'fill_value': df.groupby(fea).count().orderBy("count", ascending=False).first()[0]} for fea in
        ordinal_features + nominal_features
    })
    print("Filling dictionary: ")
    print(imputer_maps)
    # Convert to JSON and save it
    json_data = json.dumps(imputer_maps, indent=4)

    # Save to a file
    with open(os.path.join(dir_save_params, 'imputer_maps.json'), 'w') as f:
        f.write(json_data)

    # ----------------------------- NOMINAL ENCODER ----------------------------

    def get_sufficiently_frequent(df, fea, frequency_threshold=frequency_threshold):
        total_count = df.count()

        # Group by the column and calculate the normalized frequency
        proportions = df.groupBy(fea).agg(
            (F.count("*") / total_count).alias(f"{fea}_frequency")
        )
        result = proportions.filter(F.col(f"{fea}_frequency") > frequency_threshold).select(fea).collect()
        result = [row[fea] for row in result]
        return result

    feature_to_sufficiently_frequent = {
        fea: get_sufficiently_frequent(df, fea) for fea in nominal_features
    }
    print("Sufficiently frequent values per feature: ")
    print(feature_to_sufficiently_frequent)

    # Map between feature and the encoder and new column name
    nominal_encode_type = {}
    nominal_encoders = {}
    new_nominal = []
    for fea in nominal_features:
        elems_to_preserve = feature_to_sufficiently_frequent[fea]
        df = df.withColumn(
            f"{fea}_aggregated",
            (F.when(~F.col(fea).isin(elems_to_preserve), lit("Other")).otherwise(F.col(fea)))
        )

        if len(elems_to_preserve) + 1 <= cardinality_threshold:
            print(f"Performing One-Hot-Encoding to feature {fea}")
            indexer = StringIndexer(inputCol=f"{fea}_aggregated", outputCol=f"{fea}_index", handleInvalid='keep')
            encoder = OneHotEncoder(inputCol=f"{fea}_index", outputCol=f"{fea}_binary", handleInvalid='keep',
                                    dropLast=True)
            pipeline = Pipeline(stages=[indexer, encoder])
            pipeline_model = pipeline.fit(df)
            nominal_encode_type[f"{fea}_aggregated"] = "binary"
            new_nominal.append(f"{fea}_binary")
            pipeline_model.save(os.path.join(dir_save_params, f'{fea}_aggregated_encoder'))
        elif high_cardinality_strategy == "ignore":
            print(f"Ignoring feature {fea}")
        elif high_cardinality_strategy == "mean":
            print(f"Performing Mean-Target-Encoding to feature {fea}")
            mapping_df = df.groupBy(f"{fea}_aggregated").agg(F.avg("ArrDelay").alias(f"{fea}_mean_enc"))
            if "Other" not in mapping_df.select(f"{fea}_aggregated").distinct().collect():
                mean = float(df.groupBy(TARGET_COLUMN).agg(F.avg("ArrDelay")).collect()[0][0])
                print(mean)
                new_row = Row(f"{fea}_aggregated", f"{fea}_mean_enc")("Other", mean)
                print(new_row)
                # Convert the new row to a DataFrame with the same schema as mapping_df
                new_row_df = spark.createDataFrame([new_row], mapping_df.schema)
                print(new_row_df.show())
                mapping_df = mapping_df.union(new_row_df)
                print(mapping_df.show())
            mapping_df.write.csv(os.path.join(dir_save_params, f'{fea}_aggregated_encoder.csv'), header=True)
            new_nominal.append(f"{fea}_mean_enc")
            nominal_encode_type[f"{fea}_aggregated"] = "mean"
        else:
            raise NotImplementedError(f"Not implemented strategy {high_cardinality_strategy}")

    print("Feature to encoder types:")
    print(nominal_encode_type)
    print("Final nominal variables:")
    print(new_nominal)

    # Convert to JSON and save it
    json_data = json.dumps(nominal_encode_type, indent=4)
    with open(os.path.join(dir_save_params, 'encode_types.json'), 'w') as f:
        f.write(json_data)

    json_data = json.dumps(feature_to_sufficiently_frequent, indent=4)
    with open(os.path.join(dir_save_params, 'non_aggregated.json'), 'w') as f:
        f.write(json_data)

    # -------------------------------- VECTORIZER --------------------------------
    # Quantitative feature assembly
    quant_assembler = VectorAssembler(
        inputCols=quantitative_features,
        outputCol="quant_features_vector"
    )

    # Assemble encoded nominal features
    nominal_assembler = VectorAssembler(
        inputCols=new_nominal,
        outputCol="nominal_features_vector"
    )

    ordinal_assembler = VectorAssembler(
        inputCols=ordinal_features,
        outputCol="ordinal_features_vector"
    )

    # Final feature vector
    final_assembler = VectorAssembler(
        inputCols=["quant_features_vector", "nominal_features_vector", "ordinal_features_vector"],
        outputCol="features"
    )

    # Create a pipeline
    pipeline = Pipeline(stages=[ordinal_assembler,
                                quant_assembler,
                                nominal_assembler,
                                final_assembler
                                ])
    vectorizer = pipeline.fit(df)
    vectorizer.save(os.path.join(dir_save_params, 'vectorizer'))
    # -------------------------------- VECTORIZER --------------------------------


def dynamic_preprocess(df, nominal_features, ordinal_features, quantitative_features, dir_load_params):
    # -------------------------------- IMPUTER --------------------------------
    with open(os.path.join(dir_load_params, 'imputer_maps.json'), 'r') as f:
        imputer_maps = json.load(f)

    for fea in quantitative_features + ordinal_features + nominal_features:
        if len(imputer_maps[fea]['extra_nulls']) > 0:
            df = df.withColumn(fea, when(df[fea].isin(imputer_maps[fea]['extra_nulls']), lit(None)).otherwise(df[fea]))

        if df.filter(col(fea).isNull()).count() > 0:
            value = imputer_maps[fea]['fill_value']
            print(f"Imputing {fea} with {value}")
            df = df.fillna(value, subset=fea)

    # ----------------------------- NOMINAL ENCODER ----------------------------
    with open(os.path.join(dir_load_params, 'encode_types.json'), 'r') as f:
        encode_types = json.load(f)
    with open(os.path.join(dir_load_params, 'non_aggregated.json'), 'r') as f:
        fea_2_non_aggregated = json.load(f)

    for fea, non_aggregated in fea_2_non_aggregated.items():
        df = df.withColumn(
            f"{fea}_aggregated",
            (F.when(~F.col(fea).isin(non_aggregated), lit("Other")).otherwise(F.col(fea)))
        )

    for fea, encode_type in encode_types.items():
        if encode_type == 'binary':
            encoder = PipelineModel.load(os.path.join(dir_load_params, f'{fea}_encoder'))
            df = encoder.transform(df)
        elif encode_type == 'mean':
            encoder = SparkSession.builder.getOrCreate().read.csv(os.path.join(dir_load_params, f'{fea}_encoder.csv'),
                                                                  header=True, inferSchema=True)
            df = df.join(encoder, on=fea, how='left')
            new_var = f"{fea}_mean_enc".replace("_aggregated", "")
            imput_value = encoder.filter(encoder[fea] == "Other").select(new_var).collect()[0][0]
            print(f"Using the following encoder for {fea}")
            print(encoder.show(10))
            print(f"Imputing unrecognized values in {fea} with 'Other'->{imput_value}")
            df = df.fillna(imput_value, subset=new_var)
        else:
            raise NotImplementedError(f"Not implemented encode type {encode_type}")

    # ------------------------------ VECTORIZER --------------------------------
    vectorizer = PipelineModel.load(os.path.join(dir_load_params, 'vectorizer'))
    df = vectorizer.transform(df)
    return df


def assure_existence_directory(directory_path):
    # Check if the directory exists
    if not os.path.exists(directory_path):
        os.makedirs(directory_path)


def preprocess_fit_and_transform(df, df_planes, dir_save_params="./data/"):
    df, quantitative_features, ordinal_features, nominal_features = static_preprocess(df, df_planes)

    if len(os.listdir(dir_save_params)) == 0:
        print("TRAINING DYNAMIC PREPROCESSING PARAMETERS")
        train_preprocess(df, nominal_features, ordinal_features, quantitative_features, dir_save_params,
                         cardinality_threshold=10, frequency_threshold=0.02, high_cardinality_strategy="mean")
    else:
        print("DYNAMIC PREPROCESSING PARAMETERS FOUND. SKIPPING LEARNING.")
    df = dynamic_preprocess(df, nominal_features, ordinal_features, quantitative_features, dir_save_params)
    return df


def split_and_preprocess(df, df_planes, train_frac=0.8, dir_save_params="./data/"):
    train_df, test_df = df.randomSplit([train_frac, 1 - train_frac], seed=42)
    train_df = preprocess_fit_and_transform(train_df, df_planes, dir_save_params=dir_save_params)

    print("TESTING DATA PROCESSING")
    test_df, quantitative_features, ordinal_features, nominal_features = static_preprocess(test_df, df_planes)
    test_df = dynamic_preprocess(test_df, nominal_features, ordinal_features, quantitative_features, dir_save_params)
    return train_df, test_df


# THIS IS THE FUNCTION TO USE TO PREPROCESS VALIDATION DATA PASSED THROUGH CONSOLE <-------------------------------------
def validation_preprocess(df, dir_save_params="./data/"):
    spark = SparkSession.builder.appName("MachineLearningProject").getOrCreate()
    df_planes = load_parquet(spark, PLANES_PARQUET_PATH, PLANE_SCHEMA_PATH)
    df, quantitative_features, ordinal_features, nominal_features = static_preprocess(df, df_planes)
    df = dynamic_preprocess(df, nominal_features, ordinal_features, quantitative_features, dir_save_params)
    return df


def load_split_and_preprocess(n_partitions=10, debug=False):
    spark = (SparkSession.builder.appName("MachineLearningProject") # Change this as needed
             .config("spark.executor.memory", "4g")
             .config("spark.driver.memory", "48g")
             .config("spark.memory.fraction", "0.8")
             .config("spark.memory.storageFraction", "0.3")
             .config("spark.driver.maxResultSize", "4g")
             .config("spark.sql.caseSensitive", "true")
             .config("spark.sql.debug.maxToStringFields", "200")
             # .config("spark.local.dir", "./temp/")
             .getOrCreate())

    if not os.path.exists(PROCESSED_TRAIN_PARQUET):
        df, df_planes = load(spark, FLIGHT_PARQUET_PATH, PLANE_SCHEMA_PATH, FLIGHT_RAW_PATH)
        df = df.repartition(n_partitions)

        if debug:
            fraction = 0.01  # Adjust the fraction to select 10% of rows
            df = df.sample(withReplacement=True, fraction=fraction)
            df = df.repartition(1)

        # train_df, test_df = complete_preprocess(df, df_planes, train_frac=0.8)
        assure_existence_directory(PROCESSING_DIR)
        train_df, test_df = split_and_preprocess(df, df_planes, train_frac=0.8, dir_save_params=PROCESSING_DIR)
        print("Finished preprocessing")
        print(train_df.head())
        print(test_df.head())

        print(f"Saving schema to {PROCESSED_SCHEMA}")
        assure_existence_directory(PROCESSED_DIR)
        schema_json = train_df.schema.json()
        with open(PROCESSED_SCHEMA, 'w') as f:
            f.write(schema_json)
        test_df.write.mode('overwrite').parquet(PROCESSED_TEST_PARQUET)
        train_df.write.mode('overwrite').parquet(PROCESSED_TRAIN_PARQUET)
    else:
        with open(PROCESSED_SCHEMA, 'r') as f:
            schema_json = f.read()

        schema = StructType.fromJson(json.loads(schema_json))

        test_df = spark.read.parquet(PROCESSED_TEST_PARQUET, schema=schema)
        train_df = spark.read.parquet(PROCESSED_TRAIN_PARQUET, schema=schema)

        print(test_df.head())
        print(train_df.head())
    spark.stop()

load_split_and_preprocess(debug=False)

Row(PlaneIssueYear_aggregated='1999', Model_aggregated='EMB-145', Dest_aggregated='Other', Origin_aggregated='IAH', Year=2008, Month=1, DayofMonth=1, DayOfWeek=2, CRSElapsedTime=122, ArrDelay=-4, DepDelay=0, Origin='IAH', Dest='TYS', Distance=772, TaxiOut=23, Manufacturer='EMBRAER', Model='EMB-145', AircraftType='Fixed Wing Multi-Engine', EngineType='Turbo-Jet', PlaneIssueYear='1999', DepTime_minutes=600, CRSDepTime_minutes=600, CRSArrTime_minutes=782, Month_cos=0.8660254037844387, Month_sin=0.49999999999999994, DayofMonth_cos=0.9795299412524945, DayofMonth_sin=0.20129852008866006, DayOfWeek_cos=-0.22252093395631434, DayOfWeek_sin=0.9749279121818236, EngineType_aggregated='Turbo-Jet', AircraftType_aggregated='Fixed Wing Multi-Engine', Manufacturer_aggregated='EMBRAER', Year_aggregated='2008', Origin_mean_enc=11.221490596320836, Dest_mean_enc=148.0, EngineType_index=1.0, EngineType_binary=SparseVector(8, {1: 1.0}), AircraftType_index=0.0, AircraftType_binary=SparseVector(5, {0: 1.0}), M

In [84]:
def main():
    """
    Main function to execute the pipeline workflow.
    Accepts command-line arguments for dynamic input/output handling.
    """
    parser = argparse.ArgumentParser(description="Flight Delay Prediction Application")
    parser.add_argument("--mode", type=str, required=True, choices=["train", "predict"], help="Mode: train or predict")
    parser.add_argument("--input", type=str, required=True, help="Path to input CSV file")
    parser.add_argument("--model", type=str, required=True, help="Path to save/load the model")
    parser.add_argument("--output", type=str, help="Path to save predictions (required for predict mode)")

    args = parser.parse_args()

    # Start Spark Session
    spark = SparkSession.builder.appName("FlightDelayPipeline").getOrCreate()

    try:
        # Workflow
        data = load_data(spark, args.input, args.mode)  # Load the dataset
        data = eda(data)
        # data = validation_preprocess(data, args.mode)        # Preprocess the dataset
        # pipeline, _ = feature_engineering(data)        # Perform feature engineering

        if args.mode == "train":
            # Train the model, evaluate it, and optionally save it
            metrics = build_and_train_model(data, pipeline, args.model)
            print(f"Training completed. Evaluation metrics: {metrics}")
        elif args.mode == "predict":
            if not args.output:
                raise ValueError("Output path is required for prediction mode.")
            # Use the trained model to generate predictions
            predict(data, args.model, args.output)

    except Exception as e:
        print("An error occurred:", e)
        traceback.print_exc()

# Add this block to execute the script when running it as a standalone script
#if __name__ == "__main__":
#    main()


In [85]:
import json
from pyspark import StorageLevel
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.functions import col, isnan
from pyspark.storagelevel import StorageLevel

def test_train_and_predict(processed_schema_path, processed_train_path_parquet, processed_test_path_parquet, model_path):
    """
    Function to test the build_and_train_model and predict functions using train and test datasets.

    Args:
        processed_schema_path (str): Path to the schema JSON file.
        processed_train_path_parquet (str): Path to the train.parquet file.
        processed_test_path_parquet (str): Path to the test.parquet file.
        model_path (str): Path to save or load the trained model.

    Returns:
        None
    """
    

    # Configuración de Spark
    spark = SparkSession.builder \
        .appName("Optimización con recursos limitados") \
        .master("local[*]") \
        .config("spark.executor.memory", "2g") \
        .config("spark.driver.memory", "2g") \
        .config("spark.sql.shuffle.partitions", "100") \
        .config("spark.python.worker.timeout", "600") \
        .config("spark.network.timeout", "600s") \
        .getOrCreate()

        
    pipeline = Pipeline(stages=[])
        
    try:
        # Carga el esquema desde el archivo JSON
        print("Loading schema from JSON...")
        with open(processed_schema_path, 'r') as f:
            schema_json = f.read()

        schema = StructType.fromJson(json.loads(schema_json))

        # Carga los DataFrames de Parquet usando el esquema
        print("Loading train and test datasets from Parquet...")
        train_df = spark.read.parquet(processed_train_path_parquet, schema=schema)
        test_df = spark.read.parquet(processed_test_path_parquet, schema=schema)


        train_df = train_df.sample(fraction=0.1)  # Usa una muestra del 10%
        train_df = train_df.repartition(10)  # Reparticionar para evitar carga de memoria


        # Persistir los DataFrames en disco
        train_df.persist(StorageLevel.DISK_ONLY)
        test_df.persist(StorageLevel.DISK_ONLY)

        # Confirmación de datos cargados
        print("Train and test datasets loaded successfully!")
        print(f"Train dataset count: {train_df.count()}")
        print(f"Test dataset count: {test_df.count()}")

        # train_df = train_df.repartition(200)  # Ajusta el número de particiones según el tamaño de los datos

        # Train the model
        print("Training the model...")
        metrics = build_and_train_model(train_df, pipeline, model_path)
        print(f"Training completed. Metrics: {metrics}")

        # Predict using the trained model
        print("Making predictions on the test dataset...")
        output_path = model_path + "_predictions"
        predict(test_df, model_path, output_path)
        print(f"Predictions saved to: {output_path}")

    except Exception as e:
        print(f"An error occurred: {e}")
        print("An error occurred:", e)
        traceback.print_exc()

# # Rutas de ejemplo
# processed_schema_path = "data/processed/schema.json"  # Ruta al archivo JSON del esquema
# processed_train_path_parquet = "data/processed/train.parquet"  # Ruta al archivo Parquet de train
# processed_test_path_parquet = "data/processed/test.parquet"  # Ruta al archivo Parquet de test
# model_path = "data/models/trained_model"

# # Llamada a la función
# test_train_and_predict(processed_schema_path, processed_train_path_parquet, processed_test_path_parquet, model_path)


In [92]:
def validate(data, model_path, output_path):
    """
    Load the trained model and generate predictions for the given data.

    Args:
        data (DataFrame): Spark DataFrame for predictions.
        model_path (str): Path to load the trained model.
        output_path (str): Path to save predictions (CSV).
    
    Returns:
        None
    """
    # Load the trained model
    
    for model_name in os.listdir(model_path):
        model_folder = os.path.join(model_path, model_name)
        print(f"Loading model: {model_folder}")
        model = PipelineModel.load(model_folder)
        name = model_name
        # # Drop the existing 'prediction' column if it exists
        # if "prediction" in data.columns:
        #     print("Dropping existing 'prediction' column...")
        #     data = data.drop("prediction")
    
        # Make predictions on the input data
        print("Validating model...")
        predictions = model.transform(data)
        rmse_evaluator = RegressionEvaluator(labelCol="ArrDelay", predictionCol="prediction", metricName="rmse")
        metrics = {}
        metrics['rmse'] = rmse_evaluator.evaluate(predictions)
        print(f"{name} - Root Mean Square Error (RMSE) on test data: {metrics['rmse']}")

        # Mean Absolute Error (MAE)
        mae_evaluator = RegressionEvaluator(labelCol="ArrDelay", predictionCol="prediction", metricName="mae")
        metrics['mae'] = mae_evaluator.evaluate(predictions)
        print(f"{name} - Mean Absolute Error (MAE) on test data: {metrics['mae']}")

        # R-Squared (R²)
        r2_evaluator = RegressionEvaluator(labelCol="ArrDelay", predictionCol="prediction", metricName="r2")
        metrics['r2'] = r2_evaluator.evaluate(predictions)
        print(f"{name} - R-Squared (R²) on test data: {metrics['r2']}")
        # Save predictions to the specified output path
        
        out_csv_path = os.path.join(output_path, name + "_pred.csv")
        print(f"Saving predictions to {out_csv_path}")
        old_columns = [col for col in predictions.columns if "_" not in col]
        predictions = predictions.select(old_columns).drop(*["features", "scaledFeatures"])
        os.makedirs(output_path, exist_ok=True)  # Ensure output directory exists
        predictions.write.mode("overwrite").csv(out_csv_path, header=True)
        print(f"Predictions saved to: {output_path}")


In [100]:
def build_and_train_model(train_df, model_save_path=None):
    """
    Build, train, evaluate, and optionally save the model using cross-validation with three models.

    Args:
        data (DataFrame): Spark DataFrame with features and labels..
        model_save_path (str): Path to save the trained model (optional).
    
    Returns:
        dict: Evaluation metrics for the trained model.
    """
    # Split data into training and testing sets
    train_data, test_data = train_df.randomSplit([0.8, 0.2], seed=42)

    if "prediction" in train_data.columns:
        train_data = train_data.drop("prediction")
    
    # Initialize metrics dictionary
    all_metrics = {}
    
    
    # Define the models: RandomForestRegressor, DecisionTreeRegressor, LinearRegression
    scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withMean=True, withStd=True)
    rf = RandomForestRegressor(featuresCol="scaledFeatures", labelCol="ArrDelay")
    dt = DecisionTreeRegressor(featuresCol="scaledFeatures", labelCol="ArrDelay")
    lr = LinearRegression(featuresCol="scaledFeatures", labelCol="ArrDelay")
    
    models = {
        'Linear_Regression': lr, 'Decision_Tree': dt, 'Random_Forest': rf
    }
    model_names = list(models.keys())
    # model_names = ['Linear_Regression']
    best = 0 
    best_model = None 
    for name in model_names:
        model = models[name]
        print(f"Training {name} model...")
        
        # Add the current model to the pipeline
        pipeline = Pipeline(stages=[scaler, model])

        # Hyperparameter tuning with cross-validation for the current model
        param_grid_builder = ParamGridBuilder()

        if isinstance(model, RandomForestRegressor):
            param_grid_builder.addGrid(model.numTrees, [10, 50, 100])
        elif isinstance(model, DecisionTreeRegressor):
            param_grid_builder.addGrid(model.maxDepth, [5, 10, 20])
        elif isinstance(model, LinearRegression):
            param_grid_builder.addGrid(model.regParam, [0.1, 0.3, 0.5])

        # Construir la grilla de parámetros
        param_grid = param_grid_builder.build()


        evaluator = RegressionEvaluator(labelCol="ArrDelay", predictionCol="prediction", metricName="rmse")

        # Set up cross-validation
        cv = CrossValidator(
            estimator=pipeline,
            estimatorParamMaps=param_grid,
            evaluator=evaluator,
            numFolds=5
        )

        # Train the model with cross-validation
        cv_model = cv.fit(train_data)

        if "prediction" in train_df.columns:
            data = data.drop("prediction")

        # Generate predictions on the test dataset
        predictions = cv_model.transform(test_data)

        # Evaluate the model using multiple metrics
        metrics = {}
        # Root Mean Square Error (RMSE)
        rmse_evaluator = RegressionEvaluator(labelCol="ArrDelay", predictionCol="prediction", metricName="rmse")
        metrics['rmse'] = rmse_evaluator.evaluate(predictions)
        print(f"{name} - Root Mean Square Error (RMSE) on test data: {metrics['rmse']}")

        # Mean Absolute Error (MAE)
        mae_evaluator = RegressionEvaluator(labelCol="ArrDelay", predictionCol="prediction", metricName="mae")
        metrics['mae'] = mae_evaluator.evaluate(predictions)
        print(f"{name} - Mean Absolute Error (MAE) on test data: {metrics['mae']}")

        # R-Squared (R²)
        r2_evaluator = RegressionEvaluator(labelCol="ArrDelay", predictionCol="prediction", metricName="r2")
        metrics['r2'] = r2_evaluator.evaluate(predictions)
        print(f"{name} - R-Squared (R²) on test data: {metrics['r2']}")

        # Store model-specific metrics
        all_metrics[name] = metrics

        # Save the best model if a save path is provided
        if model_save_path:
            params = cv_model.bestModel.extractParamMap()
            model = models[name]
            model.setParams(**params)
            print(f"Re-Training {best_model} model with params {params}...")
            # Add the current model to the pipeline
            pipeline = Pipeline(stages=[scaler, model])
            pipeline_model = pipeline.fit(train_df)
            path = f"{model_save_path}/retrained_{name}"
            pipeline_model.write().overwrite().save(path)
            print(f"Selected best model retrained saved to: {path}")
    return all_metrics


In [101]:
import json
import os
from pyspark import StorageLevel
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.ml import Pipeline

def test_train_and_predict(processed_schema_path, processed_train_path_parquet, processed_test_path_parquet, model_path, debug=False, skip_training=False):
    """
    Function to test the build_and_train_model and predict functions using train and test datasets.

    Args:
        processed_schema_path (str): Path to the schema JSON file.
        processed_train_path_parquet (str): Path to the train.parquet file.
        processed_test_path_parquet (str): Path to the test.parquet file.
        model_path (str): Path to save or load the trained model.

    Returns:
        None
    """
    # Spark configuration
    spark = SparkSession.builder \
        .appName("Optimización con recursos limitados") \
        .master("local[*]") \
        .config("spark.executor.memory", "18g") \
        .config("spark.driver.memory", "42g") \
        .config("spark.sql.shuffle.partitions", "10") \
        .config("spark.python.worker.timeout", "600") \
        .config("spark.network.timeout", "600s") \
        .config("spark.executor.heartbeatInterval", "60s") \
        .config("spark.driver.extraJavaOptions", "-XX:+UseG1GC") \
        .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC") \
        .getOrCreate()
    
    # Load schema from JSON
    print("Loading schema from JSON...")
    with open(processed_schema_path, 'r') as f:
        schema_json = f.read()
    schema = StructType.fromJson(json.loads(schema_json))

    # Load DataFrames from Parquet
    print("Loading train and test datasets from Parquet...")
    train_df = spark.read.schema(schema).parquet(processed_train_path_parquet)
    test_df = spark.read.schema(schema).parquet(processed_test_path_parquet)

    # Sample and repartition data
    
    if debug:
        train_df = train_df.sample(fraction=0.1).repartition(1)
    else:
        train_df = train_df.repartition(10)
        
    # train_df.persist(StorageLevel.DISK_ONLY)
    # test_df.persist(StorageLevel.DISK_ONLY)

    # Confirm data loading
    print("Train and test datasets loaded successfully!")
    print(f"Train dataset count: {train_df.count()}")
    print(f"Test dataset count: {test_df.count()}")

    # Train the model
    print("Training the model...")
    if not skip_training:
        metrics = build_and_train_model(train_df, model_path)
        print(f"Training completed. Metrics: {metrics}")

    # Predict using the trained model
    print("Making predictions on the test dataset...")
    output_path = model_path + "/predictions/"
    
    validate(test_df, model_path, output_path)
        
        

# Example paths
processed_schema_path = "data/processed/schema.json"
processed_train_path_parquet = "data/processed/train.parquet"
processed_test_path_parquet = "data/processed/test.parquet"
model_path = "data/models/trained_model"

# Call the function
test_train_and_predict(processed_schema_path, processed_train_path_parquet, processed_test_path_parquet, model_path, debug=False, skip_training=False)


Loading schema from JSON...
Loading train and test datasets from Parquet...
Train and test datasets loaded successfully!
Train dataset count: 4724323
Test dataset count: 1181675
Training the model...
Training Linear_Regression model...
Linear_Regression - Root Mean Square Error (RMSE) on test data: 10.514251056604795
Linear_Regression - Mean Absolute Error (MAE) on test data: 7.467296493020642
Linear_Regression - R-Squared (R²) on test data: 0.927236775791577
Re-Training None model with params {}...
Selected best model retrained saved to: data/models/trained_model/retrained_Linear_Regression
Training Decision_Tree model...
Decision_Tree - Root Mean Square Error (RMSE) on test data: 10.517368073736392
Decision_Tree - Mean Absolute Error (MAE) on test data: 6.818047361781439
Decision_Tree - R-Squared (R²) on test data: 0.9271936271480895
Re-Training None model with params {}...
Selected best model retrained saved to: data/models/trained_model/retrained_Decision_Tree
Training Random_Fores

In [22]:
# spark-submit notebook.py --mode train --input path/to/train.csv --model path/to/save_model
# spark-submit notebook.py --mode predict --input path/to/test.csv --model path/to/save_model --output path/to/predictions