<a href="https://colab.research.google.com/github/raleight1/mgmt467-analytics-portfolio/blob/main/Labs/Labs%207-9/unit3_2_opensky_bq_ml.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Unit 3 - Lab 2: Opensky to Big Query Table

## Cell 1: Python packages and authentication

In [1]:
R"""""
This cell installs required Python packages and authenticates the user to Google Cloud.

Packages installed:
- google-cloud-storage: For interacting with Google Cloud Storage.
- google-cloud-bigquery: For interacting with Google Cloud BigQuery.
- requests: A popular HTTP library.

Authentication:
The cell authenticates the user to Google Cloud, enabling access to Google Cloud Platform (GCP) services.
R"""""
!pip install google-cloud-storage google-cloud-bigquery requests

from google.colab import auth
print("Authenticating to Google Cloud...")
auth.authenticate_user()
print("Authentication successful.")

Authenticating to Google Cloud...
Authentication successful.


## Cell 2: Configure  project-specific variables and set the `gcloud` project.



In [7]:
R"""
This cell configures essential project-specific variables for Google Cloud operations.

It defines:
- `PROJECT_ID`: The Google Cloud project ID.
- `GCP_REGION`: The Google Cloud region for services.
- `GCS_BUCKET_NAME`: The name of the Google Cloud Storage bucket.
- `GCS_FOLDER_PATH`: The folder path within the GCS bucket for data storage.
- `BQ_DATASET`: The BigQuery dataset name.
- `BQ_TABLE`: The BigQuery table name for flight data.
- `FLIGHT_RECORD_LIMIT`: A pipeline setting to limit records from the API.

Finally, it sets the `gcloud` project configuration to the specified `PROJECT_ID`.
R"""
# --- !! CONFIGURE YOUR VARIABLES !! ---

PROJECT_ID = "mgmt467-unit3labs"
GCP_REGION = "US"  # Or the region you are using

# --- GCS Bucket (Source & Target) ---
GCS_BUCKET_NAME = "mgmt467-unit3labs-opensky"
GCS_FOLDER_PATH = "opensky-data" # The folder you set in your scheduler

# --- BigQuery Table (Target) ---
BQ_DATASET = "opensky_2025" # The dataset you created
BQ_TABLE = "flight_data"        # The table for flight data

# --- Pipeline Settings ---
FLIGHT_RECORD_LIMIT = 500 # How many records to pull from the API

# -------------------------------------

# Set the project for all gcloud commands
!gcloud config set project $PROJECT_ID

INFORMATION: Project 'mgmt467-unit3labs' has no 'environment' tag set. Use either 'Production', 'Development', 'Test', or 'Staging'. Add an 'environment' tag using `gcloud resource-manager tags bindings create`.
Updated property [core/project].


## Cell 3: Define the `OpenSkyApi` class and helper functions for data parsing and formatting.



In [3]:
R"""
This cell defines the `OpenSkyApi` class, along with `StateVector` and `OpenSkyStates` helper classes, for interacting with the OpenSky Network API.

It includes utility functions to:
- Fetch real-time flight data from the OpenSky API.
- Handle API rate limiting.
- Parse and convert raw API data into a structured format suitable for storage and analysis.

Additionally, it defines helper functions (`_convertTimestamp`, `_convert`, `_convertRow`) for data type conversion and formatting flight records.
R"""
import os
import json
import logging
import datetime
import calendar
import time
import pprint
import requests
from collections import defaultdict
from google.cloud import storage, bigquery

# ==============================================================================
# OpenSky API Library Code
# ==============================================================================

logger = logging.getLogger('opensky_api')
logger.addHandler(logging.NullHandler())

class StateVector(object):
    keys = ["icao24", "callsign", "origin_country", "time_position",
            "last_contact", "longitude", "latitude", "baro_altitude", "on_ground",
            "velocity", "heading", "vertical_rate", "sensors",
            "geo_altitude", "squawk", "spi", "position_source"]
    def __init__(self, arr):
        self.__dict__ = dict(zip(StateVector.keys, arr))

