## Importing Libraries

In [None]:
pip install meteostat

Collecting meteostat
  Downloading meteostat-1.6.8-py3-none-any.whl.metadata (4.6 kB)
Downloading meteostat-1.6.8-py3-none-any.whl (31 kB)
Installing collected packages: meteostat
Successfully installed meteostat-1.6.8


In [None]:
# Importing modules

from pyspark import SparkContext
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, TimestampType, IntegerType, DateType, FloatType
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, when, countDistinct
from pyspark.sql import SparkSession

from datetime import datetime

from meteostat import Daily, Stations

from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import udf
from pyspark.sql.functions import broadcast
from pyspark.sql.functions import count, sum, when

import math

## Data Uploading

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
path = "/content/drive/MyDrive/Colab Notebooks/Distributed Data Analysis and Mining/Project/data"

In [None]:
spark = SparkSession.builder \
    .appName("DDAM") \
    .master("local[*]") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

In [None]:
# Create DataFrames an infer the schemas

df_flights = spark.read.csv(path + '/2018.csv', header = True, inferSchema = True)
df_airports = spark.read.csv(path + '/airports.csv', header = True, inferSchema = True)
df_population = spark.read.csv(path + '/population by cities.csv', header = True, sep = ';')

In [None]:
df_population = df_population.withColumn("pop_2018", regexp_replace(col("pop_2018"), ",", "")) \
                           .withColumn("pop_2018", col("pop_2018").cast("integer")) # Replace commas in 'pop_2018' with empty strings and cast to integer

df_population = df_population.withColumn("city", regexp_replace(col("city"), " city", "")) # Remove " city" from the "city" column in df_population

df_population = df_population.withColumnRenamed("city", "city_orig") \
                             .withColumnRenamed("state", "state_orig") # Rename columns for join consistency

## Joining Datasets

In [None]:
# Select specific columns from df_airports

df_airports_selected = df_airports.select("code", "latitude", "longitude", "city", "state")

### Airports

In [None]:
# Perform the first left join (BY ORIGIN)

df = df_flights.join(df_airports_selected, df_flights.ORIGIN == df_airports_selected.code, "left") \
    .drop(df_airports_selected.code) \
    .withColumnRenamed("latitude", "latitude_origin") \
    .withColumnRenamed("longitude", "longitude_origin") \
    .withColumnRenamed("city", "city_origin") \
    .withColumnRenamed("state", "state_origin")

In [None]:
# Perform the second left join (BY DESTINATION)

df = df.join(df_airports_selected, df.DEST == df_airports_selected.code, "left") \
    .drop(df_airports_selected.code) \
    .withColumnRenamed("latitude", "latitude_dest") \
    .withColumnRenamed("longitude", "longitude_dest") \
    .withColumnRenamed("city", "city_dest") \
    .withColumnRenamed("state", "state_dest")

### Population

In [None]:
df = df.join(df_population, ["city_orig", "state_orig"], "left") \
    .drop(df_population.city_orig) \
    .drop(df_population.state_orig) \
    .withColumnRenamed("pop_2018", "population_origin_ok")

### Renaming and Adding Variables

In [None]:
# Define the Haversine distance, which the greatest circle distance between two points on the Earth

def haversine_distance(lat1, lon1, lat2, lon2):
    lon1, lat1, lon2, lat2 = map(math.radians, [lon1, lat1, lon2, lat2]) # Map to radians

    dlon = lon2 - lon1
    dlat = lat2 - lat1
    a = math.sin(dlat / 2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2)**2
    c = 2 * math.asin(math.sqrt(a))
    r = 6371

    return c * r # Haversine formula

haversine_udf = udf(haversine_distance, DoubleType())

In [None]:
# Apply Haversine distance to the data frame

df = df.withColumn(
    "distance_km",
    haversine_udf(
        col("latitude_origin"),
        col("longitude_origin"),
        col("latitude_dest"),
        col("longitude_dest")
    )
)

In [None]:
# Create delaying column

df = df.withColumn(
    "Delayed_status",
    when(df["ARR_DELAY"] > 15, 1).otherwise(0)
)

In [None]:
# Renaming columns

