# Incremental ETL Pipeline

This notebook orchestrates the flow of data from the transactional source database (sakila) to the analytical data warehouse (sakila_star).

Key Features:
- Incremental Loading: Uses a "Watermark" strategy to fetch only records created or modified since the last successful run.
- Star Schema Transformation: Denormalizes data (e.g., joining address, city, and country into dimension tables).
- Upsert Logic: Uses ON DUPLICATE KEY UPDATE to handle modifications to existing records.
- Deletion Sync: Identifies and removes records in the warehouse that have been hard-deleted from the source. \[WIP\]

<br>

In [257]:
import pandas as pd
from sqlalchemy import create_engine, text
import time
from tqdm.notebook import tqdm

# Source: Transactional Database
src_engine = create_engine("mysql+pymysql://app:app_password@db:3306/sakila")

# Target: Data Warehouse
tgt_engine = create_engine("mysql+pymysql://app:app_password@db:3306/sakila_star")


<br>

### Check schemas (Optional)

Check if you can read from both databases.


In [None]:
pd.read_sql("SHOW FULL TABLES;", src_engine)

In [None]:
pd.read_sql("SHOW FULL TABLES;", tgt_engine)

<br>

## Core ETL Functions:

These functions are important for all of this notebook's operations.

In [None]:
def get_watermark(pipeline_name, conn):
    """Retrieve the last successful timestamp from sakila_star.etl_state"""
    
    query = text("SELECT last_success_ts FROM etl_state WHERE pipeline_name = :p_name")
    
    result = conn.execute(
        query, 
        {"p_name": pipeline_name}
    ).fetchone()

    
    # If never run, default to an old timestamp (start of epoch)
    return result[0] if result else '1970-01-01 00:00:00'

# Test
with tgt_engine.connect() as conn:
    watermark_value = get_watermark("fact_rental", conn)
    print(watermark_value)



def update_watermark(pipeline_names, new_ts, conn):
    """
    Update the watermark for a list of pipelines.
    
    Args:
        pipeline_names (list): A list of pipeline names (e.g. ['dim_customer'])
        new_ts (datetime/str): The timestamp to set (e.g. '2025-01-01 12:00:00')
        conn: The active database connection
    """
    
    query = text("""
        INSERT INTO etl_state (pipeline_name, last_success_ts) 
        VALUES (:p_name, :ts) 
        ON DUPLICATE KEY UPDATE last_success_ts = VALUES(last_success_ts)
    """)
    

    for name in pipeline_names:
        old_watermark = get_watermark(name, conn)
        
        conn.execute(query, {"p_name": name, "ts": new_ts})
        print(f"[{name}] Updated watermark from '{old_watermark}' to '{new_ts}' in 'etl_state' table")
    

def test_select_query(query, engine, watermark_value='1970-01-01 00:00:00'):    
    return (
        pd.read_sql(
            query, 
            tgt_engine, 
            params={"watermark": watermark_value}
        )
    )

def _initialise_etl_state(etl_state_list=None, ts='1970-01-01 00:00:00'):
    print("\n\n >> INITIALISING ETL STATE ...\n")

    if not etl_state_list:
        etl_state_list = ["fact_rental", "dim_film", "dim_customer", "dim_staff", "dim_actor", "bridge_actor", "dim_store"]
    
    with tgt_engine.connect() as conn:
        update_watermark(etl_state_list, ts, conn)
        conn.commit() 

def _clear_table_data(table_names, engine, force=False):
    """
    Deletes all rows from a specified tables.
    
    Args:
        table_name (list): The tables to clear (e.g. ['dim_store'])
        engine: The SQLAlchemy engine to use
        force (bool): If True, disables Foreign Key checks to force deletion.
    """
    print(f"\n\n >> CLEARING DATA FROM TABLES {table_names} ...\n")
    for table_name in table_names:
        with engine.connect() as conn:
            try:
                if force:
                    conn.execute(text("SET FOREIGN_KEY_CHECKS = 0"))
                
                # Delete all rows
                result = conn.execute(text(f"DELETE FROM {table_name}"))
                
                if force:
                    conn.execute(text("SET FOREIGN_KEY_CHECKS = 1"))
                
                conn.commit()
                print(f"Success: Deleted {result.rowcount} rows from {table_name}.")
                
            except Exception as e:
                print(f"Error clearing {table_name}: {e}")




def run_incremental_load(pipeline_name, extract_sql, load_sql, src_engine, tgt_engine):
    """
    Generic function to run ETL for a single table.
    """
    
    # 1. Get Watermark (Target)
    with tgt_engine.connect() as tgt_conn:
        watermark = get_watermark(pipeline_name, tgt_conn)
        
    print(f"[{pipeline_name}] Checking for updates since {watermark}...")

    # 2. Extract Data (Source)
    # We use the watermark to only fetch new data
    try:
        df = pd.read_sql(extract_sql, src_engine, params={"watermark": watermark})
    except Exception as e:
        print(f"[{pipeline_name}] Extraction Error: {e}")
        return

    if df.empty:
        print(f"[{pipeline_name}] No new records found.")
        return

    print(f"[{pipeline_name}] Found {len(df)} rows. Loading...")

    # 3. Load Data (Target)
    # Iterate and execute the specific Upsert SQL for this table
    with tgt_engine.begin() as tgt_conn: # .begin() auto-commits on success
        for index, row in df.iterrows():
        
            # Convert row to dict for parameter binding
            tgt_conn.execute(load_sql, row.to_dict())
        
        # 4. Update Watermark (Target)
        # Calculate new max timestamp from the dataframe
        new_ts = df['src_last_update'].max()
        update_watermark([pipeline_name], new_ts, tgt_conn)
        
    print(f"[{pipeline_name}] Success! Watermark updated to {new_ts}")

