# Config

In [None]:
#!python config

# TAHAP 1 : LENGKAPI İSİ TABELNYA ! DAN SUDAH LENGKAP :)

oltp_conn_string = 'postgresql://spiderweb:spiderweb@47.237.106.32:5432/spiderweb_oltp'
warehouse_conn_string = 'postgresql://spiderweb:spiderweb@47.237.106.32:5432/spiderweb_dwh'

oltp_tables = {
    "users": "tb_users",
    "payments": "tb_payments",
    "shippers": "tb_shippers",
    "ratings": "tb_ratings",
    "vouchers": "tb_voucher", # klo di DWH itu dim_voucher, bukan dim_vouchers
    "orders": "tb_orders",
    "product_category" : "tb_product_category",
    "products" : "tb_products",
    "order_items" : "tb_order_items"
}

warehouse_tables = {
    "users": "dim_users", # diubah dari dim_user ke dim_users ( sesuai ERD kelompok 5)
    "payments": "dim_payments", # diubah dari dim_payment ke dim_payments ( sesuai ERD kelompok 5)
    "shippers": "dim_shippers", # diubah dari dim_shipper ke dim_shippers ( sesuai ERD kelompok 5)
    "ratings": "dim_ratings", # diubah dari dim_rating ke dim_ratings ( sesuai ERD kelompok 5)
    "vouchers": "dim_voucher",
    "orders": "fact_orders",
    "product_category" : "dim_product_category",
    "products" : "dim_products",
    "order_items" : "fact_order_items"
}

# data OLTP yang ada tabel user, connect-kan di DWH dim_user. 2 tabel salig berhubungan
# payments di OLTP, klo di DWH itu dim_payment
# tipe datanya adalah dictionary agar logiccodingannya lebih mudah

# pastikan nama-nama kolomnya benar !

# PERUBAHAN
# 1. kolom user_birthday di dim_users diubah menjadi user_birthdate
# 2. menambahkan kolom dim_product_category, dim_products, dan fact_order_items

dimension_columns = {
    "dim_users": ["user_id", "user_first_name", "user_last_name", "user_gender", "user_address", "user_birthdate", "user_join"],
    "dim_payments": ["payment_id", "payment_name", "payment_status"],
    "dim_shippers": ["shipper_id", "shipper_name"],
    "dim_ratings": ["rating_id", "rating_level", "rating_status"],
    "dim_voucher": ["voucher_id", "voucher_name", "voucher_price", "voucher_created","user_id"],
    "fact_orders": ['order_id', 'order_date', 'user_id', 'payment_id', 'shipper_id', 'order_price','order_discount', 'voucher_id', 'order_total', 'rating_id'],
    "dim_product_category" : [ "product_category_id", "product_category_name" ],
    "dim_products" : [ "product_id", "product_category_id", "product_name", "product_created", "product_price", "product_discount"],
    "fact_order_items" : [ 'order_item_id',  'order_id', 'product_id', 'order_item_quantity', 'product_discount', 'product_subdiscount', 'product_price', 'product_subprice' ]

    }


# perubahan sesuai Query DDL
ddl_statements = {
    "dim_users": """
       CREATE TABLE IF NOT EXISTS dim_users (
            user_id INT NOT NULL PRIMARY KEY,
            user_first_name VARCHAR(255) NOT NULL,
            user_last_name VARCHAR(255) NOT NULL,
            user_gender VARCHAR(50) NOT NULL,
            user_address VARCHAR(255),
            user_birthdate DATE NOT NULL,
            user_join DATE NOT NULL
        );
    """,
    "dim_payments": """
        CREATE TABLE IF NOT EXISTS dim_payments (
            payment_id INT NOT NULL PRIMARY KEY,
            payment_name VARCHAR(255) NOT NULL,
            payment_status BOOLEAN NOT NULL
        );
    """,
    "dim_shippers": """
        CREATE TABLE IF NOT EXISTS dim_shippers (
            shipper_id INT NOT NULL PRIMARY KEY,
            shipper_name VARCHAR(255) NOT NULL
        );
    """,
    "dim_ratings": """
       CREATE TABLE IF NOT EXISTS dim_ratings (
            rating_id INT NOT NULL PRIMARY KEY,
            rating_level INT NOT NULL,
            rating_status VARCHAR(255) NOT NULL
        );
    """,
    "dim_voucher": """
       CREATE TABLE IF NOT EXISTS dim_voucher (
            voucher_id INT NOT NULL PRIMARY KEY,
            voucher_name VARCHAR(255) NOT NULL,
            voucher_price INT NOT NULL,
            voucher_created DATE NOT NULL,
            user_id INT NOT NULL
        );
    """,
    "fact_orders": """
       CREATE TABLE IF NOT EXISTS fact_orders (
            order_id INT NOT NULL PRIMARY KEY,
            order_date DATE NOT NULL,
            user_id INT NOT NULL,
            payment_id INT NOT NULL,
            shipper_id INT NOT NULL,
            order_price INT NOT NULL,
            order_discount INT,
            voucher_id INT NOT NULL,
            order_total INT NOT NULL,
            rating_id INT NOT NULL,
            FOREIGN KEY (user_id) REFERENCES dim_users(user_id),
            FOREIGN KEY (payment_id) REFERENCES dim_payments(payment_id),
            FOREIGN KEY (shipper_id) REFERENCES dim_shippers(shipper_id),
            FOREIGN KEY (voucher_id) REFERENCES dim_voucher(voucher_id),
            FOREIGN KEY (rating_id) REFERENCES dim_ratings(rating_id)
        );
    """,
     "dim_product_category": """
      CREATE TABLE IF NOT EXISTS dim_product_category (
            product_category_id INT NOT NULL PRIMARY KEY,
            product_category_name VARCHAR(255) NOT NULL
        );
    """,
     "dim_products": """
      CREATE TABLE IF NOT EXISTS dim_products (
            product_id INT NOT NULL PRIMARY KEY,
            product_category_id INT NOT NULL,
            product_name VARCHAR(255) NOT NULL,
            product_created DATE NOT NULL,
            product_price INT NOT NULL,
            product_discount INT,
            FOREIGN KEY (product_category_id) REFERENCES dim_product_category(product_category_id)
        );
    """,
     "fact_order_items": """
      CREATE TABLE IF NOT EXISTS fact_order_items (
            order_item_id INT NOT NULL PRIMARY KEY,
            order_id INT NOT NULL,
            product_id INT NOT NULL,
            order_item_quantity INT,
            product_discount INT,
            product_subdiscount INT,
            product_price INT NOT NULL,
            product_subprice INT NOT NULL,
            FOREIGN KEY (order_id) REFERENCES fact_orders(order_id),
            FOREIGN KEY (product_id) REFERENCES dim_products(product_id)
        );
    """

}

## 1. ) Data Marts ( dm_sales )

In [1]:
ddl_marts_1 = {
    """
       CREATE TABLE IF NOT EXISTS dm_sales (
           month_sales CHAR(7) NOT NULL,
           total_sales INT NOT NULL,
           PRIMARY KEY (month_sales)
       );

      INSERT INTO dm_sales (month_sales, total_sales )
        SELECT
          TO_CHAR(order_date, 'MM') AS month_sales,
          SUM(oi.order_item_quantity * oi.product_price) AS total_sales
        FROM
          fact_orders o
        JOIN
          fact_order_items oi ON o.order_id = oi.order_id
        GROUP BY
          TO_CHAR(order_date, 'MM')
        ORDER BY
          month_sales;
    """
}

## 2.) Data Marts ( dm_product_sales_2 )

In [None]:
ddl_marts_2 = {
    """
       CREATE TABLE IF NOT EXISTS dm_product_sales_2 (
              product_category_id INT NOT NULL,
              product_category_name VARCHAR(255) NOT NULL,
              total_sales INT NOT NULL,
              PRIMARY KEY (product_category_id),
              FOREIGN KEY (product_category_id) REFERENCES dim_product_category(product_category_id)
            );


        INSERT INTO dm_product_sales_2 (product_category_id, product_category_name, total_sales)
        SELECT
          p.product_category_id,
          c.product_category_name,
          SUM(oi.order_item_quantity * oi.product_price) AS total_sales
        FROM
          dim_products p
        JOIN
          dim_product_category c ON p.product_category_id = c.product_category_id
        JOIN
          fact_order_items oi ON p.product_id = oi.product_id
        GROUP BY
          p.product_category_id, c.product_category_name;
    """
}

## 3.) Data Marts ( dm_payment_sales_3 )

In [None]:
ddl_marts_3 = {
    """
       CREATE TABLE IF NOT EXISTS dm_payment_sales_3 (
            payment_id INT NOT NULL,
            payment_name VARCHAR(255) NOT NULL,
            total_sales INT NOT NULL,
            PRIMARY KEY (payment_id),
            FOREIGN KEY (payment_id) REFERENCES dim_payments(payment_id)
          );


        INSERT INTO dm_payment_sales_3 (payment_id, payment_name, total_sales)
        SELECT
          o.payment_id,
          p.payment_name,
          SUM(o.order_total) AS total_sales
        FROM
          fact_orders o
        JOIN
          dim_payments p ON o.payment_id = p.payment_id
        GROUP BY
          o.payment_id, p.payment_name;
    """
}

