In [23]:
import pandas as pd
from sqlalchemy import create_engine, text
from datetime import datetime, timedelta

In [2]:
# Database connection configuration
user = 'root'
password = 'raghad2531'
host = 'localhost'
port = 3306
source_db = 'sales_transactions_dw'
target_db = 'rental_star'

# Create SQLAlchemy engines for source and target databases
source_engine = create_engine(f'mysql+mysqlconnector://{user}:{password}@{host}:{port}/{source_db}')
target_engine = create_engine(f'mysql+mysqlconnector://{user}:{password}@{host}:{port}/{target_db}')


In [None]:
def etl_table(query, target_table, key_columns=None, foreign_key_checks=None, drop_na=True, if_exists='append'):
    """
    Extract data from source DB, transform by removing duplicates, optionally dropping NA in key columns,
    filtering invalid foreign keys, and avoiding inserting duplicates.
    
    Parameters:
    - query (str): SQL query to extract data from source
    - target_table (str): target table name in destination DB
    - key_columns (list[str]): columns to check for dropping NA and deduplication
    - foreign_key_checks (dict): { 'fk_column': 'dimension_table' } for filtering invalid foreign keys
    - drop_na (bool): whether to drop rows with NA in key_columns
    - if_exists (str): pandas to_sql if_exists option (default 'append')
    """
    print(f"Extracting data for '{target_table}'...")
    df = pd.read_sql(query, source_engine)

    print("Removing duplicates...")
    df = df.drop_duplicates()

    if drop_na and key_columns:
        print(f" Dropping rows with NA in key columns: {key_columns} ...")
        df = df.dropna(subset=key_columns)

    # Filter rows with invalid foreign keys if specified
    if foreign_key_checks:
        for fk_col, dim_table in foreign_key_checks.items():
            print(f" Checking foreign key '{fk_col}' against dimension '{dim_table}'...")
            valid_keys = pd.read_sql(f"SELECT DISTINCT {fk_col} FROM {dim_table}", target_engine)
            df = df[df[fk_col].isin(valid_keys[fk_col])]
    
    # Avoid inserting duplicates by excluding keys already in target table
    if key_columns:
        existing_keys_query = f"SELECT DISTINCT {', '.join(key_columns)} FROM {target_table}"
        existing_keys_df = pd.read_sql(existing_keys_query, target_engine)
        df = df.merge(existing_keys_df, on=key_columns, how='left', indicator=True)
        df = df[df['_merge'] == 'left_only'].drop(columns=['_merge'])

    if df.empty:
        print(f"No new clean data to load into '{target_table}'. Skipping load.\n")
        return df

    print(f"Loading {len(df)} records into '{target_table}'...")
    df.to_sql(target_table, target_engine, if_exists=if_exists, index=False)
    print(f" Loaded table '{target_table}' successfully.\n")
    return df

In [5]:
def generate_date_dim(start_date, end_date):
    """
    Generate a date dimension DataFrame from start_date to end_date.
    
    Returns:
    - DataFrame with columns: date_id (YYYYMMDD int), full_date, day, month, year
    """
    dates = []
    current = start_date
    while current <= end_date:
        dates.append({
            'date_id': int(current.strftime('%Y%m%d')),
            'full_date': current.date(),
            'day': current.day,
            'month': current.month,
            'year': current.year
        })
        current += timedelta(days=1)
    return pd.DataFrame(dates)


In [6]:
def load_dim_date():
    """
    Extract min and max payment dates from source and generate/load date dimension.
    """
    query_dates = "SELECT MIN(payment_date) AS min_date, MAX(payment_date) AS max_date FROM payment"
    date_range = pd.read_sql(query_dates, source_engine)

    start_date = pd.to_datetime(date_range['min_date'][0]).replace(day=1)
    end_date = pd.to_datetime(date_range['max_date'][0]) + pd.offsets.MonthEnd(0)

    print("Generating and loading date dimension...")
    df_dates = generate_date_dim(start_date, end_date)
    df_dates.to_sql("dim_date", target_engine, if_exists='append', index=False)
    print("Date dimension loaded successfully.\n")
    return df_dates


