<a href="https://colab.research.google.com/github/raleight1/MGMT467_Team2/blob/main/Final_Project/BatchIngest_Streaming_Model_DIVE/MGMT_467_Final_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Google Cloud Authentication and Setup

!pip install --quiet google-cloud-storage google-cloud-bigquery

from google.colab import auth
auth.authenticate_user()
print("✅ Authenticated to Google Cloud")

✅ Authenticated to Google Cloud


In [None]:
# Mount Google Drive

from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# Initialize Google Cloud Storage and Create/Get Bucket

from google.cloud import storage

PROJECT_ID = "mgmt467-project"
BUCKET_NAME = "mgmt467-gym-raw"

client = storage.Client(project=PROJECT_ID)

try:
    bucket = client.get_bucket(BUCKET_NAME)
    print("Bucket already exists:", BUCKET_NAME)
except:
    bucket = client.create_bucket(BUCKET_NAME, location="US")
    print("Created bucket:", BUCKET_NAME)

Bucket already exists: mgmt467-gym-raw


In [None]:
# Export BigQuery Table to Cloud Storage

from google.cloud import bigquery

bq = bigquery.Client(project=PROJECT_ID)

DATASET = "GymDB"
SOURCE_TABLE = "GymData"

DEST_URI = f"gs://{BUCKET_NAME}/batch/GymData_export.csv"

extract_job = bq.extract_table(
    f"{PROJECT_ID}.{DATASET}.{SOURCE_TABLE}",
    DEST_URI,
    location="US"
)

extract_job.result()
print("✅ Exported GymData to:", DEST_URI)

✅ Exported GymData to: gs://mgmt467-gym-raw/batch/GymData_export.csv


In [None]:
# Load Raw Data from Cloud Storage to BigQuery

RAW_TABLE = "GymData_Raw"
uri = f"gs://{BUCKET_NAME}/batch/GymData_export.csv"

job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.CSV,
    skip_leading_rows=0,  # BigQuery export has no header row
    autodetect=True
)

load_job = bq.load_table_from_uri(
    uri,
    f"{PROJECT_ID}.{DATASET}.{RAW_TABLE}",
    job_config=job_config
)

load_job.result()
print("✅ Loaded raw table:", f"{PROJECT_ID}.{DATASET}.{RAW_TABLE}")

✅ Loaded raw table: mgmt467-project.GymDB.GymData_Raw


In [None]:
# Create or Replace Raw Data Table in BigQuery

query = f"""
CREATE OR REPLACE TABLE `mgmt467-project.GymDB.GymData_Raw` AS
SELECT *
FROM `mgmt467-project.GymDB.GymData`;
"""

bq.query(query).result()

<google.cloud.bigquery.table._EmptyRowIterator at 0x7ac64dac1a60>

In [None]:
CURATED_TABLE = "GymData_Curated"

query = f"""
CREATE OR REPLACE TABLE `mgmt467-project.GymDB.GymData_Curated`
PARTITION BY Last_Visit_Date AS
SELECT
  *,
  DATE_DIFF(Last_Visit_Date, Join_Date, DAY) AS Tenure_Days,
  IF(Churn = TRUE, 1, 0) AS Churn_Label
FROM `mgmt467-project.GymDB.GymData_Raw`;
"""

bq.query(query).result()
print("✅ Created curated table:", f"{PROJECT_ID}.{DATASET}.{CURATED_TABLE}")

✅ Created curated table: mgmt467-project.GymDB.GymData_Curated


In [None]:
dq_query = """
SELECT
  COUNT(*) AS total_rows,
  COUNTIF(Member_ID IS NULL) AS missing_member_id,
  COUNTIF(Last_Visit_Date IS NULL) AS missing_last_visit_date,
  COUNTIF(Join_Date IS NULL) AS missing_join_date,
  COUNTIF(Age IS NULL) AS missing_age,
  MIN(Age) AS min_age,
  MAX(Age) AS max_age
FROM `mgmt467-project.GymDB.GymData_Curated`;
"""

