### **Load Library**

In [1]:
import pandas as pd
import sqlalchemy as sa

print(pd.__version__)
print(sa.__version__)

from config import oltp_conn_string, warehouse_conn_string, oltp_tables, warehouse_tables, dimension_columns, ddl_statements, ddl_marts

2.1.4
1.4.39


In [2]:
%pip install sqlalchemy==1.4.39

Note: you may need to restart the kernel to use updated packages.


In [3]:
print(pd.__version__)
print(sa.__version__)

2.1.4
1.4.39


### **Function ETL**

In [4]:
def create_tables():
    """Create tables in the data warehouse if they do not exist."""
    engine = sa.create_engine(warehouse_conn_string)
    with engine.connect() as conn:
        for ddl in ddl_statements.values():
            conn.execute(ddl)
            
def extract_data(table_name):
    """Extract data from a table in the OLTP database."""
    engine = sa.create_engine(oltp_conn_string)
    query = f"SELECT * FROM {oltp_tables[table_name]}"
    df = pd.read_sql(query, engine)
    print(f'Extract Data {oltp_tables[table_name]} Success')
    return df

def transform_data(df, target_table):
    """Transform the extracted data to match the schema of the target dimension table."""
    columns = dimension_columns.get(target_table)
    if columns:
        df = df[columns]
    print(f'Transform Data {target_table} Success')
    return df

def transform_fact_orders():
    """Transform data for the fact_orders table."""
    dataframes = {table: extract_data(table) for table in oltp_tables.keys()}

    df_orders = dataframes['orders']
    df_orders = df_orders.merge(dataframes['users'], on='user_id')
    df_orders = df_orders.merge(dataframes['payments'], on='payment_id')
    df_orders = df_orders.merge(dataframes['shippers'], on='shipper_id')
    df_orders = df_orders.merge(dataframes['ratings'], on='rating_id')
    df_orders = df_orders.merge(dataframes['vouchers'], how='left', on='voucher_id')
    df_orders.rename(columns={'user_id_x': 'user_id'}, inplace=True)
    
    fact_orders_columns = dimension_columns.get('fact_orders')
    return df_orders[fact_orders_columns]

def transform_fact_orders_items():
    """Transform data for the fact_orders_items table."""
    dataframes = {table: extract_data(table) for table in oltp_tables.keys()}

    df_orders_items = dataframes['orders_items']
    df_orders_items = df_orders_items.merge(dataframes['orders'], on='order_id')
    df_orders_items = df_orders_items.merge(dataframes['products'], on='product_id')
    df_orders_items = df_orders_items.merge(dataframes['products'], on='product_discount')
    df_orders_items = df_orders_items.merge(dataframes['products'], on='product_price')    
    df_orders_items.rename(columns={'order_id_x': 'order_id'}, inplace=True)
    
    fact_orders_items_columns = dimension_columns.get('fact_orders_items')
    return df_orders_items[fact_orders_items_columns]

def load_data(df, table_name):
    """Load the transformed data into the target table in the data warehouse."""
    engine = sa.create_engine(warehouse_conn_string)
    with engine.connect() as conn:
        # Cek kunci unique
        unique_key = get_unique_key(table_name)  # Misalnya user_id untuk tabel dim_user
        existing_data = pd.read_sql(f"SELECT {unique_key} FROM {table_name}", conn)
        
        # Deduplikasi data
        df = deduplicate_data(df, existing_data, unique_key)
        
        # Masukkan data baru
        df.to_sql(table_name, conn, index=False, if_exists='append', method='multi')
        print(f'Load Data {table_name} Success')
        
def deduplicate_data(new_data, existing_data, unique_key):
    """Remove duplicates from new data based on existing data."""
    existing_keys = existing_data[unique_key].tolist()
    unique_rows = new_data[~new_data[unique_key].isin(existing_keys)]
    return unique_rows

