### **Load Library**

In [6]:
import pandas as pd
import sqlalchemy as sa

from config_project1 import oltp_conn_string, warehouse_conn_string, oltp_tables, warehouse_tables, dimension_columns, ddl_statements, ddl_marts

### **Function ETL**

In [2]:
def test_connection():
    """Test the database connection."""
    try:
        engine = sa.create_engine(warehouse_conn_string)
        with engine.connect() as conn:
            print("Connection to database succeeded.")
    except Exception as e:
        print(f"Failed to connect to the database: {e}")

def create_tables():
    """Create tables in the data warehouse if they do not exist."""
    engine = sa.create_engine(warehouse_conn_string)
    try:
        with engine.begin() as conn:
            for table_name, ddl in ddl_statements.items():
                try:
                    conn.execute(sa.text(ddl))
                    print(f"Table {table_name} created successfully.")
                except Exception as e:
                    print(f"Error creating table {table_name}: {e}")
    except Exception as e:
        print(f"Failed to connect to the database: {e}")
            
def extract_data(table_name):
    """Extract data from a table in the OLTP database."""
    engine = sa.create_engine(oltp_conn_string)
    query = f"SELECT * FROM {oltp_tables[table_name]}"
    df = pd.read_sql(query, engine)
    print(f'Extract Data {oltp_tables[table_name]} Success')
    return df

def transform_data(df, target_table):
    """Transform the extracted data to match the schema of the target dimension table."""
    columns = dimension_columns.get(target_table)
    if columns:
        df = df[columns]
    print(f'Transform Data {target_table} Success')
    return df

def transform_fact_orders():
    """Transform data for the fact_orders table."""
    dataframes = {table: extract_data(table) for table in oltp_tables.keys()}

    df_orders = dataframes['orders']
    df_orders = df_orders.merge(dataframes['users'], on='user_id')
    df_orders = df_orders.merge(dataframes['payments'], on='payment_id')
    df_orders = df_orders.merge(dataframes['shippers'], on='shipper_id')
    df_orders = df_orders.merge(dataframes['ratings'], on='rating_id')
    df_orders = df_orders.merge(dataframes['vouchers'], how='left', on='voucher_id')
    df_orders.rename(columns={'user_id_x': 'user_id'}, inplace=True)
    
    fact_orders_columns = dimension_columns.get('fact_orders')
    return df_orders[fact_orders_columns]


def load_data(df, table_name):
    """Load the transformed data into the target table in the data warehouse."""
    engine = sa.create_engine(warehouse_conn_string)
    with engine.begin() as conn:
        # Cek kunci unique
        unique_key = get_unique_key(table_name)  # Misalnya user_id untuk tabel dim_user
        existing_data = pd.read_sql(f"SELECT {unique_key} FROM {table_name}", conn)
        
        # Deduplikasi data
        df = deduplicate_data(df, existing_data, unique_key)
        
        # Masukkan data baru
        df.to_sql(table_name, conn, index=False, if_exists='append', method='multi')
        print(f'Load Data {table_name} Success')
        
def deduplicate_data(new_data, existing_data, unique_key):
    """Remove duplicates from new data based on existing data."""
    existing_keys = existing_data[unique_key].tolist()
    unique_rows = new_data[~new_data[unique_key].isin(existing_keys)]
    return unique_rows

def get_unique_key(table_name):
    """Retrieve the unique key of the table."""
    if table_name == 'dim_user':
        return 'user_id'
    elif table_name == 'dim_payment':
        return 'payment_id'
    elif table_name == 'dim_shipper':
        return 'shipper_id'
    elif table_name == 'dim_rating':
        return 'rating_id'
    elif table_name == 'dim_voucher':
        return 'voucher_id'
    elif table_name == 'fact_orders':
        return 'order_id'
    elif table_name == 'dim_product_category':
        return 'product_category_id'
    elif table_name == 'dim_product':
        return 'product_id'
    elif table_name == 'fact_order_items':
        return 'order_item_id'
    else:
        raise ValueError("Table name not recognized.")

### **Function Data Mart**

In [3]:
def create_and_insert_dm_sales():
    """Create dm_sales table and insert data into it."""
    engine = sa.create_engine(warehouse_conn_string)
    with engine.begin() as conn:
        # Create dm_sales table
        conn.execute(sa.text(ddl_marts['dm_sales']))

        # Insert data into dm_sales table
        conn.execute(sa.text(ddl_marts['insert_dm_sales']))

        # Create dm_sales_detail table
        conn.execute(sa.text(ddl_marts['dm_sales_detail']))

        # Insert data into dm_sales_detail
        conn.execute(sa.text(ddl_marts['insert_dm_sales_detail']))
    print(f'Data Mart Has Create Success')

### **Function Run**

