In [1]:
import pandas as pd
from tabulate import tabulate
import warnings 
  
warnings.filterwarnings('ignore')

## 1. Data Source

### 1.1. Data Source I : API
- Data `city` : https://raw.githubusercontent.com/rahilpacmann/case-data-wrangling-api/main/city.csv

In [2]:
city_raw = "https://raw.githubusercontent.com/rahilpacmann/case-data-wrangling-api/main/city.csv"

### 1.2. Data Source II : API
- Data `country` : https://raw.githubusercontent.com/rahilpacmann/case-data-wrangling-api/main/country.csv

In [3]:
country_raw = "https://raw.githubusercontent.com/rahilpacmann/case-data-wrangling-api/main/country.csv"

### 1.3. Data Source III : Database
- Jika sudah mempersiapkan data source 3 buat koneksi ke database tersebut.
    - dbname = "dvdrental"
    - user = "postgres"
    - password = "qwerty123"
    - host = "localhost"
    - port = "5433"    

In [13]:
# Your code here
# Buat koneksi ke database
import psycopg2
from sqlalchemy import create_engine

dbname = "dvdrental"
user = "postgres"
password = "qwerty123"
host = "localhost"
port = "5433"

engine_str = f"postgresql://{user}:{password}@{host}:{port}/{dbname}"
engine = create_engine(engine_str)

### 1.4. Requirements (JSON)
- Requirements :
    - https://rahilpacmann.github.io/case-data-wrangling-api/requirements_table.json
- Requirements tersebut meliputi :
    - Nama tabel
    - Nama kolom pada setiap tabel
    - Tipe data pada setiap tabel

In [5]:
requirements_table_url = 'https://rahilpacmann.github.io/case-data-wrangling-api/requirements_table.json'

# Your code here
import urllib.request, json 

with urllib.request.urlopen(requirements_table_url) as url:
    requirements_table = json.load(url)
    
print(requirements_table)