df = df \
    .withColumnRenamed("FL_DATE", "Flight_Date") \
    .withColumnRenamed("OP_CARRIER", "Operating_Carrier") \
    .withColumnRenamed("OP_CARRIER_FL_NUM", "Flight_Number") \
    .withColumnRenamed("ORIGIN", "Origin_Airport") \
    .withColumnRenamed("DEST", "Destination_Airport") \
    .withColumnRenamed("CRS_DEP_TIME", "Scheduled_Departure_Time") \
    .withColumnRenamed("DEP_TIME", "Actual_Departure_Time") \
    .withColumnRenamed("DEP_DELAY", "Departure_Delay_Minutes") \
    .withColumnRenamed("TAXI_OUT", "Taxi_Out_Time") \
    .withColumnRenamed("WHEELS_OFF", "Takeoff_Time") \
    .withColumnRenamed("WHEELS_ON", "Landing_Time") \
    .withColumnRenamed("TAXI_IN", "Taxi_In_Time") \
    .withColumnRenamed("CRS_ARR_TIME", "Scheduled_Arrival_Time") \
    .withColumnRenamed("ARR_TIME", "Actual_Arrival_Time") \
    .withColumnRenamed("ARR_DELAY", "Arrival_Delay_Minutes") \
    .withColumnRenamed("CANCELLED", "Flight_Cancelled") \
    .withColumnRenamed("CANCELLATION_CODE", "Cancellation_Reason_Code") \
    .withColumnRenamed("DIVERTED", "Flight_Diverted") \
    .withColumnRenamed("CRS_ELAPSED_TIME", "Scheduled_Flight_Duration") \
    .withColumnRenamed("ACTUAL_ELAPSED_TIME", "Actual_Flight_Duration") \
    .withColumnRenamed("AIR_TIME", "Airborne_Time") \
    .withColumnRenamed("DISTANCE", "Flight_Distance") \
    .withColumnRenamed("CARRIER_DELAY", "Carrier_Delay_Minutes") \
    .withColumnRenamed("WEATHER_DELAY", "Weather_Delay_Minutes") \
    .withColumnRenamed("NAS_DELAY", "NAS_Delay_Minutes") \
    .withColumnRenamed("SECURITY_DELAY", "Security_Delay_Minutes") \
    .withColumnRenamed("LATE_AIRCRAFT_DELAY", "Late_Aircraft_Delay_Minutes")

### Weather

In [None]:
# Drop duplicates based on the origin airports data

df_unique = df.dropDuplicates(subset=['latitude_orig', 'longitude_orig', 'Flight_Date']) \
    .select(['latitude_orig', 'longitude_orig', 'Flight_Date'])

In [None]:
# Get weather data

def fetch_weather(row):
    try:
        lat, lon, date = row.latitude_orig, row.longitude_orig, row.Flight_Date # Extract coordinates and dates
        date_obj = date if isinstance(date, datetime) else datetime.strptime(str(date), "%Y-%m-%d") # Cast date to DateType

        tavg = wspd = wdir = pres = 0.0 # Initializing weather variables

        stations = Stations().nearby(lat, lon) # Get nearest station
        station = stations.fetch(1)

        if not station.empty:
            data = Daily(station, date_obj, date_obj).fetch()
            if not data.empty:
                tavg = float(data.iloc[0].get("tavg", 0.0))
                wspd = float(data.iloc[0].get("wspd", 0.0))
                wdir = float(data.iloc[0].get("wdir", 0.0))
                pres = float(data.iloc[0].get("pres", 0.0)) # Retrieve data from stations

        return Row(latitude = float(lat),
                   longitude = float(lon),
                   date = date_obj.date(),
                   tavg = tavg,
                   wspd = wspd,
                   wdir = wdir,
                   pres = pres
        ) # Create and return a row with weather data
    except Exception as e:
        print(f"Error fetching data for row {row}: {e}")

        return Row(latitude = float(lat) if lat else 0.0,
                   longitude = float(lon) if lon else 0.0,
                   date = date_obj.date() if date_obj else None,
                   tavg = 0.0,
                   wspd = 0.0,
                   wdir = 0.0,
                   pres = 0.0
        ) # In case of error, return a row full of zeros

In [None]:
weather_rdd = df_unique.rdd.map(fetch_weather) # Get weather into an RDD

weather_schema = StructType([
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("date", DateType(), True),
    StructField("tavg", DoubleType(), True),
    StructField("wspd", DoubleType(), True),
    StructField("wdir", DoubleType(), True),
    StructField("pres", DoubleType(), True),
]) # Build a schema for the data frame

