In [1]:
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go

def etl_pipeline():
    print("EXTRACT")
    
    
    base_path = "../data/" 
    
    symbols = pd.read_csv(base_path + "symbols.csv", sep=';')
    account = pd.read_csv(base_path + "account-statement-1-1-2024-12-31-2024.csv", sep=';')
    country = pd.read_csv(base_path + "country.csv")
    

    # Standardize country names for reliable joins.
    # Data harmonization at the start is crucial to avoid mismatches during merging.
    country['name'] = country['name'].replace({
        'Taiwan, Province of China': 'Taiwan',
        'Türkiye': 'Turkey'   
    })
    country.loc[(country['name'] == 'Taiwan') & (country['region'].isnull()), 'region'] = 'Asia'

    # Clean column names to prevent future join errors.
    # Stripping whitespace prevents many hard-to-debug errors, as underlined in lecture.
    for df in (symbols, account, country):
        df.columns = df.columns.str.strip()

    # Drop any accidental "Unnamed" columns.
    # This avoids contaminating the schema if the csvs have been opened in Excel or similar.
    account = account.drop(columns=[c for c in account.columns if "Unnamed" in c], errors='ignore')

    print("TRANSFORM")
    # Filter to 2024 and keep only 'BUY'/'SELL'.
    # Even though the file covers only 2024, explicitly filtering is good practice, clarifies project scope, and guards against possible future schema changes.
    # I drop 'DIVIDEND' to align with the focus on actual trading activity.
    account['Date'] = pd.to_datetime(account['Date'], dayfirst=True, errors='coerce')
    acct_2024 = account[
        (account['Date'].dt.year == 2024) & (account['TransactionType'].isin(['BUY', 'SELL']))].copy()

    # Derive Quarter and Year from Date.
    # This step anticipates analysis needs and avoids costly on-the-fly date computations in dashboard queries.
    acct_2024['Quarter'] = acct_2024['Date'].dt.quarter

    # Construct time dimension (dim_time).
    # Hierarchical time dimension (Date→Quarter) provides maximum flexibility for analytical slicing (daily, quarterly).
    dim_time = (acct_2024[['Date', 'Quarter']]
                .drop_duplicates()
                .reset_index(drop=True))
    dim_time['TimeKey'] = dim_time.index + 1

    # Build symbol dimension (dim_symbol).
    # Surrogate keys (SymbolKey) are required for a robust star schema.
    dim_symbol = (symbols[['symbol', 'sector', 'industry', 'country']]
                  .drop_duplicates()
                  .reset_index(drop=True))
    dim_symbol['SymbolKey'] = dim_symbol.index + 1
    dim_symbol = dim_symbol.rename(columns={'symbol':'Symbol','country':'Country'})

    # Build geography dimension (dim_geography).
    # Including both Country and Region enables higher-level aggregation.
    dim_geo = (country[['name','region']]
               .drop_duplicates()
               .reset_index(drop=True))
    dim_geo['GeographyKey'] = dim_geo.index + 1
    dim_geo = dim_geo.rename(columns={'name':'Country','region':'Region'})

    # Build transaction type dimension (dim_transaction_type).
    # This allows easy filtering and analysis by transaction type, even though only BUY/SELL remain.
    dim_trans = (acct_2024[['TransactionType']]
                 .drop_duplicates()
                 .reset_index(drop=True))
    dim_trans['TransactionTypeKey'] = dim_trans.index + 1

    # Merge in all foreign keys to build the fact table.
    # Here is the heart of star schema, linking facts to descriptive dimensions with surrogate keys for performance, clarity, and consistency.
    fact = acct_2024.copy()
    fact = fact.merge(dim_time[['TimeKey', 'Date', 'Quarter']],
                      on=['Date','Quarter'], how='left')
    fact = fact.merge(dim_symbol[['SymbolKey', 'Symbol']],
                      left_on='Symbol', right_on='Symbol', how='left')
    fact = fact.merge(dim_symbol[['Symbol', 'Country']],
                      on='Symbol', how='left')
    fact = fact.merge(dim_geo[['GeographyKey', 'Country']],
                      on='Country', how='left')
    fact = fact.merge(dim_trans[['TransactionTypeKey', 'TransactionType']],
                      on='TransactionType', how='left')

    # Finalize fact table: select keys and measures, rename for consistency.
    # Each transaction row counts as one, so TransactionCount = 1 is assigned.
    fact_final = fact[['TimeKey','SymbolKey','GeographyKey','TransactionTypeKey','TransactionType','Unit']].copy()
    fact_final = fact_final.rename(columns={'Unit':'Units'})
    fact_final['FactTransactionKey'] = fact_final.index + 1
    fact_final['TransactionCount'] = 1  # Each row = 1 transaction (simplifies sum/count queries).

    # Drop rows missing keys, to guarantee referential integrity.
    print("Fact table rows before dropna:", len(fact_final))
    fact_final = fact_final.dropna(subset=['TimeKey','SymbolKey','GeographyKey','TransactionTypeKey'])
    print("Fact table rows after dropna:", len(fact_final))

    print("ETL complete: dimensions and fact built in memory")
    return {
        'fact': fact_final,
        'dim_time': dim_time,
        'dim_symbol': dim_symbol,
        'dim_geography': dim_geo,
        'dim_transaction_type': dim_trans
    }

