In [None]:
# Install 
!pip install -r requirements.txt

In [1]:
import os
import urllib.request

data_dir = "data"
list_data = [
    {
        'url': 'https://drive.usercontent.google.com/u/0/uc?id=1uynxymjHuqunDjLeT1FqSfVDFC-jWyXx&export=download', 
        'filename': 'cms_articles.csv'
    },
    {
        'url': 'https://drive.usercontent.google.com/u/0/uc?id=1ava925zhJFtxlzfvo7CnBiieLH2u0r0-&export=download', 
        'filename': 'ga4_events.csv'
    },
    {
        'url': 'https://drive.usercontent.google.com/u/0/uc?id=1Dig4PqGiFBRhNTyZsj8Et0AFZ9zJXU2m&export=download', 
        'filename': 'gam_delivery.csv'
    }
]

# Download the data from Google Drive
os.makedirs(data_dir, exist_ok=True)
for data in list_data:
    file_path = f"{data_dir}/{data['filename']}"
    if not os.path.exists(file_path):
        urllib.request.urlretrieve(data['url'], f"{data_dir}/{data['filename']}")
    else:
        print(f"INF skip, {file_path} already exists!")

In [2]:
# Credentials
db_creds = {
    'hostname': 'localhost',
    'port': 5432,
    'username': 'admin',
    'password': 'admin',
    'database': 'public'
}

In [3]:
from scripts.postgresql import (
    postgresql_connection,
    execute_query
)

# Creating schemas
print("INF creating schema ...", end='')
connection = postgresql_connection(
    db_creds['hostname'], 
    db_creds['port'], 
    db_creds['username'], 
    db_creds['password'], 
    db_creds['database']
)
execute_query(
    connection,
    "create schema if not exists bronze; create schema if not exists gold;"
)
connection.close()
print(' [done]')

INF creating schema ... [done]


In [4]:
import pandas as pd
import json
from datetime import timedelta, datetime
from scripts.postgresql import (
    postgresql_connection,
    dict_to_postgresql_schema,
    create_table,
    insert_data_batch
)
from scripts.dataframe import (
    read_data_from_dataframe
)

pd.options.mode.chained_assignment = None

# Read data
cms_articles_df = pd.read_csv(f"{data_dir}/cms_articles.csv")

# Create connection
connection = postgresql_connection(
    db_creds['hostname'], 
    db_creds['port'], 
    db_creds['username'], 
    db_creds['password'], 
    db_creds['database']
)

# Generate schema
dst_table_schema = dict_to_postgresql_schema(json.loads(cms_articles_df.iloc[0].to_json()))

# Based on min max date in publish_timestamp
start = pd.to_datetime(cms_articles_df['publish_timestamp'].min())
end = pd.to_datetime(cms_articles_df['publish_timestamp'].max())

current = start
while current < end:
    print(f"INF partition {current.strftime('%Y-%m-%d')}")
    # Get data by partition
    data_partition_day = read_data_from_dataframe(cms_articles_df, current, timestamp_key_name='publish_timestamp')
    # Create table
    create_table(connection, 'bronze', 'cms_articles', data_partition_day[0])
    # Insert data
    insert_data_batch(
        connection, 
        data_partition_day, 
        'bronze', 
        'cms_articles', 
        dst_table_schema,
        '_hashrow'
    )
    current += timedelta(days=1)

INF partition 2025-09-01
INF partition 2025-09-02
INF partition 2025-09-03
INF partition 2025-09-04
INF partition 2025-09-05
INF partition 2025-09-06
INF partition 2025-09-07
INF partition 2025-09-08
INF partition 2025-09-09
INF partition 2025-09-10
INF partition 2025-09-11
INF partition 2025-09-12
INF partition 2025-09-13
INF partition 2025-09-14
INF partition 2025-09-15
INF partition 2025-09-16
INF partition 2025-09-17
INF partition 2025-09-18
INF partition 2025-09-19
INF partition 2025-09-20
INF partition 2025-09-21
INF partition 2025-09-22
INF partition 2025-09-23
INF partition 2025-09-24
INF partition 2025-09-25
INF partition 2025-09-26
INF partition 2025-09-27
INF partition 2025-09-28
INF partition 2025-09-29
INF partition 2025-09-30
INF partition 2025-10-01
INF partition 2025-10-02
INF partition 2025-10-03
INF partition 2025-10-04
INF partition 2025-10-05
INF partition 2025-10-06
INF partition 2025-10-07
INF partition 2025-10-08
INF partition 2025-10-09
INF partition 2025-10-10