{'actor': [{'column_name': 'actor_id', 'data_type': 'int64'}, {'column_name': 'last_update', 'data_type': 'datetime64[ns]'}, {'column_name': 'first_name', 'data_type': 'object'}, {'column_name': 'last_name', 'data_type': 'object'}], 'store': [{'column_name': 'store_id', 'data_type': 'int64'}, {'column_name': 'manager_staff_id', 'data_type': 'int64'}, {'column_name': 'address_id', 'data_type': 'int64'}, {'column_name': 'last_update', 'data_type': 'datetime64[ns]'}], 'address': [{'column_name': 'last_update', 'data_type': 'datetime64[ns]'}, {'column_name': 'city_id', 'data_type': 'int64'}, {'column_name': 'address_id', 'data_type': 'int64'}, {'column_name': 'district', 'data_type': 'object'}, {'column_name': 'phone', 'data_type': 'object'}, {'column_name': 'postal_code', 'data_type': 'object'}, {'column_name': 'address', 'data_type': 'object'}, {'column_name': 'address2', 'data_type': 'object'}], 'category': [{'column_name': 'category_id', 'data_type': 'int64'}, {'column_name': 'last_upd

## 2. Extract 

### 2.1. Extract From Data Source I
- Lakukan ekstrak data dari Data Source I
- Simpan dataframenya dengan nama `city_df`

In [6]:
# Your code here
# Read data
city_df = pd.read_csv(city_raw)
# Tampilkan 5 data teratas
city_df.head()

Unnamed: 0,city_id,city,country
0,1,A Corua (La Corua),Spain
1,2,Abha,Saudi Arabia
2,3,Abu Dhabi,United Arab Emirates
3,4,Acua,Mexico
4,5,Adana,Turkey


### 2.2. Extract From Data Source II
- Lakukan ekstrak data dari Data Source II
- Simpan dataframenya dengan nama `country_df`

In [7]:
# Your code here
# Read data
country_df = pd.read_csv(country_raw)

# Tampilkan 5 data teratas
country_df

Unnamed: 0,country,last_update
0,Afghanistan,2006-02-15 09:44:00
1,Algeria,2006-02-15 09:44:00
2,American Samoa,2006-02-15 09:44:00
3,Angola,2006-02-15 09:44:00
4,Anguilla,2006-02-15 09:44:00
...,...,...
104,Vietnam,2006-02-15 09:44:00
105,"Virgin Islands, U.S.",2006-02-15 09:44:00
106,Yemen,2006-02-15 09:44:00
107,Yugoslavia,2006-02-15 09:44:00


### 2.3. Extract From Data Source III
- Buatlah function untuk mendapatkan tabel pada database
- Parameter :
    - nama tabel
    - engine koneksi ke database
- Return : dataframe

In [8]:
# Your code here
# Buat fungsi untuk mendapatkan tabel pada database
def get_table_data_source_iii(table_name, engine):
    try:
        stmt = f"SELECT * FROM {table_name};"
        df = pd.read_sql(stmt, engine)
    except Exception as e:
        print(f"Error: {e}")
        df = pd.DataFrame()
    return df

- Buat masing-masing dataframe untuk setiap table tersebut

In [14]:
actor_df = get_table_data_source_iii('actor', engine)
store_df = get_table_data_source_iii('store', engine)
address_df = get_table_data_source_iii('address', engine)
category_df = get_table_data_source_iii('category', engine)
customer_df = get_table_data_source_iii('customer', engine)
film_actor_df = get_table_data_source_iii('film_actor', engine)
film_category_df = get_table_data_source_iii('film_category', engine)
inventory_df = get_table_data_source_iii('inventory',engine)
language_df = get_table_data_source_iii('language',engine)
rental_df = get_table_data_source_iii('rental',engine)
staff_df = get_table_data_source_iii('staff',engine)
payment_df = get_table_data_source_iii('payment',engine)
film_df = get_table_data_source_iii('film',engine)

- Tampung semua dataframe dari Data Source 1, 2, dan 3 ke dalam sebuah **Dictionary**.
- Keys:
    - nama tabel
- Value:
    - dataframe

In [16]:
# Your code here
table_dict = {
    'actor': actor_df, 
    'store' : store_df, 
    'address' : address_df, 
    'category' :category_df, 
    'customer' : customer_df, 
    'film_actor' : film_actor_df, 
    'film_category' : film_category_df, 
    'inventory' : inventory_df, 
    'language' : language_df, 
    'rental' : rental_df, 
    'staff' : staff_df, 
    'payment' : payment_df, 
    'film' : film_df, 
    'city' : city_df, 
    'country' : country_df
}

## 3. Data Validations
- Pada bagian ini akan dilakukan beberapa **pengecekan**, yaitu :
    - Check `Table Requirements`
    - Check Data `Shape`
    - Check `Columns`
    - Check Data `Types`
    - Check `Missing Values`
    - Check `Duplicates Data`

### 3.1. Check `Table Requirements`
- Pada bagian ini kita akan melakukan **pengecekan daftar tabel**.
- Pengecekan dilakukan pada :
    - Data Source I
    - Data Source II
    - Data Source III
- Pastikan tabel yang dimiliki dari data source sesuai dengan requirements table.

- Buat fungsi pengecekan Tabel : **check_table_requirements**
- Fungsi ini akan :
    - Memprint tabulate dengan header nama tabel dan hasil pengecekan tabel tersebut. ✓ jika tabelnya ada dan ✗ jika tabelnya tidak ada
- Parameter :
    - actual_table : Dictionary actual table (Data source 1, 2, 3)
    - requirements_table : Requirements JSON
- Return : None

In [18]:
# Your code here
def check_table_requirements(actual_table, requirements_table):
    
    actual_table_name = [table_name for table_name in actual_table]
    requirements_table_name = list(requirements_table.keys())
    table_checking = []

    for table_name in requirements_table_name:
        
        if table_name in actual_table_name:
            table_checking.append([table_name, "✓"])
            
        else:
            table_checking.append([table_name, "✗"])
            
    table_headers = ['Table_name', 'Is_Exist']
    table = tabulate(table_checking, headers = table_headers, tablefmt = 'grid')
    print("=> STEP 1: Check Table")
    print(table)

In [19]:
check_table_requirements(
    actual_table = table_dict, 
    requirements_table = requirements_table
)

=> STEP 1: Check Table
+---------------+------------+
| Table_name    | Is_Exist   |
| actor         | ✓          |
+---------------+------------+
| store         | ✓          |
+---------------+------------+
| address       | ✓          |
+---------------+------------+
| category      | ✓          |
+---------------+------------+
| city          | ✓          |
+---------------+------------+
| country       | ✓          |
+---------------+------------+
| customer      | ✓          |
+---------------+------------+
| film_actor    | ✓          |
+---------------+------------+
| film_category | ✓          |
+---------------+------------+
| inventory     | ✓          |
+---------------+------------+
| language      | ✓          |
+---------------+------------+
| rental        | ✓          |
+---------------+------------+
| staff         | ✓          |
+---------------+------------+
| payment       | ✓          |
+---------------+------------+
| film          | ✓          |
+---------------

### 3.2. Check `Data Shape`
- Buat fungsi pengecekan data shape : **check_shape**
- Fungsi ini akan :
    - Memprint tabulate untuk menampilkan jumlah baris dan jumlah kolom dari setiap table.
- Parameter :
    - actual_table : Dictionary actual table (Data source 1, 2, 3)
    - requirements_table : Requirements JSON
- Return : None

In [20]:
# Your code here
def check_shape(actual_table):
    print("=> STEP 2: Check Data Shape")
    shape_checking = []
    
    for table_name in actual_table:
        n_row = actual_table[table_name].shape[0]
        n_col = actual_table[table_name].shape[1]
        shape_checking.append([table_name, n_row, n_col])
    
    # Print Table
    table_headers = ['Table_name', 'Number of rows', 'Number of columns']
    table = tabulate(shape_checking, headers = table_headers, tablefmt = 'grid')
    print(table)   

In [21]:
check_shape(actual_table = table_dict)

=> STEP 2: Check Data Shape
+---------------+------------------+---------------------+
| Table_name    |   Number of rows |   Number of columns |
| actor         |              200 |                   4 |
+---------------+------------------+---------------------+
| store         |                2 |                   4 |
+---------------+------------------+---------------------+
| address       |              603 |                   8 |
+---------------+------------------+---------------------+
| category      |               16 |                   3 |
+---------------+------------------+---------------------+
| customer      |              599 |                  10 |
+---------------+------------------+---------------------+
| film_actor    |             5462 |                   3 |
+---------------+------------------+---------------------+
| film_category |             1000 |                   3 |
+---------------+------------------+---------------------+
| inventory     |           

### 3.3. Check `Columns`
- Buat fungsi pengecekan nama-nama kolom pada setiap tabel : **check_columns**
- Fungsi ini akan :
    - Memprint tabulate untuk menampilkan tabel-tabel yang kolomnya tidak sesuai dengan requirements
- Parameter :
    - actual_table : Dictionary actual table (Data source 1, 2, 3)
    - requirements_table : Requirements JSON
- Return : None

In [22]:
# Your code here
def check_columns(actual_table, requirements_table):
    print("=> STEP 3: Check Columns")
    
    for table_name in requirements_table:
        result = []
        
        actual_columns = list(actual_table[table_name].columns)
        requirements_columns = []
        
        requirements_table_data = requirements_table[table_name]
        for column in requirements_table_data:
            requirements_columns.append(column['column_name'])

        for column_name in set(actual_columns + requirements_columns):
            in_actual_table = '✔' if column_name in actual_columns else '✘'
            in_requirements_table = '✔' if column_name in requirements_columns else '✘'
            result.append([column_name, in_actual_table, in_requirements_table])

        if set(actual_columns) == set(requirements_columns):
            pass

        else:
            headers = ["column_name", "in_actual_table", "in_requirements_table"]
            print(f"Table : {table_name}")
            print(tabulate(result, headers = headers, tablefmt = "grid"))
            print("\n")

In [23]:
check_columns(actual_table = table_dict,
              requirements_table = requirements_table)

=> STEP 3: Check Columns
Table : city
+---------------+-------------------+-------------------------+
| column_name   | in_actual_table   | in_requirements_table   |
| city_id       | ✔                 | ✔                       |
+---------------+-------------------+-------------------------+
| last_update   | ✘                 | ✔                       |
+---------------+-------------------+-------------------------+
| country_id    | ✘                 | ✔                       |
+---------------+-------------------+-------------------------+
| country       | ✔                 | ✘                       |
+---------------+-------------------+-------------------------+
| city          | ✔                 | ✔                       |
+---------------+-------------------+-------------------------+


Table : country
+---------------+-------------------+-------------------------+
| column_name   | in_actual_table   | in_requirements_table   |
| country_id    | ✘                 | ✔         

### 3.4. Check `Data Types`
- Buat fungsi pengecekan nama-nama kolom pada setiap tabel : **check_data_types**
- Fungsi ini akan :
    - Memprint tabulate untuk menampilkan tabel dan kolom yang tipe datanya tidak sesuai dengan requirements
- Parameter :
    - actual_table : Dictionary actual table (Data source 1, 2, 3)
    - requirements_table : Requirements JSON
- Return : None

In [24]:
# Your code here
def check_data_types(actual_table, requirements_table):
    print("=> STEP 4: Check Data Types")
    summary_data = []

    for table_name, df in actual_table.items():
        if table_name in requirements_table:
            for column_info in requirements_table[table_name]:
                column_name = column_info["column_name"]
                requirements_type = column_info["data_type"]
                
                if column_name in df.columns:
                    actual_type = str(df[column_name].dtype)
                    match = "✔" if actual_type == requirements_type else "✘"
                    summary_data.append([table_name, column_name, actual_type, requirements_type, match])
                else:
                    summary_data.append([table_name, column_name, "N/A", requirements_type, "✘ (Column not found)"])

    headers = ["Table Name", "Column Name", "Actual Type", "Requirements Type", "Match"]

    mismatch_data = [row for row in summary_data if "✘" in row[4]]
    
    if mismatch_data:
        print("\nSummary of Mismatches Data Types:")
        print(tabulate(mismatch_data, headers = headers, tablefmt = "grid"))
    
    else:
        print("All Data Types Match")

In [25]:
check_data_types(actual_table = table_dict, 
                 requirements_table = requirements_table)

=> STEP 4: Check Data Types

Summary of Mismatches Data Types:
+--------------+---------------+---------------+---------------------+----------------------+
| Table Name   | Column Name   | Actual Type   | Requirements Type   | Match                |
| customer     | create_date   | object        | datetime64[ns]      | ✘                    |
+--------------+---------------+---------------+---------------------+----------------------+
| city         | country_id    | N/A           | int64               | ✘ (Column not found) |
+--------------+---------------+---------------+---------------------+----------------------+
| city         | last_update   | N/A           | datetime64[ns]      | ✘ (Column not found) |
+--------------+---------------+---------------+---------------------+----------------------+
| country      | country_id    | N/A           | int64               | ✘ (Column not found) |
+--------------+---------------+---------------+---------------------+---------------------

### 3.5. Check `Missing Values`
- Buat fungsi pengecekan nama-nama kolom pada setiap tabel : **check_missing_values**
- Fungsi ini akan :
    - Memprint tabulate untuk menampilkan tabel dan kolom yang mempunyai missing values. 
    - Menampilkan jumlah dan persentase missing values
- Parameter :
    - actual_table : Dictionary actual table (Data source 1, 2, 3)
    - requirements_table : Requirements JSON
- Return : None

In [26]:
# Your code here
def check_missing_values(actual_table):
    print("=> STEP 5: Check Missing Values")

    missing_summary = []

    for table_name, df in actual_table.items():
        missing_count = df.isnull().sum()
        missing_percentage = (df.isnull().mean() * 100).round(2)
        
        for column_name, count in missing_count.items():
            if count > 0:
                percentage = missing_percentage[column_name]
                missing_summary.append([table_name, column_name, count, percentage])

    if missing_summary:
        print("Missing Value Summary:")
        print(tabulate(missing_summary, headers=["Table Name", "Column Name", "Missing Value Count", "Missing Value Percentage"], tablefmt="grid"))
    
    else:
        print("There's no Missing Values")

In [27]:
check_missing_values(actual_table = table_dict)

=> STEP 5: Check Missing Values
Missing Value Summary:
+--------------+---------------+-----------------------+----------------------------+
| Table Name   | Column Name   |   Missing Value Count |   Missing Value Percentage |
| address      | address2      |                     4 |                       0.66 |
+--------------+---------------+-----------------------+----------------------------+
| rental       | return_date   |                   183 |                       1.14 |
+--------------+---------------+-----------------------+----------------------------+
| staff        | picture       |                     1 |                      50    |
+--------------+---------------+-----------------------+----------------------------+
| city         | city          |                    10 |                       1.48 |
+--------------+---------------+-----------------------+----------------------------+
| city         | country       |                     7 |                       1.03 |

### 3.6. Check `Duplicates Data`
- Buat fungsi pengecekan nama-nama kolom pada setiap tabel : **check_duplicates_data**
- Fungsi ini akan :
    - Memprint tabulate untuk menampilkan tabel yang mempunyai data duplikat. 
    - Menampilkan jumlah data yang duplikat (Semua data yang terindikasi duplikat). 
- Parameter :
    - actual_table : Dictionary actual table (Data source 1, 2, 3)
    - requirements_table : Requirements JSON
- Return : None

In [28]:
# Your code here
def check_duplicates_data(actual_table):
    print("=> STEP 6: Check Duplicates Data")
    duplicate_summary = []

    for table_name, df in actual_table.items():
        try:
            duplicate_rows = df[df.duplicated(keep = False)]
            
            if not duplicate_rows.empty:
                duplicate_summary.append([table_name, len(duplicate_rows)])
        except:
            pass

    if duplicate_summary:
        print("Duplicate Data Summary:")
        print(tabulate(duplicate_summary, headers=["Table Name", "Duplicate Rows Count"], tablefmt="grid"))
    else:
        print("No Duplicate Data Found")

In [29]:
check_duplicates_data(table_dict)

=> STEP 6: Check Duplicates Data
Duplicate Data Summary:
+--------------+------------------------+
| Table Name   |   Duplicate Rows Count |
| city         |                    154 |
+--------------+------------------------+


## 4. Data Transform
- Proses transformasi ini dilakukan untuk **mempersiapkan data untuk diload ke Data Warehouse.**
- **Pertama, lakukanlah cleaning** sebelum datanya diload ke data warehouse **pada database dvdrental_clean**.
<center>

<img src="https://sekolahdata-assets.s3.ap-southeast-1.amazonaws.com/notebook-images/mde-intro-to-data-eng/10_01.png"/>
<center>

### 4.1. Data Cleansing
- Berdasarkan **hasil validations** diketahui bahwa data yang diperoleh dari beberapa data source **masih belum clean.**
    - **Kolom yang tidak sesuai requirements terdapat pada tabel:** 
	    - city
	    - country
    - **Terdapat missing values pada kolom :**
	    - address
	    - city
	    - rental
	    - staff
    - **Tipe data yang tidak sesuai requirements terdapat pada tabel :**
	    - city
	    - country
	    - customer
    - **Terdapat duplicates data pada kolom :**
	    - city


#### 4.1.1. Handle mismatch columns

**TABLE : country**

- Berikut adalah **data aktual** dari tabel "country"

In [30]:
country_df = table_dict['country']
country_df.head()

Unnamed: 0,country,last_update
0,Afghanistan,2006-02-15 09:44:00
1,Algeria,2006-02-15 09:44:00
2,American Samoa,2006-02-15 09:44:00
3,Angola,2006-02-15 09:44:00
4,Anguilla,2006-02-15 09:44:00


- Berikut adalah **requirements** untuk tabel "country"

In [31]:
requirements_table['country']

[{'column_name': 'country_id', 'data_type': 'int64'},
 {'column_name': 'last_update', 'data_type': 'datetime64[ns]'},
 {'column_name': 'country', 'data_type': 'object'}]

- Pada table country aktual belum mempunyai kolom country_id, maka :
    - **tambahkan colom country_id.**
- Ketentuan :
    - country_id tersebut dimulai dari angka 1 dan seterusnya

In [32]:
# Your code here
# Menambahkan kolom baru 'country_id' di posisi pertama
country_df.insert(0, 'country_id', range(1, len(country_df) + 1))

country_df.head()

Unnamed: 0,country_id,country,last_update
0,1,Afghanistan,2006-02-15 09:44:00
1,2,Algeria,2006-02-15 09:44:00
2,3,American Samoa,2006-02-15 09:44:00
3,4,Angola,2006-02-15 09:44:00
4,5,Anguilla,2006-02-15 09:44:00


- **Rubah dataframe country pada table_dict**

In [33]:
# Your code here
table_dict['country'] = country_df

**TABLE : city**

- Berikut adalah **data aktual** pada tabel "city".

In [34]:
city_df = table_dict['city']

city_df.head()

Unnamed: 0,city_id,city,country
0,1,A Corua (La Corua),Spain
1,2,Abha,Saudi Arabia
2,3,Abu Dhabi,United Arab Emirates
3,4,Acua,Mexico
4,5,Adana,Turkey


- Berikut adalah **requirements** untuk table "city".

In [35]:
requirements_table['city']

[{'column_name': 'city_id', 'data_type': 'int64'},
 {'column_name': 'country_id', 'data_type': 'int64'},
 {'column_name': 'last_update', 'data_type': 'datetime64[ns]'},
 {'column_name': 'city', 'data_type': 'object'}]

- Pada tabel city :
    - **Buat kolom :**
        - country_id
        - last_update
    - **Hapus kolom :**
        - country
- Untuk mencapai tujuan tersebut, lakukan **merge dengan table country** untuk **mendapatkan data country_id dan last_update**.

In [36]:
# Your code here
city_df = city_df.merge(country_df, on = 'country', how = 'left')

city_df = city_df[['city_id', 'country_id', 'city', 'last_update']]

city_df.head()

Unnamed: 0,city_id,country_id,city,last_update
0,1,87.0,A Corua (La Corua),2006-02-15 09:44:00
1,2,82.0,Abha,2006-02-15 09:44:00
2,3,101.0,Abu Dhabi,2006-02-15 09:44:00
3,4,60.0,Acua,2006-02-15 09:44:00
4,5,97.0,Adana,2006-02-15 09:44:00


- **Rubah dataframe city pada table_dict**

In [37]:
# Your code here
table_dict['city'] = city_df

- Setelah menyesuaikan kolom pada data aktual dengan requirements, **validasi** kolomnya kembali. Gunakan fungsi `check_columns`

In [38]:
check_columns(actual_table = table_dict,
              requirements_table = requirements_table)

=> STEP 3: Check Columns


- Hal tersebut menunjukkan kolom-kolom pada data aktual sudah sesuai dengan requirements.

#### 4.1.2. Handle Missing Values

- Pada proses handle missing values ini akan dilakukan dropping missing values

- **Buatlah fungsi** untuk remove missing values : **remove_missing_values**
- Fungsi ini akan :
    - Menghapus missing values pada setiap tabel.
- Parameter :
    - actual_table : Dictionary actual table (Data source 1, 2, 3)
- Return :
    - cleaned_actual_table : Dictionary actual table yang sudah bersih dari missing values

In [39]:
# Your code here
def remove_missing_values(actual_table):
    cleaned_actual_table = {}

    for table_name, df in actual_table.items():
        cleaned_df = df.dropna() 
        cleaned_actual_table[table_name] = cleaned_df

    return cleaned_actual_table

- Update table_dict dengan data yang sudah tidak mempunyai missing values

In [40]:
# Your code here
table_dict = remove_missing_values(actual_table = table_dict)

- Validasi missing valuenya kembali. Gunakan fungsi **check_missing_values**

In [41]:
check_missing_values(actual_table = table_dict)

=> STEP 5: Check Missing Values
There's no Missing Values


- Dapat dilihat bahwa missing values sudah terhandle.

#### 4.1.3. Handle Data Types

- Pada proses handle data types akan dilakukan **casting tipe data** berdasarkan requirements yang telah diberikan.
- **Buatlah fungsi** untuk menghandle Data Types : **adjust_data_types**
- Fungsi ini akan :
    - Melakukan casting tipe data sesuai dengan requirements.
- Parameter :
    - actual_table : Dictionary actual table (Data source 1, 2, 3)
- Return :
    - cleaned_actual_table : Dictionary actual table yang sudah bersih dari missing values

In [42]:
# Your code here
def adjust_data_types(actual_table, requirements_table):
    adjusted_table_dict = {}

    for table_name, df in actual_table.items():
        if table_name in requirements_table:
            table_requirements = requirements_table[table_name]

            for column_info in table_requirements:
                column_name = column_info["column_name"]
                data_type = column_info["data_type"]

                if column_name in df.columns:
                    df[column_name] = df[column_name].astype(data_type)

            adjusted_table_dict[table_name] = df

    return adjusted_table_dict

- Update table_dict dengan data yang sudah sesuai tipe datanya

In [43]:
# Your code here
table_dict = adjust_data_types(actual_table = table_dict, 
                               requirements_table = requirements_table)

- Validasi tipe data dari kolom pada setiap tabel. Gunakan fungsi **check_data_types**

In [44]:
check_data_types(actual_table = table_dict, 
                 requirements_table = requirements_table)

=> STEP 4: Check Data Types
All Data Types Match


- Dapat dilihat bahwa tipe data pada data aktual sudah sesuai.

#### 4.1.4. Handle Duplicates Data

- Pada proses handling data yang duplikat, akan dilakukan **drop data yang duplikat dengan mempertahankan record data yang pertama**.
- Buatlah fungsi untuk menghandle Data Types : **remove_duplicates**
- Fungsi ini akan :
    - Melakukan drop data yang duplikat dengan mempertahankan record data yang pertama.
- Parameter :
    - actual_table : Dictionary actual table (Data source 1, 2, 3)
- Return :
    - cleaned_table_dict : Dictionary actual table yang sudah bersih dari missing values

In [45]:
# Your code here
def remove_duplicates(actual_table):
    cleaned_table_dict = {}

    for table_name, df in actual_table.items():
        if table_name == 'city':
            cleaned_df = df.drop_duplicates(keep = 'first')  # Menghapus data duplikat dengan mempertahankan record data yang pertama
            cleaned_table_dict[table_name] = cleaned_df

        else:
            cleaned_table_dict[table_name] = df

    return cleaned_table_dict

- Update table_dict dengan data yang sudah tidak mempunyai data duplikat

In [46]:
# Your code here
table_dict = remove_duplicates(table_dict)

- Validasi kembali data yang mengalami duplikat. Gunakan fungsi **check_duplicates_data**

In [47]:
check_duplicates_data(table_dict)

=> STEP 6: Check Duplicates Data
No Duplicate Data Found


### 4.2. Data Manipulation & Data Selection

- Pada bagian ini, kita akan coba **generate sebuah tabel** yang merupakan kombinasi dari beberapa tabel.
- Kita akan generate tabel `film_list`
- Tabel tersebut akan disimpan pada database **dvdrental_analysis**
- Tabel tersebut merupakan **kombinasi tabel :**
    - `actor`
    - `film_actor`
    - `film`
    - `film_category`
    - `category`
<center>
<img src="https://sekolahdata-assets.s3.ap-southeast-1.amazonaws.com/notebook-images/mde-intro-to-data-eng/10_02.png"/>

- Kolom-kolom yang akan kita **generate** adalah :
    - `fid` -> sama dengan film_id
    - `title` -> Judul film
    - `description` -> Deskripsi dari film
    - `category` -> Kategori film
    - `price` -> Harga film
    - `length` -> Durasi film
    - `rating` -> Rating film
    - `actors` -> Nama-nama aktor film tersebut

<center>
<img src="https://sekolahdata-assets.s3.ap-southeast-1.amazonaws.com/notebook-images/mde-intro-to-data-eng/10_03.png"/>
<center>

- Pertama, **persiapkan dataframe yang diperlukan.**
- Dapatkan dataframenya dari **table_dict**

In [57]:
# Your code here
category = table_dict['category']
film_category = table_dict['film_category']
film = table_dict['film']
film_actor = table_dict['film_actor']
actor = table_dict['actor']

- Gabungkan tabel-tabel yang diperlukan dengan melakukan **merge**.
    - Merge : **Category -> film_category**
    - on : **category_id**
    - Join Type : **left join**
    - Suffixes : ("_x1", "_y1")

In [58]:
# Your code here
# Buat dataframe "film_list" untuk menyimpan hasil merge
film_list = category.merge(film_category, how = 'left', on = 'category_id', suffixes = ("_x1", "_y1"))

# Tampilkan 5 data "film_list" teratas
film_list.head()

Unnamed: 0,category_id,name,last_update_x1,film_id,last_update_y1
0,1,Action,2006-02-15 09:46:27,19,2006-02-15 10:07:09
1,1,Action,2006-02-15 09:46:27,21,2006-02-15 10:07:09
2,1,Action,2006-02-15 09:46:27,29,2006-02-15 10:07:09
3,1,Action,2006-02-15 09:46:27,38,2006-02-15 10:07:09
4,1,Action,2006-02-15 09:46:27,56,2006-02-15 10:07:09


- 
    - Merge : **film_list -> film**
    - on : **film_id**
    - Join Type : **left join**
    - Suffixes : ("_x2", "_y2")

In [59]:
# Your code here
# Update dataframe "film_list" untuk menyimpan hasil merge
film_list = film_list.merge(film, how = 'left', on = 'film_id', suffixes = ("_x2", "_y2"))

# Tampilkan 5 data "film_list" teratas
film_list.head()

Unnamed: 0,category_id,name,last_update_x1,film_id,last_update_y1,title,description,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating,last_update,special_features,fulltext
0,1,Action,2006-02-15 09:46:27,19,2006-02-15 10:07:09,Amadeus Holy,A Emotional Display of a Pioneer And a Technic...,2006,1,6,0.99,113,20.99,PG,2013-05-26 14:50:58.951,"[Commentaries, Deleted Scenes, Behind the Scenes]",'amadeus':1 'baloon':20 'battl':15 'display':5...
1,1,Action,2006-02-15 09:46:27,21,2006-02-15 10:07:09,American Circus,A Insightful Drama of a Girl And a Astronaut w...,2006,1,3,4.99,129,17.99,R,2013-05-26 14:50:58.951,"[Commentaries, Behind the Scenes]",'administr':17 'american':1 'astronaut':11 'ci...
2,1,Action,2006-02-15 09:46:27,29,2006-02-15 10:07:09,Antitrust Tomatoes,A Fateful Yarn of a Womanizer And a Feminist w...,2006,1,5,2.99,168,11.99,NC-17,2013-05-26 14:50:58.951,"[Trailers, Commentaries, Deleted Scenes]",'administr':17 'ancient':19 'antitrust':1 'dat...
3,1,Action,2006-02-15 09:46:27,38,2006-02-15 10:07:09,Ark Ridgemont,A Beautiful Yarn of a Pioneer And a Monkey who...,2006,1,6,0.99,68,25.99,NC-17,2013-05-26 14:50:58.951,"[Trailers, Commentaries, Deleted Scenes, Behin...",'ark':1 'beauti':4 'desert':20 'explor':16 'mo...
4,1,Action,2006-02-15 09:46:27,56,2006-02-15 10:07:09,Barefoot Manchurian,A Intrepid Story of a Cat And a Student who mu...,2006,1,6,2.99,129,15.99,G,2013-05-26 14:50:58.951,"[Trailers, Commentaries]",'abandon':19 'amus':20 'barefoot':1 'cat':8 'g...


-
    - Merge : **film_list -> film_actor**
    - on : **film_id**
    - Join Type : **inner join**
    - Suffixes : ("_x3", "_y3")

In [60]:
# Your code here
# Update dataframe "film_list" untuk menyimpan hasil merge
film_list = film_list.merge(film_actor, how = 'inner', on = 'film_id', suffixes = ("_x3", "_y3"))

# Tampilkan 5 data "film_list" teratas
film_list.head()

Unnamed: 0,category_id,name,last_update_x1,film_id,last_update_y1,title,description,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating,last_update_x3,special_features,fulltext,actor_id,last_update_y3
0,1,Action,2006-02-15 09:46:27,19,2006-02-15 10:07:09,Amadeus Holy,A Emotional Display of a Pioneer And a Technic...,2006,1,6,0.99,113,20.99,PG,2013-05-26 14:50:58.951,"[Commentaries, Deleted Scenes, Behind the Scenes]",'amadeus':1 'baloon':20 'battl':15 'display':5...,5,2006-02-15 10:05:03
1,1,Action,2006-02-15 09:46:27,19,2006-02-15 10:07:09,Amadeus Holy,A Emotional Display of a Pioneer And a Technic...,2006,1,6,0.99,113,20.99,PG,2013-05-26 14:50:58.951,"[Commentaries, Deleted Scenes, Behind the Scenes]",'amadeus':1 'baloon':20 'battl':15 'display':5...,27,2006-02-15 10:05:03
2,1,Action,2006-02-15 09:46:27,19,2006-02-15 10:07:09,Amadeus Holy,A Emotional Display of a Pioneer And a Technic...,2006,1,6,0.99,113,20.99,PG,2013-05-26 14:50:58.951,"[Commentaries, Deleted Scenes, Behind the Scenes]",'amadeus':1 'baloon':20 'battl':15 'display':5...,37,2006-02-15 10:05:03
3,1,Action,2006-02-15 09:46:27,19,2006-02-15 10:07:09,Amadeus Holy,A Emotional Display of a Pioneer And a Technic...,2006,1,6,0.99,113,20.99,PG,2013-05-26 14:50:58.951,"[Commentaries, Deleted Scenes, Behind the Scenes]",'amadeus':1 'baloon':20 'battl':15 'display':5...,43,2006-02-15 10:05:03
4,1,Action,2006-02-15 09:46:27,19,2006-02-15 10:07:09,Amadeus Holy,A Emotional Display of a Pioneer And a Technic...,2006,1,6,0.99,113,20.99,PG,2013-05-26 14:50:58.951,"[Commentaries, Deleted Scenes, Behind the Scenes]",'amadeus':1 'baloon':20 'battl':15 'display':5...,84,2006-02-15 10:05:03


- 
    - Merge : **film_list -> actor**
    - on : **actor_id**
    - Join Type : **inner join**
    - Suffixes : ("_x4", "_y4")

In [61]:
# Your code here
# Update dataframe "film_list" untuk menyimpan hasil merge
film_list = film_list.merge(actor, how = 'inner', on = 'actor_id', suffixes = ("_x4", "_y4"))

# Tampilkan 5 data "film_list" teratas
film_list.head()

Unnamed: 0,category_id,name,last_update_x1,film_id,last_update_y1,title,description,release_year,language_id,rental_duration,...,replacement_cost,rating,last_update_x3,special_features,fulltext,actor_id,last_update_y3,first_name,last_name,last_update
0,1,Action,2006-02-15 09:46:27,19,2006-02-15 10:07:09,Amadeus Holy,A Emotional Display of a Pioneer And a Technic...,2006,1,6,...,20.99,PG,2013-05-26 14:50:58.951,"[Commentaries, Deleted Scenes, Behind the Scenes]",'amadeus':1 'baloon':20 'battl':15 'display':5...,5,2006-02-15 10:05:03,Johnny,Lollobrigida,2013-05-26 14:47:57.620
1,1,Action,2006-02-15 09:46:27,375,2006-02-15 10:07:09,Grail Frankenstein,A Unbelieveable Saga of a Teacher And a Monkey...,2006,1,4,...,17.99,NC-17,2013-05-26 14:50:58.951,"[Commentaries, Deleted Scenes, Behind the Scenes]",'abandon':19 'fight':14 'frankenstein':2 'girl...,5,2006-02-15 10:05:03,Johnny,Lollobrigida,2013-05-26 14:47:57.620
2,1,Action,2006-02-15 09:46:27,732,2006-02-15 10:07:09,Rings Heartbreakers,A Amazing Yarn of a Sumo Wrestler And a Boat w...,2006,1,5,...,17.99,G,2013-05-26 14:50:58.951,"[Trailers, Commentaries, Behind the Scenes]",'amaz':4 'boat':12 'conquer':15 'heartbreak':2...,5,2006-02-15 10:05:03,Johnny,Lollobrigida,2013-05-26 14:47:57.620
3,2,Animation,2006-02-15 09:46:27,865,2006-02-15 10:07:09,Sunrise League,A Beautiful Epistle of a Madman And a Butler w...,2006,1,3,...,19.99,PG-13,2013-05-26 14:50:58.951,[Behind the Scenes],'beauti':4 'butler':11 'crocodil':16 'epistl':...,5,2006-02-15 10:05:03,Johnny,Lollobrigida,2013-05-26 14:47:57.620
4,3,Children,2006-02-15 09:46:27,392,2006-02-15 10:07:09,Hall Cassidy,A Beautiful Panorama of a Pastry Chef And a A ...,2006,1,5,...,13.99,NC-17,2013-05-26 14:50:58.951,"[Commentaries, Behind the Scenes]",'battl':16 'beauti':4 'cassidi':2 'chef':9 'ge...,5,2006-02-15 10:05:03,Johnny,Lollobrigida,2013-05-26 14:47:57.620


- Selanjutnya, **gabungkan** `first_name` dan `last_name` menjadi `full_name`

In [62]:
# Gabungkan nama depan dan belakang aktor
film_list['full_name'] = film_list['first_name'] + ' ' + film_list['last_name']

# Tampilkan kolom "full_name"
film_list['full_name']

0       Johnny Lollobrigida
1       Johnny Lollobrigida
2       Johnny Lollobrigida
3       Johnny Lollobrigida
4       Johnny Lollobrigida
               ...         
5457           Milla Keitel
5458           Milla Keitel
5459           Milla Keitel
5460           Milla Keitel
5461           Milla Keitel
Name: full_name, Length: 5462, dtype: object

- Perlu diketahui bahwa satu film bisa terdiri dari beberapa aktor.
- Oleh karena itu, **gabung nama-nama aktor tersebut di satu film yang sama**.
- Contoh :
    <center>
    <img src="https://sekolahdata-assets.s3.ap-southeast-1.amazonaws.com/notebook-images/mde-intro-to-data-eng/10_04.png"/>
    <center>

In [63]:
# Your code here
# Gabungkan nama aktor menjadi satu string untuk setiap film
film_list = film_list.groupby(['film_id', 'title', 'description', 'name', 'rental_rate', 'length', 'rating'])['full_name'].apply(lambda x: ', '.join(x))

# Tampilkan 5 data teratas
film_list.head()

film_id  title             description                                                                                                            name         rental_rate  length  rating
1        Academy Dinosaur  A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies                       Documentary  0.99         86      PG        Rock Dukakis, Mena Temple, Sandra Peck, Mary K...
2        Ace Goldfinger    A Astounding Epistle of a Database Administrator And a Explorer who must Find a Car in Ancient China                   Horror       4.99         48      G         Bob Fawcett, Sean Guiness, Chris Depp, Minnie ...
3        Adaptation Holes  A Astounding Reflection of a Lumberjack And a Car who must Sink a Lumberjack in A Baloon Factory                       Documentary  2.99         50      NC-17     Julianne Dench, Nick Wahlberg, Cameron Streep,...
4        Affair Prejudice  A Fanciful Documentary of a Frisbee And a Lumberjack who must Chas

- Hasil tersebut **masih berupa "series"**
- Oleh karena itu, **rubah menjadi dataframe**

In [64]:
# Your code here
# Rubah dari series menjadi dataframe
film_list = pd.DataFrame(film_list).reset_index()

# Tampilkan 5 data teratas
film_list.head()

Unnamed: 0,film_id,title,description,name,rental_rate,length,rating,full_name
0,1,Academy Dinosaur,A Epic Drama of a Feminist And a Mad Scientist...,Documentary,0.99,86,PG,"Rock Dukakis, Mena Temple, Sandra Peck, Mary K..."
1,2,Ace Goldfinger,A Astounding Epistle of a Database Administrat...,Horror,4.99,48,G,"Bob Fawcett, Sean Guiness, Chris Depp, Minnie ..."
2,3,Adaptation Holes,A Astounding Reflection of a Lumberjack And a ...,Documentary,2.99,50,NC-17,"Julianne Dench, Nick Wahlberg, Cameron Streep,..."
3,4,Affair Prejudice,A Fanciful Documentary of a Frisbee And a Lumb...,Horror,2.99,117,G,"Jodie Degeneres, Scarlett Damon, Fay Winslet, ..."
4,5,African Egg,A Fast-Paced Documentary of a Pastry Chef And ...,Family,2.99,130,G,"Dustin Tautou, Matthew Leigh, Gary Phoenix, Ma..."


- Terakhir, **rubah nama kolom**.
    - `film_id` : `fid`
    - `name` : `category`
    - `rental_rate` : `price`
    - `full_name` : `actors`

In [65]:
# Your code here
# Buat mapping perubahan nama kolom
rename_column_map = {
    'film_id' : 'fid',
    'name' : 'category',
    'rental_rate' : 'price',
    'full_name' : 'actors'
}
# Your code here
# Rubah nama kolom
film_list.rename(columns = rename_column_map, inplace = True) 

# Tampilkan 5 data teratas
film_list.head()

Unnamed: 0,fid,title,description,category,price,length,rating,actors
0,1,Academy Dinosaur,A Epic Drama of a Feminist And a Mad Scientist...,Documentary,0.99,86,PG,"Rock Dukakis, Mena Temple, Sandra Peck, Mary K..."
1,2,Ace Goldfinger,A Astounding Epistle of a Database Administrat...,Horror,4.99,48,G,"Bob Fawcett, Sean Guiness, Chris Depp, Minnie ..."
2,3,Adaptation Holes,A Astounding Reflection of a Lumberjack And a ...,Documentary,2.99,50,NC-17,"Julianne Dench, Nick Wahlberg, Cameron Streep,..."
3,4,Affair Prejudice,A Fanciful Documentary of a Frisbee And a Lumb...,Horror,2.99,117,G,"Jodie Degeneres, Scarlett Damon, Fay Winslet, ..."
4,5,African Egg,A Fast-Paced Documentary of a Pastry Chef And ...,Family,2.99,130,G,"Dustin Tautou, Matthew Leigh, Gary Phoenix, Ma..."


## 5. Load Data To Data Warehouse

### 5.1. Setup Connection
- Buat fungsi untuk membuat engine postgres.
    - user = "root"
    - password = "qwerty123"
    - host = "localhost"
    - port = "3000"
- Parameter : Nama database

In [66]:
# Your code here
def dw_postgres_engine(database_name):
    
    # Koneksi ke database
    user = "root"
    password = "qwerty123"
    host = "localhost"
    port = "3000"
    
    engine = create_engine(f"postgresql://{user}:{password}@{host}:{port}/{database_name}")

    return engine

### 5.2. Load to "dvdrental_clean" Database

#### 5.2.1. Create table
- Buatlah table untuk database tersebut.
- Gunakan schema berikut :
    ```sql
    -- Membuat skema public jika belum ada
    CREATE SCHEMA IF NOT EXISTS public;

    -- Buat tabel
    CREATE TABLE public.actor (
        actor_id SERIAL PRIMARY KEY,
        last_update TIMESTAMP,
        first_name VARCHAR,
        last_name VARCHAR
    );

    CREATE TABLE public.store (
        store_id SERIAL PRIMARY KEY,
        manager_staff_id INTEGER,
        address_id INTEGER,
        last_update TIMESTAMP
    );

    CREATE TABLE public.address (
        last_update TIMESTAMP,
        city_id INTEGER,
        address_id SERIAL PRIMARY KEY,
        district VARCHAR,
        phone VARCHAR,
        postal_code VARCHAR,
        address VARCHAR,
        address2 VARCHAR
    );

    CREATE TABLE public.category (
        category_id SERIAL PRIMARY KEY,
        last_update TIMESTAMP,
        name VARCHAR
    );

    CREATE TABLE public.city (
        city_id SERIAL PRIMARY KEY,
        country_id INTEGER,
        last_update TIMESTAMP,
        city VARCHAR
    );

    CREATE TABLE public.country (
        country_id SERIAL PRIMARY KEY,
        last_update TIMESTAMP,
        country VARCHAR
    );

    CREATE TABLE public.customer (
        active INTEGER,
        store_id INTEGER,
        create_date TIMESTAMP,
        last_update TIMESTAMP,
        customer_id SERIAL PRIMARY KEY,
        address_id INTEGER,
        activebool BOOLEAN,
        first_name VARCHAR,
        last_name VARCHAR,
        email VARCHAR
    );

    CREATE TABLE public.film_actor (
        actor_id SERIAL PRIMARY KEY,
        film_id INTEGER,
        last_update TIMESTAMP
    );

    CREATE TABLE public.film_category (
        film_id SERIAL PRIMARY KEY,
        category_id INTEGER,
        last_update TIMESTAMP
    );

    CREATE TABLE public.inventory (
        inventory_id SERIAL PRIMARY KEY,
        film_id INTEGER,
        store_id INTEGER,
        last_update TIMESTAMP
    );

    CREATE TABLE public.language (
        language_id SERIAL PRIMARY KEY,
        last_update TIMESTAMP,
        name VARCHAR
    );

    CREATE TABLE public.rental (
        rental_id SERIAL PRIMARY KEY,
        rental_date TIMESTAMP,
        inventory_id INTEGER,
        customer_id INTEGER,
        return_date TIMESTAMP,
        staff_id INTEGER,
        last_update TIMESTAMP
    );

    CREATE TABLE public.staff (
        picture VARCHAR,
        address_id INTEGER,
        store_id INTEGER,
        active BOOLEAN,
        last_update TIMESTAMP,
        staff_id SERIAL PRIMARY KEY,
        first_name VARCHAR,
        last_name VARCHAR,
        password VARCHAR,
        email VARCHAR,
        username VARCHAR
    );

    CREATE TABLE public.payment (
        payment_id SERIAL PRIMARY KEY,
        customer_id INTEGER,
        staff_id INTEGER,
        rental_id INTEGER,
        amount FLOAT,
        payment_date TIMESTAMP
    );

    CREATE TABLE public.film (
        fulltext VARCHAR,
        rating VARCHAR,
        last_update TIMESTAMP,
        film_id SERIAL PRIMARY KEY,
        release_year INTEGER,
        language_id INTEGER,
        rental_duration INTEGER,
        rental_rate FLOAT,
        length INTEGER,
        replacement_cost FLOAT,
        title VARCHAR,
        description VARCHAR,
        special_features VARCHAR
    );
```

#### 5.2.2. Load Data
- Lakukan load data ke "dvdrental_analysis" database

In [67]:
# Your code here
# Buat engine postgres
engine = dw_postgres_engine(database_name = 'dvdrental_clean')

# Iterasi melalui dictionary table_dict
for table_name, df in table_dict.items():
    # Insert data ke table
    df.to_sql(table_name, engine, if_exists = 'replace', index = False)

# Tutup koneksi ke database
engine.dispose()

### 5.3. Load to "dvdrental_analysis" Database

#### 5.3.1. Create Table
- Buatlah table untuk database tersebut.
- Gunakan schema berikut :
    ```sql
    -- Membuat skema public jika belum ada
    CREATE SCHEMA IF NOT EXISTS public;

    -- Membuat tabel film_list
    CREATE TABLE public.film_list (
        fid SERIAL PRIMARY KEY,
        title VARCHAR,
        description VARCHAR,
        category VARCHAR,
        price FLOAT,
        length INTEGER,
        rating VARCHAR,
        actors VARCHAR
    );
    ```

#### 5.3.2. Load Data
- Lakukan load data ke "dvdrental_analysis" database

In [68]:
# Your code here
# Buat engine postgres
engine = dw_postgres_engine(database_name = "dvdrental_analysis")

# Insert data ke table film_list
film_list.to_sql('film_list', engine, if_exists = 'replace', index = False)

# Tutup koneksi ke database
engine.dispose()