# Installing dependencies

In [None]:
!pip install kagglehub
!pip install pyspark
!wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.0/hadoop-aws-3.2.0.jar
!wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.375/aws-java-sdk-bundle-1.11.375.jar


In [None]:
# for deleting files from colab
import shutil

shutil.rmtree('./airline_parquet/')

# Login to Kaggle

In [None]:
import kagglehub

kagglehub.login()

## Downloading datasets from Kaggle

In [None]:
path_2009 = kagglehub.dataset_download("yuanyuwendymu/airline-delay-and-cancellation-data-2009-2018")

print("Path to dataset files:", path_2009)
path_2019 = kagglehub.dataset_download("patrickzel/flight-delay-and-cancellation-dataset-2019-2023")

print("Path to dataset files:", path_2019)

Listing the files that have been collected from kaggle.

In [None]:
import os

files_2009 = os.listdir(path_2009)
print("first files downloaded:", files_2009)
files_2019 = os.listdir(path_2019)
print("second Files downloaded:", files_2019)

Creating a mapping to change the column names because they differ between the 3 kaggle datasets. Changing all to fit the original airline.csv

In [None]:
# Mapping for converting columns in the second and third indexes to the first index
column_mapping = {
    # Second Index to First Index
    'FL_DATE': 'Date',
    'OP_CARRIER': 'UniqueCarrier',
    'OP_CARRIER_FL_NUM': 'FlightNum',
    'ORIGIN': 'Origin',
    'DEST': 'Dest',
    'CRS_DEP_TIME': 'CRSDepTime',
    'DEP_TIME': 'DepTime',
    'DEP_DELAY': 'DepDelay',
    'TAXI_OUT': 'TaxiOut',
    'WHEELS_OFF': 'WheelsOff',
    'WHEELS_ON': 'WheelsOn',
    'TAXI_IN': 'TaxiIn',
    'CRS_ARR_TIME': 'CRSArrTime',
    'ARR_TIME': 'ArrTime',
    'ARR_DELAY': 'ArrDelay',
    'CANCELLED': 'Cancelled',
    'CANCELLATION_CODE': 'CancellationCode',
    'DIVERTED': 'Diverted',
    'CRS_ELAPSED_TIME': 'CRSElapsedTime',
    'ACTUAL_ELAPSED_TIME': 'ActualElapsedTime',
    'AIR_TIME': 'AirTime',
    'DISTANCE': 'Distance',
    'CARRIER_DELAY': 'CarrierDelay',
    'WEATHER_DELAY': 'WeatherDelay',
    'NAS_DELAY': 'NASDelay',
    'SECURITY_DELAY': 'SecurityDelay',
    'LATE_AIRCRAFT_DELAY': 'LateAircraftDelay',

    # Third Index to First Index
    'AIRLINE_CODE': 'UniqueCarrier',  # assuming similar mapping if applicable
    'FL_NUMBER': 'FlightNum',
}

## Using SPARK to process the data
Here we are loading the csv files as spark dataframes to save some space and compute.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.types import LongType, IntegerType, DoubleType, StringType
from pyspark.sql.functions import col


# create Spark session
spark = SparkSession.builder.appName("airline_processing").getOrCreate()

# Define schema corrections
schema_corrections = {
  "Date": StringType(),
  "CRSArrTime": LongType(),
  "CRSDepTime": LongType(),
  "FlightNum": IntegerType(),
  "Cancelled": DoubleType()
}

# Code for processing post 2008 kaggle datasets
def load_and_rename(file_path, column_mapping):
  # Load CSV directly into Spark DataFrame
  spark_df = spark.read.csv(file_path, header=True, inferSchema=True)

  # Remove unnamed columns (any column name containing "Unnamed")
  unnamed_columns = [col for col in spark_df.columns if "Unnamed" in col]
  spark_df = spark_df.drop(*unnamed_columns)

  # Rename columns based on mapping
  for old_col, new_col in column_mapping.items():
    if old_col in spark_df.columns:
      spark_df = spark_df.withColumnRenamed(old_col, new_col)

  for column_name, desired_type in schema_corrections.items():
    if column_name in spark_df.columns:
      if column_name == "Diverted":
        spark_df = spark_df.withColumn(
          "Diverted",
          col(column_name).cast("double").cast(LongType())
        )
      else:
        spark_df = spark_df.withColumn(
          column_name,
          col(column_name).cast(desired_type)
        )

  return spark_df

This is the code for accessing and processing the original kaggle dataset  from 1987 to 2008.

In [None]:
import pandas as pd
from pyspark.sql import SparkSession

# code for processing original, pre 2008 kaggle dataset
# Initialize Spark session
spark = SparkSession.builder \
    .appName("airline_processing") \
    .getOrCreate()