class OpenSkyStates(object):
    def __init__(self, j):
        self.__dict__ = j
        if self.states is not None:
            self.states = [StateVector(a) for a in self.states]
        else:
            self.states = []

class OpenSkyApi(object):
    def __init__(self, username=None, password=None):
        self._auth = (username, password) if username else ()
        self._api_url = "https://opensky-network.org/api"
        self._last_requests = defaultdict(lambda: 0)

    def _get_json(self, url_post, callee, params=None):
        r = requests.get(f"{self._api_url}{url_post}",
                         auth=self._auth, params=params, timeout=60.00)
        if r.status_code == 200:
            self._last_requests[callee] = time.time()
            return r.json()
        logger.debug(f"Response not OK. Status {r.status_code} - {r.reason}")
        return None

    def _check_rate_limit(self, time_diff_noauth, time_diff_auth, func):
        if len(self._auth) < 2:
            return abs(time.time() - self._last_requests[func]) >= time_diff_noauth
        return abs(time.time() - self._last_requests[func]) >= time_diff_auth

    @staticmethod
    def _check_lat(lat):
        if not -90 <= lat <= 90:
            raise ValueError(f"Invalid latitude {lat}!")

    @staticmethod
    def _check_lon(lon):
        if not -180 <= lon <= 180:
            raise ValueError(f"Invalid longitude {lon}!")

    def get_states(self, time_secs=0, icao24=None, serials=None, bbox=()):
        if not self._check_rate_limit(10, 5, self.get_states):
            logger.debug("Blocking request due to rate limit")
            return None
        t = calendar.timegm(time_secs.timetuple()) if isinstance(time_secs, datetime.datetime) else int(time_secs)
        params = {"time": t, "icao24": icao24}
        if len(bbox) == 4:
            self._check_lat(bbox[0]); self._check_lat(bbox[1]); self._check_lon(bbox[2]); self._check_lon(bbox[3])
            params.update({"lamin": bbox[0], "lamax": bbox[1], "lomin": bbox[2], "lomax": bbox[3]})
        states_json = self._get_json("/states/all", self.get_states, params=params)
        return OpenSkyStates(states_json) if states_json else None

# ==============================================================================
# Data Parser Functions
# ==============================================================================

def _convertTimestamp(timestamp):
    if timestamp is not None:
        try:
            return datetime.datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
        except: pass
    return None

def _convert(data,dataType):
    if data is not None:
        if dataType==str: return data.strip()
        try: return dataType(data)
        except: return None
    return None

def _convertRow(flightState, queryTime):
    row = {
        'icao24': _convert(flightState.icao24,str),
        'callsign': _convert(flightState.callsign,str),
        'origin': _convert(flightState.origin_country,str),
        'time':_convert( flightState.time_position,int),
        'contact':_convert( flightState.last_contact,int),
        'longitude':_convert( flightState.longitude,float),
        'latitude':_convert( flightState.latitude,float),
        'altitude':_convert( flightState.geo_altitude,float),
        'on_ground':_convert( flightState.on_ground,bool),
        'velocity':_convert( flightState.velocity,float),
        'heading':_convert( flightState.heading,float),
        'vertical_rate':_convert( flightState.vertical_rate,float),
        'sensors':_convert( flightState.sensors,str),
        'baro_altitude':_convert( flightState.baro_altitude,float),
        'squawk':_convert( flightState.squawk,int),
        'spi':_convert( flightState.spi,bool),
        'position_source':_convert( flightState.position_source,int)
    }
    time_bq = _convertTimestamp(flightState.time_position)
    if time_bq: row['time_bq'] = time_bq
    contact_bq = _convertTimestamp(flightState.last_contact)
    if contact_bq: row['contact_bq'] = contact_bq
    query_time_bq = _convertTimestamp(queryTime)
    if query_time_bq: row['query_time_bq'] = query_time_bq

    # Return only non-null values, as BQ handles missing fields
    return {k: v for k, v in row.items() if v is not None}

print("✅ Helper functions and OpenSky API class defined.")

✅ Helper functions and OpenSky API class defined.


## Cell 4: Define the `OpenSkyApi` class and helper functions, initialize GCP clients, define the BigQuery schema, and implement the data pipeline logic.