def get_unique_key(table_name):
    """Retrieve the unique key of the table."""
    if table_name == 'dim_user':
        return 'user_id'
    elif table_name == 'dim_payment':
        return 'payment_id'
    elif table_name == 'dim_shipper':
        return 'shipper_id'
    elif table_name == 'dim_rating':
        return 'rating_id'
    elif table_name == 'dim_voucher':
        return 'voucher_id'
    elif table_name == 'fact_orders':
        return 'order_id'
    elif table_name == 'dim_product_category':
        return 'product_category_id'
    elif table_name == 'dim_product':
        return 'product_id'
    elif table_name == 'fact_orders_items':
        return 'order_item_id'
    # Tambahkan kondisi lain jika ada tabel lain    
    else:
        raise ValueError("Table name not recognized.")

### **Function Data Mart**

In [5]:
def create_and_insert_dm_sales():
    """Create dm_sales table and insert data into it."""
    engine = sa.create_engine(warehouse_conn_string)
    with engine.connect() as conn:
        # Create dm_sales table
        conn.execute(ddl_marts['dm_sales'])

        # Insert data into dm_sales table
        conn.execute(ddl_marts['insert_dm_sales'])
    print(f'Data Mart Has Create Success')

In [6]:
def create_and_insert_dm_order_product():
    """Create dm_sales table and insert data into it."""
    engine = sa.create_engine(warehouse_conn_string)
    with engine.connect() as conn:
        # Create dm_sales table
        conn.execute(ddl_marts['dm_order_product'])

        # Insert data into dm_sales table
        conn.execute(ddl_marts['insert_dm_order_product'])
    print(f'Data Mart order product Has Create Success')

### **Function Run**

In [7]:
def etl_process():
    """Run the entire ETL process."""
    # Create tables
    create_tables()

    # Process dimension tables
    for dim_table, target_table in warehouse_tables.items():
        if dim_table != 'fact_orders':
            source_table = dim_table
            df = extract_data(source_table)
            transformed_df = transform_data(df, dim_table)
            load_data(transformed_df, target_table)
        else:
            # Process fact table
            df_fact_orders = transform_fact_orders()
            load_data(df_fact_orders, target_table)

    # proses mart table
    create_and_insert_dm_sales()
    create_and_insert_dm_order_product()

# **Run ETL**

In [8]:
#script running all ETL
etl_process()

Extract Data tb_users Success
Transform Data users Success
Load Data dim_user Success
Extract Data tb_payments Success
Transform Data payments Success
Load Data dim_payment Success
Extract Data tb_shippers Success
Transform Data shippers Success
Load Data dim_shipper Success
Extract Data tb_ratings Success
Transform Data ratings Success
Load Data dim_rating Success
Extract Data tb_vouchers Success
Transform Data vouchers Success
Load Data dim_voucher Success
Extract Data tb_orders Success
Transform Data orders Success
Load Data fact_orders Success
Extract Data tb_product_category Success
Transform Data product_category Success
Load Data dim_product_category Success
Extract Data tb_products Success
Transform Data products Success
Load Data dim_product Success
Extract Data tb_order_items Success
Transform Data orders_items Success
Load Data fact_orders_items Success
Data Mart Has Create Success
Data Mart order product Has Create Success


### **Run Testing**

In [9]:
create_tables()

source_table = 'users'
df = extract_data(source_table)
df

Extract Data tb_users Success