## 4.) Data Marts ( dm_shipper_sales_4 )

In [None]:
ddl_marts_4 = {
    """
       CREATE TABLE IF NOT EXISTS dm_shipper_sales_4 (
            shipper_id INT NOT NULL,
            shipper_name VARCHAR(255) NOT NULL,
            total_sales INT NOT NULL,
            PRIMARY KEY (shipper_id),
            FOREIGN KEY (shipper_id) REFERENCES dim_shippers(shipper_id)
          );

        INSERT INTO dm_shipper_sales_4 (shipper_id, shipper_name, total_sales)
        SELECT
          o.shipper_id,
          s.shipper_name,
          SUM(o.order_total) AS total_sales
        FROM
          fact_orders o
        JOIN
          dim_shippers s ON o.shipper_id = s.shipper_id
        GROUP BY
          o.shipper_id, s.shipper_name;

    """
}

## 5.) Data Marts ( dm_user_sales_5  )

In [None]:
ddl_marts_5 = {
    """
       CREATE TABLE IF NOT EXISTS dm_user_sales_5 (
            user_id INT NOT NULL,
            user_first_name VARCHAR(255) NOT NULL,
            user_last_name VARCHAR(255) NOT NULL,
            total_sales INT NOT NULL,
            PRIMARY KEY (user_id),
            FOREIGN KEY (user_id) REFERENCES dim_users(user_id)
          );


        INSERT INTO dm_user_sales_5 (user_id, user_first_name, user_last_name, total_sales)
        SELECT
          u.user_id,
          u.user_first_name,
          u.user_last_name,
          SUM(o.order_total) AS total_sales
        FROM
          dim_users u
        JOIN
          fact_orders o ON u.user_id = o.user_id
        GROUP BY
          u.user_id, u.user_first_name, u.user_last_name;

    """
}

## 6.) Data Marts ( dm_discount_voucher_trend_6 )

In [None]:
ddl_marts_6 = {
    """
       CREATE TABLE IF NOT EXISTS dm_discount_voucher_trend_6 (
            trend_id SERIAL PRIMARY KEY,
            trend_date DATE NOT NULL,
            discount_amount INT NOT NULL,
            voucher_amount INT NOT NULL
          );


        INSERT INTO dm_discount_voucher_trend_6 (trend_date, discount_amount, voucher_amount)
        SELECT
          DATE_TRUNC('month', o.order_date) AS trend_date,
          SUM(o.order_discount) AS discount_amount,
          SUM(voucher.voucher_price) AS voucher_amount
        FROM
          fact_orders o
        LEFT JOIN
          dim_voucher voucher ON o.voucher_id = voucher.voucher_id
        GROUP BY
          trend_date;

    """
}

## 7.) Data Marts ( dm_sales_performance_by_region_7 )

In [None]:
ddl_marts_7 = {
    """
       CREATE TABLE IF NOT EXISTS dm_sales_performance_by_region_7 (
            region VARCHAR(255) NOT NULL,
            total_sales INT NOT NULL,
            PRIMARY KEY (region)
          );

        INSERT INTO dm_sales_performance_by_region_7 (region, total_sales)
        SELECT
          u.user_address AS region,
          SUM(o.order_total) AS total_sales
        FROM
          fact_orders o
        JOIN
          dim_users u ON o.user_id = u.user_id
        GROUP BY
          u.user_address;

    """
}

## 8.) Data Marts ( dm_profit_margin_per_category_8 )

In [None]:
ddl_marts_8 = {
    """
       CREATE TABLE IF NOT EXISTS dm_profit_margin_per_category_8 (
            category_id INT NOT NULL PRIMARY KEY,
            category_name VARCHAR(255) NOT NULL,
            total_sales INT NOT NULL,
            total_cost INT NOT NULL,
            total_profit INT NOT NULL
          );

        INSERT INTO dm_profit_margin_per_category_8 (category_id, category_name, total_sales, total_cost, total_profit)
        SELECT
          pc.product_category_id AS category_id,
          pc.product_category_name AS category_name,
          SUM(oi.product_price * oi.order_item_quantity) AS total_sales,
          SUM(oi.product_subprice * oi.order_item_quantity) AS total_cost,
          SUM((oi.product_price - oi.product_subprice) * oi.order_item_quantity) AS total_profit
        FROM
          fact_order_items oi
        JOIN
          dim_products p ON oi.product_id = p.product_id
        JOIN
          dim_product_category pc ON p.product_category_id = pc.product_category_id
        GROUP BY
          pc.product_category_id, pc.product_category_name;

    """
}

## 9.) Data Marts ( dm_average_order_value_per_user_9 )

In [None]:
ddl_marts_9= {
    """
       CREATE TABLE IF NOT EXISTS dm_average_order_value_per_user_9 (
            user_id INT NOT NULL PRIMARY KEY,
            user_first_name VARCHAR(255) NOT NULL,
            user_last_name VARCHAR(255) NOT NULL,
            average_order_value NUMERIC(10, 2) NOT NULL
          );

        INSERT INTO dm_average_order_value_per_user_9 (user_id, user_first_name, user_last_name, average_order_value)
        SELECT
          u.user_id,
          u.user_first_name,
          u.user_last_name,
          AVG(fo.order_total) AS average_order_value
        FROM
          dim_users u
        JOIN
          fact_orders fo ON u.user_id = fo.user_id
        GROUP BY
          u.user_id, u.user_first_name, u.user_last_name;

    """
}

## 10.) Data Marts ( dm_voucher_conversion_rate_10  )

In [None]:
ddl_marts_9= {
    """
       CREATE TABLE IF NOT EXISTS dm_voucher_conversion_rate_10 (
            voucher_id INT NOT NULL PRIMARY KEY,
            voucher_name VARCHAR(255) NOT NULL,
            total_orders_with_voucher INT NOT NULL,
            total_orders_without_voucher INT NOT NULL,
            conversion_rate NUMERIC(5, 2) NOT NULL
          );


        INSERT INTO dm_voucher_conversion_rate_10 (voucher_id, voucher_name, total_orders_with_voucher, total_orders_without_voucher, conversion_rate)
        SELECT
          v.voucher_id,
          v.voucher_name,
          COALESCE(COUNT(fo.order_id), 0) AS total_orders_with_voucher,
          (SELECT COUNT(*) FROM fact_orders WHERE voucher_id IS NULL) AS total_orders_without_voucher,
          CASE
              WHEN COUNT(fo.order_id) > 0 THEN (COUNT(fo.order_id)::NUMERIC / (COUNT(fo.order_id) + (SELECT COUNT(*) FROM fact_orders WHERE voucher_id IS NULL))::NUMERIC) * 100
              ELSE 0
          END AS conversion_rate
        FROM
          dim_voucher v
        LEFT JOIN
          fact_orders fo ON v.voucher_id = fo.voucher_id
        GROUP BY
          v.voucher_id, v.voucher_name;


    """
}

# **Load Library**

In [None]:
import pandas as pd
import sqlalchemy as sa

# **Function ETL**

### create table

In [None]:
# Buat tabel DWH fact dan dim nya
def create_tables():
    """Create tables in the data warehouse if they do not exist."""
    #Menjelaskan bahwa fungsi ini bertugas untuk membuat tabel di data warehouse jika tabel-tabel tersebut belum ada.

    engine = sa.create_engine(warehouse_conn_string) # create table baru
    #sa.create_engine(warehouse_conn_string): Menggunakan SQLAlchemy (sa kemungkinan adalah alias dari sqlalchemy)
    #untuk membuat engine yang akan digunakan untuk koneksi ke database data warehouse.

    # warehouse_conn_string adalah string koneksi yang berisi informasi tentang bagaimana menghubungkan ke database data warehouse.


    with engine.connect() as conn: # tes koneksi
    #with engine.connect() as conn: Membuka koneksi ke database menggunakan engine yang baru dibuat.
    #Blok with memastikan bahwa koneksi akan ditutup secara otomatis setelah blok selesai dieksekusi.
    #conn: Objek koneksi yang digunakan untuk berinteraksi dengan database.

        for ddl in ddl_statements.values():
          # ddl_statements: Koleksi dari pernyataan DDL (Data Definition Language) yang berisi SQL untuk membuat tabel.
          # Ini bisa berupa dictionary di mana setiap nilai adalah pernyataan SQL.

          # for ddl in ddl_statements.values(): Melakukan iterasi melalui setiap pernyataan DDL dalam koleksi.

            conn.execute(ddl) # di running tablenya
            # conn.execute(ddl): Menjalankan pernyataan DDL menggunakan koneksi yang telah dibuka. Ini akan membuat tabel di database data warehouse sesuai dengan pernyataan SQL yang ada dalam ddl_statements.

Potongan kode ini memastikan bahwa semua tabel yang diperlukan di data warehouse dibuat jika belum ada, mempersiapkan data warehouse untuk proses ETL selanjutnya.

Fungsi create_tables bertujuan untuk membuat tabel di data warehouse jika tabel-tabel tersebut belum ada. Fungsi ini bekerja dengan cara:

1. Membuat engine untuk koneksi ke data warehouse menggunakan string koneksi.
2. Membuka koneksi ke database.
3. Melakukan iterasi melalui sekumpulan pernyataan DDL.
4. Menjalankan setiap pernyataan DDL untuk membuat tabel yang diperlukan.