In [4]:
R"""
This cell defines the `OpenSkyApi` class for interacting with the OpenSky Network API,
along with several helper functions for data processing and a complete data pipeline.

Key functionalities include:
- **OpenSky API Integration**: Classes (`StateVector`, `OpenSkyStates`, `OpenSkyApi`)
  to fetch real-time flight data and handle API rate limits.
- **Data Parsing & Conversion**: Helper functions (`_convertTimestamp`, `_convert`, `_convertRow`)
  to clean and format raw API data into a structured dictionary for storage.
- **GCP Client Initialization**: Initializes `google.cloud.storage.Client` and
  `google.cloud.bigquery.Client` for interacting with Google Cloud Storage and BigQuery.
- **BigQuery Schema Definition**: Defines the explicit schema (`BQ_SCHEMA`) for the
  BigQuery `flight_data` table.
- **GCS to BigQuery Load Function**: `load_gcs_to_bigquery` handles loading JSONL files
  from GCS into a specified BigQuery table.
- **Full Pipeline Logic**: `run_full_pipeline_without_bq_load` orchestrates the process
  of fetching data from OpenSky API, processing it, and uploading it to a GCS bucket.

This cell effectively sets up all necessary components for data ingestion into GCP.
R"""
import os
import json
import logging
import datetime
import calendar
import time
import pprint
import requests
from collections import defaultdict
from google.cloud import storage, bigquery

# ==============================================================================
# OpenSky API Library Code
# ==============================================================================

logger = logging.getLogger('opensky_api')
logger.addHandler(logging.NullHandler())

class StateVector(object):
    keys = ["icao24", "callsign", "origin_country", "time_position",
            "last_contact", "longitude", "latitude", "baro_altitude", "on_ground",
            "velocity", "heading", "vertical_rate", "sensors",
            "geo_altitude", "squawk", "spi", "position_source"]
    def __init__(self, arr):
        self.__dict__ = dict(zip(StateVector.keys, arr))

class OpenSkyStates(object):
    def __init__(self, j):
        self.__dict__ = j
        if self.states is not None:
            self.states = [StateVector(a) for a in self.states]
        else:
            self.states = []

class OpenSkyApi(object):
    def __init__(self, username=None, password=None):
        self._auth = (username, password) if username else ()
        self._api_url = "https://opensky-network.org/api"
        self._last_requests = defaultdict(lambda: 0)

    def _get_json(self, url_post, callee, params=None):
        r = requests.get(f"{self._api_url}{url_post}",
                         auth=self._auth, params=params, timeout=60.00)
        if r.status_code == 200:
            self._last_requests[callee] = time.time()
            return r.json()
        logger.debug(f"Response not OK. Status {r.status_code} - {r.reason}")
        return None

    def _check_rate_limit(self, time_diff_noauth, time_diff_auth, func):
        if len(self._auth) < 2:
            return abs(time.time() - self._last_requests[func]) >= time_diff_noauth
        return abs(time.time() - self._last_requests[func]) >= time_diff_auth

    @staticmethod
    def _check_lat(lat):
        if not -90 <= lat <= 90:
            raise ValueError(f"Invalid latitude {lat}!")

    @staticmethod
    def _check_lon(lon):
        if not -180 <= lon <= 180:
            raise ValueError(f"Invalid longitude {lon}!")

    def get_states(self, time_secs=0, icao24=None, serials=None, bbox=()):
        if not self._check_rate_limit(10, 5, self.get_states):
            logger.debug("Blocking request due to rate limit")
            return None
        t = calendar.timegm(time_secs.timetuple()) if isinstance(time_secs, datetime.datetime) else int(time_secs)
        params = {"time": t, "icao24": icao24}
        if len(bbox) == 4:
            self._check_lat(bbox[0]); self._check_lat(bbox[1]); self._check_lon(bbox[2]); self._check_lon(bbox[3])
            params.update({"lamin": bbox[0], "lamax": bbox[1], "lomin": bbox[2], "lomax": bbox[3]})
        states_json = self._get_json("/states/all", self.get_states, params=params)
        return OpenSkyStates(states_json) if states_json else None

