<a href="https://colab.research.google.com/github/mnpoliakov/MGMT467_Team7/blob/main/Final_Project/pipeline/Streaming/New_weather_pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# üõ†Ô∏è **Step 1: Setup & Authentication**
In this step, we install the necessary libraries and authenticate with Google Cloud.

> **Note:** This notebook acts as the **Producer** (Cloud Function). The ingestion into BigQuery is now handled by the **Pub/Sub Subscription** we created in the Console.

In [None]:
# Install libraries
!pip install google-cloud-pubsub google-cloud-bigquery requests -q

import requests
import json
import logging
import time
from datetime import datetime, timezone
from google.cloud import pubsub_v1, bigquery
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from google.colab import auth

# Force Logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s', datefmt='%H:%M:%S', force=True)
logger = logging.getLogger(__name__)

# Authenticate
print("üîê Authenticating...")
try:
    auth.authenticate_user()
    print("‚úÖ Authenticated successfully!")
except Exception as e:
    print(f"‚ùå Authentication failed: {e}")

[?25l   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m0.0/321.3 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m[91m‚ï∏[0m [32m317.4/321.3 kB[0m [31m17.2 MB/s[0m eta [36m0:00:01[0m[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m321.3/321.3 kB[0m [31m7.7 MB/s[0m eta [36m0:00:00[0m
[?25hüîê Authenticating...
‚úÖ Authenticated successfully!


# ‚öôÔ∏è **Step 2: Configuration**
Define your project constants.
* **Project ID:** `finalproject-480220`
* **Topic ID:** `weatherstack-data` (Must match the topic connected to your BigQuery subscription).

In [None]:
# --- CONFIGURATION ---
API_KEY = "3985f95c6616907409036d8b8dbef10f".strip()
PROJECT_ID = 'finalproject-480220'
TOPIC_ID = 'weatherstack-data'
# Note: We do NOT need SUB_ID here because GCP handles the subscription now.

DATASET_ID = 'weather_data_dataset'
TABLE_ID = 'new_weather_data'

LOCATIONS = [
    "New York", "London", "Tokyo", "Paris", "Berlin", "Sydney", "Mumbai",
    "Chicago", "Toronto", "Singapore", "Dubai", "Los Angeles"
]

# üèóÔ∏è **Step 3: Infrastructure Check**
This ensures the Topic and Table exist.

> **Safe Mode:** This script uses `exists_ok=True`. It will **NOT** delete your existing table or data. It simply ensures the resources are there so the pipeline doesn't crash.

In [None]:
# --- STEP 3: INFRASTRUCTURE (PRODUCER ONLY) ---
def setup_infrastructure():
    publisher = pubsub_v1.PublisherClient()
    bq_client = bigquery.Client(project=PROJECT_ID)
    topic_path = publisher.topic_path(PROJECT_ID, TOPIC_ID)

    # 1. Create Topic (if missing)
    try:
        publisher.create_topic(request={"name": topic_path})
        logger.info(f"‚úÖ Topic ready: {TOPIC_ID}")
    except Exception:
        logger.info(f"‚ÑπÔ∏è Topic already exists (Good).")

    # 2. BigQuery Dataset & Table Check
    # We just want to make sure the destination exists so we don't send data to nowhere
    table_ref = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}"
    try:
        bq_client.get_table(table_ref)
        logger.info(f"‚úÖ Target table found: {TABLE_ID}")
    except Exception:
        logger.warning(f"‚ö†Ô∏è Warning: Table '{TABLE_ID}' not found. Verify you created it!")

setup_infrastructure()

15:33:33 - ‚ÑπÔ∏è Topic already exists (Good).
15:33:33 - ‚úÖ Target table found: new_weather_data


# ‚òÅÔ∏è **Step 4: The Producer (Cloud Function Logic)**
This class mimics the behavior of a **Cloud Function (2nd Gen)**.
1. It fetches weather data from the API.
2. It normalizes the JSON.
3. It publishes the payload to Pub/Sub.

> **Note:** We removed the ingestion code because Google Cloud is now handling that for us!

In [None]:
class WeatherProducer:
    def __init__(self):
        self.publisher = pubsub_v1.PublisherClient()
        self.topic_path = self.publisher.topic_path(PROJECT_ID, TOPIC_ID)
        self.session = requests.Session()
        retries = Retry(total=3, backoff_factor=1, status_forcelist=[500, 502, 503, 504])
        self.session.mount('http://', HTTPAdapter(max_retries=retries))

    def fetch_weather(self, city_name):
        url = "http://api.weatherstack.com/current"
        params = {'access_key': API_KEY, 'query': city_name}
        try:
            r = self.session.get(url, params=params, timeout=10)
            r.raise_for_status()
            data = r.json()
            if data.get('success') is False:
                logger.warning(f"‚ö†Ô∏è API Error for {city_name}: {data.get('error', {}).get('info')}")
                return None
            return self._transform(data)
        except Exception as e:
            logger.error(f"‚ùå Network Error for {city_name}: {e}")
            return None

    def _transform(self, data):
        req = data.get('request', {})
        loc = data.get('location', {})
        cur = data.get('current', {})
        def list_to_str(val): return ",".join(val) if isinstance(val, list) else str(val)

        return {
            "request_type": "Weatherstack",
            "query": req.get('query'),
            "location_name": loc.get('name'),
            "country": loc.get('country'),
            "region": loc.get('region'),
            "observation_time": cur.get('observation_time'),
            "temperature": float(cur.get('temperature', 0)),
            "weather_code": int(cur.get('weather_code', 0)),
            "weather_icons": list_to_str(cur.get('weather_icons', [])),
            "weather_descriptions": list_to_str(cur.get('weather_descriptions', [])),
            "wind_speed": float(cur.get('wind_speed', 0)),
            "wind_degree": float(cur.get('wind_degree', 0)),
            "wind_dir": cur.get('wind_dir'),
            "pressure": float(cur.get('pressure', 0)),
            "precip": float(cur.get('precip', 0)),
            "humidity": float(cur.get('humidity', 0)),
            "cloudcover": float(cur.get('cloudcover', 0)),
            "feelslike": float(cur.get('feelslike', 0)),
            "uv_index": float(cur.get('uv_index', 0)),
            "visibility": float(cur.get('visibility', 0)),
            "is_day": cur.get('is_day'),
            "lat": loc.get('lat'),
            "lon": loc.get('lon'),
            "timezone_id": loc.get('timezone_id'),
            "localtime": loc.get('localtime'),
            "utc_offset": loc.get('utc_offset'),
            "timestamp": datetime.now(timezone.utc).isoformat()
        }

    def publish(self, data):
        if not data: return
        try:
            future = self.publisher.publish(self.topic_path, json.dumps(data).encode('utf-8'))
            future.result()
            logger.info(f"üì§ PUBLISHED to Pub/Sub: {data['location_name']}")
        except Exception as e:
            logger.error(f"‚ùå Publish failed: {e}")

    def run(self):
        logger.info(f"üöÄ Producer starting for {len(LOCATIONS)} cities...")
        for loc in LOCATIONS:
            res = self.fetch_weather(loc)
            self.publish(res)
            time.sleep(1.1) # Sequential to avoid Rate Limits

# üü¢ **Step 5: Run the Producer**
Clicking run below will:
1. Fetch data for all cities.
2. Send it to the Pub/Sub Topic.
3. **Google Cloud** will automatically pick it up and stream it into your BigQuery table.

In [None]:
if __name__ == "__main__":
    producer = WeatherProducer()
    producer.run()
    print("\n‚è≥ Data sent! Waiting 15 seconds for Google Cloud Pipeline to process...")
    time.sleep(15)

15:33:33 - üöÄ Producer starting for 12 cities...
15:33:34 - üì§ PUBLISHED to Pub/Sub: New York
15:33:36 - üì§ PUBLISHED to Pub/Sub: London
15:33:37 - üì§ PUBLISHED to Pub/Sub: Tokyo
15:33:39 - üì§ PUBLISHED to Pub/Sub: Paris
15:33:40 - üì§ PUBLISHED to Pub/Sub: Berlin
15:33:42 - üì§ PUBLISHED to Pub/Sub: Sydney
15:33:43 - üì§ PUBLISHED to Pub/Sub: Mumbai
15:33:44 - üì§ PUBLISHED to Pub/Sub: Chicago
15:33:46 - üì§ PUBLISHED to Pub/Sub: Toronto
15:33:47 - üì§ PUBLISHED to Pub/Sub: Singapore
15:33:49 - üì§ PUBLISHED to Pub/Sub: Dubai
15:33:50 - üì§ PUBLISHED to Pub/Sub: Los Angeles



‚è≥ Data sent! Waiting 15 seconds for Google Cloud Pipeline to process...


# üîé **Step 6: Validate Real-Time Data**
We query BigQuery to confirm the data arrived.
* **Timestamp Check:** Look at the "Lag" column to see how fast the pipeline is working.

In [None]:
def validate_data():
    bq_client = bigquery.Client(project=PROJECT_ID)

    query = f"""
    SELECT location_name, temperature, timestamp
    FROM `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}`
    ORDER BY timestamp DESC
    LIMIT 10
    """

    print(f"üîé Checking {TABLE_ID}...\n")
    try:
        results = bq_client.query(query).result()
        print(f"{'TIMESTAMP (UTC)':<25} | {'CITY':<15} | {'TEMP':<10} | {'LAG'}")
        print("-" * 75)

        current_time = datetime.now(timezone.utc)
        for row in results:
            lag = f"{(current_time - row.timestamp).seconds}s ago"
            print(f"{str(row.timestamp):<25} | {row.location_name:<15} | {row.temperature}¬∞C      | {lag}")

    except Exception as e:
        print(f"‚ùå Validation failed: {e}")

validate_data()

üîé Checking new_weather_data...

TIMESTAMP (UTC)           | CITY            | TEMP       | LAG
---------------------------------------------------------------------------
2025-12-12 15:33:50.649138+00:00 | Los Angeles     | 12.0¬∞C      | 17s ago
2025-12-12 15:33:49.118919+00:00 | Dubai           | 27.0¬∞C      | 18s ago
2025-12-12 15:33:47.599879+00:00 | Singapore       | 26.0¬∞C      | 20s ago
2025-12-12 15:33:46.072124+00:00 | Toronto         | -2.0¬∞C      | 21s ago
2025-12-12 15:33:44.780895+00:00 | Chicago         | -4.0¬∞C      | 23s ago
2025-12-12 15:33:43.491412+00:00 | Mumbai          | 27.0¬∞C      | 24s ago
2025-12-12 15:33:42.187951+00:00 | Sydney          | 19.0¬∞C      | 25s ago
2025-12-12 15:33:40.647032+00:00 | Berlin          | 5.0¬∞C      | 27s ago
2025-12-12 15:33:39.122089+00:00 | Paris           | 12.0¬∞C      | 28s ago
2025-12-12 15:33:37.393182+00:00 | Tokyo           | 5.0¬∞C      | 30s ago