Unnamed: 0,user_id,user_first_name,user_last_name,user_gender,user_address,user_birthday,user_join
0,100101,Budi,Gunawan,Male,"Jl. Pondok Indah No.1, Kecamatan Pondok Labu, ...",1998-09-12,2022-01-13
1,100102,Eva,Susanti,Female,"Jl. Timur Raya No. 13, Kramat Jaya, Jakarta Ti...",1997-02-16,2022-01-29
2,100103,Dana,Pradana,Male,"Jl. Pahlawan, Surabaya, Jawa Timur",1999-07-19,2022-02-11
3,100104,Rahmat,Hidayat,Male,"Jl. Amil Abas, Jakarta Timur, DKI Jakarta",2000-02-14,2022-03-22
4,100105,Dodo,Andriano,Male,"Jl. Pakuan Selatan No. 177, Magelang, Jawa Tengah",2000-09-06,2022-04-03
5,100106,Caca,Kumala,Female,"Jl. Bunga Raya, Kota Tanggerang, Banten",1998-11-05,2022-05-20
6,100107,Andi,Kurniawan,Male,"Jl. Mawar Indah No. 25, Jakarta Barat, DKI Jak...",2001-03-14,2022-05-24
7,100108,Fanny,Utami,Female,"Jl. Kilometer Panjang No. 210, Jakarta Utara, ...",2002-01-27,2022-06-02
8,100109,Gagah,Prakasa,Male,"Jl. Timur Asri No. 10, Denpasar, Bali",2001-08-05,2022-07-14
9,100110,Anita,Friska,Female,"Jl. Tembung Raya, Kota Medan Timur, Sumatera U...",2000-11-04,2022-07-21


In [10]:
transformed_df = transform_data(df, 'dim_user')
transformed_df

Transform Data dim_user Success


Unnamed: 0,user_id,user_first_name,user_last_name,user_gender,user_address,user_birthday,user_join
0,100101,Budi,Gunawan,Male,"Jl. Pondok Indah No.1, Kecamatan Pondok Labu, ...",1998-09-12,2022-01-13
1,100102,Eva,Susanti,Female,"Jl. Timur Raya No. 13, Kramat Jaya, Jakarta Ti...",1997-02-16,2022-01-29
2,100103,Dana,Pradana,Male,"Jl. Pahlawan, Surabaya, Jawa Timur",1999-07-19,2022-02-11
3,100104,Rahmat,Hidayat,Male,"Jl. Amil Abas, Jakarta Timur, DKI Jakarta",2000-02-14,2022-03-22
4,100105,Dodo,Andriano,Male,"Jl. Pakuan Selatan No. 177, Magelang, Jawa Tengah",2000-09-06,2022-04-03
5,100106,Caca,Kumala,Female,"Jl. Bunga Raya, Kota Tanggerang, Banten",1998-11-05,2022-05-20
6,100107,Andi,Kurniawan,Male,"Jl. Mawar Indah No. 25, Jakarta Barat, DKI Jak...",2001-03-14,2022-05-24
7,100108,Fanny,Utami,Female,"Jl. Kilometer Panjang No. 210, Jakarta Utara, ...",2002-01-27,2022-06-02
8,100109,Gagah,Prakasa,Male,"Jl. Timur Asri No. 10, Denpasar, Bali",2001-08-05,2022-07-14
9,100110,Anita,Friska,Female,"Jl. Tembung Raya, Kota Medan Timur, Sumatera U...",2000-11-04,2022-07-21


In [11]:
load_data(transformed_df, 'dim_user')

Load Data dim_user Success


### **Script Upload Google Sheets**

In [12]:
%pip install gspread
%pip install oauth2client

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [13]:
import json
import gspread
from oauth2client.service_account import ServiceAccountCredentials

with open('digitalskola_key.json','rb') as file:
    key = json.load(file)
    
scope = ['https://www.googleapis.com/auth/drive','https://spreadsheets.google.com/feeds']
creds = ServiceAccountCredentials.from_json_keyfile_dict(key, scope)
client = gspread.authorize(creds)

###tambahkan email googledigitalskola@digitalskola-368401.iam.gserviceaccount.com ke dalam google sheet anda#

In [14]:
def fetch_data_from_dwh(query):
     # Membuat koneksi ke database
    engine = sa.create_engine(warehouse_conn_string)
    
    # Membuat hasil query menjadi Dataframe
    df = pd.read_sql(query, engine)
    
    return df

df_mart = fetch_data_from_dwh("""SELECT * FROM dm_sales;""")
df_mart