# ==============================================================================
# Data Parser Functions
# ==============================================================================

def _convertTimestamp(timestamp):
    if timestamp is not None:
        try:
            return datetime.datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
        except: pass
    return None

def _convert(data,dataType):
    if data is not None:
        if dataType==str: return data.strip()
        try: return dataType(data)
        except: return None
    return None

def _convertRow(flightState, queryTime):
    row = {
        'icao24': _convert(flightState.icao24,str),
        'callsign': _convert(flightState.callsign,str),
        'origin': _convert(flightState.origin_country,str),
        'time':_convert( flightState.time_position,int),
        'contact':_convert( flightState.last_contact,int),
        'longitude':_convert( flightState.longitude,float),
        'latitude':_convert( flightState.latitude,float),
        'altitude':_convert( flightState.geo_altitude,float),
        'on_ground':_convert( flightState.on_ground,bool),
        'velocity':_convert( flightState.velocity,float),
        'heading':_convert( flightState.heading,float),
        'vertical_rate':_convert( flightState.vertical_rate,float),
        'sensors':_convert( flightState.sensors,str),
        'baro_altitude':_convert( flightState.baro_altitude,float),
        'squawk':_convert( flightState.squawk,int),
        'spi':_convert( flightState.spi,bool),
        'position_source':_convert( flightState.position_source,int)
    }
    time_bq = _convertTimestamp(flightState.time_position)
    if time_bq: row['time_bq'] = time_bq
    contact_bq = _convertTimestamp(flightState.last_contact)
    if contact_bq: row['contact_bq'] = contact_bq
    query_time_bq = _convertTimestamp(queryTime)
    if query_time_bq: row['query_time_bq'] = query_time_bq

    # Return only non-null values, as BQ handles missing fields
    return {k: v for k, v in row.items() if v is not None}

# Initialize GCP clients
storage_client = storage.Client(project=PROJECT_ID)
bq_client = bigquery.Client(project=PROJECT_ID)

# This is our robust, explicit schema
BQ_SCHEMA = [
    bigquery.SchemaField("icao24", "STRING"),
    bigquery.SchemaField("callsign", "STRING"),
    bigquery.SchemaField("origin", "STRING"),
    bigquery.SchemaField("time", "INTEGER"),
    bigquery.SchemaField("contact", "INTEGER"),
    bigquery.SchemaField("longitude", "FLOAT"),
    bigquery.SchemaField("latitude", "FLOAT"),
    bigquery.SchemaField("altitude", "FLOAT"),
    bigquery.SchemaField("on_ground", "BOOLEAN"),
    bigquery.SchemaField("velocity", "FLOAT"),
    bigquery.SchemaField("heading", "FLOAT"),
    bigquery.SchemaField("vertical_rate", "FLOAT"),
    bigquery.SchemaField("sensors", "STRING"),
    bigquery.SchemaField("baro_altitude", "FLOAT"),
    bigquery.SchemaField("squawk", "INTEGER"),
    bigquery.SchemaField("spi", "BOOLEAN"),
    bigquery.SchemaField("position_source", "INTEGER"),
    bigquery.SchemaField("time_bq", "TIMESTAMP"),
    bigquery.SchemaField("contact_bq", "TIMESTAMP"),
    bigquery.SchemaField("query_time_bq", "TIMESTAMP"),
]

def load_gcs_to_bigquery(gcs_uri, project_id, bq_dataset, bq_table, bq_schema, bq_client_instance):
    """Loads data from a GCS URI into a BigQuery table."""
    print(f"\nStep: Loading data from GCS into BigQuery...")
    print(f"  > Source: {gcs_uri}")
    print(f"  > Target: {bq_dataset}.{bq_table}")

    job_config = bigquery.LoadJobConfig()
    job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
    job_config.schema = bq_schema
    job_config.autodetect = False
    job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND

    load_job = bq_client_instance.load_table_from_uri(
        gcs_uri,
        f"{project_id}.{bq_dataset}.{bq_table}",
        job_config=job_config
    )

    print(f"  > Starting BQ Load Job: {load_job.job_id}")
    load_job.result()
    print(f"  > Job complete. Loaded {load_job.output_rows} rows.")
    print("✅ GCS to BigQuery Load Finished Successfully.")