In [None]:
# Check dengan running
for i in ddl_statements:
  print(i)

dim_users
dim_payments
dim_shippers
dim_ratings
dim_voucher
fact_orders
dim_product_category
dim_products
fact_order_items


### ekstrak data

In [None]:
# Cek OLTP tables
oltp_tables

{'users': 'tb_users',
 'payments': 'tb_payments',
 'shippers': 'tb_shippers',
 'ratings': 'tb_ratings',
 'vouchers': 'tb_voucher',
 'orders': 'tb_orders',
 'product_category': 'tb_product_category',
 'products': 'tb_products',
 'order_items': 'tb_order_items'}

In [None]:
# OLTP tables kolom 'user'
oltp_tables['users']

'tb_users'

In [None]:
# Cek warehouse tables
warehouse_tables

{'users': 'dim_users',
 'payments': 'dim_payments',
 'shippers': 'dim_shippers',
 'ratings': 'dim_ratings',
 'vouchers': 'dim_voucher',
 'orders': 'fact_orders',
 'product_category': 'dim_product_category',
 'products': 'dim_products',
 'order_items': 'fact_order_items'}

In [None]:
# warehouse tables kolom 'user'
warehouse_tables['users']

'dim_users'

In [None]:
def extract_data(table_name):
  # Menjelaskan bahwa fungsi ini bertugas untuk mengekstrak data dari sebuah tabel di database OLTP (Online Transaction Processing).
    """Extract data from a table in the OLTP database."""
    engine = sa.create_engine(oltp_conn_string)
    # sa.create_engine(oltp_conn_string): Menggunakan SQLAlchemy (sa kemungkinan adalah alias dari sqlalchemy) untuk membuat engine yang akan digunakan untuk koneksi ke database OLTP.
    # oltp_conn_string adalah string koneksi yang berisi informasi tentang bagaimana menghubungkan ke database OLTP.


    query = f"SELECT * FROM {oltp_tables[table_name]}"
    # Membuat Query SQL: Menggunakan f-string untuk membentuk query SQL yang akan mengekstrak semua data (SELECT *) dari tabel yang namanya diambil dari oltp_tables[table_name].
    # oltp_tables: Diasumsikan sebagai dictionary atau mapping yang berisi nama-nama tabel di database OLTP. table_name adalah kunci untuk mendapatkan nama tabel yang benar.

    df = pd.read_sql(query, engine)
    # pd.read_sql(query, engine): Menggunakan fungsi read_sql dari Pandas (pd) untuk menjalankan query SQL dan mengembalikan hasilnya sebagai DataFrame (df).
    # query adalah query SQL yang telah dibuat, dan engine adalah engine SQLAlchemy yang digunakan untuk koneksi ke database.

    print(f'Extract Data {oltp_tables[table_name]} Success')
    # Mencetak Pesan: Menggunakan f-string untuk mencetak pesan sukses yang menyatakan bahwa data dari tabel yang dimaksud telah berhasil diekstraksi.
    return df
    # Mengembalikan DataFrame: Mengembalikan DataFrame (df) yang berisi data yang diekstraksi dari tabel di database OLTP.

Fungsi extract_data bertujuan untuk mengekstrak data dari tabel tertentu di database OLTP dan mengembalikannya sebagai DataFrame Pandas. Berikut adalah langkah-langkah yang dilakukan oleh fungsi ini:

1. Membuat Engine Koneksi: Membuat engine koneksi ke database OLTP menggunakan string koneksi.
2. Membuat Query SQL: Membuat query SQL untuk memilih semua data dari tabel yang ditentukan.
3. Menjalankan Query: Menjalankan query menggunakan Pandas dan menyimpan hasilnya dalam DataFrame.
4. Mencetak Pesan Sukses: Mencetak pesan yang menunjukkan bahwa data telah berhasil diekstraksi.
5. Mengembalikan Data: Mengembalikan DataFrame yang berisi data yang diekstraksi.

Fungsi ini adalah bagian penting dari proses ETL (Extract, Transform, Load) yang bertugas untuk mengekstrak data dari sumber OLTP sebelum data tersebut ditransformasi dan dimuat ke dalam data warehouse.

### transform data

#### a.) Dimensional

In [None]:
def transform_data(df, target_table):
    """Transform the extracted data to match the schema of the target dimension table."""
    # Menjelaskan bahwa fungsi ini bertujuan untuk mentransformasi data yang diekstrak agar sesuai dengan skema tabel dimensi target.

    columns = dimension_columns.get(target_table)
    # dimension_columns.get(target_table): Mengambil daftar kolom yang diperlukan untuk tabel dimensi target dari dimension_columns,
    # yang diasumsikan sebagai dictionary yang memetakan nama tabel ke daftar kolom yang dibutuhkan.

    if columns:
    # if columns:: Memeriksa apakah kolom yang dibutuhkan tersedia (tidak None).

        df = df[columns]
        # df = df[columns]: Jika kolom tersedia, menyaring DataFrame (df) agar hanya berisi kolom yang ditentukan dalam columns.
        # Ini memastikan bahwa DataFrame yang dikembalikan hanya memiliki kolom yang sesuai dengan skema tabel dimensi target.

    print(f'Transform Data {target_table} Success')
    # Mencetak Pesan: Menggunakan f-string untuk mencetak pesan sukses yang menunjukkan bahwa data untuk tabel dimensi target telah berhasil ditransformasi.

    return df
    # Mengembalikan DataFrame: Mengembalikan DataFrame (df) yang telah ditransformasi agar sesuai dengan skema tabel dimensi target.

Fungsi ini merupakan bagian penting dari proses ETL (Extract, Transform, Load) yang bertugas untuk mentransformasi data setelah diekstrak dan sebelum dimuat ke dalam tabel dimensi di data warehouse.

Fungsi transform_data bertujuan untuk mentransformasi data yang diekstrak agar sesuai dengan skema tabel dimensi target. Berikut adalah langkah-langkah yang dilakukan oleh fungsi ini:

1. Mengambil Kolom yang Dibutuhkan: Mengambil daftar kolom yang diperlukan untuk tabel dimensi target dari dimension_columns.
2. Menyaring Kolom DataFrame: Jika kolom yang dibutuhkan tersedia, menyaring DataFrame agar hanya berisi kolom yang sesuai dengan skema tabel dimensi target.
3. Mencetak Pesan Sukses: Mencetak pesan yang menunjukkan bahwa data telah berhasil ditransformasi.
4. Mengembalikan Data: Mengembalikan DataFrame yang berisi data yang telah ditransformasi.

Jadi, transform yang transform_data untuk yang dimensional, sedangkan yang dibawah, yang transform_fact_orders untuk yang fact table aja

#### b.) fact_orders

In [None]:
def transform_fact_orders():
    """Transform data for the fact_orders table."""
    # Menjelaskan bahwa fungsi ini bertujuan untuk mentransformasi data yang diperlukan untuk tabel fact_orders.

    dataframes = {table: extract_data(table) for table in oltp_tables.keys()}
    # {table: extract_data(table) for table in oltp_tables.keys()}: Membuat dictionary dataframes yang berisi DataFrame hasil ekstraksi data dari setiap tabel dalam oltp_tables.
    # Ini menggunakan dictionary comprehension untuk mengiterasi melalui kunci-kunci tabel dalam oltp_tables dan mengekstrak data untuk setiap tabel menggunakan fungsi extract_data

    df_orders = dataframes['orders']
    df_orders = df_orders.merge(dataframes['users'], on='user_id')
    df_orders = df_orders.merge(dataframes['payments'], on='payment_id')
    df_orders = df_orders.merge(dataframes['shippers'], on='shipper_id')
    df_orders = df_orders.merge(dataframes['ratings'], on='rating_id')
    df_orders = df_orders.merge(dataframes['vouchers'], how='left', on='voucher_id')
    # Penggabungan Data: DataFrame df_orders digabungkan dengan DataFrame lain yang diekstrak sebelumnya. Ini termasuk tabel users, payments, shippers, ratings, dan vouchers.
    # Penggabungan dilakukan berdasarkan kolom-kolom yang sesuai seperti user_id, payment_id, shipper_id, rating_id, dan voucher_id.


    df_orders.rename(columns={'user_id_x': 'user_id'}, inplace=True)
    # Mengganti Nama Kolom: Karena setelah penggabungan terdapat kolom-kolom yang berasal dari tabel yang sama, perlu dilakukan penamaan ulang agar tidak terjadi duplikasi nama kolom.
    # Di sini, kolom yang berasal dari users diubah namanya dari user_id_x menjadi user_id.

    fact_orders_columns = dimension_columns.get('fact_orders') # karena tabel targetnya adalah afct order
    return df_orders[fact_orders_columns]
    # Memilih Kolom Tertentu: Mengambil kolom-kolom yang sesuai dengan skema tabel fact_orders dari DataFrame df_orders.
    # Kolom-kolom yang dipilih sesuai dengan yang telah ditentukan sebelumnya dalam dimension_columns.


Fungsi transform_fact_orders adalah bagian dari proses ETL yang bertugas untuk mentransformasi data yang diekstrak dari tabel-tabel sumber menjadi format yang sesuai dengan skema tabel fact_orders dalam data warehouse. Langkah-langkah yang dilakukan termasuk penggabungan data dari berbagai tabel sumber, penggantian nama kolom, dan pemilihan kolom tertentu sesuai dengan skema tabel target.

