In [0]:
import requests
import os

# Make output directory
os.makedirs("db1b_market", exist_ok=True)

for year in range(1993, 2026):   # DB1B starts in 1993
    for q in [1,2,3,4]:
        url = (
            f"https://transtats.bts.gov/PREZIP/Origin_and_Destination_Survey_DB1BMarket_{year}_{q}.zip"
            # "https://www.transtats.bts.gov/Download_Bulk_Files.asp?"
            # f"Table_ID=291&Has_Group=3&Is_Zipped=1&Year={year}&Qtr={q}"
        )

        r = requests.get(url)
        filename = f"db1b_market_{year}_q{q}.zip"

        with open(f"/Volumes/workspace/airline_origin_dest_data/d1b1/{filename}", "wb") as f:
            f.write(r.content)

        print("Downloaded", filename)

In [0]:
import zipfile
import io

volume_base = "/Volumes/workspace/airline_origin_dest_data/d1b1"

for year in range(1993, 2026):
    for q in [1, 2, 3, 4]:
        zip_path_volume = f"{volume_base}/db1b_market_{year}_q{q}.zip"
        extract_to = f"{volume_base}/unzipped_{year}_q{q}"

        dbutils.fs.mkdirs(extract_to)

        # Read zip file directly from Volume into memory
        with open(zip_path_volume, "rb") as f:
            zip_bytes = f.read()

        with zipfile.ZipFile(io.BytesIO(zip_bytes)) as z:
            for member in z.namelist():
                # Extract file content in memory
                with z.open(member) as source:
                    content = source.read()
                # Write extracted file back to Volume
                with open(f"{extract_to}/{member}", "wb") as target:
                    target.write(content)

        print("Extraction complete.")

        csv_path = f"{extract_to}/Origin_and_Destination_Survey_DB1BMarket_{year}_{q}.csv"

        df = (
            spark.read.format("csv")
            .option("header", "true")
            .option("inferSchema", "true")
            .load(csv_path)
        )

        display(df)

        df.write.format("delta").mode("overwrite").saveAsTable(
            f"workspace.airline_origin_dest_data.airline_data_table_{year}_{q}"
        )

In [0]:
from functools import reduce

from pyspark.sql import DataFrame

# Define your catalog and schema
catalog_name = "workspace"
schema_name = "airline_origin_dest_data"
table_pattern = "airline_data_table_*" # Use a pattern that matches all 100 tables

# Get the list of table names
tables = spark.catalog.listTables(f"{catalog_name}.{schema_name}")
filtered_tables = [t.name for t in tables if t.name.startswith("airline_data_table_")]
 

if not filtered_tables:
    print("No tables found matching the pattern.")
else:
    # Create a list of DataFrames
    dfs = [spark.table(f"{catalog_name}.{schema_name}.{table_name}") for table_name in filtered_tables]

    # Union all DataFrames by name
    # The allowMissingColumns=True parameter can handle potential minor schema differences
    combined_df = reduce(DataFrame.unionByName, dfs)

    # Register as a temporary view for SQL access if needed
    combined_df.createOrReplaceTempView("combined_quarterly_data")

    # Show the result or perform further analysis
    print(f"Successfully combined {len(dfs)} tables.")
    display(combined_df)
    combined_df.write.format("delta").mode("overwrite").saveAsTable("workspace.airline_origin_dest_data.combined_quarterly_data")
