In [None]:
import numpy as np
import pandas as pd
from sqlalchemy import create_engine, Table, Column, ForeignKey, MetaData, String, Integer, Date

# SOURCE(S)

In [None]:
SOURCE_MYSQL_USERNAME = "root"
SOURCE_MYSQL_PASSWORD = "123456"
SOURCE_MYSQL_DATABASE = "accounting_db2"

source_engine = create_engine(f"mysql+pymysql://{SOURCE_MYSQL_USERNAME}:{SOURCE_MYSQL_PASSWORD}@localhost:3306/{SOURCE_MYSQL_DATABASE}")

In [None]:
def extract_table(table_name, engine=source_engine):
    return pd.read_sql(f"SELECT * FROM {table_name}", engine)

# EXTRACT

In [None]:
def extract_dim_account():
    account_df = extract_table("accounting_account").add_prefix("account_")
    accountType_df = extract_table("account_type").add_prefix("type_")

    joined_df = pd.merge(
        account_df,
        accountType_df,
        how="inner",
        left_on="account_acctype_id",   # from accounting_account table
        right_on="type_acctype_id"      # from account_type table
    )

    joined_df.drop(['type_acctype_id'], axis=1, inplace=True)
    
    joined_df.rename(columns={
        'account_acc_id': 'account_id',
        'account_acctype_id': 'type_id',
    }, inplace=True)

    return joined_df

In [None]:
dim_account = extract_dim_account()
dim_account

In [None]:
def extract_fact_entry():
    journalTransaction_df = extract_table("journal_transaction")
    journalEntry_df = extract_table("journal_entry")

    joined_df = pd.merge(
        journalEntry_df, 
        journalTransaction_df, 
        how="inner", 
        on="trans_id"
    )
    
    joined_df.drop(["journal_id", "period_id", "supplier_id", "customer_id"], axis=1, inplace=True)

    joined_df.rename(columns={
        'trans_id': 'transaction_id',
        'acc_id': 'account_id',
        'trans_date': 'transaction_date',
    }, inplace=True)
    
    return joined_df

In [None]:
fact_entry = extract_fact_entry()
fact_entry

# TRANSFORMATION

### Dim Account

In [None]:
dim_account.info()

In [None]:
def change_dtype_da(dim_account):    
    object_cols = ['account_id', 'account_code', 'type_id']
    for col in object_cols:
        dim_account[col] = dim_account[col].astype(str).str.strip()
        
    return dim_account

In [None]:
def remove_accounts_da(dim_account):
    codes_to_remove = ['621', '622', '627', '911']
    dim_account = dim_account[~dim_account['account_code'].isin(codes_to_remove)]
    return dim_account

In [None]:
def remove_cols_da(dim_account):
    cols_to_remove = ['type_id']
    dim_account.drop(cols_to_remove, axis=1, inplace=True)
    return dim_account

In [None]:
def transform_dim_account(dim_account):
    dim_account = dim_account.copy()  # avoid modifying the original DF
    
    dim_account = change_dtype_da(dim_account)
    dim_account = remove_accounts_da(dim_account)
    dim_account = remove_cols_da(dim_account)
    
    return dim_account

In [None]:
transformed_dim_account = transform_dim_account(dim_account)
transformed_dim_account.info()

### Fact Entry

In [None]:
fact_entry.info()

In [None]:
def change_dtype_fe(fact_entry):
    
    object_cols = ['entry_id', 'transaction_id', 'account_id']
    for col in object_cols:
        fact_entry[col] = fact_entry[col].astype(str).str.strip()

    date_cols = ['transaction_date']
    for col in date_cols:
        fact_entry[col] = pd.to_datetime(fact_entry[col]).dt.date
    
    return fact_entry

In [None]:
def fillna_amount_fe(fact_entry):
    cols_to_fillna = ['debit_amount', 'credit_amount']
    fact_entry.loc[:, cols_to_fillna] = fact_entry[cols_to_fillna].fillna(0)
    return fact_entry

In [None]:
def join_dim_account(fact_entry, dim_account):
    cols_to_merge = ['account_id', 'account_code', 'type_name']
    joined_df = fact_entry.merge(dim_account[cols_to_merge], on='account_id', how='inner')
    return joined_df

In [None]:
def remove_accounts_fe(fact_entry):    
    codes_to_remove = ['621', '622', '627', '911']
    fact_entry = fact_entry[~fact_entry['account_code'].isin(codes_to_remove)]
    return fact_entry

In [None]:
def add_is_closing_entry(fact_entry):    
    condition_1 = (
        (fact_entry['type_name'] == 'Revenue') & 
        (fact_entry['account_code'] != '521') & 
        (fact_entry['debit_amount'] > 0) & 
        (fact_entry['credit_amount'] == 0)
    )

    condition_2 = (
        (fact_entry['type_name'] == 'Revenue') & 
        (fact_entry['account_code'] == '521') & 
        (fact_entry['debit_amount'] == 0) & 
        (fact_entry['credit_amount'] > 0)
    )

    condition_3 = (
        (fact_entry['type_name'] == 'Expenses') & 
        (fact_entry['debit_amount'] == 0) & 
        (fact_entry['credit_amount'] > 0)
    )

    fact_entry['is_closing_entry'] = (condition_1 | condition_2 | condition_3).astype(int)
    return fact_entry