In [4]:
def etl_process():
    """Run the entire ETL process."""
    # Create tables
    create_tables()

    # Process dimension tables
    for dim_table, target_table in warehouse_tables.items():
        if dim_table != 'fact_orders':
            source_table = dim_table
            df = extract_data(source_table)
            transformed_df = transform_data(df, dim_table)
            load_data(transformed_df, target_table)
        else:
            # Process fact table
            df_fact_orders = transform_fact_orders()
            load_data(df_fact_orders, target_table)

    # proses mart table
    create_and_insert_dm_sales()

# **Run ETL**

In [9]:
#script running all ETL
etl_process()

Table dim_user created successfully.
Table dim_payment created successfully.
Table dim_shipper created successfully.
Table dim_rating created successfully.
Table dim_voucher created successfully.
Table fact_orders created successfully.
Table dim_product_category created successfully.
Table dim_product created successfully.
Table fact_order_items created successfully.
Extract Data tb_users Success
Transform Data users Success
Load Data dim_user Success
Extract Data tb_payments Success
Transform Data payments Success
Load Data dim_payment Success
Extract Data tb_shippers Success
Transform Data shippers Success
Load Data dim_shipper Success
Extract Data tb_ratings Success
Transform Data ratings Success
Load Data dim_rating Success
Extract Data tb_vouchers Success
Transform Data vouchers Success
Load Data dim_voucher Success
Extract Data tb_orders Success
Transform Data orders Success
Load Data fact_orders Success
Extract Data tb_product_category Success
Transform Data product categories S

### **Run Testing**

In [33]:
source_table = 'users'
df = extract_data(source_table)
df

Extract Data tb_users Success


Unnamed: 0,user_id,user_first_name,user_last_name,user_gender,user_address,user_birthday,user_join
0,100101,Budi,Gunawan,Male,"Jl. Pondok Indah No.1, Kecamatan Pondok Labu, ...",1998-09-12,2022-01-13
1,100102,Eva,Susanti,Female,"Jl. Timur Raya No. 13, Kramat Jaya, Jakarta Ti...",1997-02-16,2022-01-29
2,100103,Dana,Pradana,Male,"Jl. Pahlawan, Surabaya, Jawa Timur",1999-07-19,2022-02-11
3,100104,Rahmat,Hidayat,Male,"Jl. Amil Abas, Jakarta Timur, DKI Jakarta",2000-02-14,2022-03-22
4,100105,Dodo,Andriano,Male,"Jl. Pakuan Selatan No. 177, Magelang, Jawa Tengah",2000-09-06,2022-04-03
5,100106,Caca,Kumala,Female,"Jl. Bunga Raya, Kota Tanggerang, Banten",1998-11-05,2022-05-20
6,100107,Andi,Kurniawan,Male,"Jl. Mawar Indah No. 25, Jakarta Barat, DKI Jak...",2001-03-14,2022-05-24
7,100108,Fanny,Utami,Female,"Jl. Kilometer Panjang No. 210, Jakarta Utara, ...",2002-01-27,2022-06-02
8,100109,Gagah,Prakasa,Male,"Jl. Timur Asri No. 10, Denpasar, Bali",2001-08-05,2022-07-14
9,100110,Anita,Friska,Female,"Jl. Tembung Raya, Kota Medan Timur, Sumatera U...",2000-11-04,2022-07-21


In [25]:
transform_fact_orders()

Extract Data tb_users Success
Extract Data tb_payments Success
Extract Data tb_shippers Success
Extract Data tb_ratings Success
Extract Data tb_vouchers Success
Extract Data tb_orders Success
Extract Data tb_product_category Success
Extract Data tb_products Success
Extract Data tb_order_items Success


Unnamed: 0,order_id,order_date,user_id,payment_id,shipper_id,order_price,order_discount,voucher_id,order_total,rating_id
0,1110001,2022-01-20,100101,1202,60002001,250000,15000,41000101.0,230000,800010003
1,1110002,2022-01-29,100102,1202,60002001,620000,40000,41000102.0,575000,800010003
2,1110003,2022-02-13,100103,1204,60002001,6000000,1000000,41000103.0,4995000,800010001
3,1110004,2022-03-06,100102,1203,60002001,3150000,45000,,3105000,800010005
4,1110005,2022-04-28,100105,1202,60002002,4000000,1000000,41000105.0,2995000,800010001
5,1110006,2022-05-09,100103,1202,60002002,4500000,1030000,,3470000,800010004
6,1110007,2022-05-21,100106,1202,60002001,870000,25000,,845000,800010005
7,1110008,2022-06-02,100108,1204,60002002,2000000,0,41000108.0,1995000,800010004
8,1110009,2022-06-23,100103,1204,60002003,2000000,0,,2000000,800010005
9,1110010,2022-07-01,100102,1204,60002003,1050000,45000,,1005000,800010002


