In [2]:
# Install once (ok to re-run)
!pip install -q psycopg2-binary sqlalchemy pandas numpy faker scikit-learn plotly joblib python-dotenv

import os, math, time, datetime, pathlib
from dataclasses import dataclass
from typing import List, Tuple

import numpy as np
import pandas as pd
from faker import Faker
from joblib import dump, load

from sqlalchemy import create_engine, text
from sqlalchemy.pool import QueuePool
from psycopg2.extras import execute_values

from sklearn.ensemble import IsolationForest
from sklearn.linear_model import SGDClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline

In [3]:
# Local PostgreSQL credentials
PG_HOST = "localhost"
PG_PORT = 5432
PG_DB   = "smart_aqms"
PG_USER = "postgres"
PG_PASS = "Shanu@01"  # your real password

# Safe engine (uses connect_args so special chars are fine)
engine = create_engine(
    "postgresql+psycopg2://",
    connect_args={
        "host": PG_HOST,
        "port": PG_PORT,
        "dbname": PG_DB,
        "user": PG_USER,
        "password": PG_PASS
    },
    poolclass=QueuePool,
    pool_size=10,
    max_overflow=20,
    pool_pre_ping=True
)
print("✅ Engine ready.")


✅ Engine ready.


In [6]:
ddl_fix = """
-- remove any old table definition
DO $$
BEGIN
  IF EXISTS (SELECT 1 FROM pg_tables WHERE schemaname='scaqms' AND tablename='air_quality') THEN
    EXECUTE 'DROP TABLE scaqms.air_quality CASCADE;';
  END IF;
END $$;

CREATE SCHEMA IF NOT EXISTS scaqms;

DO $$ BEGIN
  CREATE TYPE scaqms.severity AS ENUM ('Low','Moderate','High','Critical');
EXCEPTION WHEN duplicate_object THEN NULL; END $$;

CREATE TABLE IF NOT EXISTS scaqms.stations (
  station_id SERIAL PRIMARY KEY,
  city_zone  VARCHAR(100) NOT NULL,
  latitude   NUMERIC(9,6) NOT NULL,
  longitude  NUMERIC(9,6) NOT NULL,
  installation_date DATE DEFAULT CURRENT_DATE,
  is_active BOOLEAN DEFAULT TRUE,
  CONSTRAINT uq_station UNIQUE (city_zone, latitude, longitude)
);

-- ✅ parent partitioned table without a PK
CREATE TABLE scaqms.air_quality (
  record_id BIGSERIAL,
  station_id INT NOT NULL REFERENCES scaqms.stations(station_id) ON DELETE CASCADE,
  ts TIMESTAMPTZ NOT NULL DEFAULT now(),
  co2_ppm NUMERIC(10,2) CHECK (co2_ppm >= 0),
  pm25 NUMERIC(10,2) CHECK (pm25 >= 0),
  temperature_c NUMERIC(5,2),
  humidity NUMERIC(5,2) CHECK (humidity BETWEEN 0 AND 100),
  wind_speed NUMERIC(5,2) CHECK (wind_speed >= 0),
  status VARCHAR(20) DEFAULT 'Normal',
  aqi_bucket TEXT GENERATED ALWAYS AS (
    CASE
      WHEN pm25 IS NULL THEN NULL
      WHEN pm25 <= 12 THEN 'Good'
      WHEN pm25 <= 35 THEN 'Moderate'
      WHEN pm25 <= 55 THEN 'Unhealthy'
      ELSE 'Hazardous'
    END
  ) STORED,
  -- use a composite UNIQUE constraint including partition key
  CONSTRAINT uq_record UNIQUE (record_id, ts)
) PARTITION BY RANGE (ts);

CREATE TABLE IF NOT EXISTS scaqms.alerts (
  alert_id SERIAL PRIMARY KEY,
  record_id BIGINT NOT NULL,
  alert_type VARCHAR(50),
  severity scaqms.severity,
  message TEXT,
  created_at TIMESTAMPTZ DEFAULT now(),
  CONSTRAINT uq_alert UNIQUE (record_id, alert_type)
);

CREATE TABLE IF NOT EXISTS scaqms.predictions (
  prediction_id SERIAL PRIMARY KEY,
  record_id BIGINT NOT NULL,
  aqi_pred TEXT,
  proba_good NUMERIC(6,5),
  proba_moderate NUMERIC(6,5),
  proba_unhealthy NUMERIC(6,5),
  proba_hazardous NUMERIC(6,5),
  predicted_at TIMESTAMPTZ DEFAULT now(),
  CONSTRAINT uq_pred UNIQUE (record_id)
);

-- indexes for performance
CREATE INDEX IF NOT EXISTS idx_aq_station ON scaqms.air_quality (station_id);
CREATE INDEX IF NOT EXISTS idx_aq_ts_desc ON scaqms.air_quality (ts DESC);

-- partition creation helper
CREATE OR REPLACE FUNCTION scaqms.ensure_month_partition(month_start date)
RETURNS void AS $$
DECLARE
  part_name text := 'air_quality_' || to_char(month_start, 'YYYYMM');
  start_ts timestamptz := month_start::timestamptz;
  end_ts   timestamptz := (month_start + INTERVAL '1 month')::timestamptz;
BEGIN
  IF NOT EXISTS (
      SELECT 1 FROM pg_class c
      JOIN pg_namespace n ON n.oid = c.relnamespace
      WHERE n.nspname='scaqms' AND c.relname=part_name
  ) THEN
    EXECUTE format('CREATE TABLE scaqms.%I PARTITION OF scaqms.air_quality
                    FOR VALUES FROM (%L) TO (%L);',
                    part_name, start_ts, end_ts);
    EXECUTE format('CREATE INDEX IF NOT EXISTS %I_station ON scaqms.%I (station_id);', part_name, part_name);
    EXECUTE format('CREATE INDEX IF NOT EXISTS %I_ts ON scaqms.%I (ts DESC);', part_name, part_name);
  END IF;
END $$ LANGUAGE plpgsql;

-- create current + next month partitions
SELECT scaqms.ensure_month_partition(date_trunc('month', now())::date);
SELECT scaqms.ensure_month_partition((date_trunc('month', now()) + INTERVAL '1 month')::date);
"""
with engine.begin() as conn:
    conn.execute(text(ddl_fix))
