<a href="https://colab.research.google.com/github/ipeirotis-org/datasets/blob/main/Flight_Stats/Load_DB1B_Market_Data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [14]:
# @title Load DB1B Market Data (Origin & Destination Survey)
# This script loads raw ticket data, which is much larger and more detailed than Table 6.

# https://www.transtats.bts.gov/DatabaseInfo.asp?QO_VQ=EFI&Yv0x=D

import pandas as pd
import requests
import io
import zipfile
import gc  # Garbage Collector
import os
from google.cloud import bigquery
from google.colab import auth

# ---------------------------------------------------------------------------
# CONFIGURATION
# ---------------------------------------------------------------------------
PROJECT_ID = "nyu-datasets"
DATASET_ID = "flights"
TABLE_NAME = "raw_db1b_market"
DOWNLOAD_DIR = "./db1b_downloads"  # Local directory to stage files

# Load data from 2018 to 2024 (inclusive)
YEARS_TO_LOAD = list(range(2018, 2025))
QUARTERS_TO_LOAD = [1, 2, 3, 4]

BASE_URL = "https://transtats.bts.gov/PREZIP/Origin_and_Destination_Survey_DB1BMarket_{}_{}.zip"


In [29]:

# ---------------------------------------------------------------------------
# COLUMN DESCRIPTIONS (OFFICIAL DOT METADATA)
# ---------------------------------------------------------------------------
# We keep the original BTS column names and map them to their official descriptions.
COLUMN_DESCRIPTIONS = {
    "ItinID": "Itinerary ID. Identification number assigned to identify an itinerary. Foreign key to DB1BTicket.",
    "MktID": "Market ID. Identification number assigned to identify a market. Foreign key to DB1BMarket.",
    "MktCoupons": "Number of Coupons in the Market. The number of flight segments in the market.",
    "Year": "Year of the survey.",
    "Quarter": "Quarter of the survey (1-4).",
    "Origin": "Origin Airport Code (e.g., JFK, ORD).",
    "OriginAirportID": "Origin Airport ID. A unique numeric code assigned by US DOT to the origin airport.",
    "OriginAirportSeqID": "Origin Airport Sequence ID.",
    "OriginCityMarketID": "Origin City Market ID. Use this field to consolidate airports serving the same city market.",
    "OriginCountry": "Origin Country Code.",
    "OriginStateFips": "Origin State FIPS Code.",
    "OriginState": "Origin State Code.",
    "OriginStateName": "Origin State Name.",
    "OriginWac": "Origin World Area Code (WAC). Geographic area code for the origin.",
    "Dest": "Destination Airport Code (e.g., LAX, SFO).",
    "DestAirportID": "Destination Airport ID. A unique numeric code assigned by US DOT to the destination airport.",
    "DestAirportSeqID": "Destination Airport Sequence ID.",
    "DestCityMarketID": "Destination City Market ID. Use this field to consolidate airports serving the same city market.",
    "DestCountry": "Destination Country Code.",
    "DestStateFips": "Destination State FIPS Code.",
    "DestState": "Destination State Code.",
    "DestStateName": "Destination State Name.",
    "DestWac": "Destination World Area Code (WAC). Geographic area code for the destination.",
    "AirportGroup": "Airport Group. Sequence of airports in the market.",
    "WacGroup": "World Area Code Group. Sequence of WACs in the market.",
    "TkCarrier": "Ticketing Carrier. The airline that sold the ticket.",
    "TkCarrierChange": "Ticketing Carrier Change Indicator. 1 if the ticketing carrier changes within the market; 0 otherwise.",
    "OpCarrier": "Operating Carrier. The airline that actually operated the flight.",
    "OpCarrierChange": "Operating Carrier Change Indicator. 1 if the operating carrier changes within the market; 0 otherwise.",
    "RPCarrier": "Reporting Carrier. The airline that submitted the data to DOT.",
    "TkCarrierGroup": "Ticketing Carrier Group. Sequence of ticketing carriers.",
    "OpCarrierGroup": "Operating Carrier Group. Sequence of operating carriers.",
    "Passengers": "Number of Passengers. 10% sample count (multiply by 10 for total estimate).",
    "MktFare": "Market Fare. The prorated fare for this specific market (one-way portion of the trip).",
    "MktDistance": "Market Distance. Distance including ground transport.",
    "MktMilesFlown": "Market Miles Flown. The distance flown for this market.",
    "NonStopMiles": "Non-Stop Miles.",
    "MktDistanceGroup": "Market Distance Group. Distance interval group.",
    "BulkFare": "Bulk Fare Indicator. 1 if the fare is a bulk/consolidator fare (price may be hidden or zero); 0 otherwise.",
    "MktGeoType": "Market Geography Type. 1=Domestic, 2=International, etc.",
    "ItinGeoType": "Itinerary Geography Type.",
    "Unnamed:41": "Artifact column (usually empty)."
}


