In [0]:
# Databricks notebook source
from pymongo import MongoClient
from urllib.parse import quote_plus

user = quote_plus("dinhtrongquynh240")
password = quote_plus("Quynh@27")
uri = f"mongodb+srv://{user}:{password}@quynhdata.cxcihb8.mongodb.net/airlines?retryWrites=true&w=majority"

# COMMAND ----------
#+==========================
# CELL 1: Import Libraries
#========================================
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pymongo import MongoClient, ReplaceOne
import json
from datetime import datetime

print(" Libraries imported successfully")

# COMMAND ----------
#========
# CELL 2: Load CSV Files
#==================================
BASE_PATH = "/Volumes/workspace/default/data/flight data/"

# Load airlines
airlines_df = spark.read.csv(
    f"{BASE_PATH}airlines.csv",
    header=True,
    inferSchema=True
)

# Load airports  
airports_df = spark.read.csv(
    f"{BASE_PATH}airports.csv",
    header=True,
    inferSchema=True
)

# Load flights
flights_df = spark.read.csv(
    f"{BASE_PATH}flights.csv",
    header=True,
    inferSchema=True
)

print(" CSV files loaded successfully")
print(f"Airlines: {airlines_df.count()} rows")
print(f"Airports: {airports_df.count()} rows")
print(f"Flights: {flights_df.count()} rows")

# COMMAND ----------

# CELL 3: Explore Data
print("\n=== AIRLINES DATA ===")
display(airlines_df.limit(5))

print("\n=== AIRPORTS DATA ===")
display(airports_df.limit(5))

print("\n=== FLIGHTS DATA (Sample) ===")
display(flights_df.select(
    "AIRLINE", "FLIGHT_NUMBER", "ORIGIN_AIRPORT", "DESTINATION_AIRPORT",
    "DEPARTURE_DELAY", "ARRIVAL_DELAY", "CANCELLED"
).limit(5))

# COMMAND ----------
# ===============================================
#  CELL 4: Data Cleaning và Transformation 
# ==============================

print("\n=== 🧹 DATA CLEANING & TRANSFORMATION ===")


# 1 Kiểm tra giá trị null

print("\n Kiểm tra giá trị null.")
null_counts = flights_df.select([
    count(when(col(c).isNull(), c)).alias(c) for c in flights_df.columns
])
display(null_counts)

# 2 Loại bỏ dòng không có mã hãng bay

flights_clean = flights_df.filter(col("AIRLINE").isNotNull())

# 3 Chuyển đổi kiểu dữ liệu

flights_clean = (
    flights_clean
    .withColumn("YEAR", col("YEAR").cast(IntegerType()))
    .withColumn("MONTH", col("MONTH").cast(IntegerType()))
    .withColumn("DAY", col("DAY").cast(IntegerType()))
    .withColumn("DAY_OF_WEEK", col("DAY_OF_WEEK").cast(IntegerType()))
    .withColumn("FLIGHT_NUMBER", col("FLIGHT_NUMBER").cast(IntegerType()))
    .withColumn("SCHEDULED_DEPARTURE", col("SCHEDULED_DEPARTURE").cast(IntegerType()))
    .withColumn("DEPARTURE_TIME", col("DEPARTURE_TIME").cast(IntegerType()))
    .withColumn("SCHEDULED_ARRIVAL", col("SCHEDULED_ARRIVAL").cast(IntegerType()))
    .withColumn("ARRIVAL_TIME", col("ARRIVAL_TIME").cast(IntegerType()))
    .withColumn("DEPARTURE_DELAY", col("DEPARTURE_DELAY").cast(DoubleType()))
    .withColumn("ARRIVAL_DELAY", col("ARRIVAL_DELAY").cast(DoubleType()))
    .withColumn("TAXI_OUT", col("TAXI_OUT").cast(DoubleType()))
    .withColumn("TAXI_IN", col("TAXI_IN").cast(DoubleType()))
    .withColumn("AIR_TIME", col("AIR_TIME").cast(DoubleType()))
    .withColumn("DISTANCE", col("DISTANCE").cast(DoubleType()))
    .withColumn("DIVERTED", col("DIVERTED").cast(IntegerType()))
    .withColumn("CANCELLED", col("CANCELLED").cast(IntegerType()))
    .withColumn("AIR_SYSTEM_DELAY", col("AIR_SYSTEM_DELAY").cast(DoubleType()))
    .withColumn("SECURITY_DELAY", col("SECURITY_DELAY").cast(DoubleType()))
    .withColumn("AIRLINE_DELAY", col("AIRLINE_DELAY").cast(DoubleType()))
    .withColumn("LATE_AIRCRAFT_DELAY", col("LATE_AIRCRAFT_DELAY").cast(DoubleType()))
    .withColumn("WEATHER_DELAY", col("WEATHER_DELAY").cast(DoubleType()))
)