# Define your large CSV file path
csv_file_path = "airline.csv"

# Define the output directory for intermediate Parquet files
intermediate_path = "airline_parquet/"

# Parameters for chunk processing
chunk_size = 100000  # Number of rows per chunk

# Process CSV in chunks using Pandas
chunk_number = 0
import pandas as pd
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("CSVtoParquet").getOrCreate()

# Define schema corrections
schema_corrections = {
    "Date": LongType(),
    "CRSArrTime": LongType(),
    "CRSDepTime": LongType(),
    "FlightNum": IntegerType(),
    "Cancelled": DoubleType()
}

for chunk in pd.read_csv(csv_file_path, chunksize=chunk_size):
  # print(chunk)
  # print("Pandas DataFrame columns:", chunk.columns)

  try:
    # Replace NaNs or inf in date-related columns
    chunk['Year'] = chunk['Year'].replace([float('inf'), -float('inf')], pd.NA).fillna(0).astype(int).astype(str)
    chunk['Month'] = chunk['Month'].replace([float('inf'), -float('inf')], pd.NA).fillna(0).astype(int).astype(str).str.zfill(2)
    chunk['DayofMonth'] = chunk['DayofMonth'].replace([float('inf'), -float('inf')], pd.NA).fillna(0).astype(int).astype(str).str.zfill(2)

    # Create Date column
    # chunk['Date'] = pd.to_datetime(
    #   chunk[['Year', 'Month', 'DayofMonth']].agg('-'.join, axis=1),
    #   format='%Y-%m-%d',
    #   errors='coerce'
    # )

    # Convert Pandas DataFrame to Spark DataFrame
    spark_df = spark.createDataFrame(chunk)

    # Remove unnamed columns
    unnamed_columns = [col for col in spark_df.columns if "Unnamed" in col]
    spark_df = spark_df.drop(*unnamed_columns)

    # Apply schema corrections
    for column_name, desired_type in schema_corrections.items():
      if column_name in spark_df.columns:
        if column_name == "Diverted":
          spark_df = spark_df.withColumn(
            "Date",
            col(column_name).cast("double").cast(LongType())
          )
        else:
          spark_df = spark_df.withColumn(
            column_name,
            col(column_name).cast(desired_type)
          )

    # Save each chunk as a separate Parquet file
    spark_df.write.mode("overwrite").parquet(f"{intermediate_path}/airline_file_{chunk_number}.parquet")

    # Free up memory
    del spark_df
    del chunk

    chunk_number += 1

  except Exception as e:
    print(f"Error processing chunk {chunk_number}: {e}")
    continue

In [None]:
# sanity check
chunk_number = 99
chunk_number

This is the code for accessing and processing the 2 newer kaggle datasets ranging from 2009 to 2023.

In [None]:
# process files
intermediate_path = "airline_parquet/"
sparks_dfs = []
# i = 0
# spark_df = load_and_rename("airline.csv", column_mapping)
# spark_df.write.parquet(f"{intermediate_path}/airline_file_{i}.parquet")
# i += 1

for files in files_2009:
    path = path_2009 + '/' + files
    spark_df = load_and_rename(path, column_mapping)
    spark_df.write.parquet(f"{intermediate_path}/airline_file_{chunk_number}.parquet")
    chunk_number += 1

spark_df = load_and_rename(path_2019 + r'/flights_sample_3m.csv', column_mapping)
spark_df.write.parquet(f"{intermediate_path}/airline_file_{chunk_number}.parquet")

In [None]:
chunk_number

We are correcting some datatype differences here due to having taken info from 3 different sources.

In [None]:
from pyspark.sql.types import IntegerType, LongType, DoubleType
from pyspark.sql.functions import col

combined_df = spark.read.parquet("./airline_parquet/*.parquet")

# # Define schema corrections
# schema_corrections = {
#     "Date": LongType(),
#     "CRSArrTime": LongType(),
#     "CRSDepTime": LongType(),
#     "FlightNum": IntegerType(),
#     "Cancelled": DoubleType()
#     # Add more columns and their desired types if needed
# }

# # Apply schema corrections
# for column_name, desired_type in schema_corrections.items():
#     combined_df = combined_df.withColumn(column_name, col(column_name).cast(desired_type))

combined_df.printSchema()

## Processing Spark dataframe to get Airline Ranking by Number of Delays

In [None]:
# Get unique carriers
unique_carriers = combined_df.select("UniqueCarrier").distinct()

# Count and show the unique carriers
unique_carriers_count = unique_carriers.count()
print(f"Number of unique carriers: {unique_carriers_count}")

