In [5]:
# ===================================================================
# === SCRIPT: DATA FUSION (CORRECTED DATE FORMAT & JOIN) =========
# ===================================================================

# === Imports ===
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, hour, date_format, to_timestamp

# === Step 1: Create Spark Session ===
spark = SparkSession.builder.appName("FlightPulse-DataFusion-Corrected").getOrCreate()
print("--- SparkSession Created ---")

# === Step 2: Load and Preprocess Flight Data ===
# (This part remains the same, assuming FLIGHT_TIME processing worked)
try:
    df_flights_raw = spark.read.csv("flights.csv", header=True, inferSchema=True)
    print("--- Flight dataset loaded successfully. ---")
    df_flights = df_flights_raw.withColumn(
        "departure_date",
        to_date(col("FLIGHT_DATE"), "dd-MM-yyyy")
    ).withColumn(
        "departure_hour", # Keep hour for potential analysis later
        hour(to_timestamp(col("FLIGHT_TIME"), "yyyy-MM-dd HH:mm:ss")) # Assumes this format worked
    ).filter(col("DEPARTURE_STATION_CD") == 'LHR')
    print("Flight Data Schema after preprocessing:")
    df_flights.printSchema()
except Exception as e:
    print(f"Error processing flight data: {e}")
    exit()

# === Step 3: Load and Preprocess Weather Data (Corrected) ===
try:
    df_weather_raw = spark.read.csv("london_weather.csv", header=True, inferSchema=True)
    print("\n--- Weather dataset loaded successfully. ---")
except Exception as e:
    print(f"Error loading london_weather.csv: {e}")
    exit()

# --- Preprocess Weather Data (CORRECTED FORMAT & NO HOUR) ---
# Convert the 'date' integer column (yyyyMMdd) to a proper DateType
df_weather = df_weather_raw.withColumn(
    "weather_date",
    to_date(col("date").cast("string"), "yyyyMMdd") # <-- CORRECT FORMAT
)
# We remove timestamp and hour extraction as the data is daily
print("Weather Data Schema after preprocessing:")
df_weather.printSchema()
df_weather.select("date", "weather_date", "mean_temp", "precipitation").show(5) # Show relevant columns

# === Step 4: Perform the Join (Corrected - Date Only) ===
print("\n--- Performing Join on Date Only ---")
# Join flights departing from LHR with LHR weather based only on the date
df_merged = df_flights.join(
    df_weather,
    df_flights["departure_date"] == df_weather["weather_date"], # <-- JOIN ON DATE ONLY
    "inner"
)
print("--- Join Complete ---")

# === Step 5: Show Result ===
print("\n--- Schema of Merged Data ---")
df_merged.printSchema()

print("\n--- Sample of Merged Data ---")
df_merged.select(
    "FLIGHT_DATE", "FLIGHT_TIME", "DEPARTURE_STATION_CD", "ARRIVAL_STATION_CD",
    "departure_date", "weather_date", "mean_temp", "precipitation", "cloud_cover" # Added weather cols
).show(10)

print(f"\nTotal flights in original dataset: {df_flights_raw.count()}")
print(f"Total LHR departures: {df_flights.count()}")
print(f"Total weather records: {df_weather.count()}")
print(f"Total records after join: {df_merged.count()}")

--- SparkSession Created ---
--- Flight dataset loaded successfully. ---
Flight Data Schema after preprocessing:
root
 |-- FLIGHT_DATE: string (nullable = true)
 |-- FLIGHT_TIME: timestamp (nullable = true)
 |-- TIME_OF_DAY: string (nullable = true)
 |-- AIRLINE_CD: string (nullable = true)
 |-- FLIGHT_NO: string (nullable = true)
 |-- DEPARTURE_STATION_CD: string (nullable = true)
 |-- ARRIVAL_STATION_CD: string (nullable = true)
 |-- ARRIVAL_COUNTRY: string (nullable = true)
 |-- ARRIVAL_REGION: string (nullable = true)
 |-- HAUL: string (nullable = true)
 |-- AIRCRAFT_TYPE: string (nullable = true)
 |-- FIRST_CLASS_SEATS: integer (nullable = true)
 |-- BUSINESS_CLASS_SEATS: integer (nullable = true)
 |-- ECONOMY_SEATS: integer (nullable = true)
 |-- TIER1_ELIGIBLE_PAX: integer (nullable = true)
 |-- TIER2_ELIGIBLE_PAX: integer (nullable = true)
 |-- TIER3_ELIGIBLE_PAX: integer (nullable = true)
 |-- departure_date: date (nullable = true)
 |-- departure_hour: integer (nullable = true