In [1]:
import pandas as pd
import psycopg2
import sqlalchemy

In [2]:
connection = sqlalchemy.create_engine(f'postgresql+psycopg2://myusername@0.0.0.0:5432/postgres')

In [3]:
def stage():
    try:
        staging_df = pd.read_csv("./input/transactions-2022-01-03.csv")
        staging_df = staging_df.rename(columns=str.lower)
        staging_df = staging_df.rename(columns={"trade date": "trade_date"})
        staging_df.to_sql('staging_transactions', connection, if_exists='replace')
        return staging_df
    except Exception as e:
        print("Data stage error: " + str(e))  

In [4]:
def extract():
    try:
        staging_df = stage()
        return staging_df
    except Exception as e:
        print("Data extract error: " + str(e))

In [5]:
def transform():
    transform_df = pd.read_sql_query('''select * from staging_transactions''', connection)
    transform_df['date'] =  pd.to_datetime(transform_df['trade_date'], format="%d/%m/%Y")
    transform_df['year'] = transform_df['date'].dt.year
    transform_df['month'] = transform_df['date'].dt.month
    transform_df['day'] = transform_df['date'].dt.day
    return transform_df

In [6]:
def load(transactions):
    trader_dim = transactions[['trader','fund']].drop_duplicates()
    load_dim(trader_dim, 'trader_dim', 'trader_key')
    
    date_dim = transactions[['trade_date','year','month','day']].drop_duplicates()
    load_dim(date_dim, 'date_dim', 'date_key')

    security_dim=transactions['security'].drop_duplicates()
    load_dim(security_dim, 'security_dim', 'security_key')

    load_fact()
    load_market_value_fact()

In [7]:
def load_dim(df, name, key_name):
    try:
        df.to_sql(name, connection, if_exists='replace')
        with connection.connect() as con:
            con.execute(f'ALTER TABLE {name} ADD COLUMN {key_name} SERIAL PRIMARY KEY;')
    except Exception as e:
        print(f"Load dimension table error for {name}: " + str(e))    

In [8]:
def load_fact():
    query_create_table = """CREATE TABLE IF NOT EXISTS transaction_fact (
          total_transaction_cost numeric,
          quantity numeric,
          price numeric,
          date_key integer,
          trader_key integer,
          security_key integer)"""

    query_insert = """INSERT INTO transaction_fact
            (total_transaction_cost, quantity, price, date_key, trader_key, security_key)
            SELECT
                s.quantity * s.price,
                s.quantity,
                s.price,
                COALESCE(d.date_key, 0),
                COALESCE(t.trader_key, 0),
                COALESCE(c.security_key, 0)
            FROM staging_transactions s
                LEFT JOIN trader_dim t ON (s.trader = t.trader and s.fund = t.fund)
                LEFT JOIN security_dim c ON s.security = c.security
                LEFT JOIN date_dim d ON s.trade_date = d.trade_date;"""
    try:
        connection.execute(query_create_table)
        connection.execute(query_insert)
    except Exception as e:
        print(f"Load fact table error for transaction_fact: " + str(e))    

In [9]:
def load_market_value_fact():
    query_create_table = """CREATE TABLE IF NOT EXISTS market_value_fact (
              total_cost numeric,
              date_Key integer)"""

    query_insert = """INSERT INTO market_value_fact
                (total_cost,date_key)
                SELECT
                    sum(s.quantity) as total_cost,
                    COALESCE(d.date_key, 0)
                FROM staging_transactions s
                LEFT JOIN date_dim d ON s.trade_date = d.trade_date
                group by date_key;"""
    try:
        connection.execute(query_create_table)
        connection.execute(query_insert)
    except Exception as e:
        print(f"Load market value fact table error for market_value_fact: " + str(e))    

In [10]:
df=extract()
df.head()

Unnamed: 0,trade_date,fund,trader,security,quantity,price
0,1/3/2022,Equity Fund,Ricky,IBM,20,0.093486
1,1/3/2022,Equity Fund,Ethel,GOOG,2,0.228878
2,1/3/2022,Equity Fund,Ricky,TSLA,58,0.129644
3,1/3/2022,Future Fund,Lucy,EDU8,31,0.626164
4,1/3/2022,Future Fund,Ricky,ZCU23,65,0.798289


In [11]:
df=transform()
df.head()

Unnamed: 0,index,trade_date,fund,trader,security,quantity,price,date,year,month,day
0,0,1/3/2022,Equity Fund,Ricky,IBM,20,0.093486,2022-03-01,2022,3,1
1,1,1/3/2022,Equity Fund,Ethel,GOOG,2,0.228878,2022-03-01,2022,3,1
2,2,1/3/2022,Equity Fund,Ricky,TSLA,58,0.129644,2022-03-01,2022,3,1
3,3,1/3/2022,Future Fund,Lucy,EDU8,31,0.626164,2022-03-01,2022,3,1
4,4,1/3/2022,Future Fund,Ricky,ZCU23,65,0.798289,2022-03-01,2022,3,1


In [12]:
load(df)