In [None]:

# ---------------------------------------------------------------------------
# 1. AUTHENTICATE
# ---------------------------------------------------------------------------
print("Authenticating User...")
auth.authenticate_user()
client = bigquery.Client(project=PROJECT_ID)
print("Authenticated.")


Authenticating User...
Authenticated.


In [30]:
from google.cloud import storage

def download_all_files():
    """
    Step 1: Download all requested files.
    Logic:
      1. Check GCS bucket 'bts_datasets' for the file.
      2. If exists in GCS, download to local disk (cache) for processing.
      3. If not in GCS, download from BTS website, save locally, then upload to GCS.
    """
    # Configuration for GCS
    BUCKET_NAME = "bts_datasets"

    if not os.path.exists(DOWNLOAD_DIR):
        os.makedirs(DOWNLOAD_DIR)

    downloaded_files = []

    # Initialize Storage Client
    storage_client = storage.Client(project=PROJECT_ID)

    # Access or Create Bucket
    try:
        bucket = storage_client.get_bucket(BUCKET_NAME)
        print(f"Accessed bucket: {BUCKET_NAME}")
    except Exception:
        print(f"Bucket {BUCKET_NAME} not found. Creating...")
        bucket = storage_client.create_bucket(BUCKET_NAME, location="US")

    print(f"\n--- STEP 1: CHECKING GCS & DOWNLOADING FILES ---")
    for year in YEARS_TO_LOAD:
        for quarter in QUARTERS_TO_LOAD:
            filename = f"DB1B_Market_{year}_{quarter}.zip"
            filepath = os.path.join(DOWNLOAD_DIR, filename)
            blob = bucket.blob(filename)

            # 1. Check if file exists in GCS
            if blob.exists():
                print(f"Found in GCS: {filename}")
                # Ensure we have it locally for processing
                if not os.path.exists(filepath):
                    print(f"  -> Downloading to local: {filepath}")
                    blob.download_to_filename(filepath)
                else:
                    print(f"  -> Local copy exists.")

                downloaded_files.append((year, quarter, filepath))
                continue

            # 2. Not in GCS -> Download from BTS
            download_url = BASE_URL.format(year, quarter)
            print(f"Missing in GCS. Fetching from BTS: {download_url}")

            try:
                # Disable SSL verify to avoid common BTS cert chain issues
                response = requests.get(download_url, verify=True, stream=True)

                if response.status_code == 200:
                    with open(filepath, 'wb') as f:
                        for chunk in response.iter_content(chunk_size=8192):
                            f.write(chunk)

                    # 3. Upload to GCS
                    print(f"  -> Uploading to GCS: {filename}")
                    blob.upload_from_filename(filepath)

                    downloaded_files.append((year, quarter, filepath))
                else:
                    print(f"Skipping {year} Q{quarter} (Status {response.status_code})")
            except Exception as e:
                print(f"Error processing {year} Q{quarter}: {e}")

    return downloaded_files

In [31]:
# 1. Fetch
local_files = download_all_files()

Accessed bucket: bts_datasets

--- STEP 1: CHECKING GCS & DOWNLOADING FILES ---
Found in GCS: DB1B_Market_2018_1.zip
  -> Local copy exists.
Found in GCS: DB1B_Market_2018_2.zip
  -> Local copy exists.
Found in GCS: DB1B_Market_2018_3.zip
  -> Local copy exists.
Found in GCS: DB1B_Market_2018_4.zip
  -> Local copy exists.
Found in GCS: DB1B_Market_2019_1.zip
  -> Local copy exists.
Found in GCS: DB1B_Market_2019_2.zip
  -> Local copy exists.
Found in GCS: DB1B_Market_2019_3.zip
  -> Local copy exists.
Found in GCS: DB1B_Market_2019_4.zip
  -> Local copy exists.
Found in GCS: DB1B_Market_2020_1.zip
  -> Local copy exists.
Found in GCS: DB1B_Market_2020_2.zip
  -> Local copy exists.
Found in GCS: DB1B_Market_2020_3.zip
  -> Local copy exists.
Found in GCS: DB1B_Market_2020_4.zip
  -> Local copy exists.
Found in GCS: DB1B_Market_2021_1.zip
  -> Local copy exists.
Found in GCS: DB1B_Market_2021_2.zip
  -> Local copy exists.
Found in GCS: DB1B_Market_2021_3.zip
  -> Local copy exists.
Found