# 4 Bổ sung xử lý cột lý do hủy chuyến

cancellation_map = {
    "A": "Airline/Carrier",
    "B": "Weather",
    "C": "National Air System",
    "D": "Security"
}

@udf(StringType())
def map_cancellation_reason(code):
    if code is None:
        return None
    return cancellation_map.get(code, "Unknown")

flights_clean = flights_clean.withColumn(
    "CANCELLATION_REASON_TEXT", map_cancellation_reason(col("CANCELLATION_REASON"))
)


# 5 Xử lý giá trị null còn lại

flights_clean = flights_clean.fillna({
    "DEPARTURE_DELAY": 0.0,
    "ARRIVAL_DELAY": 0.0,
    "AIR_TIME": 0.0,
    "DISTANCE": 0.0,
    "CANCELLED": 0,
    "DIVERTED": 0,
    "AIR_SYSTEM_DELAY": 0.0,
    "SECURITY_DELAY": 0.0,
    "AIRLINE_DELAY": 0.0,
    "LATE_AIRCRAFT_DELAY": 0.0,
    "WEATHER_DELAY": 0.0
})

# 6 Chuẩn hóa thời gian (HHmm → timestamp)

def convert_time_to_minutes(colname):
    """Chuyển HHMM (vd: 1345) thành tổng phút"""
    return when(col(colname).isNotNull(),
                (floor(col(colname) / 100) * 60) + (col(colname) % 100)
            ).otherwise(None)

flights_clean = flights_clean.withColumn("SCHED_DEP_MIN", convert_time_to_minutes("SCHEDULED_DEPARTURE"))
flights_clean = flights_clean.withColumn("DEP_MIN", convert_time_to_minutes("DEPARTURE_TIME"))
flights_clean = flights_clean.withColumn("SCHED_ARR_MIN", convert_time_to_minutes("SCHEDULED_ARRIVAL"))
flights_clean = flights_clean.withColumn("ARR_MIN", convert_time_to_minutes("ARRIVAL_TIME"))


# 7 Tạo cột phân loại thời gian

flights_clean = (
    flights_clean
    .withColumn("IS_WEEKEND", when(col("DAY_OF_WEEK").isin([6, 7]), 1).otherwise(0))
    .withColumn("DEP_HOUR", floor(col("SCHEDULED_DEPARTURE") / 100))
    .withColumn("TIME_OF_DAY",
        when((col("DEP_HOUR") >= 5) & (col("DEP_HOUR") < 12), "Morning")
        .when((col("DEP_HOUR") >= 12) & (col("DEP_HOUR") < 18), "Afternoon")
        .when((col("DEP_HOUR") >= 18) & (col("DEP_HOUR") < 24), "Evening")
        .otherwise("Night")
    )
    .withColumn("PEAK_PERIOD",
        when((col("DEP_HOUR").between(7,9)) | (col("DEP_HOUR").between(16,19)), "Peak").otherwise("Off-Peak")
    )
)


# 8 Cột IS_DELAYED và phân loại delay

flights_clean = (
    flights_clean
    .withColumn("IS_DELAYED", when(col("DEPARTURE_DELAY") > 15, 1).otherwise(0))
    .withColumn(
        "DELAY_CATEGORY",
        when(col("DEPARTURE_DELAY") <= 0, "On Time/Early")
        .when(col("DEPARTURE_DELAY") <= 15, "Slight Delay")
        .when(col("DEPARTURE_DELAY") <= 60, "Moderate Delay")
        .when(col("DEPARTURE_DELAY") <= 180, "Severe Delay")
        .otherwise("Extreme Delay")
    )
)