In [13]:
def load_fact_monthly_payment():
    """
    ETL process for fact_monthly_payment table:
    - Extract payment data
    - Join with dim_date, dim_staff, dim_rent to filter valid records
    - Aggregate monthly payment amount by staff and rental
    - Load aggregated results into fact_monthly_payment table
    """
    print("Extracting payment data and related dimensions...")
    dim_staff = pd.read_sql('SELECT staff_id FROM dim_staff', target_engine)
    dim_rent = pd.read_sql('SELECT rent_id, rental_date FROM dim_rent', target_engine)
    dim_date = pd.read_sql('SELECT date_id, full_date FROM dim_date', target_engine)

    dim_date['full_date'] = pd.to_datetime(dim_date['full_date']).dt.normalize()

    payment = pd.read_sql('SELECT payment_id, staff_id, rental_id, amount, payment_date FROM payment', source_engine)
    payment['payment_date_only'] = pd.to_datetime(payment['payment_date']).dt.normalize()

    payment = payment.merge(dim_date, left_on='payment_date_only', right_on='full_date', how='left')

    # Filter to keep only valid staff and rental IDs
    payment = payment[payment['staff_id'].isin(dim_staff['staff_id'])]
    payment = payment[payment['rental_id'].isin(dim_rent['rent_id'])]

    # Extract year and month for aggregation
    payment['year'] = payment['full_date'].dt.year
    payment['month'] = payment['full_date'].dt.month

    # Aggregate monthly amount by staff_id and rental_id
    fact_agg = payment.groupby(['staff_id', 'rental_id', 'year', 'month']).agg({'amount': 'sum'}).reset_index()

    # Create a date representing the first day of the month for joining with date dimension
    fact_agg['first_day_of_month'] = pd.to_datetime(fact_agg[['year', 'month']].assign(day=1))

    fact_agg = fact_agg.merge(dim_date, left_on='first_day_of_month', right_on='full_date', how='left')

    fact_final = fact_agg[['date_id', 'staff_id', 'rental_id', 'amount']].copy()
    fact_final.rename(columns={'rental_id': 'rent_id'}, inplace=True)

    # Create a payment_id sequence
    fact_final['payment_id'] = range(1, len(fact_final) + 1)

    # Reorder columns for final fact table
    fact_final = fact_final[['payment_id', 'date_id', 'staff_id', 'rent_id', 'amount']]

    print(f"Loading aggregated monthly payments into 'fact_monthly_payment'...")
    fact_final.to_sql("fact_monthly_payment", target_engine, if_exists='append', index=False)
    print("Loaded fact_monthly_payment successfully.\n")


In [8]:
def load_fact_daily_inventory():
    """
    ETL process for fact_daily_inventory table:
    - Extract inventory and dimension tables
    - Join and filter by existing dimension keys
    - Aggregate inventory count by film_id, store_id and date
    - Load aggregated results into fact_daily_inventory table
    """
    print(" Extracting inventory and dimension tables...")
    dim_film = pd.read_sql('SELECT film_id FROM dim_film', target_engine)
    dim_store = pd.read_sql('SELECT store_id FROM dim_store', target_engine)
    dim_date = pd.read_sql('SELECT date_id, full_date FROM dim_date', target_engine)
    dim_date['full_date'] = pd.to_datetime(dim_date['full_date'])

    inventory = pd.read_sql('SELECT inventory_id, film_id, store_id, last_update FROM inventory', source_engine)
    inventory['last_update_date'] = pd.to_datetime(inventory['last_update']).dt.normalize()

    # Join with date dimension on last_update_date
    inventory = inventory.merge(dim_date, left_on='last_update_date', right_on='full_date', how='left')

    # Filter to keep only inventory with valid film_id and store_id
    inventory = inventory[inventory['film_id'].isin(dim_film['film_id'])]
    inventory = inventory[inventory['store_id'].isin(dim_store['store_id'])]

    # Aggregate inventory quantity by film, store, and date
    fact_agg = inventory.groupby(['film_id', 'store_id', 'date_id']).agg(
        inventory_qty=('inventory_id', 'count')
    ).reset_index()

    fact_agg['inventory_id'] = range(1, len(fact_agg) + 1)

    fact_final = fact_agg[['inventory_id', 'date_id', 'film_id', 'store_id', 'inventory_qty']]

    print(f"Loading daily inventory facts into 'fact_daily_inventory'...")
    fact_final.to_sql('fact_daily_inventory', target_engine, if_exists='append', index=False)
    print("Loaded fact_daily_inventory successfully.\n")