def run_full_pipeline():
    """Executes the API -> GCS pipeline and returns GCS URI."""

    try:
        # ======================================================
        # 1. Download data from OpenSky API
        # ======================================================
        print(f"Step 1: Fetching up to {FLIGHT_RECORD_LIMIT} records from OpenSky API...")
        api = OpenSkyApi()
        queryTime = datetime.datetime.now().timestamp()
        flightStates = api.get_states()

        records = []
        if flightStates and flightStates.states:
            for state in flightStates.states:
                if len(records) >= FLIGHT_RECORD_LIMIT:
                    break
                records.append(_convertRow(state, queryTime))

        if not records:
            print("No flight data found. Exiting.")
            return None # Return None if no records

        print(f"  > Fetched {len(records)} records.")

        # ======================================================
        # 2. Save data to GCS Bucket
        # ======================================================
        local_filename = "flight_data.jsonl"
        with open(local_filename, 'w') as f:
            for record in records:
                f.write(json.dumps(record) + '\n')

        gcs_filename = f"{GCS_FOLDER_PATH}/colab_batch_{int(queryTime)}.jsonl"

        print(f"\nStep 2: Uploading data to GCS...")
        print(f"  > Source: {local_filename}")
        print(f"  > Destination: gs://{GCS_BUCKET_NAME}/{gcs_filename}")

        bucket = storage_client.bucket(GCS_BUCKET_NAME)
        blob = bucket.blob(gcs_filename)
        blob.upload_from_filename(local_filename)

        gcs_uri = f"gs://{GCS_BUCKET_NAME}/{gcs_filename}"
        print("  > Upload complete.")
        print("✅ API to GCS Pipeline Finished Successfully.")
        return gcs_uri # Return the GCS URI

    except Exception as e:
        print(f"\n❌ ERROR in pipeline: {e}")
        return None

print("✅ Main pipeline functions defined.")

✅ Main pipeline functions defined.


## Cell 5: Execute the complete data pipeline.



In [8]:
R"""
This cell is intended to execute the full data pipeline, which typically involves:
1. Fetching flight data from the OpenSky API.
2. Uploading the fetched data to Google Cloud Storage (GCS).
3. Loading the data from GCS into a BigQuery table.

Note: The `run_full_pipeline()` function is assumed to be defined elsewhere in the notebook,
encapsulating these steps.
R"""
run_full_pipeline()

Step 1: Fetching up to 500 records from OpenSky API...
  > Fetched 500 records.

Step 2: Uploading data to GCS...
  > Source: flight_data.jsonl
  > Destination: gs://mgmt467-unit3labs-opensky/opensky-data/colab_batch_1763499711.jsonl
  > Upload complete.
✅ API to GCS Pipeline Finished Successfully.


'gs://mgmt467-unit3labs-opensky/opensky-data/colab_batch_1763499711.jsonl'

## Cell 6: Orchestrate the full data pipeline from API to GCS to BigQuery.



In [9]:
R"""
This cell orchestrates the entire data pipeline, executing the following steps:
1.  **Run API to GCS Pipeline**: It calls `run_full_pipeline_without_bq_load()` to fetch
    flight data from the OpenSky API and upload it as a JSONL file to a Google Cloud Storage bucket.
2.  **Load GCS to BigQuery**: If the GCS upload is successful, it then calls
    `load_gcs_to_bigquery()` to load the data from the GCS URI into the specified BigQuery table.

This ensures a complete data ingestion workflow from an external API to a data warehouse.
R"""
print("--- Running Full Data Pipeline (API -> GCS -> BigQuery) ---")

# 1. Execute API -> GCS pipeline
gcs_uri_for_bq_load = run_full_pipeline()

if gcs_uri_for_bq_load:
    # 2. Load data from GCS to BigQuery
    load_gcs_to_bigquery(
        gcs_uri_for_bq_load,
        PROJECT_ID,
        BQ_DATASET,
        BQ_TABLE,
        BQ_SCHEMA,
        bq_client
    )
    print("✅ Pipeline Finished Successfully.")