Jadi, fungsi ini melakukan transformasi data yang diperlukan untuk tabel fact_orders sehingga data tersebut siap untuk dimuat ke dalam data warehouse.

In [None]:
# ngecek tipe data
dimension_columns

{'dim_users': ['user_id',
  'user_first_name',
  'user_last_name',
  'user_gender',
  'user_address',
  'user_birthdate',
  'user_join'],
 'dim_payments': ['payment_id', 'payment_name', 'payment_status'],
 'dim_shippers': ['shipper_id', 'shipper_name'],
 'dim_ratings': ['rating_id', 'rating_level', 'rating_status'],
 'dim_voucher': ['voucher_id',
  'voucher_name',
  'voucher_price',
  'voucher_created',
  'user_id'],
 'fact_orders': ['order_id',
  'order_date',
  'user_id',
  'payment_id',
  'shipper_id',
  'order_price',
  'order_discount',
  'voucher_id',
  'order_total',
  'rating_id'],
 'dim_product_category': ['product_category_id', 'product_category_name'],
 'dim_products': ['product_id',
  'product_category_id',
  'product_name',
  'product_created',
  'product_price',
  'product_discount'],
 'fact_order_items': ['order_item_id',
  'order_id',
  'product_id',
  'order_item_quantity',
  'product_discount',
  'product_subdiscount',
  'product_price',
  'product_subprice']}

misal kita mau ambil isi dari dim_users

In [None]:
dimension_columns['dim_users']

['user_id',
 'user_first_name',
 'user_last_name',
 'user_gender',
 'user_address',
 'user_birthdate',
 'user_join']

In [None]:
fact_orders_columns = dimension_columns.get('fact_orders')
fact_orders_columns

['order_id',
 'order_date',
 'user_id',
 'payment_id',
 'shipper_id',
 'order_price',
 'order_discount',
 'voucher_id',
 'order_total',
 'rating_id']

#### c.) fact_order_items

In [None]:
def transform_fact_order_items():
    """Transform data for the fact_orders table."""
    dataframes = {table: extract_data(table) for table in oltp_tables.keys()}

    df_order_items = dataframes['order_items']
    df_order_items = df_orders.merge(dataframes['product_category'], on='product_category_id')
    df_order_items = df_orders.merge(dataframes['products'], on='product_id')

    fact_order_items_columns = dimension_columns.get('fact_order_items') # karena tabel targetnya adalah afct order
    return df_order_items[fact_order_items_columns]


In [None]:
# ngecek tipe data
dimension_columns

{'dim_users': ['user_id',
  'user_first_name',
  'user_last_name',
  'user_gender',
  'user_address',
  'user_birthdate',
  'user_join'],
 'dim_payments': ['payment_id', 'payment_name', 'payment_status'],
 'dim_shippers': ['shipper_id', 'shipper_name'],
 'dim_ratings': ['rating_id', 'rating_level', 'rating_status'],
 'dim_voucher': ['voucher_id',
  'voucher_name',
  'voucher_price',
  'voucher_created',
  'user_id'],
 'fact_orders': ['order_id',
  'order_date',
  'user_id',
  'payment_id',
  'shipper_id',
  'order_price',
  'order_discount',
  'voucher_id',
  'order_total',
  'rating_id'],
 'dim_product_category': ['product_category_id', 'product_category_name'],
 'dim_products': ['product_id',
  'product_category_id',
  'product_name',
  'product_created',
  'product_price',
  'product_discount'],
 'fact_order_items': ['order_item_id',
  'order_id',
  'product_id',
  'order_item_quantity',
  'product_discount',
  'product_subdiscount',
  'product_price',
  'product_subprice']}

In [None]:
dimension_columns['dim_product_category']

['product_category_id', 'product_category_name']

In [None]:
fact_order_items_columns = dimension_columns.get('fact_order_items')
fact_order_items_columns

['order_item_id',
 'order_id',
 'product_id',
 'order_item_quantity',
 'product_discount',
 'product_subdiscount',
 'product_price',
 'product_subprice']

### Load Data

Load Fact_order

In [None]:
def load_data(df, table_name):
    """Load the transformed data into the target table in the data warehouse."""
    # Menjelaskan bahwa fungsi ini bertujuan untuk memuat data yang telah ditransformasi ke dalam tabel target di dalam data warehouse.

    engine = sa.create_engine(warehouse_conn_string)
    # sa.create_engine(warehouse_conn_string): Menggunakan SQLAlchemy (sa kemungkinan adalah alias dari sqlalchemy) untuk membuat engine yang akan digunakan untuk koneksi ke database data warehouse.
    # warehouse_conn_string adalah string koneksi yang berisi informasi tentang bagaimana menghubungkan ke database data warehouse.


    with engine.connect() as conn:
        # Cek kunci unique
        unique_key = get_unique_key(table_name)  # Misalnya user_id untuk tabel dim_user
        existing_data = pd.read_sql(f"SELECT {unique_key} FROM {table_name}", conn)
        # Mengecek Kunci Unik: Menggunakan fungsi get_unique_key untuk mendapatkan kunci unik dari tabel target yang dijadikan acuan. Ini bisa misalnya user_id untuk tabel dimensi dim_user.
        # Membaca Data yang Sudah Ada: Membaca data yang sudah ada di dalam tabel target dengan memilih hanya kolom kunci unik. Data ini akan digunakan untuk deduplikasi data yang baru dimuat.

        # Deduplikasi data
        df = deduplicate_data(df, existing_data, unique_key)
        # Deduplikasi Data: Memanggil fungsi deduplicate_data untuk melakukan deduplikasi data baru (df) dengan data yang sudah ada di dalam tabel target.
        # Ini bertujuan untuk memastikan bahwa tidak ada data yang sama di dalam tabel target.

        # Masukkan data baru
        df.to_sql(table_name, conn, index=False, if_exists='append', method='multi')
        # Memasukkan Data Baru: Memasukkan data baru yang telah ditransformasi (df) ke dalam tabel target di dalam database data warehouse. Pengaturan yang digunakan adalah:
        # a. table_name: Nama tabel target di dalam data warehouse.
        # b. conn: Koneksi ke database.
        # c. index=False: Tidak menyertakan indeks DataFrame dalam tabel yang dimuat.
        # d. if_exists='append': Data baru akan ditambahkan ke dalam tabel yang sudah ada.
        # e. method='multi': Metode yang digunakan untuk memasukkan data, dalam hal ini menggunakan metode multi-baris untuk meningkatkan efisiensi memuat data.


        print(f'Load Data {table_name} Success')

Fungsi load_data bertanggung jawab untuk memuat data yang telah ditransformasi ke dalam tabel target di dalam data warehouse. Langkah-langkah yang dilakukan meliputi memeriksa kunci unik, deduplikasi data, memasukkan data baru ke dalam tabel target, dan mencetak pesan sukses.

Jadi, fungsi ini merupakan bagian penting dari proses ETL (Extract, Transform, Load) yang bertugas untuk memastikan data yang telah diolah dimuat dengan benar ke dalam data warehouse.

Function untuk cheklist duplikat

Sudah melengkapi bagian dim_product_category, dim_products, fact_order_items!

In [None]:
def deduplicate_data(new_data, existing_data, unique_key):
    """Remove duplicates from new data based on existing data."""
    # Docstring: Menjelaskan bahwa fungsi ini bertujuan untuk menghapus duplikasi data dari data baru berdasarkan data yang sudah ada.

    existing_keys = existing_data[unique_key].tolist()
    unique_rows = new_data[~new_data[unique_key].isin(existing_keys)]
    return unique_rows
    # Menghapus Duplikasi: Fungsi ini bekerja dengan mengambil kolom kunci unik dari data yang sudah ada dan mengonversinya menjadi daftar.
    # Selanjutnya, ia membandingkan kolom kunci unik dari data baru dengan daftar kunci yang ada, dan mengembalikan baris-baris yang tidak memiliki kunci yang sama dengan data yang sudah ada.
    # Dengan cara ini, duplikasi dihilangkan dan hanya baris unik yang dipertahankan.



def get_unique_key(table_name):
    """Retrieve the unique key of the table."""
    # Docstring: Menjelaskan bahwa fungsi ini bertujuan untuk mengambil kunci unik dari tabel yang ditentukan.

    if table_name == 'dim_user':
        return 'user_id'
    elif table_name == 'dim_payment':
        return 'payment_id'
    elif table_name == 'dim_shipper':
        return 'shipper_id'
    elif table_name == 'dim_rating':
        return 'rating_id'
    elif table_name == 'dim_voucher':
        return 'voucher_id'
    elif table_name == 'fact_orders':
        return 'order_id'
    elif table_name == 'dim_product_category':
        return 'product_category_id'
    elif table_name == 'dim_products':
        return 'product_id'
    elif table_name == 'fact_order_items':
        return 'order_item_id'
        # Tambahkan kondisi lain jika ada tabel lain
    else:
        raise ValueError("Table name not recognized.")

    # Kunci Unik Tabel: Fungsi ini mengembalikan kunci unik yang sesuai untuk tabel yang diberikan.
    # Ini dilakukan dengan memeriksa nama tabel yang diberikan dan mengembalikan kunci unik yang sesuai berdasarkan tabel tersebut. Jika nama tabel tidak dikenali, fungsi akan memunculkan ValueError.