# Show the unique carriers
unique_carriers_list = unique_carriers.collect()
print("Unique carriers:", [row["UniqueCarrier"] for row in unique_carriers_list])


In [None]:
from pyspark.sql import functions as F

# Retrieve Delay rankings
delayed_airlines = combined_df.withColumn(
    "total_delay",
    F.when(
        (F.col("CarrierDelay") > 0) |
        (F.col("WeatherDelay") > 0) |
        (F.col("NASDelay") > 0) |
        (F.col("SecurityDelay") > 0) |
        (F.col("LateAircraftDelay") > 0),
        1
    ).otherwise(0)
)

result = delayed_airlines.groupBy("UniqueCarrier").agg(
    F.sum("total_delay").alias("total_delay")  # Assuming "Description" column exists in carrier_df
).orderBy(F.desc("total_delay"))

# Show the result
row_count = result.count()
result.show(row_count)
# result.show()

Saving the results into a zip file.

In [None]:
result.coalesce(1).write.parquet("airline_delay_ranking.parquet")

In [None]:
!zip -r airline_delay_ranking.zip airline_delay_ranking.parquet

## Saving all Spark data and parquet files to S3

In [None]:
!pip install boto3

In [None]:
from dotenv import load_dotenv
import os

load_dotenv()
AWS_ACCESS_KEY=os.environ.get("AWS_ACCESS_KEY")
AWS_SECRET_ACCESS_KEY=os.environ.get("AWS_SECRET_ACCESS_KEY")

import boto3
from botocore.exceptions import NoCredentialsError

# Define AWS credentials and initialize S3 client
s3 = boto3.client(
    's3',
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
    region_name='us-east-1'  # Set your region
)

# Define the file to upload and target S3 bucket details
# local_file_path = './airline_delay_ranking.zip'
# bucket_name = 'is459-project-output-data'
# s3_file_path = 'airline_delay_ranking.zip'

# try:
#     # Upload the file
#     s3.upload_file(local_file_path, bucket_name, s3_file_path)
#     print(f"File uploaded successfully to s3://{bucket_name}/{s3_file_path}")
# except FileNotFoundError:
#     print("The file was not found")
# except NoCredentialsError:
#     print("Credentials not available")


In [None]:
# save combined spark dataframes
import os

# Define local directory and S3 bucket details
local_directory = './airline_parquet'  # Replace with your directory path
bucket_name = 'is459-project-data'  # Replace with your S3 bucket name
s3_folder = 'kaggle/new'  # Replace with the desired folder path in S3

# Function to upload files in a directory to S3
def upload_files_to_s3(local_path, s3_path):
    for root, dirs, files in os.walk(local_path):
        for file in files:
            # Only upload files with .parquet extension
            if file.endswith(".parquet"):
                local_file_path = os.path.join(root, file)
                s3_file_path = os.path.join(s3_path, os.path.relpath(local_file_path, local_path))

                try:
                    # Upload the file to S3
                    s3.upload_file(local_file_path, bucket_name, s3_file_path)
                    print(f"Uploaded {local_file_path} to s3://{bucket_name}/{s3_file_path}")
                except FileNotFoundError:
                    print(f"The file {file} was not found")
                except NoCredentialsError:
                    print("Credentials not available")

# Upload all Parquet files from local directory to S3
upload_files_to_s3(local_directory, s3_folder)

# Read from kaggle

In [None]:
import boto3
import pandas as pd
from io import BytesIO

# Define S3 bucket details
bucket_name = 'is459-project-data'  # Replace with your S3 bucket name
s3_folder = 'kaggle/new/'  # Replace with the specific folder path in your S3 bucket

# Initialize S3 client
s3 = boto3.client('s3')

def read_parquet_files_from_s3(bucket_name, folder_path):
    # List all objects in the specified S3 folder
    response = s3.list_objects_v2(Bucket=bucket_name, Prefix=folder_path)
    files_data = []

    for obj in response.get('Contents', []):
        file_key = obj['Key']
        # Check if the file is a Parquet file
        if file_key.endswith('.parquet'):
            # Read the file from S3
            s3_object = s3.get_object(Bucket=bucket_name, Key=file_key)
            data = s3_object['Body'].read()
            # Load the Parquet file into a DataFrame
            df = pd.read_parquet(BytesIO(data))
            files_data.append(df)

            print(f"Loaded {file_key} into DataFrame")

    # Combine all DataFrames if necessary
    combined_df = pd.concat(files_data, ignore_index=True) if files_data else None
    return combined_df

# Use the function to read all Parquet files in the specified folder
df = read_parquet_files_from_s3(bucket_name, s3_folder)

# Display the combined DataFrame
if df is not None:
    print(df.head())
else:
    print("No Parquet files found in the specified folder.")