# 9 Tổng delay theo nguyên nhân

flights_clean = flights_clean.withColumn(
    "TOTAL_DELAY",
    col("AIR_SYSTEM_DELAY") + col("SECURITY_DELAY") + col("AIRLINE_DELAY") +
    col("LATE_AIRCRAFT_DELAY") + col("WEATHER_DELAY")
)

print(f" Dữ liệu gốc: {flights_df.count()} → Sau làm sạch: {flights_clean.count()} dòng")

#  Hiển thị mẫu dữ liệu
display(flights_clean.select(
    "YEAR","MONTH","DAY","DAY_OF_WEEK","AIRLINE","FLIGHT_NUMBER",
    "ORIGIN_AIRPORT","DESTINATION_AIRPORT","DEPARTURE_DELAY","ARRIVAL_DELAY",
    "CANCELLED","CANCELLATION_REASON_TEXT","TIME_OF_DAY","PEAK_PERIOD",
    "TOTAL_DELAY"
).limit(10))

# COMMAND ----------

# CELL 5: Join với Airlines và Airports
print("\n=== JOINING DATA ===")

# Join với airlines
flights_with_airline = flights_clean.join(
    airlines_df,
    flights_clean["AIRLINE"] == airlines_df["IATA_CODE"],
    "left"
).select(
    flights_clean["*"],
    airlines_df["AIRLINE"].alias("AIRLINE_NAME")
)
print(" Joined with airlines")

# Join với origin airports - GIỮ LẠI CỘT ORIGIN_AIRPORT
flights_with_origin = flights_with_airline.join(
    airports_df.alias("origin"),
    flights_with_airline["ORIGIN_AIRPORT"] == col("origin.IATA_CODE"),
    "left"
).select(
    flights_with_airline["*"],  # Giữ tất cả cột gốc bao gồm ORIGIN_AIRPORT
    col("origin.AIRPORT").alias("ORIGIN_AIRPORT_NAME"),
    col("origin.CITY").alias("ORIGIN_CITY"),
    col("origin.STATE").alias("ORIGIN_STATE"),
    col("origin.COUNTRY").alias("ORIGIN_COUNTRY")
)
print(" Joined with origin airports")

# Join với destination airports - GIỮ LẠI CỘT DESTINATION_AIRPORT
flights_complete = flights_with_origin.join(
    airports_df.alias("dest"),
    flights_with_origin["DESTINATION_AIRPORT"] == col("dest.IATA_CODE"),
    "left"
).select(
    flights_with_origin["*"],  # Giữ tất cả cột gốc bao gồm DESTINATION_AIRPORT
    col("dest.AIRPORT").alias("DEST_AIRPORT_NAME"),
    col("dest.CITY").alias("DEST_CITY"),
    col("dest.STATE").alias("DEST_STATE"),
    col("dest.COUNTRY").alias("DEST_COUNTRY")
)
print(" Joined with destination airports")

# Show sample - KIỂM TRA CÁC CỘT CÓ SẴN
print("\n Columns available after join:")
print(flights_complete.columns)

display(flights_complete.select(
    "AIRLINE", "AIRLINE_NAME",
    "ORIGIN_AIRPORT", "ORIGIN_CITY", "ORIGIN_STATE",
    "DESTINATION_AIRPORT", "DEST_CITY", "DEST_STATE",
    "DEPARTURE_DELAY", "CANCELLATION_REASON_TEXT"
).limit(5))