In [32]:
def analyze_schema(file_list):
    """
    Step 2: Define Unified Schema based on Official Column Descriptions.
    This ensures strict consistency across years.
    """
    print(f"\n--- STEP 2: DEFINING UNIFIED SCHEMA ---")

    # 1. Define Master Schema
    master_columns = list(COLUMN_DESCRIPTIONS.keys())
    print(f"Standardized Schema defined with {len(master_columns)} columns.")

    # 2. Scan files for drift (Extra or Missing columns)
    # This is for information only; the load process will strictly enforce master_columns.
    all_extra = set()
    all_missing = set()

    print("Scanning files for schema consistency...")
    for year, quarter, filepath in file_list:
        try:
            with zipfile.ZipFile(filepath, 'r') as z:
                csv_files = [f for f in z.namelist() if f.lower().endswith('.csv')]
                if not csv_files: continue

                with z.open(csv_files[0]) as f:
                    # Read header only
                    header_df = pd.read_csv(f, nrows=0)
                    # Normalize file columns (remove spaces)
                    file_cols = set(c.replace(' ', '') for c in header_df.columns)

                    print(year, quarter, file_cols)

                    # Check vs Master
                    extra = file_cols - set(master_columns)
                    missing = set(master_columns) - file_cols

                    all_extra.update(extra)
                    all_missing.update(missing)
        except Exception as e:
            print(f"Warning: Could not scan {filepath}: {e}")

    if all_extra:
        print(f"  Note: {len(all_extra)} extra columns found in raw files will be IGNORED (e.g., {list(all_extra)[:3]}).")
    if all_missing:
        print(f"  Note: {len(all_missing)} standard columns are MISSING in some files and will be NULL (e.g., {list(all_missing)[:3]}).")
    if not all_extra and not all_missing:
        print("  Success: All files match the standardized schema perfectly.")

    return master_columns

In [33]:
master_schema = analyze_schema(local_files)


--- STEP 2: DEFINING UNIFIED SCHEMA ---
Standardized Schema defined with 42 columns.
Scanning files for schema consistency...
2018 1 {'OriginStateName', 'DestStateFips', 'DestCityMarketID', 'RPCarrier', 'MktDistanceGroup', 'DestAirportSeqID', 'Unnamed:41', 'DestWac', 'MktCoupons', 'OriginCountry', 'BulkFare', 'DestState', 'MktDistance', 'OriginCityMarketID', 'OriginState', 'OriginStateFips', 'TkCarrierChange', 'MktFare', 'MktGeoType', 'DestAirportID', 'ItinGeoType', 'WacGroup', 'ItinID', 'AirportGroup', 'MktMilesFlown', 'OriginWac', 'Year', 'TkCarrierGroup', 'Dest', 'OriginAirportSeqID', 'OriginAirportID', 'MktID', 'DestStateName', 'DestCountry', 'Passengers', 'NonStopMiles', 'OpCarrierChange', 'Origin', 'Quarter', 'OpCarrier', 'TkCarrier', 'OpCarrierGroup'}
2018 2 {'OriginStateName', 'DestStateFips', 'DestCityMarketID', 'RPCarrier', 'MktDistanceGroup', 'DestAirportSeqID', 'Unnamed:41', 'DestWac', 'MktCoupons', 'OriginCountry', 'BulkFare', 'DestState', 'MktDistance', 'OriginCityMarket

In [41]:

# ---------------------------------------------------------------------------
# 2. HELPER FUNCTIONS
# ---------------------------------------------------------------------------

def upload_files(file_list, master_columns, table_id):
    """
    Step 3: Load data into BigQuery.
    Explicitly drops table, creates schema with types/descriptions, and loads data in chunks.
    """
    print(f"\n--- STEP 3: UPLOADING TO BIGQUERY ---")

    # 1. Define Explicit Types for BigQuery
    # Default to STRING for codes/names, specify others.
    type_map = {
        # IDs and Sequences -> INTEGER
        "ItinID": "INTEGER",
        "MktID": "INTEGER",
        "MktCoupons": "INTEGER",
        "OriginAirportID": "INTEGER",
        "OriginAirportSeqID": "INTEGER",
        "OriginCityMarketID": "INTEGER",
        "DestAirportID": "INTEGER",
        "DestAirportSeqID": "INTEGER",
        "DestCityMarketID": "INTEGER",

        # Time -> INTEGER
        "Year": "INTEGER",
        "Quarter": "INTEGER",

        # Metrics -> FLOAT (Safe for calculations and currency)
        "Passengers": "FLOAT",
        "MktFare": "FLOAT",
        "MktDistance": "FLOAT",
        "MktMilesFlown": "FLOAT",
        "NonStopMiles": "FLOAT",

        # Flags/Categorical Indices -> INTEGER
        "MktDistanceGroup": "INTEGER",
        "TkCarrierChange": "INTEGER",
        "OpCarrierChange": "INTEGER",
        "BulkFare": "INTEGER",
        "MktGeoType": "INTEGER",
        "ItinGeoType": "INTEGER",

        # Everything else (Carriers, Origin/Dest Codes, States, etc.) -> STRING
    }

    # 2. Construct BigQuery Schema
    schema = []
    for col in master_columns:
        # Keys in descriptions/files might have spaces, but we clean them in DF.
        col_clean = col.replace(' ', '')

        field_type = type_map.get(col_clean, "STRING")
        description = COLUMN_DESCRIPTIONS.get(col, "No description available.")

        schema.append(bigquery.SchemaField(
            name=col_clean,
            field_type=field_type,
            mode="NULLABLE",
            description=description
        ))

    # 3. Drop and Create Table explicitly
    print(f"Dropping table {table_id} (if exists) and creating new schema...")
    client.delete_table(table_id, not_found_ok=True)

    table = bigquery.Table(table_id, schema=schema)
    client.create_table(table)
    print("  -> Table created successfully with explicit schema.")

    # 4. Configure Load Job
    job_config = bigquery.LoadJobConfig(
        write_disposition="WRITE_APPEND", # Append to the table we just created
        schema=schema,                    # Enforce the explicit schema
        source_format=bigquery.SourceFormat.CSV,
        skip_leading_rows=1               # Header handled by pandas serialization
    )

    # 5. Load Files in Chunks
    chunk_size = 1_000_000
    for year, quarter, filepath in file_list:
        print(f"Processing {year} Q{quarter}...")
        try:
            with zipfile.ZipFile(filepath, 'r') as z:
                csv_files = [f for f in z.namelist() if f.lower().endswith('.csv')]
                if not csv_files: continue

                with z.open(csv_files[0]) as f:
                    # Read Data in Chunks
                    # low_memory=False is implicitly handled by chunking
                    chunk_iter = pd.read_csv(f, chunksize=chunk_size)

                    for i, df_chunk in enumerate(chunk_iter):
                        # Clean Columns (remove spaces to match Schema names)
                        df_chunk.columns = df_chunk.columns.str.replace(' ', '')

                        # Ensure all master columns exist (fill missing with None/NaN)
                        for col in master_columns:
                            clean_col = col.replace(' ', '')
                            if clean_col not in df_chunk.columns:
                                df_chunk[clean_col] = None

                        # Filter and Reorder to match Schema strictly
                        df_chunk = df_chunk[[c.replace(' ', '') for c in master_columns]]

                        # Upload Chunk
                        job = client.load_table_from_dataframe(df_chunk, table_id, job_config=job_config)
                        job.result()
                        print(f"    -> Uploaded chunk {i+1} ({len(df_chunk)} rows).")

                        del df_chunk
                        gc.collect()

        except Exception as e:
            print(f"  -> Error uploading {year} Q{quarter}: {e}")
            # Optional: Raise to stop execution if strict on errors
            # raise e


In [None]:

# ---------------------------------------------------------------------------
# 3. MAIN EXECUTION FLOW
# ---------------------------------------------------------------------------
table_id = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_NAME}"