Potongan kode yang diberikan mengandung dua fungsi yang saling terkait untuk menangani penghapusan duplikasi data dan untuk mendapatkan kunci unik dari tabel.

Kedua fungsi ini bekerja bersama-sama dalam proses ETL (Extract, Transform, Load) untuk menghapus duplikasi data dan menentukan kunci unik dari tabel yang berbeda. Fungsi deduplicate_data bertanggung jawab untuk menghapus duplikasi data berdasarkan kunci unik yang ada, sementara fungsi get_unique_key bertanggung jawab untuk memberikan kunci unik yang sesuai dengan tabel yang diberikan.

### FULL FUNCTION

LENGKAPIN YANG get_unique_key(table_name) !

In [None]:
# Buat tabel DWH fact dan dim nya
#def create_tables():
#    """Create tables in the data warehouse if they do not exist."""
#    engine = sa.create_engine(warehouse_conn_string) # create table baru
#    with engine.connect() as conn: # tes koneksi
#        for ddl in ddl_statements.values():
#            conn.execute(ddl) # di running tablenya
#
#def extract_data(table_name):
#    """Extract data from a table in the OLTP database."""
#    engine = sa.create_engine(oltp_conn_string)
#    query = f"SELECT * FROM {oltp_tables[table_name]}"
#    df = pd.read_sql(query, engine)
#    print(f'Extract Data {oltp_tables[table_name]} Success')
#    return df

#def transform_data(df, target_table):
#    """Transform the extracted data to match the schema of the target dimension table."""
#    columns = dimension_columns.get(target_table)
#    if columns:
#        df = df[columns]
#    print(f'Transform Data {target_table} Success')
#    return df

#def transform_fact_orders():
#    """Transform data for the fact_orders table."""
#    dataframes = {table: extract_data(table) for table in oltp_tables.keys()}

#    df_orders = dataframes['orders']
#    df_orders = df_orders.merge(dataframes['users'], on='user_id')
#    df_orders = df_orders.merge(dataframes['payments'], on='payment_id')
#    df_orders = df_orders.merge(dataframes['shippers'], on='shipper_id')
#    df_orders = df_orders.merge(dataframes['ratings'], on='rating_id')
#    df_orders = df_orders.merge(dataframes['vouchers'], how='left', on='voucher_id')
#    df_orders.rename(columns={'user_id_x': 'user_id'}, inplace=True)

#    fact_orders_columns = dimension_columns.get('fact_orders')
#    return df_orders[fact_orders_columns]


#def load_data(df, table_name):
#    """Load the transformed data into the target table in the data warehouse."""
#    engine = sa.create_engine(warehouse_conn_string)
#    with engine.connect() as conn:
#        # Cek kunci unique
#        unique_key = get_unique_key(table_name)  # Misalnya user_id untuk tabel dim_user
#        existing_data = pd.read_sql(f"SELECT {unique_key} FROM {table_name}", conn)

        # Deduplikasi data
#        df = deduplicate_data(df, existing_data, unique_key)

        # Masukkan data baru
#        df.to_sql(table_name, conn, index=False, if_exists='append', method='multi')
#        print(f'Load Data {table_name} Success')

#def deduplicate_data(new_data, existing_data, unique_key):
#    """Remove duplicates from new data based on existing data."""
#    existing_keys = existing_data[unique_key].tolist()
#    unique_rows = new_data[~new_data[unique_key].isin(existing_keys)]
#    return unique_rows

#def get_unique_key(table_name):
#    """Retrieve the unique key of the table."""
#    if table_name == 'dim_user':
#        return 'user_id'
#    elif table_name == 'dim_payment':
#        return 'payment_id'
#    elif table_name == 'dim_shipper':
#        return 'shipper_id'
#    elif table_name == 'dim_rating':
#        return 'rating_id'
#    elif table_name == 'dim_voucher':
#        return 'voucher_id'
#    elif table_name == 'fact_orders':
#        return 'order_id'
    # Tambahkan kondisi lain jika ada tabel lain
#    else:
#        raise ValueError("Table name not recognized.")

### **Function Data Mart**

Potongan kode ini adalah fungsi Python yang bertanggung jawab untuk membuat tabel dm_sales di dalam data warehouse dan memasukkan data ke dalamnya

Fungsi create_and_insert_dm_sales bertanggung jawab untuk membuat tabel dm_sales di dalam data warehouse dan memasukkan data ke dalamnya. Langkah-langkah yang dilakukan meliputi membuat koneksi ke data warehouse, membuat tabel dm_sales, memasukkan data ke dalamnya, dan mencetak pesan sukses setelah proses selesai. Jadi, fungsi ini merupakan bagian penting dari proses ETL (Extract, Transform, Load) yang bertugas untuk mempersiapkan data dalam bentuk yang sesuai untuk analisis di dalam data warehouse.

#### 1.) dm_sales

In [None]:
def create_and_insert_dm_sales():
  # Docstring: Menjelaskan bahwa fungsi ini bertujuan untuk membuat tabel dm_sales dan memasukkan data ke dalamnya di dalam data warehouse.

    """Create dm_sales table and insert data into it."""
    engine = sa.create_engine(warehouse_conn_string)
    # Membuat Engine: Menggunakan SQLAlchemy (sa kemungkinan adalah alias dari sqlalchemy) untuk membuat engine yang akan digunakan untuk koneksi ke database data warehouse.
    # warehouse_conn_string adalah string koneksi yang berisi informasi tentang bagaimana menghubungkan ke database data warehouse.


    with engine.connect() as conn:
        # Create dm_sales table
        conn.execute(ddl_marts_1['dim_sales'])
        # Membuat Tabel: Menggunakan koneksi (conn) untuk mengeksekusi pernyataan DDL (Data Definition Language) yang disimpan dalam variabel ddl_marts['dim_sales'].
        # Ini bertujuan untuk membuat tabel dm_sales di dalam database data warehouse.

        # Insert data into dm_sales table
        conn.execute(ddl_marts_1['insert_dm_sales'])
        # Memasukkan Data: Setelah tabel dm_sales dibuat, data dimasukkan ke dalamnya dengan mengeksekusi pernyataan DML (Data Manipulation Language) yang disimpan dalam variabel ddl_marts['insert_dm_sales'].


    print(f'Data Mart Has Create Success')

#### 2.) dm_product_sales_2

In [None]:
def create_and_insert_dm_product_sales_2():
  # Docstring: Menjelaskan bahwa fungsi ini bertujuan untuk membuat tabel dm_sales dan memasukkan data ke dalamnya di dalam data warehouse.

    """Create dm_sales table and insert data into it."""
    engine = sa.create_engine(warehouse_conn_string)
    # Membuat Engine: Menggunakan SQLAlchemy (sa kemungkinan adalah alias dari sqlalchemy) untuk membuat engine yang akan digunakan untuk koneksi ke database data warehouse.
    # warehouse_conn_string adalah string koneksi yang berisi informasi tentang bagaimana menghubungkan ke database data warehouse.


    with engine.connect() as conn:
        # Create dm_sales table
        conn.execute(ddl_marts_2['dm_product_sales_2'])
        # Membuat Tabel: Menggunakan koneksi (conn) untuk mengeksekusi pernyataan DDL (Data Definition Language) yang disimpan dalam variabel ddl_marts['dim_sales'].
        # Ini bertujuan untuk membuat tabel dm_sales di dalam database data warehouse.

        # Insert data into dm_sales table
        conn.execute(ddl_marts_2['insert_dm_product_sales_2'])
        # Memasukkan Data: Setelah tabel dm_sales dibuat, data dimasukkan ke dalamnya dengan mengeksekusi pernyataan DML (Data Manipulation Language) yang disimpan dalam variabel ddl_marts['insert_dm_sales'].


    print(f'Data Mart Has Create Success')

#### 3.) dm_payment_sales_3

In [None]:
def create_and_insert_dm_payment_sales_3():
  # Docstring: Menjelaskan bahwa fungsi ini bertujuan untuk membuat tabel dm_sales dan memasukkan data ke dalamnya di dalam data warehouse.

    """Create dm_sales table and insert data into it."""
    engine = sa.create_engine(warehouse_conn_string)
    # Membuat Engine: Menggunakan SQLAlchemy (sa kemungkinan adalah alias dari sqlalchemy) untuk membuat engine yang akan digunakan untuk koneksi ke database data warehouse.
    # warehouse_conn_string adalah string koneksi yang berisi informasi tentang bagaimana menghubungkan ke database data warehouse.


    with engine.connect() as conn:
        # Create dm_sales table
        conn.execute(ddl_marts_3['dm_payment_sales_3'])
        # Membuat Tabel: Menggunakan koneksi (conn) untuk mengeksekusi pernyataan DDL (Data Definition Language) yang disimpan dalam variabel ddl_marts['dim_sales'].
        # Ini bertujuan untuk membuat tabel dm_sales di dalam database data warehouse.

        # Insert data into dm_sales table
        conn.execute(ddl_marts_3['insert_dm_payment_sales_3'])
        # Memasukkan Data: Setelah tabel dm_sales dibuat, data dimasukkan ke dalamnya dengan mengeksekusi pernyataan DML (Data Manipulation Language) yang disimpan dalam variabel ddl_marts['insert_dm_sales'].


    print(f'Data Mart Has Create Success')