print("✅ Partitioned schema created successfully.")


✅ Partitioned schema created successfully.


In [7]:
seed_sql = """
INSERT INTO scaqms.stations (city_zone, latitude, longitude)
VALUES (:z, :lat, :lon)
ON CONFLICT (city_zone, latitude, longitude) DO NOTHING;
"""
stations = [
    ("Downtown", 40.7128, -74.0060),
    ("Uptown", 40.7870, -73.9754),
    ("Industrial", 40.6782, -73.9442),
    ("Harbor", 40.7003, -74.0122),
    ("Park", 40.7712, -73.9742),
]
with engine.begin() as conn:
    for z, lat, lon in stations:
        conn.execute(text(seed_sql), {"z": z, "lat": lat, "lon": lon})
print("✅ Stations seeded successfully.")

✅ Stations seeded successfully.


In [15]:
import random, datetime
from faker import Faker
from psycopg2.extras import execute_values

fake = Faker()

rows = []
for _ in range(100):
    station_id = random.randint(1, 5)
    ts = fake.date_time_between(start_date='-1d', end_date='now', tzinfo=datetime.timezone.utc)
    co2 = round(random.uniform(350, 900), 2)
    pm25 = round(random.uniform(10, 160), 2)
    temp = round(random.uniform(10, 40), 2)
    humidity = round(random.uniform(30, 90), 2)
    wind = round(random.uniform(0.5, 8.0), 2)
    status = "Alert" if pm25 > 100 or co2 > 800 else "Normal"
    rows.append((station_id, ts, co2, pm25, temp, humidity, wind, status))

insert_sql = """
INSERT INTO scaqms.air_quality
(station_id, ts, co2_ppm, pm25, temperature_c, humidity, wind_speed, status)
VALUES %s;
"""

raw = engine.raw_connection()
try:
    with raw.cursor() as cur:
        execute_values(cur, insert_sql, rows)
    raw.commit()
finally:
    raw.close()