In [None]:
def remove_closing_entries(fact_entry):
    fact_entry = fact_entry[fact_entry['is_closing_entry'] == 0]
    return fact_entry

In [None]:
def add_sign(fact_entry):
    condition_1 = (
        fact_entry['type_name'] == 'Expenses'
    )
    
    condition_2 = (
        (fact_entry['type_name'] == 'Revenue') & 
        (fact_entry['account_code'] == '521')
    )
    
    condition_3 = (
        (fact_entry['type_name'] == 'Assets') & 
        (fact_entry['debit_amount'] == 0) &
        (fact_entry['credit_amount'] > 0)
    )

    condition_4 = (
        (fact_entry['type_name'].isin(['Liabilities', 'Equity'])) &
        (fact_entry['debit_amount'] > 0) &
        (fact_entry['credit_amount'] == 0)
    )
    
    # Default to 1
    fact_entry['sign'] = 1
    # Change to -1 based on the conditions
    fact_entry.loc[(condition_1 | condition_2 | condition_3 | condition_4), 'sign'] = -1

    return fact_entry

In [None]:
def add_signed_amount(fact_entry):
    fact_entry['signed_amount'] = (fact_entry['debit_amount'] + fact_entry['credit_amount']) * fact_entry['sign']
    fact_entry['signed_amount'] = fact_entry['signed_amount'].astype(np.int64)
    return fact_entry

In [None]:
def remove_cols_fe(fact_entry):
    cols_to_drop = ['debit_amount', 'credit_amount', 'account_code', 'type_name', 'is_closing_entry', 'sign']
    fact_entry.drop(cols_to_drop, axis=1, inplace=True)
    return fact_entry

In [None]:
def transform_fact_entry(fact_entry, transformed_dim_account):
    fact_entry = fact_entry.copy()
    dim_account = transformed_dim_account.copy()
    
    fact_entry = change_dtype_fe(fact_entry)
    fact_entry = fillna_amount_fe(fact_entry)
    fact_entry = join_dim_account(fact_entry, dim_account)
    fact_entry = remove_accounts_fe(fact_entry)  # redundant because join with transformed_dim_account that already excluded accounts
    fact_entry = add_is_closing_entry(fact_entry)
    fact_entry = remove_closing_entries(fact_entry)
    fact_entry = add_sign(fact_entry)
    fact_entry = add_signed_amount(fact_entry)
    fact_entry = remove_cols_fe(fact_entry)
    
    return fact_entry

In [None]:
transformed_fact_entry = transform_fact_entry(fact_entry, transformed_dim_account)
transformed_fact_entry.info()

In [None]:
type(fact_entry['transaction_date'].iloc[0])

In [None]:
transformed_fact_entry

# CHECK BALANCE

In [None]:
merged = transformed_fact_entry.merge(transformed_dim_account, on='account_id', how='inner')
merged.shape

In [None]:
# Filter and sum signed amounts by type
total_assets = merged.loc[merged['type_name'] == 'Assets', 'signed_amount'].sum()
total_le = merged.loc[merged['type_name'].isin(['Liabilities', 'Equity']), 'signed_amount'].sum()

# Display results
print("Total Assets:               ", total_assets)
print("Total Liabilities + Equity: ", total_le)

# Check for balance (allowing small float error)
if abs(total_assets - total_le) == 0:
    print("✅ Balanced")
else:
    print(f"❌ Not balanced (difference = {total_assets - total_le}.5f)")

# LOAD

In [None]:
TARGET_MYSQL_USERNAME = "root"
TARGET_MYSQL_PASSWORD = "123456"
TARGET_MYSQL_DATABASE = "accounting_dwh"

target_engine = create_engine(f"mysql+pymysql://{TARGET_MYSQL_USERNAME}:{TARGET_MYSQL_PASSWORD}@localhost:3306/{TARGET_MYSQL_DATABASE}")

In [None]:
def create_schema(engine=target_engine):
    # metadata is a container object that holds information about tables and schema
    metadata = MetaData()
    # Define dim_account table
    dim_account = Table("dim_account", metadata,
        Column("account_id", String(4), primary_key=True),
        Column("account_code", String(3)),
        Column("account_name", String(128)),
        Column("account_normal_balance", String(32)),
        Column("account_subcategory", String(128)),
        Column("account_category", String(128)),
        Column("type_name", String(32)),
        Column("type_normal_balance", String(32)),
    )
    # Define fact_entry table
    fact_entry = Table("fact_entry", metadata,
        Column("entry_id", String(7), primary_key=True),
        Column("transaction_id", String(7)),
        Column("account_id", String(4), ForeignKey("dim_account.account_id")),
        Column("signed_amount", Integer),
        Column("transaction_date", Date),
        Column("description", String(512)),
    )
    # Create tables in the database
    metadata.drop_all(engine)  # if already exists
    metadata.create_all(engine)

    return None

In [None]:
def load_table(df, table_name, engine=target_engine):
    print(f"Loading '{table_name}' into database {TARGET_MYSQL_DATABASE}...")
    
    df.to_sql(
        name=table_name,
        con=engine,
        if_exists='append',
        index=False,
    )
    
    print(f"'{table_name}' loaded successfully. Rows inserted: {len(df)}")

    return None

In [None]:
create_schema()
load_table(transformed_dim_account, "dim_account")
load_table(transformed_fact_entry, "fact_entry")