#### 4.) dm_shipper_sales_4

In [None]:
def create_and_insert_dm_shipper_sales_4 ():
  # Docstring: Menjelaskan bahwa fungsi ini bertujuan untuk membuat tabel dm_sales dan memasukkan data ke dalamnya di dalam data warehouse.

    """Create dm_sales table and insert data into it."""
    engine = sa.create_engine(warehouse_conn_string)
    # Membuat Engine: Menggunakan SQLAlchemy (sa kemungkinan adalah alias dari sqlalchemy) untuk membuat engine yang akan digunakan untuk koneksi ke database data warehouse.
    # warehouse_conn_string adalah string koneksi yang berisi informasi tentang bagaimana menghubungkan ke database data warehouse.


    with engine.connect() as conn:
        # Create dm_sales table
        conn.execute(ddl_marts_4['dm_shipper_sales_4'])
        # Membuat Tabel: Menggunakan koneksi (conn) untuk mengeksekusi pernyataan DDL (Data Definition Language) yang disimpan dalam variabel ddl_marts['dim_sales'].
        # Ini bertujuan untuk membuat tabel dm_sales di dalam database data warehouse.

        # Insert data into dm_sales table
        conn.execute(ddl_marts_4['insert_dm_shipper_sales_4'])
        # Memasukkan Data: Setelah tabel dm_sales dibuat, data dimasukkan ke dalamnya dengan mengeksekusi pernyataan DML (Data Manipulation Language) yang disimpan dalam variabel ddl_marts['insert_dm_sales'].


    print(f'Data Mart Has Create Success')

#### 5.) dm_user_sales_5

In [None]:
def create_and_insert_dm_user_sales_5():
  # Docstring: Menjelaskan bahwa fungsi ini bertujuan untuk membuat tabel dm_sales dan memasukkan data ke dalamnya di dalam data warehouse.

    """Create dm_sales table and insert data into it."""
    engine = sa.create_engine(warehouse_conn_string)
    # Membuat Engine: Menggunakan SQLAlchemy (sa kemungkinan adalah alias dari sqlalchemy) untuk membuat engine yang akan digunakan untuk koneksi ke database data warehouse.
    # warehouse_conn_string adalah string koneksi yang berisi informasi tentang bagaimana menghubungkan ke database data warehouse.


    with engine.connect() as conn:
        # Create dm_sales table
        conn.execute(ddl_marts_5['dm_user_sales_5'])
        # Membuat Tabel: Menggunakan koneksi (conn) untuk mengeksekusi pernyataan DDL (Data Definition Language) yang disimpan dalam variabel ddl_marts['dim_sales'].
        # Ini bertujuan untuk membuat tabel dm_sales di dalam database data warehouse.

        # Insert data into dm_sales table
        conn.execute(ddl_marts_5['insert_dm_user_sales_5'])
        # Memasukkan Data: Setelah tabel dm_sales dibuat, data dimasukkan ke dalamnya dengan mengeksekusi pernyataan DML (Data Manipulation Language) yang disimpan dalam variabel ddl_marts['insert_dm_sales'].


    print(f'Data Mart Has Create Success')

#### 6.) dm_discount_voucher_trend_6

In [None]:
def create_and_insert_dm_discount_voucher_trend_6():
  # Docstring: Menjelaskan bahwa fungsi ini bertujuan untuk membuat tabel dm_sales dan memasukkan data ke dalamnya di dalam data warehouse.

    """Create dm_sales table and insert data into it."""
    engine = sa.create_engine(warehouse_conn_string)
    # Membuat Engine: Menggunakan SQLAlchemy (sa kemungkinan adalah alias dari sqlalchemy) untuk membuat engine yang akan digunakan untuk koneksi ke database data warehouse.
    # warehouse_conn_string adalah string koneksi yang berisi informasi tentang bagaimana menghubungkan ke database data warehouse.


    with engine.connect() as conn:
        # Create dm_sales table
        conn.execute(ddl_marts_6['dm_discount_voucher_trend_6'])
        # Membuat Tabel: Menggunakan koneksi (conn) untuk mengeksekusi pernyataan DDL (Data Definition Language) yang disimpan dalam variabel ddl_marts['dim_sales'].
        # Ini bertujuan untuk membuat tabel dm_sales di dalam database data warehouse.

        # Insert data into dm_sales table
        conn.execute(ddl_marts_6['insert_dm_discount_voucher_trend_6'])
        # Memasukkan Data: Setelah tabel dm_sales dibuat, data dimasukkan ke dalamnya dengan mengeksekusi pernyataan DML (Data Manipulation Language) yang disimpan dalam variabel ddl_marts['insert_dm_sales'].


    print(f'Data Mart Has Create Success')

#### 7.) dm_sales_performance_by_region_7

In [None]:
def create_and_insert_dm_sales_performance_by_region_7():
  # Docstring: Menjelaskan bahwa fungsi ini bertujuan untuk membuat tabel dm_sales dan memasukkan data ke dalamnya di dalam data warehouse.

    """Create dm_sales table and insert data into it."""
    engine = sa.create_engine(warehouse_conn_string)
    # Membuat Engine: Menggunakan SQLAlchemy (sa kemungkinan adalah alias dari sqlalchemy) untuk membuat engine yang akan digunakan untuk koneksi ke database data warehouse.
    # warehouse_conn_string adalah string koneksi yang berisi informasi tentang bagaimana menghubungkan ke database data warehouse.


    with engine.connect() as conn:
        # Create dm_sales table
        conn.execute(ddl_marts_7['dm_sales_performance_by_region_7'])
        # Membuat Tabel: Menggunakan koneksi (conn) untuk mengeksekusi pernyataan DDL (Data Definition Language) yang disimpan dalam variabel ddl_marts['dim_sales'].
        # Ini bertujuan untuk membuat tabel dm_sales di dalam database data warehouse.

        # Insert data into dm_sales table
        conn.execute(ddl_marts_7['insert_dm_sales_performance_by_region_7'])
        # Memasukkan Data: Setelah tabel dm_sales dibuat, data dimasukkan ke dalamnya dengan mengeksekusi pernyataan DML (Data Manipulation Language) yang disimpan dalam variabel ddl_marts['insert_dm_sales'].


    print(f'Data Mart Has Create Success')

#### 8.) dm_profit_margin_per_category_8

In [None]:
def create_and_insert_dm_profit_margin_per_category_8():
  # Docstring: Menjelaskan bahwa fungsi ini bertujuan untuk membuat tabel dm_sales dan memasukkan data ke dalamnya di dalam data warehouse.

    """Create dm_sales table and insert data into it."""
    engine = sa.create_engine(warehouse_conn_string)
    # Membuat Engine: Menggunakan SQLAlchemy (sa kemungkinan adalah alias dari sqlalchemy) untuk membuat engine yang akan digunakan untuk koneksi ke database data warehouse.
    # warehouse_conn_string adalah string koneksi yang berisi informasi tentang bagaimana menghubungkan ke database data warehouse.


    with engine.connect() as conn:
        # Create dm_sales table
        conn.execute(ddl_marts_8['dm_profit_margin_per_category_8'])
        # Membuat Tabel: Menggunakan koneksi (conn) untuk mengeksekusi pernyataan DDL (Data Definition Language) yang disimpan dalam variabel ddl_marts['dim_sales'].
        # Ini bertujuan untuk membuat tabel dm_sales di dalam database data warehouse.

        # Insert data into dm_sales table
        conn.execute(ddl_marts_8['insert_dm_profit_margin_per_category_8'])
        # Memasukkan Data: Setelah tabel dm_sales dibuat, data dimasukkan ke dalamnya dengan mengeksekusi pernyataan DML (Data Manipulation Language) yang disimpan dalam variabel ddl_marts['insert_dm_sales'].


    print(f'Data Mart Has Create Success')

#### 9.) dm_average_order_value_per_user_9

In [None]:
def create_and_insert_dm_average_order_value_per_user_9():
  # Docstring: Menjelaskan bahwa fungsi ini bertujuan untuk membuat tabel dm_sales dan memasukkan data ke dalamnya di dalam data warehouse.

    """Create dm_sales table and insert data into it."""
    engine = sa.create_engine(warehouse_conn_string)
    # Membuat Engine: Menggunakan SQLAlchemy (sa kemungkinan adalah alias dari sqlalchemy) untuk membuat engine yang akan digunakan untuk koneksi ke database data warehouse.
    # warehouse_conn_string adalah string koneksi yang berisi informasi tentang bagaimana menghubungkan ke database data warehouse.


    with engine.connect() as conn:
        # Create dm_sales table
        conn.execute(ddl_marts_9['dm_average_order_value_per_user_9'])
        # Membuat Tabel: Menggunakan koneksi (conn) untuk mengeksekusi pernyataan DDL (Data Definition Language) yang disimpan dalam variabel ddl_marts['dim_sales'].
        # Ini bertujuan untuk membuat tabel dm_sales di dalam database data warehouse.

        # Insert data into dm_sales table
        conn.execute(ddl_marts_9['insert_dm_average_order_value_per_user_9'])
        # Memasukkan Data: Setelah tabel dm_sales dibuat, data dimasukkan ke dalamnya dengan mengeksekusi pernyataan DML (Data Manipulation Language) yang disimpan dalam variabel ddl_marts['insert_dm_sales'].


    print(f'Data Mart Has Create Success')