# COMMAND ----------
#=============
# CELL 6: Tính toán KPIs cho Airlines
#=====================================
airline_performance = flights_complete.groupBy(
    "AIRLINE",
    "AIRLINE_NAME"
).agg(
    # Basic counts
    count("*").alias("total_flights"),
    sum("CANCELLED").alias("cancelled_flights"),
    sum("DIVERTED").alias("diverted_flights"),
    sum(when(col("DEPARTURE_DELAY") <= 15, 1).otherwise(0)).alias("on_time_flights"),
    
    # Delay metrics
    avg(when(col("DEPARTURE_DELAY") > 0, col("DEPARTURE_DELAY"))).alias("avg_departure_delay"),
    avg(when(col("ARRIVAL_DELAY") > 0, col("ARRIVAL_DELAY"))).alias("avg_arrival_delay"),
    max("DEPARTURE_DELAY").alias("max_departure_delay"),
    max("ARRIVAL_DELAY").alias("max_arrival_delay"),
    
    # Cancellation reasons
    sum(when(col("CANCELLATION_REASON") == "A", 1).otherwise(0)).alias("cancelled_airline_reason"),
    sum(when(col("CANCELLATION_REASON") == "B", 1).otherwise(0)).alias("cancelled_weather_reason"),
    sum(when(col("CANCELLATION_REASON") == "C", 1).otherwise(0)).alias("cancelled_nas_reason"),
    sum(when(col("CANCELLATION_REASON") == "D", 1).otherwise(0)).alias("cancelled_security_reason"),
    
    # Delay categories
    sum(when((col("DEPARTURE_DELAY") > 15) & (col("DEPARTURE_DELAY") <= 60), 1).otherwise(0)).alias("moderate_delays"),
    sum(when(col("DEPARTURE_DELAY") > 60, 1).otherwise(0)).alias("severe_delays"),
    
    # Operational metrics
    avg("DISTANCE").alias("avg_distance"),
    avg("AIR_TIME").alias("avg_air_time"),
    avg("TAXI_OUT").alias("avg_taxi_out"),
    avg("TAXI_IN").alias("avg_taxi_in"),
    
    # Airport info
    countDistinct("ORIGIN_AIRPORT").alias("unique_origin_airports"),
    countDistinct("DESTINATION_AIRPORT").alias("unique_dest_airports"),
    
    # Most common routes -
    first("ORIGIN_AIRPORT").alias("most_common_origin"),
    first("DESTINATION_AIRPORT").alias("most_common_destination")
)

print(" Aggregated airline performance")

# COMMAND ----------
# ======================================================
# CELL 7: Tính toán tỉ lệ, thêm các cột thống kê và top 10 sân bay
# ======================================================

from pyspark.sql import Window
from pyspark.sql.functions import (
    col, round, when, lit, coalesce, current_timestamp,
    count, row_number, collect_list, concat_ws
)

print("\n Đang tính toán các tỉ lệ và top 10 sân bay cho mỗi hãng ")

# --- Bước 1: Tính các tỉ lệ và chỉ số cơ bản ---
airline_performance = (
    airline_performance
    .withColumn(
        "on_time_rate",
        round((col("on_time_flights") / col("total_flights")) * 100, 2)
    )
    .withColumn(
        "cancellation_rate",
        round((col("cancelled_flights") / col("total_flights")) * 100, 2)
    )
    .withColumn(
        "diversion_rate",
        round((col("diverted_flights") / col("total_flights")) * 100, 2)
    )
    .withColumn(
        "avg_delay_minutes",
        round(coalesce(col("avg_departure_delay"), lit(0.0)), 2)
    )
    .withColumn(
        "avg_arrival_delay_minutes",
        round(coalesce(col("avg_arrival_delay"), lit(0.0)), 2)
    )
    .withColumn(
        "cancellation_airline_pct",
        round(when(col("cancelled_flights") > 0,
            (col("cancelled_airline_reason") / col("cancelled_flights")) * 100
        ).otherwise(0.0), 2)
    )
    .withColumn(
        "cancellation_weather_pct",
        round(when(col("cancelled_flights") > 0,
            (col("cancelled_weather_reason") / col("cancelled_flights")) * 100
        ).otherwise(0.0), 2)
    )
    .withColumn(
        "cancellation_nas_pct",
        round(when(col("cancelled_flights") > 0,
            (col("cancelled_nas_reason") / col("cancelled_flights")) * 100
        ).otherwise(0.0), 2)
    )
    .withColumn(
        "cancellation_security_pct",
        round(when(col("cancelled_flights") > 0,
            (col("cancelled_security_reason") / col("cancelled_flights")) * 100
        ).otherwise(0.0), 2)
    )
)