def diagnostic(dw):
    print("\n DIAGNOSTIC ")
    # Print out table sizes to verify successful load and dimensionality.
    fact = dw['fact']
    print(f"Fact table rows: {len(fact)}")
    for name in ['dim_time','dim_symbol','dim_geography','dim_transaction_type']:
        df = dw[name]
        print(f"{name} rows: {len(df)}")
    # Check for missing foreign keys, which would indicate data integrity issues.
    fks = ['TimeKey','SymbolKey','GeographyKey','TransactionTypeKey']
    for fk in fks:
        nulls = fact[fk].isna().sum()
        print(f"Nulls in {fk}: {nulls}")
    # Show TransactionType breakdown to confirm filtering.
    if 'TransactionTypeKey' in fact.columns:
        print("\nTransaction Type distribution:")
        print(fact['TransactionTypeKey'].value_counts())
    # Print sector/region details for manual inspection for validation.
    if 'Sector' in dw['dim_symbol'].columns:
        print("\nSectors in dim_symbol:", dw['dim_symbol']['sector'].nunique())
        print(dw['dim_symbol']['sector'].value_counts().head())
    if 'Region' in dw['dim_geography'].columns:
        print("\nRegions in dim_geography:", dw['dim_geography']['Region'].nunique())
        print(dw['dim_geography']['Region'].value_counts().head())
    # Print date coverage, to ensure full 2024 coverage and correct date parsing.
    if 'Date' in dw['dim_time'].columns:
        min_date = dw['dim_time']['Date'].min()
        max_date = dw['dim_time']['Date'].max()
        print(f"\nDate range in dim_time: {min_date.date()} to {max_date.date()}")