In [11]:
def run_etl():
    """
    Run ETL on dimension and fact tables, cleaning data before loading.
    Make sure to:
    1) Run ETL with current clean data.
    2) Add some dirty (not cleaned) data manually.
    3) Run ETL again to check if it correctly handles duplicates, missing values, and invalid foreign keys.
    """
    # Load dimension tables with checks for keys and foreign keys where applicable
    etl_table("""
        SELECT staff_id, first_name, last_name, email, store_id, active, username, last_update
        FROM staff
    """, "dim_staff", key_columns=["staff_id"], drop_na=True)

    etl_table("""
        SELECT store_id, manager_staff_id, address_id, last_update
        FROM store
    """, "dim_store", key_columns=["store_id"], drop_na=True)

    etl_table("""
        SELECT rental_id AS rent_id, rental_date, return_date, inventory_id, customer_id, staff_id, last_update
        FROM rental
    """, "dim_rent", 
        key_columns=["rent_id", "rental_date"], 
        foreign_key_checks={"staff_id": "dim_staff"}, 
        drop_na=True)

    etl_table("""
        SELECT film_id, title, description, release_year, rental_duration, rental_rate
        FROM film
    """, "dim_film", key_columns=["film_id"], drop_na=True)

    # Load date dimension table
    load_dim_date()

    # Load fact tables
    load_fact_monthly_payment()
    load_fact_daily_inventory()


In [31]:
if __name__ == "__main__":
    run_etl()

Extracting data for 'dim_staff'...
Removing duplicates...
 Dropping rows with NA in key columns: ['staff_id'] ...
Loading 2 records into 'dim_staff'...
 Loaded table 'dim_staff' successfully.

Extracting data for 'dim_store'...
Removing duplicates...
 Dropping rows with NA in key columns: ['store_id'] ...
No new clean data to load into 'dim_store'. Skipping load.

Extracting data for 'dim_rent'...
Removing duplicates...
 Dropping rows with NA in key columns: ['rent_id', 'rental_date'] ...
🔍 Checking foreign key 'staff_id' against dimension 'dim_staff'...
Loading 16044 records into 'dim_rent'...
 Loaded table 'dim_rent' successfully.

Extracting data for 'dim_film'...
Removing duplicates...
 Dropping rows with NA in key columns: ['film_id'] ...
No new clean data to load into 'dim_film'. Skipping load.

Generating and loading date dimension...
Date dimension loaded successfully.

Extracting payment data and related dimensions...
Loading aggregated monthly payments into 'fact_monthly_paym

In [34]:
def count_rows(table_name, engine):
    query = f"SELECT COUNT(*) as cnt FROM {table_name}"
    df = pd.read_sql(query, engine)
    return df['cnt'][0]

before_count = count_rows('staff', source_engine)
print(f"Rows before ETL: {before_count}")

Rows before ETL: 2


In [41]:
from datetime import datetime
from sqlalchemy import text

dirty_staff_data = [
    (1, 'Mike', 'Hillyer', 1, None, 'mike.hillyer@sakilastaff.com', 1, 1, 'mike', datetime.now()),
    (8, 'John', None, 1, None, 'john.doe@example.com', 1, 1, 'johndoe', datetime.now()),
    (3, 'Fake', 'Staff', 1, None, 'fake.staff@example.com', 1, 1, 'fakestaff', datetime.now()),
]