# --- Bước 2: Tính top 10 sân bay đi và đến của mỗi hãng -------
print(" Tính top 10 sân bay xuất phát và đến cho mỗi hãng bay...")

origin_counts = (
    flights_complete.groupBy("AIRLINE", "ORIGIN_AIRPORT")
    .agg(count("*").alias("origin_count"))
)
destination_counts = (
    flights_complete.groupBy("AIRLINE", "DESTINATION_AIRPORT")
    .agg(count("*").alias("destination_count"))
)

# Top 10 sân bay xuất phát
origin_window = Window.partitionBy("AIRLINE").orderBy(col("origin_count").desc())
top_origin_airports = (
    origin_counts
    .withColumn("rank", row_number().over(origin_window))
    .filter(col("rank") <= 10)
    .groupBy("AIRLINE")
    .agg(
        collect_list("ORIGIN_AIRPORT").alias("top_origin_list"),
        collect_list("origin_count").alias("top_origin_count_list")
    )
    .withColumn("top_origin", concat_ws(", ", col("top_origin_list")))
    .withColumn("top_origin_count", concat_ws(", ", col("top_origin_count_list")))
    .drop("top_origin_list", "top_origin_count_list")
)

# Top 10 sân bay đến
destination_window = Window.partitionBy("AIRLINE").orderBy(col("destination_count").desc())
top_destination_airports = (
    destination_counts
    .withColumn("rank", row_number().over(destination_window))
    .filter(col("rank") <= 10)
    .groupBy("AIRLINE")
    .agg(
        collect_list("DESTINATION_AIRPORT").alias("top_destination_list"),
        collect_list("destination_count").alias("top_destination_count_list")
    )
    .withColumn("top_destination", concat_ws(", ", col("top_destination_list")))
    .withColumn("top_destination_count", concat_ws(", ", col("top_destination_count_list")))
    .drop("top_destination_list", "top_destination_count_list")
)

#  Bước 3: Gộp vào airline_performance 
airline_performance = (
    airline_performance
    .join(top_origin_airports, on="AIRLINE", how="left")
    .join(top_destination_airports, on="AIRLINE", how="left")
    .withColumn("top_origin", coalesce(col("top_origin"), lit("Unknown")))
    .withColumn("top_destination", coalesce(col("top_destination"), lit("Unknown")))
    .withColumn("top_origin_count", coalesce(col("top_origin_count"), lit("0")))
    .withColumn("top_destination_count", coalesce(col("top_destination_count"), lit("0")))
    .withColumn("last_updated", current_timestamp())
)

print(" Hoàn tất tính toán và gộp top 10 sân bay cho mỗi hãng!")

final_performance = airline_performance.select(
    "AIRLINE", "AIRLINE_NAME", "total_flights",
    "on_time_rate", "cancellation_rate", "diversion_rate",
    "avg_delay_minutes", "avg_arrival_delay_minutes",
    "max_departure_delay", "max_arrival_delay",
    "cancelled_flights", "diverted_flights", "on_time_flights",
    "moderate_delays", "severe_delays",
    "cancelled_airline_reason", "cancelled_weather_reason",
    "cancelled_nas_reason", "cancelled_security_reason",
    "cancellation_airline_pct", "cancellation_weather_pct",
    "cancellation_nas_pct", "cancellation_security_pct",
    "avg_distance", "avg_air_time", "avg_taxi_out", "avg_taxi_in",
    "unique_origin_airports", "unique_dest_airports",
    "most_common_origin", "most_common_destination",
    "top_origin", "top_destination","top_origin_count",
    "top_destination_count",
    "last_updated"
)


# =======================================
# CELL 10: Transform to MongoDB Format
# ======================================
import json
from datetime import datetime

def safe_int(x):
    try:
        if isinstance(x, list):
            return [int(i) for i in x]
        if isinstance(x, str) and "," in x:
            return [int(i.strip()) for i in x.split(",") if i.strip().isdigit()]
        return int(x)
    except:
        return 0

def safe_float(x):
    try:
        return float(x)
    except:
        return 0.0

def safe_str(x):
    try:
        return str(x) if x is not None else "Unknown"
    except:
        return "Unknown"