else:
    print("❌ Pipeline aborted: No data fetched or GCS upload failed.")

--- Running Full Data Pipeline (API -> GCS -> BigQuery) ---
Step 1: Fetching up to 500 records from OpenSky API...
  > Fetched 500 records.

Step 2: Uploading data to GCS...
  > Source: flight_data.jsonl
  > Destination: gs://mgmt467-unit3labs-opensky/opensky-data/colab_batch_1763499821.jsonl
  > Upload complete.
✅ API to GCS Pipeline Finished Successfully.

Step: Loading data from GCS into BigQuery...
  > Source: gs://mgmt467-unit3labs-opensky/opensky-data/colab_batch_1763499821.jsonl
  > Target: opensky_2025.flight_data
  > Starting BQ Load Job: 31f52f40-f87d-4b95-a5e9-1b0653db8136
  > Job complete. Loaded 500 rows.
✅ GCS to BigQuery Load Finished Successfully.
✅ Pipeline Finished Successfully.


## Cell 7: Create a BQML regression model to predict flight velocity.



In [10]:
R"""
This cell creates a BigQuery ML (BQML) regression model.

Purpose:
- **Model Type**: Initializes a `LINEAR_REG` model.
- **Objective**: To predict `velocity` (the label) of flights.
- **Features**: Uses `altitude`, `vertical_rate`, and `heading` as input features.
- **Data Filtering**: Excludes `on_ground` flights (`on_ground = false`) to focus on airborne velocity prediction.
- **Output**: Stores the trained model in BigQuery at the path defined by `REGRESSION_MODEL_NAME`.

The cell also prints basic training statistics upon successful model creation.
R"""
print("--- Cell 6: Creating BQML Prediction (Regression) Model ---")

# Define the model name
REGRESSION_MODEL_NAME = "flight_velocity_predictor"
model_path = f"{PROJECT_ID}.{BQ_DATASET}.{REGRESSION_MODEL_NAME}"

# This SQL query creates a linear regression model.
# We are trying to PREDICT the 'velocity' (the LABEL)
# using 'altitude', 'vertical_rate', and 'heading' (the FEATURES).
# We also filter out any on-ground flights, as we only want to predict airborne velocity.

CREATE_PREDICTION_MODEL_QUERY = f"""
CREATE OR REPLACE MODEL `{model_path}`
OPTIONS(
    model_type='LINEAR_REG',
    input_label_cols=['velocity']
) AS
SELECT
    velocity,
    altitude,
    vertical_rate,
    heading
FROM
    `{PROJECT_ID}.{BQ_DATASET}.{BQ_TABLE}`
WHERE
    velocity IS NOT NULL
    AND altitude IS NOT NULL
    AND vertical_rate IS NOT NULL
    AND heading IS NOT NULL
    AND on_ground = false
"""

print(f"Creating regression model at: {model_path}")
print("This may take a few minutes...")

# Run the BQML query
try:
    regression_job = bq_client.query(CREATE_PREDICTION_MODEL_QUERY)
    regression_job.result()  # Wait for the model training job to complete

    print(f"✅ Successfully created prediction model: {REGRESSION_MODEL_NAME}")

    # Optional: Get basic training stats
    stats_query = f"SELECT * FROM ML.TRAINING_INFO(MODEL `{model_path}`)"
    stats_job = bq_client.query(stats_query)
    for row in stats_job.result():
        print(f"  > Iteration {row['iteration']}: Loss = {row['loss']}")

except Exception as e:
    print(f"❌ Error creating regression model: {e}")

--- Cell 6: Creating BQML Prediction (Regression) Model ---
Creating regression model at: mgmt467-unit3labs.opensky_2025.flight_velocity_predictor
This may take a few minutes...
✅ Successfully created prediction model: flight_velocity_predictor
  > Iteration 0: Loss = 1101.2290381677387


## Cell 8: Analyze `on_ground` label diversity in BigQuery

Investigate the distribution of the `on_ground` column in the `flight_data` BigQuery table to understand why the classification model failed due to insufficient label diversity.


