In [145]:
# Cell 1 Imports + connect to mysql in docker.
import pandas as pd
from sqlalchemy import create_engine
from pymysql.constants import CLIENT

# Establishes connection
engine = create_engine(
    "mysql+pymysql://appuser:apppass@127.0.0.1:3307/ads507",
    connect_args={
        "local_infile": 1,
        "client_flag": CLIENT.LOCAL_FILES
    }
)

# Engine check
pd.read_sql("SELECT 1;", engine)


Unnamed: 0,1
0,1


In [146]:
# Cell 2 Confirm Local Database
pd.read_sql("SHOW GLOBAL VARIABLES LIKE 'local_infile';", engine)


Unnamed: 0,Variable_name,Value
0,local_infile,ON


In [147]:
# Cell 3 Reset All Pipeline Tables

# Drops all tables for a clean slate and creates a schema for data to be dropped in.
# Reason for tables:
# - stg_metro: Where the raw data initially lands.
# - metro_data: Final table where the data is stored after transformations.
# - etl_run_log: Stores logs for pipeline monitoring.
# - etl_control: Stores watermark for each batch of data loaded, essential for incremental loads.

ddl = """
DROP TABLE IF EXISTS etl_run_log;
DROP TABLE IF EXISTS etl_control;
DROP TABLE IF EXISTS stg_metro;
DROP TABLE IF EXISTS metro_data;
DROP TABLE IF EXISTS raw_data;
DROP TABLE IF EXISTS raw_metro;

CREATE TABLE etl_control (
  dataset_name VARCHAR(100) PRIMARY KEY,
  last_source_id BIGINT NOT NULL DEFAULT -1,
  last_run_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE etl_run_log (
  run_id BIGINT AUTO_INCREMENT PRIMARY KEY,
  dataset_name VARCHAR(100) NOT NULL,
  started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  ended_at TIMESTAMP NULL,
  status VARCHAR(20) NOT NULL,
  rows_staged BIGINT DEFAULT 0,
  rows_loaded BIGINT DEFAULT 0,
  last_source_before BIGINT NULL,
  last_source_after BIGINT NULL,
  error_message TEXT NULL
);

CREATE TABLE stg_metro (
  source_id BIGINT,
  ts_str VARCHAR(19),
  TP2 DOUBLE, TP3 DOUBLE, H1 DOUBLE, DV_pressure DOUBLE,
  Reservoirs DOUBLE, Oil_temperature DOUBLE, Motor_current DOUBLE,
  COMP DOUBLE, DV_eletric DOUBLE, Towers DOUBLE, MPG DOUBLE, LPS DOUBLE,
  Pressure_switch DOUBLE, Oil_level DOUBLE, Caudal_impulses DOUBLE
);

CREATE TABLE metro_data (
  source_id BIGINT PRIMARY KEY,
  event_ts DATETIME,
  TP2 DOUBLE, TP3 DOUBLE, H1 DOUBLE, DV_pressure DOUBLE,
  Reservoirs DOUBLE, Oil_temperature DOUBLE, Motor_current DOUBLE,
  COMP DOUBLE, DV_eletric DOUBLE, Towers DOUBLE, MPG DOUBLE, LPS DOUBLE,
  Pressure_switch DOUBLE, Oil_level DOUBLE, Caudal_impulses DOUBLE,
  ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""

with engine.begin() as conn:
    for stmt in ddl.split(";"):
        s = stmt.strip()
        if s:
            conn.execute(text(s))

pd.read_sql("SHOW TABLES;", engine)


Unnamed: 0,Tables_in_ads507
0,etl_control
1,etl_run_log
2,metro_data
3,stg_metro


In [148]:
# Cell 4 Initialize Watermark


DATASET = "metro"

with engine.begin() as conn:
    conn.execute(text("""
        INSERT INTO etl_control (dataset_name, last_source_id)
        VALUES (:ds, -1)
        ON DUPLICATE KEY UPDATE last_source_id = last_source_id;
    """), {"ds": DATASET})

pd.read_sql("SELECT * FROM etl_control;", engine)


Unnamed: 0,dataset_name,last_source_id,last_run_at
0,metro,-1,2026-02-07 21:22:36


In [152]:
# Cell 5 Incremental ETL Pipeline

from sqlalchemy import text

DATASET = "metro"
CSV_PATH = "/Users/calynguyen/Documents/GitHub/ADS507_Finalproject/data/MetroPT3(AirCompressor).csv"


def run_pipeline():

    run_id = None

    try:

        # Start Run
        with engine.begin() as conn:

            last_before = conn.execute(text("""
                SELECT last_source_id
                FROM etl_control
                WHERE dataset_name = :ds
            """), {"ds": DATASET}).fetchone()[0]

            res = conn.execute(text("""
                INSERT INTO etl_run_log (dataset_name, status, last_source_before)
                VALUES (:ds, 'RUNNING', :lb)
            """), {"ds": DATASET, "lb": last_before})

            run_id = res.lastrowid


        # Extract
        truncate_sql = "TRUNCATE TABLE stg_metro;"


        load_sql = f"""
        LOAD DATA LOCAL INFILE '{CSV_PATH}'
        INTO TABLE stg_metro
        FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"'
        LINES TERMINATED BY '\\n'
        IGNORE 1 LINES
        (@id, @ts, TP2, TP3, H1, DV_pressure,
         Reservoirs, Oil_temperature, Motor_current,
         COMP, DV_eletric, Towers, MPG, LPS,
         Pressure_switch, Oil_level, Caudal_impulses)
        SET
          source_id = CAST(@id AS UNSIGNED),
          ts_str = @ts;
        """


        with engine.begin() as conn:
            conn.execute(text(truncate_sql))
            conn.execute(text(load_sql))

            rows_staged = conn.execute(
                text("SELECT COUNT(*) FROM stg_metro;")
            ).fetchone()[0]


        # Transform + Load 
        with engine.begin() as conn:

            rows_new = conn.execute(text("""
                SELECT COUNT(*)
                FROM stg_metro
                WHERE source_id > :lb
            """), {"lb": last_before}).fetchone()[0]


            conn.execute(text("""
                INSERT INTO metro_data (
                  source_id, event_ts,
                  TP2, TP3, H1, DV_pressure,
                  Reservoirs, Oil_temperature, Motor_current,
                  COMP, DV_eletric, Towers, MPG, LPS,
                  Pressure_switch, Oil_level, Caudal_impulses
                )
                SELECT
                  source_id,
                  STR_TO_DATE(ts_str, '%Y-%m-%d %H:%i:%s'),
                  TP2, TP3, H1, DV_pressure,
                  Reservoirs, Oil_temperature, Motor_current,
                  COMP, DV_eletric, Towers, MPG, LPS,
                  Pressure_switch, Oil_level, Caudal_impulses
                FROM stg_metro
                WHERE source_id > :lb;
            """), {"lb": last_before})


            last_after = conn.execute(
                text("SELECT MAX(source_id) FROM metro_data;")
            ).fetchone()[0]


            conn.execute(text("""
                UPDATE etl_control
                SET last_source_id=:la,
                    last_run_at=CURRENT_TIMESTAMP
                WHERE dataset_name=:ds
            """), {"la": last_after, "ds": DATASET})


            conn.execute(text("""
                UPDATE etl_run_log
                SET status='SUCCESS',
                    ended_at=CURRENT_TIMESTAMP,
                    rows_staged=:rs,
                    rows_loaded=:rl,
                    last_source_after=:la
                WHERE run_id=:rid
            """), {
                "rs": rows_staged,
                "rl": rows_new,
                "la": last_after,
                "rid": run_id
            })


        print("Pipeline Success")
        print("Staged:", rows_staged)
        print("New Rows:", rows_new)
        print("Watermark:", last_after)


    except Exception as e:

        if run_id:

            with engine.begin() as conn:
                conn.execute(text("""
                    UPDATE etl_run_log
                    SET status='FAILED',
                        ended_at=CURRENT_TIMESTAMP,
                        error_message=:err
                    WHERE run_id=:rid
                """), {"err": repr(e), "rid": run_id})

        raise


run_pipeline()


Pipeline Success
Staged: 1516948
New Rows: 0
Watermark: 15169470


In [153]:
# Cell 6 Monitoring Dashboard


print("Watermark:")
display(pd.read_sql("SELECT * FROM etl_control;", engine))

print("Run History:")
display(pd.read_sql("""
SELECT run_id, status, started_at, ended_at,
       rows_staged, rows_loaded,
       last_source_before, last_source_after
FROM etl_run_log
ORDER BY run_id DESC
LIMIT 10;
""", engine))

print("Final Table:")
display(pd.read_sql("""
SELECT COUNT(*) AS total_rows,
       MIN(event_ts) AS min_ts,
       MAX(event_ts) AS max_ts
FROM metro_data;
""", engine))


Watermark:


Unnamed: 0,dataset_name,last_source_id,last_run_at
0,metro,15169470,2026-02-07 21:29:32


Run History:


Unnamed: 0,run_id,status,started_at,ended_at,rows_staged,rows_loaded,last_source_before,last_source_after
0,3,SUCCESS,2026-02-07 21:29:26,2026-02-07 21:29:32,1516948,0,15169470,15169470
1,2,SUCCESS,2026-02-07 21:29:00,2026-02-07 21:29:07,1516948,0,15169470,15169470
2,1,SUCCESS,2026-02-07 21:22:37,2026-02-07 21:22:49,1516948,1516948,-1,15169470


Final Table:


Unnamed: 0,total_rows,min_ts,max_ts
0,1516948,2020-02-01,2020-09-01 03:59:50