def transform_to_mongo_format(row):
    return {
        "airline_code": safe_str(row.AIRLINE),
        "airline_name": safe_str(row.AIRLINE_NAME),
        "metrics": {
            "total_flights": safe_int(row.total_flights),
            "on_time_rate": safe_float(row.on_time_rate),
            "cancellation_rate": safe_float(row.cancellation_rate),
            "diversion_rate": safe_float(row.diversion_rate),
            "avg_delay_minutes": safe_float(row.avg_delay_minutes),
            "avg_arrival_delay_minutes": safe_float(row.avg_arrival_delay_minutes),
            "max_departure_delay": safe_float(row.max_departure_delay),
            "max_arrival_delay": safe_float(row.max_arrival_delay),
            "flight_counts": {
                "cancelled": safe_int(row.cancelled_flights),
                "diverted": safe_int(row.diverted_flights),
                "on_time": safe_int(row.on_time_flights),
                "moderate_delays": safe_int(row.moderate_delays),
                "severe_delays": safe_int(row.severe_delays),
            },
            "cancellation_reasons": {
                "airline": {
                    "count": safe_int(row.cancelled_airline_reason),
                    "percentage": safe_float(row.cancellation_airline_pct),
                },
                "weather": {
                    "count": safe_int(row.cancelled_weather_reason),
                    "percentage": safe_float(row.cancellation_weather_pct),
                },
                "national_air_system": {
                    "count": safe_int(row.cancelled_nas_reason),
                    "percentage": safe_float(row.cancellation_nas_pct),
                },
                "security": {
                    "count": safe_int(row.cancelled_security_reason),
                    "percentage": safe_float(row.cancellation_security_pct),
                },
            },
            "operational": {
                "avg_distance_miles": safe_float(row.avg_distance),
                "avg_air_time_minutes": safe_float(row.avg_air_time),
                "avg_taxi_out_minutes": safe_float(row.avg_taxi_out),
                "avg_taxi_in_minutes": safe_float(row.avg_taxi_in),
            },
            "airport_info": {
                "unique_origin_airports": safe_int(row.unique_origin_airports),
                "unique_dest_airports": safe_int(row.unique_dest_airports),
                "most_common_origin": safe_str(row.most_common_origin),
                "most_common_destination": safe_str(row.most_common_destination),
                "top_origin": [x.strip() for x in safe_str(row.top_origin).split(",")],
                "top_destination": [x.strip() for x in safe_str(row.top_destination).split(",")],
                "top_origin_count": safe_int(row.top_origin_count),
                "top_destination_count": safe_int(row.top_destination_count),
            },
        },
        "last_updated": safe_str(getattr(row, "last_updated", datetime.utcnow().isoformat())),
    }


performance_documents = [
    transform_to_mongo_format(row)
    for row in final_performance.collect()
]

print(f"Prepared {len(performance_documents)} documents")
print(json.dumps(performance_documents[0], indent=2))

# COMMAND ----------

# CELL 11: Connect to MongoDB
user = quote_plus("dinhtrongquynh240")
password = quote_plus("Quynh@27")
mongo_connection_string = f"mongodb+srv://{user}:{password}@quynhdata.cxcihb8.mongodb.net/airlines?retryWrites=true&w=majority"

try:
    client = MongoClient(mongo_connection_string, serverSelectionTimeoutMS=5000)
    client.admin.command('ping')
    print(" Connected to MongoDB")
    
    db = client["flight_analytics"]
    collection = db["airline_performance"]
    print(f" Database: {db.name}, Collection: {collection.name}")
except Exception as e:
    print(f" Connection failed: {e}")

# COMMAND ----------

# CELL 12: Write to MongoDB
try:
    operations = [
        ReplaceOne({"airline_code": doc["airline_code"]}, doc, upsert=True)
        for doc in performance_documents
    ]
    
    result = collection.bulk_write(operations)
    print(" Data written successfully!")
    print(f"   Inserted: {result.upserted_count}")
    print(f"   Modified: {result.modified_count}")
except Exception as e:
    print(f" Error: {e}")

# COMMAND ----------