dq_result = bq.query(dq_query).result().to_dataframe()

print("===== DATA QUALITY CHECK RESULTS =====")
print(dq_result)

===== DATA QUALITY CHECK RESULTS =====
   total_rows  missing_member_id  missing_last_visit_date  missing_join_date  \
0         150                  0                        0                  9   

   missing_age  min_age  max_age  
0           13     18.0     54.0  


In [None]:
transform_query = """
SELECT
  Member_ID,
  Join_Date,
  Last_Visit_Date,
  DATE_DIFF(Last_Visit_Date, Join_Date, DAY) AS Tenure_Days,
  Churn,
  IF(Churn = TRUE, 1, 0) AS Churn_Label
FROM `mgmt467-project.GymDB.GymData_Curated`
LIMIT 10;
"""

transform_result = bq.query(transform_query).result().to_dataframe()

print("===== TRANSFORMATION LOGIC PREVIEW =====")
print(transform_result)

===== TRANSFORMATION LOGIC PREVIEW =====
   Member_ID   Join_Date Last_Visit_Date  Tenure_Days  Churn  Churn_Label
0         41  2023-08-28      2024-01-02          127  False            0
1        128  2023-05-27      2023-08-16           81   True            1
2        120  2023-07-28      2023-08-16           19  False            0
3        124         NaT      2022-09-23         <NA>  False            0
4         99  2022-05-10      2022-12-07          211  False            0
5         14  2024-08-23      2025-06-29          310  False            0
6         58         NaT      2025-02-22         <NA>   True            1
7         31  2024-07-17      2025-02-22          220   True            1
8         44  2024-03-16      2025-03-10          359  False            0
9         92  2024-01-16      2024-04-13           88  False            0


In [None]:
from google.colab import auth
auth.authenticate_user()

from google.cloud import bigquery

PROJECT_ID = "mgmt467-project"

# IMPORTANT: This MUST be named bq — and DO NOT overwrite it later
bq = bigquery.Client(project=PROJECT_ID)

print("BigQuery client reset. Connected to:", PROJECT_ID)

BigQuery client reset. Connected to: mgmt467-project


In [None]:
bq.query("DROP TABLE IF EXISTS `mgmt467-project.GymDB.Weather_Raw_Streaming`;").result()
bq.query("DROP TABLE IF EXISTS `mgmt467-project.GymDB.Weather_Monthly_Avg`;").result()

# Create new raw streaming table
bq.query("""
CREATE TABLE `mgmt467-project.GymDB.Weather_Raw_Streaming`
(
  event_timestamp TIMESTAMP,
  temperature_c FLOAT64,
  wind_speed_kph FLOAT64,
  relative_humidity FLOAT64,
  is_day BOOL
)
PARTITION BY DATE(event_timestamp)
""").result()

# Create monthly aggregated table
bq.query("""
CREATE TABLE `mgmt467-project.GymDB.Weather_Monthly_Avg`
(
  year INT64,
  month INT64,
  avg_temperature_c FLOAT64,
  avg_wind_speed_kph FLOAT64,
  avg_relative_humidity FLOAT64
)
""").result()

print("Tables recreated successfully.")

Tables recreated successfully.


In [None]:
from google.colab import auth
auth.authenticate_user()

from google.cloud import bigquery

PROJECT_ID = "mgmt467-project"   # <-- USE NAME, NOT NUMBER
DATASET_ID = "GymDB"

bq = bigquery.Client(project=PROJECT_ID)

print("Authenticated project:", bq.project)

Authenticated project: mgmt467-project


In [None]:
from google.cloud import bigquery

PROJECT_ID = "mgmt467-project"
DATASET_ID = "GymDB"
TABLE_ID = "Weather_Raw_Streaming"

# -----------------------
# 1) Ensure dataset exists
# -----------------------
dataset_ref = bigquery.Dataset(f"{PROJECT_ID}.{DATASET_ID}")
dataset = bq.create_dataset(dataset_ref, exists_ok=True)
print("Dataset verified:", DATASET_ID)