def upsert_data(upsert_list, src_engine, tgt_engine):
    # print(upsert_list)

    print("\n\n >> UPDATING / INSERTING DATA ...\n")

    for job in upsert_list:
        run_incremental_load(
            job["table_name"], 
            job["extract_sql"], 
            job["load_sql"], 
            src_engine, 
            tgt_engine
        )


<br>

## Extract and Load SQL queries

In [None]:
# ========= DIM_STORE ========= ## ===== GET SRC DATA STATEMENTS ===== #


# Get 'dim_store' data from src
dim_store_extract_sql = text("""
    SELECT 
        s.store_id, 
        a.address, 
        a.address2, 
        a.district, 
        c.city, 
        co.country, 
        a.postal_code, 
        a.phone, 
        s.last_update as src_last_update
    FROM sakila.store s
    JOIN sakila.address a ON s.address_id = a.address_id
    JOIN sakila.city c ON a.city_id = c.city_id
    JOIN sakila.country co ON c.country_id = co.country_id
    WHERE s.last_update > :watermark
""")

dim_customer_extract_sql = text("""
    SELECT 
        c.customer_id, 
        c.store_id, 
        c.first_name, 
        c.last_name, 
        c.email, 
        c.active as activebool,  -- Map source 'active' to target 'activebool'
        c.active,                -- Also keep 'active' for the second column
        c.create_date, 
        a.address, 
        a.address2,              
        a.district,              
        ci.city, 
        co.country, 
        a.postal_code,          
        a.phone,                 
        c.last_update as src_last_update
    FROM sakila.customer c
    JOIN sakila.address a ON c.address_id = a.address_id
    JOIN sakila.city ci ON a.city_id = ci.city_id
    JOIN sakila.country co ON ci.country_id = co.country_id
    WHERE c.last_update > :watermark;
""")

# test_select_query(dim_customer_extract_sql, src_engine)

In [None]:
# ===== INSERT DATA STATEMENTS ===== #


# Define the Load SQL (Upsert)
dim_store_load_sql = text("""
    INSERT INTO dim_store (
        store_id, 
        address, 
        address2, 
        district, 
        city, 
        country, 
        postal_code, 
        phone, 
        src_last_update
    ) VALUES (
        :store_id, 
        :address, 
        :address2, 
        :district, 
        :city, 
        :country, 
        :postal_code, 
        :phone, 
        :src_last_update
    )
    ON DUPLICATE KEY UPDATE
        address         = VALUES(address),
        address2        = VALUES(address2),      -- Added
        district        = VALUES(district),      -- Added
        city            = VALUES(city),
        country         = VALUES(country),       -- Added
        postal_code     = VALUES(postal_code),   -- Added
        phone           = VALUES(phone),
        src_last_update = VALUES(src_last_update);
""")

dim_customer_load_sql = text("""
    INSERT INTO dim_customer (
        customer_id, 
        store_id, 
        first_name, 
        last_name, 
        email, 
        activebool, 
        active, 
        create_date, 
        address, 
        address2, 
        district, 
        city, 
        country, 
        postal_code, 
        phone, 
        src_last_update
    ) VALUES (
        :customer_id, 
        :store_id, 
        :first_name, 
        :last_name, 
        :email, 
        :activebool, 
        :active, 
        :create_date, 
        :address, 
        :address2, 
        :district, 
        :city, 
        :country, 
        :postal_code, 
        :phone, 
        :src_last_update
    )
    ON DUPLICATE KEY UPDATE
        store_id        = VALUES(store_id),
        first_name      = VALUES(first_name),
        last_name       = VALUES(last_name),
        email           = VALUES(email),
        activebool      = VALUES(activebool),
        active          = VALUES(active),
        address         = VALUES(address),
        address2        = VALUES(address2),
        district        = VALUES(district),
        city            = VALUES(city),
        country         = VALUES(country),
        postal_code     = VALUES(postal_code),
        phone           = VALUES(phone),
        src_last_update = VALUES(src_last_update);
""")

# test_select_query("DESCRIBE dim_customer", tgt_engine)

In [None]:
# ADD ABOVE DATA TO UPSERT LIST

upsert_list = [
    {
        "table_name": "dim_store",
        "extract_sql": dim_store_extract_sql,
        "load_sql": dim_store_load_sql,
    },
    {
        "table_name": "dim_customer",
        "extract_sql": dim_customer_extract_sql,
        "load_sql": dim_customer_load_sql,
    },
]

<br>

## Run Incremental Load for all data

In [None]:
# ===== PERFORM INCREMENTAL LOAD ===== #

# _initialise_etl_state()
# _clear_table_data(["dim_customer", "dim_store"], tgt_engine, force=False)


        
upsert_data(upsert_list, src_engine, tgt_engine)