#### 10.) dm_voucher_conversion_rate_10

In [None]:
def create_and_insert_dm_voucher_conversion_rate_10():
  # Docstring: Menjelaskan bahwa fungsi ini bertujuan untuk membuat tabel dm_sales dan memasukkan data ke dalamnya di dalam data warehouse.

    """Create dm_sales table and insert data into it."""
    engine = sa.create_engine(warehouse_conn_string)
    # Membuat Engine: Menggunakan SQLAlchemy (sa kemungkinan adalah alias dari sqlalchemy) untuk membuat engine yang akan digunakan untuk koneksi ke database data warehouse.
    # warehouse_conn_string adalah string koneksi yang berisi informasi tentang bagaimana menghubungkan ke database data warehouse.


    with engine.connect() as conn:
        # Create dm_sales table
        conn.execute(ddl_marts_10['dm_voucher_conversion_rate_10'])
        # Membuat Tabel: Menggunakan koneksi (conn) untuk mengeksekusi pernyataan DDL (Data Definition Language) yang disimpan dalam variabel ddl_marts['dim_sales'].
        # Ini bertujuan untuk membuat tabel dm_sales di dalam database data warehouse.

        # Insert data into dm_sales table
        conn.execute(ddl_marts_10['insert_dm_voucher_conversion_rate_10'])
        # Memasukkan Data: Setelah tabel dm_sales dibuat, data dimasukkan ke dalamnya dengan mengeksekusi pernyataan DML (Data Manipulation Language) yang disimpan dalam variabel ddl_marts['insert_dm_sales'].


    print(f'Data Mart Has Create Success')

### **Function Run ETL**

In [None]:
def etl_process():
    """Run the entire ETL process."""
    # Menjelaskan bahwa fungsi ini bertujuan untuk menjalankan keseluruhan proses ETL.

    # Create tables
    create_tables()
    # Membuat Tabel: Memanggil fungsi create_tables yang bertugas untuk membuat tabel-tabel yang diperlukan di dalam data warehouse.

    # Process dimension tables
    for dim_table, target_table in warehouse_tables.items():
        if dim_table != 'fact_orders':
            source_table = dim_table
            df = extract_data(source_table)
            transformed_df = transform_data(df, dim_table)
            load_data(transformed_df, target_table)
        else:
            # Process fact table
            df_fact_orders = transform_fact_orders()
            load_data(df_fact_orders, target_table)

    # Memproses Tabel Dimensi: Melalui loop, fungsi ini melakukan iterasi melalui setiap tabel dimensi yang didefinisikan dalam warehouse_tables.
    # Jika tabel yang sedang diproses bukanlah tabel fakta (fact_orders), maka data diekstrak, ditransformasi, dan dimuat ke dalam data warehouse.
    # Jika tabel yang sedang diproses adalah tabel fakta, maka fungsi transform_fact_orders dipanggil untuk mentransformasi data faktual, dan hasilnya dimuat ke dalam data warehouse.

Potongan kode tersebut adalah sebuah fungsi Python yang bertanggung jawab untuk menjalankan keseluruhan proses ETL (Extract, Transform, Load). Fungsi ini berisi serangkaian langkah-langkah yang diperlukan untuk mempersiapkan dan memuat data dari sumbernya ke dalam data warehouse

Fungsi etl_process adalah inti dari proses ETL yang bertanggung jawab untuk menjalankan semua langkah yang diperlukan dalam mempersiapkan dan memuat data ke dalam data warehouse. Ini mencakup pembuatan tabel, pemrosesan tabel dimensi dan fakta, serta pembuatan dan pengisian data ke dalam tabel Mart. Dengan menjalankan fungsi ini, keseluruhan proses ETL dapat dieksekusi dengan efisien dan konsisten.

#### 1.) Run dm_sales

In [None]:
# proses mart table
create_and_insert_dm_sales()
# Memproses Tabel Mart: Setelah selesai memproses semua tabel dimensi dan fakta,
# fungsi create_and_insert_dm_sales dipanggil untuk membuat dan memasukkan data ke dalam tabel Mart (dm_sales), yang kemungkinan adalah agregasi atau transformasi data untuk tujuan analisis.

#### 2.) Run dm_product_sales_2

In [None]:
# proses mart table
create_and_insert_dm_product_sales_2()
# Memproses Tabel Mart: Setelah selesai memproses semua tabel dimensi dan fakta,
# fungsi create_and_insert_dm_sales dipanggil untuk membuat dan memasukkan data ke dalam tabel Mart (dm_sales), yang kemungkinan adalah agregasi atau transformasi data untuk tujuan analisis.

#### 3.) Run dm_payment_sales_3

In [None]:
# proses mart table
create_and_insert_dm_payment_sales_3()
# Memproses Tabel Mart: Setelah selesai memproses semua tabel dimensi dan fakta,
# fungsi create_and_insert_dm_sales dipanggil untuk membuat dan memasukkan data ke dalam tabel Mart (dm_sales), yang kemungkinan adalah agregasi atau transformasi data untuk tujuan analisis.

#### 4.) Run dm_shipper_sales_4

In [None]:
# proses mart table
create_and_insert_dm_shipper_sales_4()
# Memproses Tabel Mart: Setelah selesai memproses semua tabel dimensi dan fakta,
# fungsi create_and_insert_dm_sales dipanggil untuk membuat dan memasukkan data ke dalam tabel Mart (dm_sales), yang kemungkinan adalah agregasi atau transformasi data untuk tujuan analisis.

#### 5.) Run dm_user_sales_5

In [None]:
# proses mart table
create_and_insert_dm_user_sales_5()
# Memproses Tabel Mart: Setelah selesai memproses semua tabel dimensi dan fakta,
# fungsi create_and_insert_dm_sales dipanggil untuk membuat dan memasukkan data ke dalam tabel Mart (dm_sales), yang kemungkinan adalah agregasi atau transformasi data untuk tujuan analisis.

#### 6.) Run dm_discount_voucher_trend_6

In [None]:
# proses mart table
create_and_insert_dm_discount_voucher_trend_6()
# Memproses Tabel Mart: Setelah selesai memproses semua tabel dimensi dan fakta,
# fungsi create_and_insert_dm_sales dipanggil untuk membuat dan memasukkan data ke dalam tabel Mart (dm_sales), yang kemungkinan adalah agregasi atau transformasi data untuk tujuan analisis.

#### 7.) Run dm_sales_performance_by_region_7

In [None]:
# proses mart table
create_and_insert_dm_sales_performance_by_region_7()
# Memproses Tabel Mart: Setelah selesai memproses semua tabel dimensi dan fakta,
# fungsi create_and_insert_dm_sales dipanggil untuk membuat dan memasukkan data ke dalam tabel Mart (dm_sales), yang kemungkinan adalah agregasi atau transformasi data untuk tujuan analisis.

#### 8.) Run dm_profit_margin_per_category_8

In [None]:
# proses mart table
create_and_insert_dm_profit_margin_per_category_8()
# Memproses Tabel Mart: Setelah selesai memproses semua tabel dimensi dan fakta,
# fungsi create_and_insert_dm_sales dipanggil untuk membuat dan memasukkan data ke dalam tabel Mart (dm_sales), yang kemungkinan adalah agregasi atau transformasi data untuk tujuan analisis.

#### 9.) Run dm_average_order_value_per_user_9

In [None]:
# proses mart table
create_and_insert_dm_average_order_value_per_user_9()
# Memproses Tabel Mart: Setelah selesai memproses semua tabel dimensi dan fakta,
# fungsi create_and_insert_dm_sales dipanggil untuk membuat dan memasukkan data ke dalam tabel Mart (dm_sales), yang kemungkinan adalah agregasi atau transformasi data untuk tujuan analisis.

#### 10.) Run dm_voucher_conversion_rate_10

In [None]:
# proses mart table
create_and_insert_dm_voucher_conversion_rate_10()
# Memproses Tabel Mart: Setelah selesai memproses semua tabel dimensi dan fakta,
# fungsi create_and_insert_dm_sales dipanggil untuk membuat dan memasukkan data ke dalam tabel Mart (dm_sales), yang kemungkinan adalah agregasi atau transformasi data untuk tujuan analisis.

# **Run ETL**

In [None]:
# Buat tabel DWH fact dan dim nya
def create_tables():
    """Create tables in the data warehouse if they do not exist."""
    engine = sa.create_engine(warehouse_conn_string) # create table baru
    with engine.connect() as conn: # tes koneksi
        for ddl in ddl_statements.values():
            conn.execute(ddl) # di running tablenya

def extract_data(table_name):
    """Extract data from a table in the OLTP database."""
    engine = sa.create_engine(oltp_conn_string)
    query = f"SELECT * FROM {oltp_tables[table_name]}"
    df = pd.read_sql(query, engine)
    print(f'Extract Data {oltp_tables[table_name]} Success')
    return df