weather_df = spark.createDataFrame(weather_rdd, schema = weather_schema) # Create a dataframe from retrieved data

In [None]:
# Join origin weather to main dataframe

weather_df = weather_df.withColumnRenamed("tavg", "tavg_orig") \
                              .withColumnRenamed("wspd", "wspd_orig") \
                              .withColumnRenamed("wdir", "wdir_orig") \
                              .withColumnRenamed("pres", "pres_orig") \
                              .withColumnRenamed("latitude", "latitude_orig") \
                              .withColumnRenamed("longitude", "longitude_orig") \
                              .withColumnRenamed("date", "Flight_Date") # Rename columns for joining origin data

df = df.join(broadcast(weather_df),
             on = ["latitude_orig", "longitude_orig", "Flight_Date"],
             how = "left")

In [None]:
# Join destination weather to main dataframe

weather_df = weather_df.withColumnRenamed("tavg_orig", "tavg_dest") \
                            .withColumnRenamed("wspd_orig", "wspd_dest") \
                            .withColumnRenamed("wdir_orig", "wdir_dest") \
                            .withColumnRenamed("pres_orig", "pres_dest") \
                            .withColumnRenamed("latitude_orig", "latitude_dest") \
                            .withColumnRenamed("longitude_orig", "longitude_dest") # Rename columns for joining destination data

df = df.join(broadcast(weather_df),
             on = ["latitude_dest", "longitude_dest", "Flight_Date"],
             how = "left")

## Final Schema

In [None]:
df = df.withColumn("Flight_Date", col("Flight_Date").cast(DateType()))
df = df.withColumn("Flight_Number", col("Flight_Number").cast(IntegerType()))
df = df.withColumn("Scheduled_Departure_Time", col("Scheduled_Departure_Time").cast(IntegerType()))
df = df.withColumn("Actual_Departure_Time", col("Actual_Departure_Time").cast(IntegerType()))
df = df.withColumn("Departure_Delay_Minutes", col("Departure_Delay_Minutes").cast(IntegerType()))
df = df.withColumn("Taxi_Out_Time", col("Taxi_Out_Time").cast(FloatType()))
df = df.withColumn("Takeoff_Time", col("Takeoff_Time").cast(FloatType()))
df = df.withColumn("Landing_Time", col("Landing_Time").cast(FloatType()))
df = df.withColumn("Taxi_In_Time", col("Taxi_In_Time").cast(FloatType()))
df = df.withColumn("Scheduled_Arrival_Time", col("Scheduled_Arrival_Time").cast(IntegerType()))
df = df.withColumn("Actual_Arrival_Time", col("Actual_Arrival_Time").cast(IntegerType()))
df = df.withColumn("Arrival_Delay_Minutes", col("Arrival_Delay_Minutes").cast(IntegerType()))
df = df.withColumn("Flight_Cancelled", col("Flight_Cancelled").cast(IntegerType()))
df = df.withColumn("Flight_Diverted", col("Flight_Diverted").cast(IntegerType()))
df = df.withColumn("Scheduled_Flight_Duration", col("Scheduled_Flight_Duration").cast(FloatType()))
df = df.withColumn("Actual_Flight_Duration", col("Actual_Flight_Duration").cast(FloatType()))
df = df.withColumn("Airborne_Time", col("Airborne_Time").cast(FloatType()))
df = df.withColumn("Flight_Distance", col("Flight_Distance").cast(FloatType()))
df = df.withColumn("Carrier_Delay_Minutes", col("Carrier_Delay_Minutes").cast(FloatType()))
df = df.withColumn("Weather_Delay_Minutes", col("Weather_Delay_Minutes").cast(FloatType()))
df = df.withColumn("NAS_Delay_Minutes", col("NAS_Delay_Minutes").cast(FloatType()))
df = df.withColumn("Security_Delay_Minutes", col("Security_Delay_Minutes").cast(FloatType()))
df = df.withColumn("Late_Aircraft_Delay_Minutes", col("Late_Aircraft_Delay_Minutes").cast(FloatType()))
df = df.withColumn("latitude_orig", col("latitude_orig").cast(FloatType()))
df = df.withColumn("longitude_orig", col("longitude_orig").cast(FloatType()))
df = df.withColumn("latitude_dest", col("latitude_dest").cast(FloatType()))
df = df.withColumn("longitude_dest", col("longitude_dest").cast(FloatType()))