def plot_queries(dw):
    fact = dw['fact']
    dt   = dw['dim_time']
    ds   = dw['dim_symbol']
    dg   = dw['dim_geography']

    # For every analytical question, join in descriptive dimension columns to make queries easy to write and interpret.
    
    df = (fact
          .merge(dt[['TimeKey', 'Quarter', 'Date']], on='TimeKey')
          .merge(ds[['SymbolKey', 'Symbol', 'sector', 'industry', 'Country']], on='SymbolKey'))

    # Q1: Rank all quarters of 2024 by total number of transactions, stacked by transaction type.
    # Stacked bar gives more insight.
    q1 = (df.groupby(['Quarter', 'TransactionType'])['TransactionCount']
            .sum().reset_index())
    q1['Quarter'] = q1['Quarter'].astype(int)
    q1 = q1.sort_values('Quarter')
    fig_q1 = px.bar(
        q1, x="Quarter", y="TransactionCount", color="TransactionType",
        barmode="stack", text_auto=True,
        title="Q1: Total Transactions by Quarter (2024) – Stacked BUY & SELL",
        labels={"TransactionCount": "Transactions"},
        template="plotly_white",
        color_discrete_map={'BUY': '#2E8B57', 'SELL': '#4169E1'},
        category_orders={"TransactionType": ["BUY", "SELL"]}
    )
    fig_q1.show()

    # Q2: What are the top 5 sectors in Asia by units traded? (Stacked by transaction type)
    asia = df.merge(dg[['GeographyKey','Region']], on='GeographyKey').query("Region == 'Asia'")
    top_sectors = (asia.groupby('sector')['Units']
                    .sum().nlargest(5).index.tolist())
    asia_top = asia[asia['sector'].isin(top_sectors)]
    sector_order = asia_top.groupby('sector')['Units'].sum().sort_values(ascending=False).index.tolist()
    q2 = (asia_top.groupby(['sector', 'TransactionType'])['Units']
          .sum().reset_index())
    fig_q2 = px.bar(
        q2, x="sector", y="Units", color="TransactionType",
        barmode="stack", text_auto=True,
        title="Q2: Top 5 Sectors in Asia by Units Traded (Stacked BUY/SELL)",
        labels={"Units": "Units"},
        template="plotly_white",
        color_discrete_map={'BUY': '#2E8B57', 'SELL': '#4169E1'},
        category_orders={"sector": sector_order, "TransactionType": ["BUY", "SELL"]}
    )
    fig_q2.update_xaxes(tickangle=30)
    fig_q2.show()

    # Q3: What are the top 5 countries in the Financial Services sector by units traded?
    fin = df[df['sector'] == "Financial Services"]
    top_countries = (fin.groupby('Country')['Units']
                      .sum().nlargest(5).index.tolist())
    fin_top = fin[fin['Country'].isin(top_countries)]
    country_order = fin_top.groupby('Country')['Units'].sum().sort_values(ascending=False).index.tolist()
    q3 = (fin_top.groupby(['Country', 'TransactionType'])['Units']
          .sum().reset_index())
    fig_q3 = px.bar(
        q3, x="Country", y="Units", color="TransactionType",
        barmode="stack", text_auto=True,
        title="Q3: Top 5 Countries in Financial Services by Units Traded (Stacked BUY/SELL)",
        labels={"Units": "Units"},
        template="plotly_white",
        color_discrete_map={'BUY': '#2E8B57', 'SELL': '#4169E1'},
        category_orders={"Country": country_order, "TransactionType": ["BUY", "SELL"]}
    )
    fig_q3.update_xaxes(tickangle=30)
    fig_q3.show()

def save_csv(dw):
    # Export each warehouse table for future work (Streamlit page in our case) or reproducibility.
    # Keeping dimension and fact tables separate is key for star schema principles and enables easy updates if the source changes.
    fact_final = dw['fact']
    dim_time = dw['dim_time']
    dim_symbol = dw['dim_symbol']
    dim_geo = dw['dim_geography']
    dim_trans = dw['dim_transaction_type']
    fact_final.to_csv("fact_transactions.csv", index=False)
    dim_time.to_csv("dim_time.csv", index=False)
    dim_symbol.to_csv("dim_symbol.csv", index=False)
    dim_geo.to_csv("dim_geography.csv", index=False)
    dim_trans.to_csv("dim_transaction_type.csv", index=False)
    print("CSV files saved successfully.")

def main():
    # Full ETL-DIAGNOSTIC-ANALYSIS-SAVE flow for clarity and testability.
    warehouse = etl_pipeline()
    diagnostic(warehouse)
    plot_queries(warehouse)
    save_csv(warehouse)

if __name__ == "__main__":
    main()


EXTRACT
TRANSFORM
Fact table rows before dropna: 2171
Fact table rows after dropna: 1973
ETL complete: dimensions and fact built in memory

 DIAGNOSTIC 
Fact table rows: 1973
dim_time rows: 2144
dim_symbol rows: 3194
dim_geography rows: 249
dim_transaction_type rows: 2
Nulls in TimeKey: 0
Nulls in SymbolKey: 0
Nulls in GeographyKey: 0
Nulls in TransactionTypeKey: 0

Transaction Type distribution:
TransactionTypeKey
2    989
1    984
Name: count, dtype: int64

Regions in dim_geography: 5
Region
Africa      60
Americas    57
Asia        51
Europe      51
Oceania     29
Name: count, dtype: int64

Date range in dim_time: 2024-01-02 to 2024-12-30


CSV files saved successfully.