# -----------------------
# 2) Delete table if exists
# -----------------------
try:
    bq.delete_table(f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}")
    print("Old table deleted.")
except:
    print("No existing table, skipping delete.")

# -----------------------
# 3) Recreate table with schema
# -----------------------
schema = [
    bigquery.SchemaField("event_timestamp", "TIMESTAMP"),
    bigquery.SchemaField("temperature_c", "FLOAT"),
    bigquery.SchemaField("wind_speed_kph", "FLOAT"),
    bigquery.SchemaField("relative_humidity", "FLOAT"),
    bigquery.SchemaField("is_day", "BOOL"),
]

table_ref = bigquery.Table(
    f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}",
    schema=schema
)

table = bq.create_table(table_ref)
print("Table created:", table.full_table_id)

Dataset verified: GymDB
Old table deleted.
Table created: mgmt467-project:GymDB.Weather_Raw_Streaming


In [None]:
import requests
import pandas as pd
from datetime import datetime, timedelta

LAT = 40.4237
LON = -86.9212

end_date = datetime.utcnow().date()
start_date = end_date - timedelta(days=365)

url = "https://archive-api.open-meteo.com/v1/archive"

params = {
    "latitude": LAT,
    "longitude": LON,
    "start_date": start_date.isoformat(),
    "end_date": end_date.isoformat(),
    "hourly": "temperature_2m,relativehumidity_2m,windspeed_10m",
    "timezone": "UTC"
}

data = requests.get(url, params=params).json()

df_hist = pd.DataFrame({
    "event_timestamp": data["hourly"]["time"],
    "temperature_c": data["hourly"]["temperature_2m"],
    "wind_speed_kph": data["hourly"]["windspeed_10m"],
    "relative_humidity": data["hourly"]["relativehumidity_2m"],
})

df_hist["is_day"] = pd.to_datetime(df_hist["event_timestamp"]).dt.hour.between(6, 18)

df_hist.head(), df_hist.shape


  end_date = datetime.utcnow().date()


(    event_timestamp  temperature_c  wind_speed_kph  relative_humidity  is_day
 0  2024-12-09T00:00            8.5            17.1                 83   False
 1  2024-12-09T01:00            9.2            16.5                 75   False
 2  2024-12-09T02:00            9.1            19.6                 79   False
 3  2024-12-09T03:00            9.3            18.8                 85   False
 4  2024-12-09T04:00            9.5            20.2                 89   False,
 (8784, 5))

In [None]:
from google.colab import auth
auth.authenticate_user()

from google.cloud import bigquery

PROJECT_ID = "mgmt467-project"
DATASET = "GymDB"
RAW_TABLE = "Weather_Raw_Streaming"

bq = bigquery.Client(project=PROJECT_ID)

# Drop old table
bq.query(f"DROP TABLE IF EXISTS `{PROJECT_ID}.{DATASET}.{RAW_TABLE}`;").result()

# Recreate raw table schema
bq.query(f"""
CREATE TABLE `{PROJECT_ID}.{DATASET}.{RAW_TABLE}`
(
  event_timestamp TIMESTAMP,
  temperature_c FLOAT64,
  wind_speed_kph FLOAT64,
  relative_humidity FLOAT64,
  is_day BOOL
)
PARTITION BY DATE(event_timestamp)
""").result()

print("Weather_Raw_Streaming table recreated.")



Weather_Raw_Streaming table recreated.


In [None]:
from google.cloud import bigquery

PROJECT_ID = "mgmt467-project"
DATASET_ID = "GymDB"
TABLE_NAME = "Weather_Raw_Streaming"

# Build SAFE reference (this is the fix)
dataset_ref = bigquery.DatasetReference(PROJECT_ID, DATASET_ID)
table_ref = bigquery.TableReference(dataset_ref, TABLE_NAME)

rows = df_hist.to_dict(orient="records")

