In [105]:
# Install necessary libraries
!pip install --upgrade google-cloud-storage google-cloud-bigquery google-cloud-aiplatform pandas requests



In [106]:
import pandas as pd
import requests
import json
import datetime
from google.cloud import storage, bigquery
import vertexai
from vertexai.generative_models import GenerativeModel
from google.cloud.exceptions import Conflict, NotFound

In [107]:
# --- Configuration ---
PROJECT_ID = "qwiklabs-gcp-01-5f9c2ba2a4cd"
BUCKET_NAME = "airport_alerts"
REGION = "us"
MODEL_NAME = "airport_alerts_model"
DATASET_ID = "airport_weather"
TABLE_ID = "weather_alerts"
SOURCE_CSV_URI = "gs://labs.roitraining.com/data-to-ai-workshop/airports.csv"

In [108]:
# Initialize Google Cloud Clients
storage_client = storage.Client(project=PROJECT_ID)

bq_client = bigquery.Client(project=PROJECT_ID)


In [109]:
%%bash

export PROJECT_ID=$(gcloud config get-value project)
export REGION="us"
export CONNECTION_ID="gemini-alert-conn"

# Enable necessary services
gcloud services enable bigqueryconnection.googleapis.com aiplatform.googleapis.com --quiet

# Check if the connection already exists
if bq show --connection --location=$REGION $PROJECT_ID.$REGION.$CONNECTION_ID > /dev/null 2>&1; then
    echo "Connection '$CONNECTION_ID' already exists in $REGION. Skipping creation."
else
    echo "Connection '$CONNECTION_ID' not found. Creating now..."
    bq mk --connection \
      --location=$REGION \
      --project_id=$PROJECT_ID \
      --connection_type=CLOUD_RESOURCE \
      $CONNECTION_ID
fi

# Retrieve the Service Account Email
SA_EMAIL=$(bq show --format=json --connection $PROJECT_ID.$REGION.$CONNECTION_ID | jq -r '.cloudResource.serviceAccountId')

echo "Your Connection Service Account is: $SA_EMAIL"

# Bind the IAM Role
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:$SA_EMAIL" \
    --role="roles/aiplatform.user" \
    --condition=None --quiet > /dev/null

echo "Setup Complete."

Connection 'gemini-alert-conn' already exists in us. Skipping creation.
Your Connection Service Account is: bqcx-268999363694-i7bn@gcp-sa-bigquery-condel.iam.gserviceaccount.com
Setup Complete.


Operation "operations/acat.p2-268999363694-14fae2a2-3874-4155-862c-5865d8291a6b" finished successfully.
Updated IAM policy for project [qwiklabs-gcp-01-5f9c2ba2a4cd].


In [110]:
# Create Dataset if it doesn't exist
dataset = bigquery.Dataset(f"{PROJECT_ID}.{DATASET_ID}")
dataset.location = "US"

try:
    bq_client.create_dataset(dataset, exists_ok=True)
    print(f"Dataset {DATASET_ID} created.")
except Conflict:
    print(f"Dataset {DATASET_ID} already exists.")


Dataset airport_weather created.


In [111]:
# Load Data into BigQuery and Fetch Large Aiports

table_ref = f"{PROJECT_ID}.{DATASET_ID}.airports"

try:
    # Check if the table exists by attempting to fetch its metadata
    bq_client.get_table(table_ref)
    print(f"Table {table_ref} already exists. Skipping load.")

except NotFound:
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.CSV,
        skip_leading_rows=1,
        autodetect=True,
    )

    print("Loading airports data into BigQuery...")
    load_job = bq_client.load_table_from_uri(
        SOURCE_CSV_URI,
        table_ref,
        job_config=job_config
    )
    load_job.result()
    print("Load job finished.")

# Fetch Large Airports and Weather
fetch_sql_query = f"""
SELECT id, name, latitude_deg, longitude_deg
FROM `{PROJECT_ID}.{DATASET_ID}.airports`
WHERE type = 'large_airport' AND iso_country = 'US'
LIMIT 100
"""

airports_df = bq_client.query(fetch_sql_query).to_dataframe()
print("Fetched data from table.")

Table qwiklabs-gcp-01-5f9c2ba2a4cd.airport_weather.airports already exists. Skipping load.
Fetched data from table.


In [112]:

def run_airport_pipeline(bq_client, project_id, dataset_id, table_id, model_name, airports_df):
    """
    Module: Fetches weather, stages data, and single-pass insert with gemini alert
    """

    # Create Gemini Model if it doesn't exist
    create_model_sql = f"""
    CREATE OR REPLACE MODEL `{dataset_id}.{model_name}`
    REMOTE WITH CONNECTION DEFAULT
    OPTIONS (endpoint = 'gemini-2.0-flash-001');
    """
    print("Checking/Creating Gemini Model...")
    bq_client.query(create_model_sql).result()

    results = []
    headers = {'User-Agent': '(my-weather-app, contact@example.com)'}
    target_table = f"{project_id}.{dataset_id}.{table_id}"
    staging_table = f"{target_table}_staging"

    # Fetch Raw Alert
    print(f"Fetching weather for {len(airports_df)} airports...")
    for _, row in airports_df.iterrows():
        try:
            # Get NWS Point & Forecast
            point_url = f"https://api.weather.gov/points/{row['latitude_deg']},{row['longitude_deg']}"
            point_res = requests.get(point_url, headers=headers).json()
            forecast_url = point_res['properties']['forecast']

            forecast_data = requests.get(forecast_url, headers=headers).json()
            latest_forecast = forecast_data['properties']['periods'][0]['detailedForecast']

            results.append({
                "id": int(row['id']),
                "airport": row['name'],
                "lat": float(row['latitude_deg']),
                "lng": float(row['longitude_deg']),
                "forecast": latest_forecast
            })
        except Exception:
            continue

    if not results: return

    # Create Base table
    create_query = f"""
    CREATE TABLE IF NOT EXISTS `{project_id}.{dataset_id}.{table_id}`
    (id INT64,
    airport STRING,
    lat FLOAT64,
    lng FLOAT64,
    forecast STRING,
    gemini_weather_alert STRING,
    last_updated TIMESTAMP);
    """

    query_job = bq_client.query(create_query)
    query_job.result()

    # Load to Staging
    staging_df = pd.DataFrame(results)
    job_config = bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE")
    bq_client.load_table_from_dataframe(staging_df, staging_table, job_config=job_config).result()

    # Merge and Gemini Alert
    # Only generate alerts for the updates
    merge_sql = f"""
    MERGE `{target_table}` T
    USING (
      SELECT
        id, airport, lat, lng, forecast, ml_generate_text_llm_result as ai_alert
      FROM ML.GENERATE_TEXT(
        MODEL `{project_id}.{dataset_id}.{model_name}`,
        (
          SELECT id, airport, lat, lng, forecast,
          CONCAT(
            "Role: Aviation Weather Expert. Input: Forecast for ", airport, ": ", forecast, ". ",
            "Task: Return a brief 1-sentence alert. ",
            "Start with [RED] (Extreme/Hazardous), [AMBER] (Delays/Caution), or [GREEN] (Clear). ",
            "Use RED for severe storms/snow, AMBER for rain/high winds, GREEN for fair weather."
            "Return only the alert, nothing else."
            ) as prompt
          FROM `{staging_table}`
        ),
        STRUCT(0.2 AS temperature, TRUE AS flatten_json_output)
      )
    ) S
    ON T.id = S.id AND S.forecast IS NOT NULL
    WHEN MATCHED THEN
      UPDATE SET
        airport = S.airport,
        lat = S.lat,
        lng = S.lng,
        forecast = S.forecast,
        gemini_weather_alert = S.ai_alert,
        last_updated = CURRENT_TIMESTAMP()
    WHEN NOT MATCHED THEN
      INSERT (id, airport, lat, lng, forecast, gemini_weather_alert, last_updated)
      VALUES (S.id, S.airport, S.lat, S.lng, S.forecast, S.ai_alert, CURRENT_TIMESTAMP())
    """

    print("Running Merge and Gemini Alerts...")

    try:
      bq_client.query(merge_sql).result()
      print(f"Done. {len(results)} airports processed.")

      print(f"Pipeline execution completed at (UTC): {datetime.datetime.now()}")

    except Exception as e:
      print(e)





In [113]:
# Initialization & Execution
run_airport_pipeline(bq_client, PROJECT_ID, DATASET_ID, TABLE_ID, MODEL_NAME, airports_df)

Checking/Creating Gemini Model...
Fetching weather for 71 airports...
Running Merge and Gemini Alerts...
Done. 71 airports processed.
Pipeline execution completed at (UTC): 2026-01-16 18:32:10.738029