print("✅ 100 rows of air quality data inserted.")

✅ 100 rows of air quality data inserted.


In [10]:
import math, time, numpy as np

def diurnal(hour_f):
    return (1 + math.sin(2 * math.pi * hour_f / 24 - math.pi / 2)) / 2

def sample_row(station_id, t):
    hour_f = t.hour + t.minute / 60
    rush = 1.0 + (0.25 if (7 <= t.hour <= 9 or 17 <= t.hour <= 19) else 0.0)
    base_pm  = 12 + 25 * diurnal(hour_f) + np.random.normal(0, 3)
    base_co2 = 400 + 120 * diurnal((hour_f + 2) % 24) + np.random.normal(0, 15)
    if np.random.rand() < 0.02: base_pm *= np.random.uniform(2.5, 6.0)
    if np.random.rand() < 0.01: base_co2 *= np.random.uniform(1.6, 2.5)
    pm25 = round(max(0, base_pm * rush), 2)
    co2  = round(max(0, base_co2 * rush), 2)
    temp = round(10 + 15 * diurnal((hour_f + 6) % 24) + np.random.normal(0, 1.2), 2)
    hum  = float(np.clip(55 + 20 * diurnal((hour_f + 10) % 24) + np.random.normal(0, 5), 15, 98))
    wind = max(0.0, round(np.random.lognormal(mean=0.8, sigma=0.5), 2))
    status = "Alert" if (pm25 > 100 or co2 > 800) else "Normal"
    return (station_id, t, co2, pm25, temp, hum, wind, status)

def run_stream(minutes=5, per_station_per_sec=1.0):
    ids = [r[0] for r in engine.execute(text("SELECT station_id FROM scaqms.stations WHERE is_active")).fetchall()]
    end_time = time.time() + minutes * 60
    while time.time() < end_time:
        t = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc)
        rows = [sample_row(sid, t) for sid in ids]
        raw = engine.raw_connection()
        try:
            with raw.cursor() as cur:
                execute_values(cur, insert_sql, rows)
            raw.commit()
        finally:
            raw.close()
        time.sleep(max(0, 1 / per_station_per_sec))

# run_stream(minutes=3, per_station_per_sec=1)


In [11]:
import pandas as pd
from sklearn.ensemble import IsolationForest

def detect_outliers(window_minutes=60, contamination=0.05):
    query = """
        SELECT record_id, station_id, ts, co2_ppm, pm25, temperature_c
        FROM scaqms.air_quality
        WHERE ts >= now() - INTERVAL :win
        ORDER BY ts ASC;
    """
    with engine.begin() as conn:
        df = pd.read_sql(text(query), conn, params={"win": f"{window_minutes} minutes"})
    if df.empty:
        print("⚠️ No recent data.")
        return 0

    iso = IsolationForest(contamination=contamination, random_state=42)
    df['is_outlier'] = iso.fit_predict(df[['co2_ppm','pm25','temperature_c']])
    out = df[df['is_outlier'] == -1]
    if out.empty:
        print("✅ No anomalies detected.")
        return 0

    rows = [(int(r.record_id), 'Anomaly', 'Critical',
             f"Abnormal reading at station {r.station_id}: PM2.5={r.pm25}, CO₂={r.co2_ppm}")
            for r in out.itertuples()]

    sql = """
        INSERT INTO scaqms.alerts (record_id, alert_type, severity, message)
        VALUES %s
        ON CONFLICT (record_id, alert_type) DO NOTHING;
    """
    raw = engine.raw_connection()
    try:
        with raw.cursor() as cur:
            execute_values(cur, sql, rows)
        raw.commit()
    finally:
        raw.close()

    print(f"🚨 Inserted {len(out)} alerts.")
    return len(out)

# detect_outliers(60, 0.05)


In [12]:
import numpy as np
from sklearn.linear_model import SGDClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from joblib import dump, load
import pathlib

MODEL_PATH = "aqi_model.joblib"
LABELS = ["Good", "Moderate", "Unhealthy", "Hazardous"]
label_to_idx = {l:i for i,l in enumerate(LABELS)}