Unnamed: 0,order_id,order_date,user_id,user_name,payment_type,shipper_name,order_price,order_discount,voucher_name,order_total,user_address
0,1110001,2022-01-20,100101,Budi Gunawan,Debit,JNE Express,250000,15000,New User,230000,"Jl. Pondok Indah No.1, Kecamatan Pondok Labu, ..."
1,1110002,2022-01-29,100102,Eva Susanti,Debit,JNE Express,620000,40000,New User,575000,"Jl. Timur Raya No. 13, Kramat Jaya, Jakarta Ti..."
2,1110003,2022-02-13,100103,Dana Pradana,Credit,JNE Express,6000000,1000000,New User,4995000,"Jl. Pahlawan, Surabaya, Jawa Timur"
3,1110004,2022-03-06,100102,Eva Susanti,Wallet,JNE Express,3150000,45000,,3105000,"Jl. Timur Raya No. 13, Kramat Jaya, Jakarta Ti..."
4,1110005,2022-04-28,100105,Dodo Andriano,Debit,Sicepat Express,4000000,1000000,New User,2995000,"Jl. Pakuan Selatan No. 177, Magelang, Jawa Tengah"
5,1110006,2022-05-09,100103,Dana Pradana,Debit,Sicepat Express,4500000,1030000,,3470000,"Jl. Pahlawan, Surabaya, Jawa Timur"
6,1110007,2022-05-21,100106,Caca Kumala,Debit,JNE Express,870000,25000,,845000,"Jl. Bunga Raya, Kota Tanggerang, Banten"
7,1110008,2022-06-02,100108,Fanny Utami,Credit,Sicepat Express,2000000,0,New User,1995000,"Jl. Kilometer Panjang No. 210, Jakarta Utara, ..."
8,1110009,2022-06-23,100103,Dana Pradana,Credit,Lazada Express,2000000,0,,2000000,"Jl. Pahlawan, Surabaya, Jawa Timur"
9,1110010,2022-07-01,100102,Eva Susanti,Credit,Lazada Express,1050000,45000,,1005000,"Jl. Timur Raya No. 13, Kramat Jaya, Jakarta Ti..."


In [15]:
# ganti dengan nama google sheets anda
sheet = client.open('data')

# ganti sesuai dengan nama sheet didalam google sheets anda
# siapkan nama kolom pada sheet di google sheet anda

export = sheet.worksheet('order')
export.update([df_mart.columns.values.tolist()] + df_mart.astype(str).values.tolist())

{'spreadsheetId': '1SRPAiD4eIais-3dqN6kU3fYsjDGRw5BBlATtJgODamE',
 'updatedRange': 'order!A1:K13',
 'updatedRows': 13,
 'updatedColumns': 11,
 'updatedCells': 143}

In [16]:
df_mart2 = fetch_data_from_dwh("""SELECT * FROM dm_order_product;""")
df_mart2

Unnamed: 0,order_item_id,order_id,order_item_quantity,product_id,product_name,product_category_id,product_category_name
0,90010001,1110001,1,31110002,Shirt,320001001,Fashion
1,90010002,1110002,1,31110007,Body Soap,320001003,Health & Beauty
2,90010003,1110002,2,31110002,Shirt,320001001,Fashion
3,90010004,1110003,1,31110004,Television,320001002,Electronic
4,90010005,1110003,1,31110005,Headphone,320001002,Electronic
5,90010006,1110004,2,31110001,Bag,320001001,Fashion
6,90010007,1110004,3,31110002,Shirt,320001001,Fashion
7,90010008,1110004,1,31110003,Camera,320001002,Electronic
8,90010009,1110005,1,31110005,Headphone,320001002,Electronic
9,90010010,1110006,1,31110005,Headphone,320001002,Electronic


In [17]:
# ganti dengan nama google sheets anda
sheet = client.open('data')

# ganti sesuai dengan nama sheet didalam google sheets anda
# siapkan nama kolom pada sheet di google sheet anda

export = sheet.worksheet('product')
export.update([df_mart2.columns.values.tolist()] + df_mart2.astype(str).values.tolist())

{'spreadsheetId': '1SRPAiD4eIais-3dqN6kU3fYsjDGRw5BBlATtJgODamE',
 'updatedRange': 'product!A1:G23',
 'updatedRows': 23,
 'updatedColumns': 7,
 'updatedCells': 161}