# 1. Fetch
local_files = download_all_files()

if not local_files:
    print("No files downloaded. Exiting.")
else:
    # 2. Examine
    master_schema = analyze_schema(local_files)

    # Check table existence
    try:
        client.get_table(table_id)
        print(f"Table {table_id} exists. Appending...")
    except:
        print(f"Table {table_id} does not exist. It will be created by the first upload.")

    # 3. Load
    upload_files(local_files, master_schema, table_id)


Accessed bucket: bts_datasets

--- STEP 1: CHECKING GCS & DOWNLOADING FILES ---
Found in GCS: DB1B_Market_2018_1.zip
  -> Local copy exists.
Found in GCS: DB1B_Market_2018_2.zip
  -> Local copy exists.
Found in GCS: DB1B_Market_2018_3.zip
  -> Local copy exists.
Found in GCS: DB1B_Market_2018_4.zip
  -> Local copy exists.
Found in GCS: DB1B_Market_2019_1.zip
  -> Local copy exists.
Found in GCS: DB1B_Market_2019_2.zip
  -> Local copy exists.
Found in GCS: DB1B_Market_2019_3.zip
  -> Local copy exists.
Found in GCS: DB1B_Market_2019_4.zip
  -> Local copy exists.
Found in GCS: DB1B_Market_2020_1.zip
  -> Local copy exists.
Found in GCS: DB1B_Market_2020_2.zip
  -> Local copy exists.
Found in GCS: DB1B_Market_2020_3.zip
  -> Local copy exists.
Found in GCS: DB1B_Market_2020_4.zip
  -> Local copy exists.
Found in GCS: DB1B_Market_2021_1.zip
  -> Local copy exists.
Found in GCS: DB1B_Market_2021_2.zip
  -> Local copy exists.
Found in GCS: DB1B_Market_2021_3.zip
  -> Local copy exists.
Found