def load_or_init_model():
    if pathlib.Path(MODEL_PATH).exists():
        return load(MODEL_PATH)
    pipe = Pipeline([
        ("scaler", StandardScaler()),
        ("clf", SGDClassifier(loss="log_loss", max_iter=1, learning_rate="optimal", tol=None, random_state=42))
    ])
    X0 = np.zeros((1,5)); y0 = np.array([0])
    pipe.named_steps["clf"].partial_fit(X0, y0, classes=np.arange(len(LABELS)))
    return pipe

def features(df): return df[['co2_ppm','pm25','temperature_c','humidity','wind_speed']].astype(float).fillna(0.0)

def train_and_predict(window_minutes=120):
    model = load_or_init_model()
    with engine.begin() as conn:
        df = pd.read_sql(text("""
            SELECT record_id, co2_ppm, pm25, temperature_c, humidity, wind_speed, aqi_bucket
            FROM scaqms.air_quality
            WHERE ts >= now() - INTERVAL :mins
        """), conn, params={"mins": f"{window_minutes} minutes"})
    if df.empty:
        print("⚠️ No data for training.")
        return
    train = df.dropna(subset=['aqi_bucket'])
    if train.empty:
        print("⚠️ No AQI labels available.")
        return
    X, y = features(train), train['aqi_bucket'].map(label_to_idx)
    model.named_steps["clf"].partial_fit(X, y, classes=np.arange(len(LABELS)))
    dump(model, MODEL_PATH)
    print(f"🧠 Model updated on {len(train)} samples.")

    Xp = features(df)
    preds = model.named_steps["clf"].predict(Xp)
    pred_labels = [LABELS[p] for p in preds]

    rows = [(int(r.record_id), pred_labels[i]) for i, r in enumerate(df.itertuples())]
    sql = """
        INSERT INTO scaqms.predictions (record_id, aqi_pred)
        VALUES %s
        ON CONFLICT (record_id) DO UPDATE
        SET aqi_pred = EXCLUDED.aqi_pred, predicted_at = now();
    """
    raw = engine.raw_connection()
    try:
        with raw.cursor() as cur:
            execute_values(cur, sql, rows)
        raw.commit()
    finally:
        raw.close()
    print(f"📈 Predictions updated for {len(rows)} rows.")


In [16]:
import plotly.express as px

with engine.begin() as conn:
    df_latest = pd.read_sql(text("""
        SELECT a.*, s.city_zone, p.aqi_pred
        FROM scaqms.air_quality a
        JOIN scaqms.stations s ON a.station_id = s.station_id
        LEFT JOIN scaqms.predictions p ON a.record_id = p.record_id
        WHERE a.ts >= now() - INTERVAL '24 hours'
        ORDER BY a.ts DESC
        LIMIT 1000;
    """), conn)

if df_latest.empty:
    print("⚠️ No data to plot — run generator first.")
else:
    aqi_colors = {"Good":"#00E400","Moderate":"#FFFF00","Unhealthy":"#FF7E00","Hazardous":"#99004C",None:"#808080"}
    fig = px.scatter(
        df_latest, x="ts", y="pm25", color="aqi_pred",
        color_discrete_map=aqi_colors,
        hover_data=["city_zone","co2_ppm","temperature_c","humidity"],
        title="🌆 PM2.5 vs Time — Predicted AQI (24 h)",
        labels={"ts":"Timestamp","pm25":"PM2.5 (µg/m³)"},
        width=1000, height=600
    )
    fig.update_traces(marker=dict(size=9, line=dict(width=1, color='DarkSlateGrey')), opacity=0.85)
    fig.update_layout(plot_bgcolor='white', paper_bgcolor='white',
                      xaxis=dict(showgrid=True, gridcolor='lightgrey'),
                      yaxis=dict(showgrid=True, gridcolor='lightgrey'),
                      legend=dict(orientation="h", y=1.05, x=1, xanchor="right"))
    fig.show()


In [14]:
import time
def monitor_loop(interval_sec=60):
    while True:
        try:
            detect_outliers(60, 0.06)
            train_and_predict(120)
        except Exception as e:
            print("❌ Monitor loop error:", e)
        time.sleep(interval_sec)

# monitor_loop(60)