def insert_dirty_data():
    with source_engine.begin() as connection:
        insert_staff_query = """
            INSERT INTO staff (
                staff_id, first_name, last_name, address_id, picture,
                email, store_id, active, username, last_update
            )
            VALUES (
                :staff_id, :first_name, :last_name, :address_id, :picture,
                :email, :store_id, :active, :username, :last_update
            )
            ON DUPLICATE KEY UPDATE last_update=VALUES(last_update)
        """
        
        staff_params = []
        for row in dirty_staff_data:
            staff_params.append({
                'staff_id': row[0],
                'first_name': row[1],
                'last_name': row[2] if row[2] is not None else '',  # تعويض None بقيمة فارغة
                'address_id': row[3] if row[3] is not None else 1,
                'picture': row[4],
                'email': row[5],
                'store_id': row[6] if row[6] is not None else 1,
                'active': row[7],
                'username': row[8],
                'last_update': row[9]
            })
        
        print("Prepared staff_params to insert:", staff_params)
        
        result = connection.execute(text(insert_staff_query), staff_params)
        print(f"Rows affected in staff: {result.rowcount}")


if __name__ == "__main__":
    insert_dirty_data()


Prepared staff_params to insert: [{'staff_id': 1, 'first_name': 'Mike', 'last_name': 'Hillyer', 'address_id': 1, 'picture': None, 'email': 'mike.hillyer@sakilastaff.com', 'store_id': 1, 'active': 1, 'username': 'mike', 'last_update': datetime.datetime(2025, 5, 24, 23, 22, 18, 229568)}, {'staff_id': 8, 'first_name': 'John', 'last_name': '', 'address_id': 1, 'picture': None, 'email': 'john.doe@example.com', 'store_id': 1, 'active': 1, 'username': 'johndoe', 'last_update': datetime.datetime(2025, 5, 24, 23, 22, 18, 229568)}, {'staff_id': 3, 'first_name': 'Fake', 'last_name': 'Staff', 'address_id': 1, 'picture': None, 'email': 'fake.staff@example.com', 'store_id': 1, 'active': 1, 'username': 'fakestaff', 'last_update': datetime.datetime(2025, 5, 24, 23, 22, 18, 229568)}]
Rows affected in staff: 4


In [43]:
before_count = count_rows('staff', source_engine)
print(f"Rows before ETL: {before_count}")

Rows before ETL: 4


In [45]:
tables_to_clear = [
    "fact_monthly_payment",
    "fact_daily_inventory",
    "dim_date",
    "dim_staff",
    "dim_rent",
]

with target_engine.connect() as conn:
    for table in tables_to_clear:
        print(f"Deleting data from {table} ...")
        conn.execute(text(f"DELETE FROM {table}"))
    conn.commit()

print("Data cleared from fact and dimension tables.")

Deleting data from fact_monthly_payment ...
Deleting data from fact_daily_inventory ...
Deleting data from dim_date ...
Deleting data from dim_staff ...
Deleting data from dim_rent ...
Data cleared from fact and dimension tables.


In [46]:
if __name__ == "__main__":
    run_etl()

Extracting data for 'dim_staff'...
Removing duplicates...
 Dropping rows with NA in key columns: ['staff_id'] ...
Loading 4 records into 'dim_staff'...
 Loaded table 'dim_staff' successfully.

Extracting data for 'dim_store'...
Removing duplicates...
 Dropping rows with NA in key columns: ['store_id'] ...
No new clean data to load into 'dim_store'. Skipping load.

Extracting data for 'dim_rent'...
Removing duplicates...
 Dropping rows with NA in key columns: ['rent_id', 'rental_date'] ...
🔍 Checking foreign key 'staff_id' against dimension 'dim_staff'...
Loading 16044 records into 'dim_rent'...
 Loaded table 'dim_rent' successfully.

Extracting data for 'dim_film'...
Removing duplicates...
 Dropping rows with NA in key columns: ['film_id'] ...
No new clean data to load into 'dim_film'. Skipping load.

Generating and loading date dimension...
Date dimension loaded successfully.

Extracting payment data and related dimensions...
Loading aggregated monthly payments into 'fact_monthly_paym