In [40]:
import pandas as pd
import json
import polars as pl
from datetime import datetime

In [41]:
# for testing the nesting only
# bookings_list = []
# passengers_list = []
# products_list = []

# with open("./data/bookings/booking.json", 'r') as f:
#     for i, line in enumerate(f):
#         data = json.loads(line)
#         event_time = data.get('timestamp')
#         tr = data.get('event', {}).get('DataElement', {}).get('travelrecord', {})
        
#         bookings_list.append({
#             'booking_id': i,
#             'timestamp': event_time,
#             'creation_date': tr.get('creationDate'),
#             'nb_passengers': tr.get('nbPassengers')
#         })
        
#         for p in tr.get('passengersList', []):
#             p_row = p.copy()
#             p_row['booking_id'] = i  # Foreign Key to Booking
#             passengers_list.append(p_row)
            
#         for prod in tr.get('productsList', []):
#             prod_row = prod.copy()
#             # If the product has its own nested 'flight' info, flatten it too
#             if 'flight' in prod_row:
#                 flight = prod_row.pop('flight')
#                 for key, val in flight.items():
#                     prod_row[f'flight_{key}'] = val
            
#             prod_row['booking_id'] = i  # Foreign Key to Booking
#             products_list.append(prod_row)

# df_bookings = pd.DataFrame(bookings_list)
# df_passengers = pd.DataFrame(passengers_list)
# df_products = pd.DataFrame(products_list)

# # df_bookings.head()
# # df_passengers.head()
# # df_products.head()

In [42]:
import polars as pl
from datetime import datetime

def process_bookings(input_path, airport_path, start_date, end_date):
    columns = [
        'Airport ID', 'Name', 'City', 'Country', 'IATA', 'ICAO', 
        'Latitude', 'Longitude', 'Altitude', 'Timezone', 'DST', 
        'Tz Database time zone', 'Type', 'Source'
    ]
    
    airports_df = (
        pl.read_csv(
            airport_path, 
            has_header=False, 
            new_columns=columns,
            infer_schema_length=0,
            quote_char='"',
            truncate_ragged_lines=True
        )
        .select(["IATA", "Country"])
        .with_columns([
            pl.col("IATA").replace("\\N", None).str.strip_chars(),
            pl.col("Country").replace("\\N", None)
        ])
        .filter(pl.col("IATA").is_not_null())
        .lazy()
    )

    # Main scan
    df = pl.scan_ndjson(input_path)
    
    df = df.select([
        pl.col("event").struct.field("DataElement").struct.field("travelrecord")
    ]).with_columns([
        pl.col("travelrecord").struct.field("passengersList").alias("passengers"),
        pl.col("travelrecord").struct.field("productsList").alias("products")
    ])

    df = df.explode("products").explode("passengers")
    
    df = df.with_columns([
        pl.col("products").struct.field("bookingStatus").alias("status"),
        pl.col("products").struct.field("flight").alias("flight"),
        pl.col("passengers").struct.field("uci").alias("p_uci"),
    ])

    df = df.filter(
        (pl.col("status") == "CONFIRMED") &
        (pl.col("flight").struct.field("marketingAirline") == "KL") &
        (pl.col("flight").struct.field("originAirport").is_in(["AMS", "RTM", "EIN"]))
    )

    # Date
    df = df.with_columns([
        pl.col("flight").struct.field("departureDate")
            .str.replace("Z", "")  #had to remove to get a real date
            .str.replace("T", " ") # had to remove to get a real date
            .str.to_datetime()
            .dt.date()
            .alias("d_date"), # Using d_date here
        pl.col("flight").struct.field("destinationAirport").alias("dest") # Using dest here
    ])

    df = df.filter(pl.col("d_date").is_between(start_date, end_date))

    # seasons
    df = df.with_columns([
        pl.col("d_date").dt.weekday().alias("DayOfWeek"),
        pl.col("d_date").dt.month().alias("m"),
    ]).with_columns(
        pl.when(pl.col("m").is_in([3, 4, 5])).then(pl.lit("spring"))
        .when(pl.col("m").is_in([6, 7, 8])).then(pl.lit("summer"))
        .when(pl.col("m").is_in([9, 10, 11])).then(pl.lit("autumn"))
        .otherwise(pl.lit("wintr")).alias("Season")
    )

    # join
    final = (
        df.join(airports_df, left_on="dest", right_on="IATA", how="left")
        .group_by(["Country", "DayOfWeek", "Season"])
        .agg(pl.col("p_uci").n_unique().alias("passenger_count"))
        .sort(["Season", "DayOfWeek", "passenger_count"], descending=[False, False, True])
    )

    return final.collect()

In [43]:
df_result = process_bookings(
    "./data/bookings/booking.json", 
    "./data/airports/airports.dat", 
    datetime(2019, 1, 1).date(), 
    datetime(2020, 1, 1).date()
)

df_result

Country,DayOfWeek,Season,passenger_count
str,i8,str,u32
"""Indonesia""",1,"""autumn""",10
"""Brazil""",1,"""autumn""",5
"""China""",1,"""autumn""",3
"""Italy""",1,"""autumn""",3
"""France""",1,"""autumn""",2
…,…,…,…
"""Norway""",7,"""wintr""",1
"""United States""",7,"""wintr""",1
"""China""",7,"""wintr""",1
"""Netherlands Antilles""",7,"""wintr""",1