# CELL 13: Create Indexes
try:
    collection.create_index("airline_code", unique=True)
    collection.create_index("last_updated")
    print(" Indexes created")
except Exception as e:
    print(f" Error: {e}")

# COMMAND ----------

# CELL 14: Verify & Close
try:
    doc_count = collection.count_documents({})
    print(f" Total documents: {doc_count}")
    
    sample = collection.find_one()
    print("\n Sample:")
    print(json.dumps(sample, indent=2, default=str))
    
except Exception as e:
    print(f" Error: {e}")

client.close()
print("\n MongoDB connection closed")
print("="*60)
print(" HOÀN THÀNH!")
print("="*60) 

 Libraries imported successfully
 CSV files loaded successfully
Airlines: 14 rows
Airports: 322 rows
Flights: 5819079 rows

=== AIRLINES DATA ===


IATA_CODE,AIRLINE
UA,United Air Lines Inc.
AA,American Airlines Inc.
US,US Airways Inc.
F9,Frontier Airlines Inc.
B6,JetBlue Airways



=== AIRPORTS DATA ===


IATA_CODE,AIRPORT,CITY,STATE,COUNTRY,LATITUDE,LONGITUDE
ABE,Lehigh Valley International Airport,Allentown,PA,USA,40.65236,-75.4404
ABI,Abilene Regional Airport,Abilene,TX,USA,32.41132,-99.6819
ABQ,Albuquerque International Sunport,Albuquerque,NM,USA,35.04022,-106.60919
ABR,Aberdeen Regional Airport,Aberdeen,SD,USA,45.44906,-98.42183
ABY,Southwest Georgia Regional Airport,Albany,GA,USA,31.53552,-84.19447



=== FLIGHTS DATA (Sample) ===


AIRLINE,FLIGHT_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,DEPARTURE_DELAY,ARRIVAL_DELAY,CANCELLED
AS,98,ANC,SEA,-11,-22,0
AA,2336,LAX,PBI,-8,-9,0
US,840,SFO,CLT,-2,5,0
AA,258,LAX,MIA,-5,-9,0
AS,135,SEA,ANC,-1,-21,0



=== 🧹 DATA CLEANING & TRANSFORMATION ===

 Kiểm tra giá trị null.


YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,TAXI_OUT,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,DIVERTED,CANCELLED,CANCELLATION_REASON,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY
0,0,0,0,0,0,14721,0,0,0,86153,86153,89047,89047,6,105071,105071,0,92513,92513,0,92513,105071,0,0,5729195,4755640,4755640,4755640,4755640,4755640


 Dữ liệu gốc: 5819079 → Sau làm sạch: 5819079 dòng


YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,DEPARTURE_DELAY,ARRIVAL_DELAY,CANCELLED,CANCELLATION_REASON_TEXT,TIME_OF_DAY,PEAK_PERIOD,TOTAL_DELAY
2015,1,1,4,AS,98,ANC,SEA,-11.0,-22.0,0,,Night,Off-Peak,0.0
2015,1,1,4,AA,2336,LAX,PBI,-8.0,-9.0,0,,Night,Off-Peak,0.0
2015,1,1,4,US,840,SFO,CLT,-2.0,5.0,0,,Night,Off-Peak,0.0
2015,1,1,4,AA,258,LAX,MIA,-5.0,-9.0,0,,Night,Off-Peak,0.0
2015,1,1,4,AS,135,SEA,ANC,-1.0,-21.0,0,,Night,Off-Peak,0.0
2015,1,1,4,DL,806,SFO,MSP,-5.0,8.0,0,,Night,Off-Peak,0.0
2015,1,1,4,NK,612,LAS,MSP,-6.0,-17.0,0,,Night,Off-Peak,0.0
2015,1,1,4,US,2013,LAX,CLT,14.0,-10.0,0,,Night,Off-Peak,0.0
2015,1,1,4,AA,1112,SFO,DFW,-11.0,-13.0,0,,Night,Off-Peak,0.0
2015,1,1,4,DL,1173,LAS,ATL,3.0,-15.0,0,,Night,Off-Peak,0.0