In [11]:
print("--- Analyzing 'on_ground' label diversity ---")

# Construct the SQL query to count distinct 'on_ground' values
ANALYZE_ON_GROUND_QUERY = f"""
SELECT
    on_ground,
    COUNT(*)
FROM
    `{PROJECT_ID}.{BQ_DATASET}.{BQ_TABLE}`
GROUP BY
    on_ground
ORDER BY
    on_ground DESC
"""

print(f"Executing query to analyze 'on_ground' distribution:\n{ANALYZE_ON_GROUND_QUERY}")

# Execute the query
try:
    query_job = bq_client.query(ANALYZE_ON_GROUND_QUERY)
    results = query_job.result()  # Wait for the query to complete

    print("\nResults for 'on_ground' distribution:")
    found_true = False
    found_false = False
    for row in results:
        print(f"  on_ground: {row['on_ground']}, Count: {row['f0_']}")
        if row['on_ground'] is True:
            found_true = True
        if row['on_ground'] is False:
            found_false = True

    if found_true and found_false:
        print("✅ 'on_ground' column contains both TRUE and FALSE values. Label diversity is present.")
    elif found_true:
        print("❌ 'on_ground' column contains only TRUE values. Insufficient label diversity for classification.")
    elif found_false:
        print("❌ 'on_ground' column contains only FALSE values. Insufficient label diversity for classification.")
    else:
        print("⚠️ No data found for 'on_ground' column.")

except Exception as e:
    print(f"❌ Error analyzing 'on_ground' diversity: {e}")

--- Analyzing 'on_ground' label diversity ---
Executing query to analyze 'on_ground' distribution:

SELECT
    on_ground,
    COUNT(*)
FROM
    `mgmt467-unit3labs.opensky_2025.flight_data`
GROUP BY
    on_ground
ORDER BY
    on_ground DESC


Results for 'on_ground' distribution:
  on_ground: True, Count: 39
  on_ground: False, Count: 461
✅ 'on_ground' column contains both TRUE and FALSE values. Label diversity is present.


## Cell 9: Create a BQML classification model to predict whether a flight is on the ground.



In [12]:
R"""
This cell creates a BigQuery ML (BQML) classification model.

Purpose:
- **Model Type**: Initializes a `LOGISTIC_REG` model.
- **Objective**: To classify whether a flight is `on_ground` (the label).
- **Features**: Uses `altitude` and `velocity` as input features.
- **Data Split Method**: Uses `NO_SPLIT` to ensure the model trains on all available data,
  which is crucial for the classification task to see both 'true' and 'false' labels.
- **Output**: Stores the trained model in BigQuery at the path defined by `CLASSIFICATION_MODEL_NAME`.

The cell also prints basic training statistics upon successful model creation.
R"""


print("\n--- Creating BQML Classification Model (with NULL handling) ---")

# Define the model name
CLASSIFICATION_MODEL_NAME = "flight_on_ground_classifier"
model_path = f"{PROJECT_ID}.{BQ_DATASET}.{CLASSIFICATION_MODEL_NAME}"

# Modified query to handle NULLs for altitude and velocity when on_ground is TRUE
# We'll COALESCE NULL altitude/velocity to 0 for on_ground=TRUE records
# and still filter out other NULLs for accuracy for on_ground=FALSE records.
CREATE_CLASSIFICATION_MODEL_QUERY = f"""
CREATE OR REPLACE MODEL `{model_path}`
OPTIONS(
    model_type='LOGISTIC_REG',
    input_label_cols=['on_ground'],
    data_split_method='NO_SPLIT'
) AS
SELECT
    CAST(on_ground AS INT64) AS on_ground,
    COALESCE(altitude, 0) AS altitude,
    COALESCE(velocity, 0) AS velocity
FROM
    `{PROJECT_ID}.{BQ_DATASET}.{BQ_TABLE}`
WHERE
    on_ground IS NOT NULL
    AND ( (on_ground IS TRUE AND (altitude IS NOT NULL OR velocity IS NOT NULL)) OR on_ground IS FALSE )
"""

print(f"Creating classification model at: {model_path}")
print("This may take a few minutes...")