# Study Case 1: GA4

## Insert Raw Data to Staging Area / Bronze Layer

In [5]:
import pandas as pd
import json
from datetime import timedelta
from scripts.postgresql import (
    postgresql_connection,
    dict_to_postgresql_schema,
    create_table,
    insert_data_batch
)
from scripts.dataframe import (
    read_data_from_dataframe
)

pd.options.mode.chained_assignment = None

# Read source data
ga4_events_df = pd.read_csv(f"{data_dir}/ga4_events.csv")

# Create connection
connection = postgresql_connection(
    db_creds['hostname'], 
    db_creds['port'], 
    db_creds['username'], 
    db_creds['password'], 
    db_creds['database']
)

# Generate schema
dst_table_schema = dict_to_postgresql_schema(json.loads(ga4_events_df.iloc[0].to_json()))

# Based on min max date in event_timestamp
start = pd.to_datetime(ga4_events_df['event_timestamp'].min())
end = pd.to_datetime(ga4_events_df['event_timestamp'].max())

current = start
while current < end:
    print(f"INF partition {current.strftime('%Y-%m-%d')}")
    # Get data by partition
    data_partition_day = read_data_from_dataframe(ga4_events_df, current, timestamp_key_name='event_timestamp')
    # Create table
    create_table(connection, 'bronze', 'ga4', data_partition_day[0])
    # Insert data
    insert_data_batch(
        connection, 
        data_partition_day, 
        'bronze', 
        'ga4', 
        dst_table_schema,
        '_hashrow'
    )
    current += timedelta(days=1)

connection.close()

INF partition 2025-10-01
INF partition 2025-10-02
INF partition 2025-10-03
INF partition 2025-10-04
INF partition 2025-10-05
INF partition 2025-10-06
INF partition 2025-10-07
INF partition 2025-10-08
INF partition 2025-10-09
INF partition 2025-10-10
INF partition 2025-10-11
INF partition 2025-10-12
INF partition 2025-10-13
INF partition 2025-10-14
INF partition 2025-10-15
INF partition 2025-10-16
INF partition 2025-10-17
INF partition 2025-10-18
INF partition 2025-10-19
INF partition 2025-10-20
INF partition 2025-10-21
INF partition 2025-10-22
INF partition 2025-10-23
INF partition 2025-10-24
INF partition 2025-10-25
INF partition 2025-10-26
INF partition 2025-10-27
INF partition 2025-10-28
INF partition 2025-10-29
INF partition 2025-10-30
INF partition 2025-10-31
INF partition 2025-11-01


## Generate Daily Mart

In [6]:
from scripts.postgresql import (
    postgresql_connection,
    create_table,
    execute_query
)
from datetime import datetime

ga4_daily_summary_schema = {
    'event_timestamp': datetime(2025, 10, 1).date(), 
    'content_id': 1000, 
    'cnt_pageviews': 1000.0, 
    'cnt_session': 1000.0,
    'cnt_user_engagment': 1000.0
}

print("INF generate ga4 daily mart ...", end='')
# Create connection
connection = postgresql_connection(
    db_creds['hostname'], 
    db_creds['port'], 
    db_creds['username'], 
    db_creds['password'], 
    db_creds['database']
)
create_table(connection, 'gold', 'ga4_daily_summary', ga4_daily_summary_schema)
with open('sql/gold_ga4_daily_summary.sql', 'r') as file:
    f = file.read()
execute_query(connection, f)
connection.close()
print(" [done]")