=== JOINING DATA ===
 Joined with airlines
 Joined with origin airports
 Joined with destination airports

 Columns available after join:
['YEAR', 'MONTH', 'DAY', 'DAY_OF_WEEK', 'AIRLINE', 'FLIGHT_NUMBER', 'TAIL_NUMBER', 'ORIGIN_AIRPORT', 'DESTINATION_AIRPORT', 'SCHEDULED_DEPARTURE', 'DEPARTURE_TIME', 'DEPARTURE_DELAY', 'TAXI_OUT', 'WHEELS_OFF', 'SCHEDULED_TIME', 'ELAPSED_TIME', 'AIR_TIME', 'DISTANCE', 'WHEELS_ON', 'TAXI_IN', 'SCHEDULED_ARRIVAL', 'ARRIVAL_TIME', 'ARRIVAL_DELAY', 'DIVERTED', 'CANCELLED', 'CANCELLATION_REASON', 'AIR_SYSTEM_DELAY', 'SECURITY_DELAY', 'AIRLINE_DELAY', 'LATE_AIRCRAFT_DELAY', 'WEATHER_DELAY', 'CANCELLATION_REASON_TEXT', 'SCHED_DEP_MIN', 'DEP_MIN', 'SCHED_ARR_MIN', 'ARR_MIN', 'IS_WEEKEND', 'DEP_HOUR', 'TIME_OF_DAY', 'PEAK_PERIOD', 'IS_DELAYED', 'DELAY_CATEGORY', 'TOTAL_DELAY', 'AIRLINE_NAME', 'ORIGIN_AIRPORT_NAME', 'ORIGIN_CITY', 'ORIGIN_STATE', 'ORIGIN_COUNTRY', 'DEST_AIRPORT_NAME', 'DEST_CITY', 'DEST_STATE', 'DEST_COUNTRY']


AIRLINE,AIRLINE_NAME,ORIGIN_AIRPORT,ORIGIN_CITY,ORIGIN_STATE,DESTINATION_AIRPORT,DEST_CITY,DEST_STATE,DEPARTURE_DELAY,CANCELLATION_REASON_TEXT
AS,Alaska Airlines Inc.,ANC,Anchorage,AK,SEA,Seattle,WA,-11.0,
AA,American Airlines Inc.,LAX,Los Angeles,CA,PBI,West Palm Beach,FL,-8.0,
US,US Airways Inc.,SFO,San Francisco,CA,CLT,Charlotte,NC,-2.0,
AA,American Airlines Inc.,LAX,Los Angeles,CA,MIA,Miami,FL,-5.0,
AS,Alaska Airlines Inc.,SEA,Seattle,WA,ANC,Anchorage,AK,-1.0,


 Aggregated airline performance

 Đang tính toán các tỉ lệ và top 10 sân bay cho mỗi hãng 
 Tính top 10 sân bay xuất phát và đến cho mỗi hãng bay...
 Hoàn tất tính toán và gộp top 10 sân bay cho mỗi hãng!


  "last_updated": safe_str(getattr(row, "last_updated", datetime.utcnow().isoformat())),


Prepared 14 documents
{
  "airline_code": "AA",
  "airline_name": "American Airlines Inc.",
  "metrics": {
    "total_flights": 725984,
    "on_time_rate": 83.55,
    "cancellation_rate": 1.5,
    "diversion_rate": 0.29,
    "avg_delay_minutes": 34.37,
    "avg_arrival_delay_minutes": 34.15,
    "max_departure_delay": 1988.0,
    "max_arrival_delay": 1971.0,
    "flight_counts": {
      "cancelled": 10919,
      "diverted": 2130,
      "on_time": 606528,
      "moderate_delays": 80466,
      "severe_delays": 38990
    },
    "cancellation_reasons": {
      "airline": {
        "count": 2879,
        "percentage": 26.37
      },
      "weather": {
        "count": 7306,
        "percentage": 66.91
      },
      "national_air_system": {
        "count": 730,
        "percentage": 6.69
      },
      "security": {
        "count": 4,
        "percentage": 0.04
      }
    },
    "operational": {
      "avg_distance_miles": 1041.3392223520077,
      "avg_air_time_minutes": 137.29329985233