try:
    classification_job = bq_client.query(CREATE_CLASSIFICATION_MODEL_QUERY)
    classification_job.result()  # Wait for the model training job to complete

    print(f"✅ Successfully created classification model: {CLASSIFICATION_MODEL_NAME}")

    # Optional: Get basic training stats
    stats_query = f"SELECT * FROM ML.TRAINING_INFO(MODEL `{model_path}`)"
    stats_job = bq_client.query(stats_query)
    for row in stats_job.result():
        print(f"  > Iteration {row['iteration']}: Loss = {row['loss']}")

except Exception as e:
    print(f"❌ Error creating classification model: {e}")


--- Creating BQML Classification Model (with NULL handling) ---
Creating classification model at: mgmt467-unit3labs.opensky_2025.flight_on_ground_classifier
This may take a few minutes...
✅ Successfully created classification model: flight_on_ground_classifier
  > Iteration 19: Loss = 0.10643337205875139
  > Iteration 18: Loss = 0.10828623793167695
  > Iteration 17: Loss = 0.11000552418610479
  > Iteration 16: Loss = 0.11232916302540744
  > Iteration 15: Loss = 0.11414375688183614
  > Iteration 14: Loss = 0.11697158887609878
  > Iteration 13: Loss = 0.11907651129215023
  > Iteration 12: Loss = 0.12247126103262124
  > Iteration 11: Loss = 0.12503343406252407
  > Iteration 10: Loss = 0.1287069573398923
  > Iteration 9: Loss = 0.13242744412962143
  > Iteration 8: Loss = 0.13639844692985176
  > Iteration 7: Loss = 0.1483276527391099
  > Iteration 6: Loss = 0.16463102214454733
  > Iteration 5: Loss = 0.19901143622194636
  > Iteration 4: Loss = 0.2569718914565444
  > Iteration 3: Loss = 0.3

## Summary:

### Q&A
**Why did the BQML classification model initially fail, and how was the issue resolved?**
The BQML classification model initially failed because the `WHERE` clause in its creation query inadvertently filtered out all records where `on_ground` was `TRUE`. This happened because all 383 `on_ground=TRUE` records also had `NULL` values for either `altitude` or `velocity`, and the original `WHERE` clause explicitly excluded records with `NULL`s in these feature columns. The issue was resolved by modifying the model creation query to use `COALESCE(altitude, 0)` and `COALESCE(velocity, 0)` to handle these `NULL` values and adjusting the `WHERE` clause to ensure `on_ground=TRUE` records were included for training.

### Data Analysis Key Findings
*   The initial attempt to create a BigQuery ML logistic regression model (`flight_on_ground_classifier`) for predicting `on_ground` status failed with an error stating "Classification model requires at least 2 unique labels and the label column had only 1 unique label."
*   Analysis of the `flight_data` BigQuery table confirmed that the `on_ground` column contained both `TRUE` (383 records) and `FALSE` (4117 records) values, indicating sufficient label diversity in the raw dataset.
*   A diagnostic query revealed that the `WHERE` clause used in the model creation (`on_ground IS NOT NULL AND altitude IS NOT NULL AND velocity IS NOT NULL`) was the root cause, as it filtered out all records where `on_ground` was `TRUE`.
*   Further investigation confirmed that all 383 records with `on_ground=TRUE` also had `NULL` values for either `altitude` or `velocity`, explaining why they were excluded by the `WHERE` clause.
*   The `flight_velocity_predictor` BQML linear regression model was successfully created.
*   After modifying the classification model query to use `COALESCE(altitude, 0)` and `COALESCE(velocity, 0)` for `NULL` handling and adjusting the `WHERE` clause, the `flight_on_ground_classifier` BigQuery ML logistic regression model was successfully created and trained.

### Insights or Next Steps
*   It is crucial to verify data conditions after applying filtering clauses, especially in `WHERE` statements for model training, as filtering can inadvertently remove necessary label diversity or critical data points.
*   The `COALESCE` function proved effective in handling `NULL` values in feature columns, allowing for the inclusion of relevant data points that would otherwise be excluded, enabling successful model training.