def transform_data(df, target_table):
    """Transform the extracted data to match the schema of the target dimension table."""
    columns = dimension_columns.get(target_table)
    if columns:
        df = df[columns]
    print(f'Transform Data {target_table} Success')
    return df

def transform_fact_orders():
    """Transform data for the fact_orders table."""
    dataframes = {table: extract_data(table) for table in oltp_tables.keys()}

    df_orders = dataframes['orders']
    df_orders = df_orders.merge(dataframes['users'], on='user_id')
    df_orders = df_orders.merge(dataframes['payments'], on='payment_id')
    df_orders = df_orders.merge(dataframes['shippers'], on='shipper_id')
    df_orders = df_orders.merge(dataframes['ratings'], on='rating_id')
    df_orders = df_orders.merge(dataframes['vouchers'], how='left', on='voucher_id')
    df_orders.rename(columns={'user_id_x': 'user_id'}, inplace=True)

    fact_orders_columns = dimension_columns.get('fact_orders')
    return df_orders[fact_orders_columns]


def load_data(df, table_name):
    """Load the transformed data into the target table in the data warehouse."""
    engine = sa.create_engine(warehouse_conn_string)
    with engine.connect() as conn:
        # Cek kunci unique
        unique_key = get_unique_key(table_name)  # Misalnya user_id untuk tabel dim_user
        existing_data = pd.read_sql(f"SELECT {unique_key} FROM {table_name}", conn)

        # Deduplikasi data
        df = deduplicate_data(df, existing_data, unique_key)

        # Masukkan data baru
        df.to_sql(table_name, conn, index=False, if_exists='append', method='multi')
        print(f'Load Data {table_name} Success')

def deduplicate_data(new_data, existing_data, unique_key):
    """Remove duplicates from new data based on existing data."""
    existing_keys = existing_data[unique_key].tolist()
    unique_rows = new_data[~new_data[unique_key].isin(existing_keys)]
    return unique_rows

def get_unique_key(table_name):
    """Retrieve the unique key of the table."""
    if table_name == 'dim_user':
        return 'user_id'
    elif table_name == 'dim_payment':
        return 'payment_id'
    elif table_name == 'dim_shipper':
        return 'shipper_id'
    elif table_name == 'dim_rating':
        return 'rating_id'
    elif table_name == 'dim_voucher':
        return 'voucher_id'
    elif table_name == 'fact_orders':
        return 'order_id'
    # Tambahkan kondisi lain jika ada tabel lain
    else:
        raise ValueError("Table name not recognized.")

#script running all ETL
etl_process()

ObjectNotExecutableError: Not an executable object: '\n       CREATE TABLE IF NOT EXISTS dim_users (\n            user_id INT NOT NULL PRIMARY KEY,\n            user_first_name VARCHAR(255) NOT NULL,\n            user_last_name VARCHAR(255) NOT NULL,\n            user_gender VARCHAR(50) NOT NULL,\n            user_address VARCHAR(255),\n            user_birthdate DATE NOT NULL,\n            user_join DATE NOT NULL\n        );\n    '

### **Run Testing**

In [None]:
source_table

In [None]:
create_tables()

source_table = 'users'
df = extract_data(source_table)
df

Extract Data tb_users Success


Unnamed: 0,user_id,user_first_name,user_last_name,user_gender,user_address,user_birthday,user_join
0,100101,Budi,Gunawan,Male,"Jl. Pondok Indah No.1, Kecamatan Pondok Labu, ...",1998-09-12,2022-01-13
1,100102,Eva,Susanti,Female,"Jl. Timur Raya No. 13, Kramat Jaya, Jakarta Ti...",1997-02-16,2022-01-29
2,100103,Dana,Pradana,Male,"Jl. Pahlawan, Surabaya, Jawa Timur",1999-07-19,2022-02-11
3,100104,Rahmat,Hidayat,Male,"Jl. Amil Abas, Jakarta Timur, DKI Jakarta",2000-02-14,2022-03-22
4,100105,Dodo,Andriano,Male,"Jl. Pakuan Selatan No. 177, Magelang, Jawa Tengah",2000-09-06,2022-04-03
5,100106,Caca,Kumala,Female,"Jl. Bunga Raya, Kota Tanggerang, Banten",1998-11-05,2022-05-20
6,100107,Andi,Kurniawan,Male,"Jl. Mawar Indah No. 25, Jakarta Barat, DKI Jak...",2001-03-14,2022-05-24
7,100108,Fanny,Utami,Female,"Jl. Kilometer Panjang No. 210, Jakarta Utara, ...",2002-01-27,2022-06-02
8,100109,Gagah,Prakasa,Male,"Jl. Timur Asri No. 10, Denpasar, Bali",2001-08-05,2022-07-14
9,100110,Anita,Friska,Female,"Jl. Tembung Raya, Kota Medan Timur, Sumatera U...",2000-11-04,2022-07-21


In [None]:
# TESTING SETIAP TABEL

transformed_df = transform_data(df, 'dim_user')
transformed_df

Transform Data dim_user Success


Unnamed: 0,user_id,user_first_name,user_last_name,user_gender,user_address,user_birthday,user_join
0,100101,Budi,Gunawan,Male,"Jl. Pondok Indah No.1, Kecamatan Pondok Labu, ...",1998-09-12,2022-01-13
1,100102,Eva,Susanti,Female,"Jl. Timur Raya No. 13, Kramat Jaya, Jakarta Ti...",1997-02-16,2022-01-29
2,100103,Dana,Pradana,Male,"Jl. Pahlawan, Surabaya, Jawa Timur",1999-07-19,2022-02-11
3,100104,Rahmat,Hidayat,Male,"Jl. Amil Abas, Jakarta Timur, DKI Jakarta",2000-02-14,2022-03-22
4,100105,Dodo,Andriano,Male,"Jl. Pakuan Selatan No. 177, Magelang, Jawa Tengah",2000-09-06,2022-04-03
5,100106,Caca,Kumala,Female,"Jl. Bunga Raya, Kota Tanggerang, Banten",1998-11-05,2022-05-20
6,100107,Andi,Kurniawan,Male,"Jl. Mawar Indah No. 25, Jakarta Barat, DKI Jak...",2001-03-14,2022-05-24
7,100108,Fanny,Utami,Female,"Jl. Kilometer Panjang No. 210, Jakarta Utara, ...",2002-01-27,2022-06-02
8,100109,Gagah,Prakasa,Male,"Jl. Timur Asri No. 10, Denpasar, Bali",2001-08-05,2022-07-14
9,100110,Anita,Friska,Female,"Jl. Tembung Raya, Kota Medan Timur, Sumatera U...",2000-11-04,2022-07-21


In [None]:
load_data(transformed_df, 'dim_user')

Load Data dim_user Success


PROSES SUDAH SELESAI, BISA REFRESH DATANYA DI POSTGRE DWH

RUNNING ULANG SETIAP TABELNYA DAN DILENGKAPIN APA YANG BELUM LENGKAP

### **Script Upload Google Sheets**

In [None]:
import json
import gspread
from oauth2client.service_account import ServiceAccountCredentials

with open('digitalskola_key.json','rb') as file:
    key = json.load(file)

scope = ['https://www.googleapis.com/auth/drive','https://spreadsheets.google.com/feeds']
creds = ServiceAccountCredentials.from_json_keyfile_dict(key, scope)
client = gspread.authorize(creds)

###tambahkan email googledigitalskola@digitalskola-368401.iam.gserviceaccount.com ke dalam google sheet anda#

In [None]:
def fetch_data_from_dwh(query):
     # Membuat koneksi ke database
    engine = sa.create_engine(warehouse_conn_string)

    # Membuat hasil query menjadi Datafrmae
    df = pd.read_sql(query, engine)

    return df

df_mart = fetch_data_from_dwh("""SELECT * FROM dm_sales;""")
df_mart

Unnamed: 0,order_id,order_date,user_id,user_name,payment_type,shipper_name,order_price,order_discount,voucher_name,order_total
0,1110001,2022-01-20,100101,Budi Gunawan,Debit,JNE Express,250000,15000,New User,230000
1,1110002,2022-01-29,100102,Eva Susanti,Debit,JNE Express,620000,40000,New User,575000
2,1110003,2022-02-13,100103,Dana Pradana,Credit,JNE Express,6000000,1000000,New User,4995000
3,1110005,2022-04-28,100105,Dodo Andriano,Debit,Sicepat Express,4000000,1000000,New User,2995000
4,1110008,2022-06-02,100108,Fanny Utami,Credit,Sicepat Express,2000000,0,New User,1995000
5,1110012,2022-07-30,100110,Anita Friska,Debit,JNE Express,490000,35000,Body Soap Promo,445000


In [None]:
# ganti dengan nama google sheets anda
sheet = client.open('Contoh Source Data')

# ganti sesuai dengan nama sheet didalam google sheets anda
# siapkan nama kolom pada sheet di google sheet anda

export = sheet.worksheet('Sheet3')
export.update([df_mart.columns.values.tolist()] + df_mart.astype(str).values.tolist())

{'spreadsheetId': '163IyMV2W_SR_vYg9IOPYcmQwOtVxSfFQzhPPb9RjBA0',
 'updatedRange': 'Sheet3!A1:J7',
 'updatedRows': 7,
 'updatedColumns': 10,
 'updatedCells': 70}