# Task
---

- Membuat ETL Pipeline menggunakan Luigi
- Proposed ETL Pipeline design adalah sebagai berikut

<center>
<img src="https://sekolahdata-assets.s3.ap-southeast-1.amazonaws.com/notebook-images/mde-intro-to-data-eng/15-01.png" width=70%>
</center>

- Task yang harus dilakukan adalah:
    1. Membuat Pipeline Extract dari source:
        - [API Payment Data](https://shandytepe.github.io/payment.json)
        - Database Hotel
    
       Setelah berhasil melakukan Extract, simpan output `.csv` ke dalam folder `live_class_w8/data/raw`
    
    2. Membuat Pipeline Validation dari Pipeline Extract data. Proses validasi yang dilakukan adalah:
        - Check Data Shape
        - Check Data Types
        - Check Missing Values

    3. Membuat Pipeline Transform dari Pipeline Extract Data. Proses yang dilakukan adalah:
        - Join Data dari beberapa source, menjadi satu table data yang baru
            - Source yang digunakan untuk membuat table baru adalah:
                - `payment` (API Payment Data)
                - `reservation` (DB Hotel)
                - `customer` (DB Hotel)


<center>
<img src="https://sekolahdata-assets.s3.ap-southeast-1.amazonaws.com/notebook-images/mde-intro-to-data-eng/15-02.png" width=80%>
</center>


- Cleaning data

    Setelah berhasil melakukan proses Transform, simpan output `.csv` ke dalam folder `live_class_w8/data/transform`

    4. Membuat Pipeline Load dari Pipeline Transform. Memasukkan table yang baru ke dalam Data Warehouse yang sudah dibuat. Untuk membuat Data Warehouse, bisa melihat dokumentasi di https://github.com/shandytepe/live-class-w8-intro-to-data-eng/tree/main/docker-db

    5. Menggabungkan semua Pipeline menjadi satu dengan menggunakan Luigi

    6. Set schedule pada Pipelien yang sudah dibuat dengan menggunakan `cron`. Set schedule pipeline untuk dijalankan tiap 2 menit sekali. Sehingga, syntax `cron` nya adalah sebagai berikut

        ```bash
        */2 * * * * /path/to/pipeline.sh
        ```

    7. Melakukan proses `UPSERT` untuk mengecek robustness dari pipeline yang dibuat
---

## Extract
---

In [1]:
# import library

import pandas as pd
import requests
import luigi
import json
from urllib import request

In [2]:
payment_url = 'https://shandytepe.github.io/payment.json'
response = requests.get(payment_url)
response

<Response [200]>

In [3]:
response = request.urlopen(payment_url)
payment_response = json.loads(response.read().decode('utf-8'))
payment_response

{'payment_data': [{'payment_id': 1,
   'reservation_id': 1,
   'provider': 'Ovo',
   'account_number': 38137149,
   'payment_status': 'Success',
   'payment_date': '2020-10-20 17:07:04.406780',
   'expire_date': '2020-10-28 18:10:37.175108'},
  {'payment_id': 2,
   'reservation_id': 2,
   'provider': 'BCA',
   'account_number': 42103729,
   'payment_status': 'Success',
   'payment_date': '2015-02-28 16:44:56.703511',
   'expire_date': '2015-03-08 05:27:33.443131'},
  {'payment_id': 3,
   'reservation_id': 3,
   'provider': 'Permata',
   'account_number': 58689635,
   'payment_status': 'Success',
   'payment_date': '2022-04-03 13:27:30.811447',
   'expire_date': '2022-04-06 14:20:57.846492'},
  {'payment_id': 4,
   'reservation_id': 4,
   'provider': 'BNI',
   'account_number': 107161965,
   'payment_status': 'Success',
   'payment_date': '2019-10-04 08:39:56.034805',
   'expire_date': '2019-10-08 02:01:21.209415'},
  {'payment_id': 5,
   'reservation_id': 5,
   'provider': 'BSI',
   'a

In [4]:
response = requests.get(payment_url)
payment_json = response.json()
payment_data = payment_json['payment_data']
payment_df = pd.DataFrame(payment_data)

payment_df

Unnamed: 0,payment_id,reservation_id,provider,account_number,payment_status,payment_date,expire_date
0,1,1,Ovo,38137149,Success,2020-10-20 17:07:04.406780,2020-10-28 18:10:37.175108
1,2,2,BCA,42103729,Success,2015-02-28 16:44:56.703511,2015-03-08 05:27:33.443131
2,3,3,Permata,58689635,Success,2022-04-03 13:27:30.811447,2022-04-06 14:20:57.846492
3,4,4,BNI,107161965,Success,2019-10-04 08:39:56.034805,2019-10-08 02:01:21.209415
4,5,5,BSI,14334131,Failed,,2020-06-14 12:02:55.669148
...,...,...,...,...,...,...,...
396,397,397,Permata,87836019,Success,2018-08-05 09:22:05.983116,2018-08-27 12:44:56.532991
397,398,398,Mandiri,50997930,Failed,,2022-04-19 12:35:51.127022
398,399,399,Ovo,44305033,Success,2015-12-12 22:35:15.569310,2015-12-19 19:28:08.809478
399,400,400,BNI,84875963,Success,2021-07-29 16:38:06.052542,2021-08-02 22:27:28.713637


In [5]:
from sqlalchemy import create_engine

In [7]:
def db_source_postgres_engine():
    db_username = 'postgres'
    db_password = 'cobapassword'
    db_host = 'localhost:5433'
    db_name = 'pachotel'

    engine_str = f"postgresql://{db_username}:{db_password}@{db_host}/{db_name}"
    engine = create_engine(engine_str)

    return engine
    

In [8]:
source_engine = db_source_postgres_engine()
source_engine

Engine(postgresql://postgres:***@localhost:5433/pachotel)

In [9]:
query = """
SELECT table_name 
FROM information_schema.tables
WHERE table_schema = 'public'
"""
tables_df = pd.read_sql_query(query, source_engine)
table_names = tables_df['table_name'].tolist()  # Daftar nama tabel

print("Tabel yang ditemukan:", table_names)

Tabel yang ditemukan: ['customer', 'reservation', 'reservation_room', 'room', 'payment']


In [14]:
def get_table_data(table_name):
    query = f"SELECT * FROM {table_name}"
    df = pd.read_sql(query, source_engine)
    return df

In [15]:
customer_df = get_table_data('customer')
reservation_df = get_table_data('reservation')
reservation_room_df = get_table_data('reservation_room')
room_df = get_table_data('room')
payment_db_df = get_table_data('payment')

In [25]:
class ExtractAPIPaymentData(luigi.Task):
    def requires(self):
        pass

    def run(self):
        payment_url = 'https://shandytepe.github.io/payment.json'

        response = requests.get(payment_url)
        payment_json = response.json()
        payment_data = payment_json['payment_data']
        payment_df = pd.DataFrame(payment_data)

        payment_df.to_csv(self.output().path, index = False)

    def output(self):
        return luigi.LocalTarget("data_extract/extract_payment_data.csv")
        

In [27]:
class ExtractDBHotelData(luigi.Task):
    def requires(self):
        return []
    
    def run(self):            
        source_engine = db_source_postgres_engine()

        query = """
        SELECT table_name 
        FROM information_schema.tables
        WHERE table_schema = 'public'
        """
        tables_df = pd.read_sql_query(query, source_engine)
        table_names = tables_df['table_name'].tolist()
        
        for table in table_names:
            query = f"SELECT * FROM {table}" 
            df = pd.read_sql(query, source_engine)
            
            df.to_csv(f"data_extract/{table}.csv", index = False)
    
    def output(self):
        return luigi.LocalTarget("data_extract/extract_status.txt")

In [28]:
luigi.build([ExtractAPIPaymentData(), ExtractDBHotelData()], local_scheduler = True)

DEBUG: Checking if ExtractAPIPaymentData() is complete
INFO: Informed scheduler that task   ExtractAPIPaymentData__99914b932b   has status   DONE
DEBUG: Checking if ExtractDBHotelData() is complete
INFO: Informed scheduler that task   ExtractDBHotelData__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 22052] Worker Worker(salt=3619855634, workers=1, host=zueible, username=LENOVO, pid=22052) running   ExtractDBHotelData()
INFO: [pid 22052] Worker Worker(salt=3619855634, workers=1, host=zueible, username=LENOVO, pid=22052) done      ExtractDBHotelData()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   ExtractDBHotelData__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=3619855634, workers=1, host=zueible, username=LENOV

True

---

## Validation
---

In [31]:
payment_df.head()

Unnamed: 0,payment_id,reservation_id,provider,account_number,payment_status,payment_date,expire_date
0,1,1,Ovo,38137149,Success,2020-10-20 17:07:04.406780,2020-10-28 18:10:37.175108
1,2,2,BCA,42103729,Success,2015-02-28 16:44:56.703511,2015-03-08 05:27:33.443131
2,3,3,Permata,58689635,Success,2022-04-03 13:27:30.811447,2022-04-06 14:20:57.846492
3,4,4,BNI,107161965,Success,2019-10-04 08:39:56.034805,2019-10-08 02:01:21.209415
4,5,5,BSI,14334131,Failed,,2020-06-14 12:02:55.669148


In [34]:
n_coloumn = payment_df.shape[1]
n_row = payment_df.shape[0]
print(f"Pada table payment memiliki jumlah {n_row} baris dan {n_coloumn} kolom")

Pada table payment memiliki jumlah 401 baris dan 7 kolom


In [41]:
for column in payment_df:
    print(f"Column {column} has data type {str(payment_df['payment_id'].dtype)}")

Column payment_id has data type int64
Column reservation_id has data type int64
Column provider has data type int64
Column account_number has data type int64
Column payment_status has data type int64
Column payment_date has data type int64
Column expire_date has data type int64


In [53]:
for column in payment_df:
    sum_null = payment_df[column].isnull().sum()
    sum_payment = len(payment_df)

    percentage = (sum_null / sum_payment) * 100
    print(f"Columns {column} has percentages missing values {percentage:.0f}%")

Columns payment_id has percentages missing values 0%
Columns reservation_id has percentages missing values 0%
Columns provider has percentages missing values 0%
Columns account_number has percentages missing values 0%
Columns payment_status has percentages missing values 0%
Columns payment_date has percentages missing values 21%
Columns expire_date has percentages missing values 0%