INF generate ga4 daily mart ... [done]


# Case Study 2: GAM

## Insert Raw Data to Staging Area / Bronze Layer

In [7]:
import pandas as pd
import json
from datetime import timedelta
from scripts.postgresql import (
    postgresql_connection,
    dict_to_postgresql_schema,
    create_table,
    insert_data_batch
)
from scripts.dataframe import (
    read_data_from_dataframe
)

pd.options.mode.chained_assignment = None

# Read data
gam_events_df = pd.read_csv(f"{data_dir}/gam_delivery.csv")

# Create connection
connection = postgresql_connection(
    db_creds['hostname'], 
    db_creds['port'], 
    db_creds['username'], 
    db_creds['password'], 
    db_creds['database']
)

# Generate schema
dst_table_schema = dict_to_postgresql_schema(json.loads(gam_events_df.iloc[0].to_json()))

# Based on min max date in publish_timestamp
start = pd.to_datetime(gam_events_df['served_at'].min())
end = pd.to_datetime(gam_events_df['served_at'].max())

current = start
while current < end:
    print(f"INF partition {current.strftime('%Y-%m-%d')}")
    # Get data by partition
    data_partition_day = read_data_from_dataframe(gam_events_df, current, timestamp_key_name='served_at')
    # Create table
    create_table(connection, 'bronze', 'gam_delivery', data_partition_day[0])
    # Insert data
    insert_data_batch(
        connection, 
        data_partition_day, 
        'bronze', 
        'gam_delivery', 
        dst_table_schema,
        '_hashrow', 
        ['served_at', 'site', 'ad_unit', 'creative', 'content_id']
    )
    current += timedelta(days=1)

INF partition 2025-10-01
INF partition 2025-10-02
INF partition 2025-10-03
INF partition 2025-10-04
INF partition 2025-10-05
INF partition 2025-10-06
INF partition 2025-10-07
INF partition 2025-10-08
INF partition 2025-10-09
INF partition 2025-10-10
INF partition 2025-10-11
INF partition 2025-10-12
INF partition 2025-10-13
INF partition 2025-10-14
INF partition 2025-10-15
INF partition 2025-10-16
INF partition 2025-10-17
INF partition 2025-10-18
INF partition 2025-10-19
INF partition 2025-10-20
INF partition 2025-10-21
INF partition 2025-10-22
INF partition 2025-10-23
INF partition 2025-10-24
INF partition 2025-10-25
INF partition 2025-10-26
INF partition 2025-10-27
INF partition 2025-10-28
INF partition 2025-10-29
INF partition 2025-10-30


## Generate Daily Mart

In [8]:
from scripts.postgresql import (
    postgresql_connection,
    create_table,
    execute_query
)
from datetime import datetime

gam_daily_summary_schema = {
    'served_at': datetime(2025, 10, 1).date(), 
    'content_id': 1000, 
    'sum_impressions': 1000.0, 
    'sum_clicks': 1000.0,
    'sum_revenue_usd': 1000.0
}

print("INF generate gam daily mart ...", end='')
# Create connection
connection = postgresql_connection(
    db_creds['hostname'], 
    db_creds['port'], 
    db_creds['username'], 
    db_creds['password'], 
    db_creds['database']
)
create_table(connection, 'gold', 'gam_daily_summary', gam_daily_summary_schema)
with open('sql/gold_gam_daily_summary.sql', 'r') as file:
    f = file.read()
execute_query(connection, f)
connection.close()
print(" [done]")

INF generate gam daily mart ... [done]


In [9]:
from scripts.postgresql import (
    postgresql_connection,
    execute_query
)

print("INF create view daily summary ...", end='')
# Create connection
connection = postgresql_connection(
    db_creds['hostname'], 
    db_creds['port'], 
    db_creds['username'], 
    db_creds['password'], 
    db_creds['database']
)
with open('sql/gold_daily_summary.sql', 'r') as file:
    f = file.read()
execute_query(connection, f)
connection.close()
print(" [done]")

INF create view daily summary ... [done]