### **Script Upload Google Sheets**

In [13]:
!pip install gspread
!pip install oauth2client




[notice] A new release of pip is available: 23.3.2 -> 24.0
[notice] To update, run: python.exe -m pip install --upgrade pip





[notice] A new release of pip is available: 23.3.2 -> 24.0
[notice] To update, run: python.exe -m pip install --upgrade pip


In [14]:
import json
import gspread
from oauth2client.service_account import ServiceAccountCredentials

with open('digitalskola_key.json','rb') as file:
    key = json.load(file)
    
scope = ['https://www.googleapis.com/auth/drive','https://spreadsheets.google.com/feeds']
creds = ServiceAccountCredentials.from_json_keyfile_dict(key, scope)
client = gspread.authorize(creds)

###tambahkan email googledigitalskola@digitalskola-368401.iam.gserviceaccount.com ke dalam google sheet anda#

In [15]:
def fetch_data_from_dwh(query):
     # Membuat koneksi ke database
    engine = sa.create_engine(warehouse_conn_string)
    
    # Membuat hasil query menjadi Datafrmae
    df = pd.read_sql(query, engine)
    
    return df

df_mart_sales = fetch_data_from_dwh("""SELECT * FROM dm_sales_kel1;""")
df_mart_sales_detail =  fetch_data_from_dwh("""SELECT * FROM dm_sales_detail_kel1;""")

df_mart_sales

Unnamed: 0,order_id,order_date,user_id,user_name,region,payment_type,shipper_name,order_price,order_discount,voucher_name,voucher_price,order_total
0,1110001,2022-01-20,100101,Budi Gunawan,Jawa Barat,Debit,JNE Express,250000,15000,New User,5000.0,230000
1,1110002,2022-01-29,100102,Eva Susanti,DKI Jakarta,Debit,JNE Express,620000,40000,New User,5000.0,575000
2,1110003,2022-02-13,100103,Dana Pradana,Jawa Timur,Credit,JNE Express,6000000,1000000,New User,5000.0,4995000
3,1110004,2022-03-06,100102,Eva Susanti,DKI Jakarta,Wallet,JNE Express,3150000,45000,,,3105000
4,1110005,2022-04-28,100105,Dodo Andriano,Jawa Tengah,Debit,Sicepat Express,4000000,1000000,New User,5000.0,2995000
5,1110006,2022-05-09,100103,Dana Pradana,Jawa Timur,Debit,Sicepat Express,4500000,1030000,,,3470000
6,1110007,2022-05-21,100106,Caca Kumala,Banten,Debit,JNE Express,870000,25000,,,845000
7,1110008,2022-06-02,100108,Fanny Utami,DKI Jakarta,Credit,Sicepat Express,2000000,0,New User,5000.0,1995000
8,1110009,2022-06-23,100103,Dana Pradana,Jawa Timur,Credit,Lazada Express,2000000,0,,,2000000
9,1110010,2022-07-01,100102,Eva Susanti,DKI Jakarta,Credit,Lazada Express,1050000,45000,,,1005000


In [16]:
df_mart_sales_detail

Unnamed: 0,order_item_id,order_id,product_id,product_category_name,order_item_quantity,product_price,product_discount
0,90010001,1110001,31110002,Fashion,1,250000,15000
1,90010002,1110002,31110007,Health & Beauty,1,120000,10000
2,90010003,1110002,31110002,Fashion,2,250000,15000
3,90010004,1110003,31110004,Electronic,1,2000000,0
4,90010005,1110003,31110005,Electronic,1,4000000,1000000
5,90010006,1110004,31110001,Fashion,2,300000,0
6,90010007,1110004,31110002,Fashion,3,250000,15000
7,90010008,1110004,31110003,Electronic,1,1800000,0
8,90010009,1110005,31110005,Electronic,1,4000000,1000000
9,90010010,1110006,31110005,Electronic,1,4000000,1000000


In [17]:
# ganti dengan nama google sheets anda
sheet = client.open('DataMart_Team1')

# ganti sesuai dengan nama sheet didalam google sheets anda
# siapkan nama kolom pada sheet di google sheet anda

export = sheet.worksheet('Sheet1')
export.update([df_mart_sales.columns.values.tolist()] + df_mart_sales.astype(str).values.tolist())

export2 = sheet.worksheet('Sheet2')
export2.update([df_mart_sales_detail.columns.values.tolist()] + df_mart_sales_detail.astype(str).values.tolist())

{'spreadsheetId': '18ubtop847PYSP9enFXCNfuzvKZjsPup6Y0pYA5Y_y7w',
 'updatedRange': 'Sheet2!A1:G23',
 'updatedRows': 23,
 'updatedColumns': 7,
 'updatedCells': 161}