batch_size = 500
errors_total = []

for i in range(0, len(rows), batch_size):
    batch = rows[i:i+batch_size]

    errors = bq.insert_rows_json(table_ref, batch)  # << fixed line

    if errors:
        print("Batch insertion errors example:", errors[:3])
    errors_total.extend(errors)

print("Done! Total insertion errors:", len(errors_total))


Done! Total insertion errors: 0


In [None]:
MONTHLY_TABLE = "Weather_Monthly_Avg"

# Drop old monthly table
bq.query(f"DROP TABLE IF EXISTS `{PROJECT_ID}.{DATASET}.{MONTHLY_TABLE}`;").result()

# Create aggregated table
bq.query(f"""
CREATE TABLE `{PROJECT_ID}.{DATASET}.{MONTHLY_TABLE}` AS
SELECT
  EXTRACT(YEAR FROM event_timestamp) AS year,
  EXTRACT(MONTH FROM event_timestamp) AS month,
  AVG(temperature_c) AS avg_temperature_c,
  AVG(wind_speed_kph) AS avg_wind_speed_kph,
  AVG(relative_humidity) AS avg_relative_humidity
FROM `{PROJECT_ID}.{DATASET}.{RAW_TABLE}`
GROUP BY year, month
ORDER BY year, month
""").result()

print("Monthly averages computed.")


Monthly averages computed.


In [None]:
df_monthly = bq.query(f"""
SELECT *
FROM `{PROJECT_ID}.{DATASET}.Weather_Monthly_Avg`
ORDER BY year, month
""").to_dataframe()

df_monthly

Unnamed: 0,year,month,avg_temperature_c,avg_wind_speed_kph,avg_relative_humidity
0,2024,12,2.690399,13.582971,80.458333
1,2025,1,-5.408602,14.267742,71.705645
2,2025,2,-1.235863,13.448958,69.824405
3,2025,3,8.181989,16.437769,61.850806
4,2025,4,11.602778,14.082778,66.873611
5,2025,5,16.089113,12.454301,68.994624
6,2025,6,23.355417,11.228889,71.3625
7,2025,7,25.546774,8.716263,75.069892
8,2025,8,22.34879,9.046237,71.958333
9,2025,9,20.681806,7.891528,61.745833


In [None]:
from google.cloud import bigquery
from datetime import datetime, timezone
import requests, time

PROJECT_ID = "mgmt467-project"
DATASET_ID = "GymDB"
TABLE_NAME = "Weather_Raw_Streaming"

bq = bigquery.Client(project=PROJECT_ID)

dataset_ref = bigquery.DatasetReference(PROJECT_ID, DATASET_ID)
table_ref = bigquery.TableReference(dataset_ref, TABLE_NAME)

In [None]:
def fetch_weather_live():
    url = "https://api.open-meteo.com/v1/forecast"
    params = {
        "latitude": 40.4237,
        "longitude": -86.9212,
        "current_weather": True
    }
    d = requests.get(url, params=params).json()
    cw = d["current_weather"]

    return {
        "event_timestamp": datetime.now(timezone.utc).isoformat(),
        "temperature_c": cw["temperature"],
        "wind_speed_kph": cw["windspeed"],
        "relative_humidity": None,  # live endpoint does not include humidity in all cases
        "is_day": bool(cw["is_day"])
    }

In [None]:
print("Starting live streaming... Press STOP to end.")

for i in range(20):  # change to while True for nonstop streaming
    row = fetch_weather_live()
    errors = bq.insert_rows_json(table_ref, [row])

    print(f"[{i}] Inserted:", row, "| Errors:", errors)

    time.sleep(60)  # 1 reading per minute

Starting live streaming... Press STOP to end.
[0] Inserted: {'event_timestamp': '2025-12-09T20:39:15.419571+00:00', 'temperature_c': 4.2, 'wind_speed_kph': 24.7, 'relative_humidity': None, 'is_day': True} | Errors: []


KeyboardInterrupt: 