# Data Pipeline Demo

## 0. Load Required Libraries

In [1]:
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import pandas as pd
import numpy as np 
import joblib
import os
import yaml
import src.util as util

## 1. Load Configuration File

In [2]:
config_data = util.load_config()

## 2. Data Collection

In [3]:
def read_raw_data(config: dict) -> pd.DataFrame:
    # Create variable to store raw dataset
    raw_dataset = pd.DataFrame()

    # Raw dataset dir
    raw_dataset_dir = config["raw_dataset_dir"]

    # Look and load add CSV files
    for i in tqdm(os.listdir(raw_dataset_dir)):
        raw_dataset = pd.concat([pd.read_csv(raw_dataset_dir + i), raw_dataset])
    
    # Return raw dataset
    return raw_dataset

In [4]:
raw_dataset = read_raw_data(config_data)

100%|██████████| 1/1 [00:00<00:00, 26.60it/s]


In [5]:
# Check our data
raw_dataset

Unnamed: 0,app_id,due_date,paid_date,due_amount,paid_amount,monthly_income,housing_type,num_of_dependent,lama_bekerja,otr,status_pernikahan,pekerjaan,tenor,dp
0,1,2018-04-07,2018-04-06,Rp0.0,Rp12000000.0,Rp9295094.0,milik sendiri,3 orang,2 tahun,Rp800000000.0,Belum Nikah,Karyawan Swasta,60,Rp80000000.0
1,2,2018-02-07,2018-02-07,Rp0.0,Rp12200000.0,Rp9295094.0,milik sendiri,3 orang,2 tahun,Rp800000000.0,Belum Nikah,Karyawan Swasta,60,Rp68000000.0
2,3,2018-03-07,2018-03-06,Rp0.0,Rp12000000.0,Rp9295094.0,milik sendiri,3 orang,2 tahun,Rp800000000.0,Belum Nikah,Karyawan Swasta,60,Rp80000000.0
3,4,2018-05-07,2018-05-05,Rp0.0,Rp12000000.0,Rp9295094.0,milik sendiri,3 orang,2 tahun,Rp800000000.0,Belum Nikah,Karyawan Swasta,60,Rp80000000.0
4,5,2018-06-07,2018-06-06,Rp0.0,Rp12000000.0,Rp9295094.0,milik sendiri,3 orang,2 tahun,Rp800000000.0,Belum Nikah,Karyawan Swasta,60,Rp80000000.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9213,9214,2019-02-01,,Rp5230000.0,Rp0.0,Rp4250000.0,milik sendiri,3 orang,0 tahun,Rp200000000.0,Nikah,Wiraswasta,12,Rp200000000.0
9214,9215,2018-09-01,,Rp8140000.0,Rp0.0,Rp4250000.0,milik sendiri,3 orang,0 tahun,Rp200000000.0,Nikah,Wiraswasta,12,Rp200000000.0
9215,9216,2019-01-01,,Rp5780000.0,Rp0.0,Rp4250000.0,milik sendiri,3 orang,0 tahun,Rp200000000.0,Nikah,Wiraswasta,12,Rp200000000.0
9216,9217,2018-11-01,,Rp6880000.0,Rp0.0,Rp4250000.0,milik sendiri,3 orang,0 tahun,Rp200000000.0,Nikah,Wiraswasta,12,Rp200000000.0


In [6]:
# We found something here, at a glance:
# 1. Index only ranged from 0 to 154, we know that there are 1830 rows
# 2. Date only ranged from month 8 to 9, we know that our data ranged from month 1 to 12 

# We need more investigation about this

In [7]:
# Try to reset the index to solve first problem
raw_dataset.reset_index(inplace = True, drop = True)

In [8]:
# Now check the result
raw_dataset

Unnamed: 0,app_id,due_date,paid_date,due_amount,paid_amount,monthly_income,housing_type,num_of_dependent,lama_bekerja,otr,status_pernikahan,pekerjaan,tenor,dp
0,1,2018-04-07,2018-04-06,Rp0.0,Rp12000000.0,Rp9295094.0,milik sendiri,3 orang,2 tahun,Rp800000000.0,Belum Nikah,Karyawan Swasta,60,Rp80000000.0
1,2,2018-02-07,2018-02-07,Rp0.0,Rp12200000.0,Rp9295094.0,milik sendiri,3 orang,2 tahun,Rp800000000.0,Belum Nikah,Karyawan Swasta,60,Rp68000000.0
2,3,2018-03-07,2018-03-06,Rp0.0,Rp12000000.0,Rp9295094.0,milik sendiri,3 orang,2 tahun,Rp800000000.0,Belum Nikah,Karyawan Swasta,60,Rp80000000.0
3,4,2018-05-07,2018-05-05,Rp0.0,Rp12000000.0,Rp9295094.0,milik sendiri,3 orang,2 tahun,Rp800000000.0,Belum Nikah,Karyawan Swasta,60,Rp80000000.0
4,5,2018-06-07,2018-06-06,Rp0.0,Rp12000000.0,Rp9295094.0,milik sendiri,3 orang,2 tahun,Rp800000000.0,Belum Nikah,Karyawan Swasta,60,Rp80000000.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9213,9214,2019-02-01,,Rp5230000.0,Rp0.0,Rp4250000.0,milik sendiri,3 orang,0 tahun,Rp200000000.0,Nikah,Wiraswasta,12,Rp200000000.0
9214,9215,2018-09-01,,Rp8140000.0,Rp0.0,Rp4250000.0,milik sendiri,3 orang,0 tahun,Rp200000000.0,Nikah,Wiraswasta,12,Rp200000000.0
9215,9216,2019-01-01,,Rp5780000.0,Rp0.0,Rp4250000.0,milik sendiri,3 orang,0 tahun,Rp200000000.0,Nikah,Wiraswasta,12,Rp200000000.0
9216,9217,2018-11-01,,Rp6880000.0,Rp0.0,Rp4250000.0,milik sendiri,3 orang,0 tahun,Rp200000000.0,Nikah,Wiraswasta,12,Rp200000000.0


In [9]:
# Seems like problem number 1 has benn fixed

In [10]:
# Save raw dataset to file
util.pickle_dump(raw_dataset, config_data["raw_dataset_path"])

## 2. Data Definition

In [11]:
# Define data type, range of data and some explanation out data for each variable

## Penjelasan tiap kolom data:

1. **app_id:** ID aplikasi atau nomor identifikasi unik untuk setiap aplikasi pinjaman. (not used)

   data_type  = int

2. **due_date:** Tanggal jatuh tempo atau batas waktu pembayaran pinjaman.

   data_type  = datetime


3. **paid_date:** Tanggal ketika pembayaran berdasarkan `paid_amount` telah diterima oleh pemberi pinjaman.

   data_type  = datetime


4. **due_amount:** Sisa jumlah pinjaman uang yang belum dibayar.

   data_type  = int


5. **paid_amount:** Jumlah uang yang dibayarkan oleh peminjam pada tanggal pembayaran atau `paid_date`.

   data_type  = Object


6. **monthly_income:** Pendapatan bulanan dari peminjam.

   data_type  = int


7. **housing_type:** Tipe kepemilikan tempat tinggal oleh peminjam .

   data_type  = Object


8. **num_of_dependent:** Jumlah orang tanggungan atau anggota keluarga peminjam yang bergantung pada pendapatannya.

   data_type  = int


9.  **lama_bekerja:** Lama waktu (dalam tahun) peminjam telah bekerja di pekerjaan saat ini.


   data_type  = int

10. **otr:** atau "On The Road," yaitu harga total mobil termasuk semua biaya yang diperlukan untuk membuat mobil tersebut siap digunakan di jalan raya.

   data_type  = int


11. **status_pernikahan:** Status pernikahan peminjam.

   data_type  = int


12. **pekerjaan:** Jenis pekerjaan atau profesi peminjam.

   data_type  = int


13. **tenor:** Jangka waktu atau durasi pinjaman dalam bulan, menunjukkan berapa lama peminjam akan membayar pinjaman tersebut.

   data_type  = int


14. **dp:** "Down Payment" adalah jumlah uang muka yang telah dibayarkan oleh peminjam pada saat mengambil pinjaman.

   data_type  = int


## 3. Data Validation

### 3.1. Tipe Data

In [12]:
# Check data type each variable
raw_dataset.dtypes

app_id                int64
due_date             object
paid_date            object
due_amount           object
paid_amount          object
monthly_income       object
housing_type         object
num_of_dependent     object
lama_bekerja         object
otr                  object
status_pernikahan    object
pekerjaan            object
tenor                 int64
dp                   object
dtype: object

In [13]:
# The result shows us that all variable data type is object, which is not true. We need more investigation about it later

### 3.2. Range

In [14]:
# Check the range of data for each variable
raw_dataset.describe()

Unnamed: 0,app_id,tenor
count,9218.0,9218.0
mean,4609.5,33.315687
std,2661.151724,15.960389
min,1.0,12.0
25%,2305.25,24.0
50%,4609.5,36.0
75%,6913.75,36.0
max,9218.0,60.0


In [16]:
# This is messed up because the data type don't match with actual values for each variabel

### 3.3. Dimensi Data

In [17]:
# It will not be affected
raw_dataset.shape

(9218, 14)

### 3.4. Handling Variables Error

### 3.5. Handling Variabel "paid_date" dan "due_date"


In [18]:
# Mengubah kolom paid_date dan due_date ke tipe datetime
raw_dataset['paid_date'] = pd.to_datetime(raw_dataset['paid_date'] )
raw_dataset['due_date'] = pd.to_datetime(raw_dataset['due_date'] )

### 3.6 Handling Variabel `due_amount`, `paid_amount`, `monthly_income`, `otr`, `dp`, `num_of_dependent` dan `lama_bekerja` ke bentuk int


In [19]:
#Mengubah kolom `due_amount`, `paid_amount`, `monthly_income`, `otr`, `dp`, `num_of_dependent` dan `lama_bekerja` ke bentuk int
raw_dataset['num_of_dependent'] = raw_dataset['num_of_dependent'].str.replace(' orang', '').str.replace('.','').astype('int64')
raw_dataset['lama_bekerja'] = raw_dataset['lama_bekerja'].str.replace(' tahun', '').str.replace('.','').astype('int64')
raw_dataset['due_amount'] = raw_dataset['due_amount'].str.replace('Rp', '').str.replace('.','').astype('int64')
raw_dataset['paid_amount'] = raw_dataset['paid_amount'].str.replace('Rp', '').str.replace('.','').astype('int64')
raw_dataset['monthly_income'] = raw_dataset['monthly_income'].str.replace('Rp', '').str.replace('.','').astype('int64')
raw_dataset['otr'] = raw_dataset['otr'].str.replace('Rp', '').str.replace('.','').astype('int64')
raw_dataset['dp'] = raw_dataset['dp'].str.replace('Rp', '').str.replace('.','').astype('int64')

raw_dataset.info()



<class 'pandas.core.frame.DataFrame'>
RangeIndex: 9218 entries, 0 to 9217
Data columns (total 14 columns):
 #   Column             Non-Null Count  Dtype         
---  ------             --------------  -----         
 0   app_id             9218 non-null   int64         
 1   due_date           9218 non-null   datetime64[ns]
 2   paid_date          8190 non-null   datetime64[ns]
 3   due_amount         9218 non-null   int64         
 4   paid_amount        9218 non-null   int64         
 5   monthly_income     9218 non-null   int64         
 6   housing_type       9218 non-null   object        
 7   num_of_dependent   9218 non-null   int64         
 8   lama_bekerja       9218 non-null   int64         
 9   otr                9218 non-null   int64         
 10  status_pernikahan  9218 non-null   object        
 11  pekerjaan          9218 non-null   object        
 12  tenor              9218 non-null   int64         
 13  dp                 9218 non-null   int64         
dtypes: datet

### 3.7 Handling data labelling


In [20]:
 #Pertama kita coba buat kolom selisih hari yaitu `due_date - paid_date `
raw_dataset['selisih_hari'] = (raw_dataset['due_date'] - raw_dataset['paid_date']).dt.days

    # Membuat kolom baru dengan nilai 1 jika NPl dan 0 jika tidak
raw_dataset['NPL'] =  'Tidak'
raw_dataset.loc[raw_dataset['due_amount'] > 0, 'NPL'] = 'Ya'
raw_dataset.loc[(raw_dataset['due_amount'] == 0) & (raw_dataset['selisih_hari'] <= -90), 'NPL'] = 'Ya'
    # Remove column ['app_id','due_date','paid_date','due_amount','paid_amount','selisih_hari']
raw_dataset = raw_dataset.drop(columns=['app_id','due_date','paid_date','due_amount','paid_amount','selisih_hari'],axis=1)



### 3.8 Handling data labelling


In [21]:
# 6. # Handling skewed data with log Transformation
for col in config_data['columns_being_logtransformed']:
    col_log = f'{col}_log'
    raw_dataset[col_log] = raw_dataset[col].apply(lambda x: np.log(x+1))

### 3.9 Handling Outlier


In [22]:
def remove_outliers(df, col):
    """
    Menghapus outlier dari DataFrame berdasarkan metode IQR.
    
    Parameters:
        df (DataFrame): DataFrame yang akan dihapus outlier-nya.
    
    Returns:
        DataFrame: DataFrame baru yang telah dihapus outlier-nya.
    """
    Q1 = df[col].quantile(0.25)
    Q3 = df[col].quantile(0.75)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    df_no_outlier = df[(df[col] >= lower_bound) & (df[col] <= upper_bound)]
    return df_no_outlier

In [23]:
# 7. Handling Outlier
list_df = []
for col in raw_dataset.columns:
    if raw_dataset[col].dtype != 'O' and col != 'dp' and  col not in config_data['columns_being_logtransformed']:
        outlier_removed = remove_outliers(raw_dataset, col)
        list_df.append(outlier_removed)

# Ambil indeks dari DataFrame pertama dalam list_df
common_index = list_df[0].index

# Iterasi melalui DataFrame lainnya dalam list_df untuk mendapatkan indeks yang bersamaan
for df in list_df[1:]:
    common_index = common_index.intersection(df.index)

## 4. Data Defense

In [24]:
def check_data(input_data, params, api = False):
    input_data = copy.deepcopy(input_data)
    params = copy.deepcopy(params)

    if not api:
        # Check data types
        assert input_data.select_dtypes("object").columns.to_list() == \
            params["object_columns"], "an error occurs in object column(s)."
        assert input_data.select_dtypes("int").columns.to_list() == \
            params["int64_columns"], "an error occurs in int64_columns column(s)."
        assert input_data.select_dtypes("float").columns.to_list() == \
            params["float_columns"], "an error occurs in float_columns column(s)."
    else:
        # In case checking data from api
        # Predictor that has object dtype only stasiun
        object_columns = params["object_columns"]
        del object_columns[1:]

        # Max column not used as predictor
        int_columns = params["int64_columns"]
        del int_columns[-1]

        # Check data types
        assert input_data.select_dtypes("object").columns.to_list() == \
            object_columns, "an error occurs in object column(s)."
        assert input_data.select_dtypes("int").columns.to_list() == \
            int_columns, "an error occurs in int32 column(s)."

    assert set(input_data.housing_type).issubset(set(params["range_housing_type"])), \
        "an error occurs in housing_type range."
    assert set(input_data.status_pernikahan).issubset(set(params["range_status_pernikahan"])), \
        "an error occurs in status_pernikahan range."
    assert set(input_data.pekerjaan).issubset(set(params["range_pekerjaan"])), \
        "an error occurs in pekerjaan range."
    assert set(input_data.housing_type).issubset(set(params["range_housing_type"])), \
        "an error occurs in housing_type range."
    
    assert input_data.monthly_income.between(params["range_monthly_income"][0], params["range_monthly_income"][1]).sum() == \
        len(input_data), "an error occurs in monthly_income range."
    assert input_data.num_of_dependent.between(params["range_num_of_dependent"][0], params["range_num_of_dependent"][1]).sum() == \
        len(input_data), "an error occurs in num_of_dependent range."
    assert input_data.lama_bekerja.between(params["range_lama_bekerja"][0], params["range_lama_bekerja"][1]).sum() == \
        len(input_data), "an error occurs in lama_bekerja range."
    assert input_data.otr.between(params["range_otr"][0], params["range_otr"][1]).sum() == \
        len(input_data), "an error occurs in otr range."
    assert input_data.tenor.between(params["range_tenor"][0], params["range_tenor"][1]).sum() == \
        len(input_data), "an error occurs in tenor range."
    assert input_data.dp.between(params["range_dp"][0], params["range_dp"][1]).sum() == \
        len(input_data), "an error occurs in dp range."
    assert input_data.monthly_income_log.between(params["range_monthly_income_log"][0], params["range_monthly_income_log"][1]).sum() == \
        len(input_data), "an error occurs in monthly_income_log range."
    assert input_data.num_of_dependent_log.between(params["range_num_of_dependent_log"][0], params["range_num_of_dependent_log"][1]).sum() == \
        len(input_data), "an error occurs in num_of_dependent_log range."
    assert input_data.lama_bekerja_log.between(params["range_lama_bekerja_log"][0], params["range_lama_bekerja_log"][1]).sum() == \
        len(input_data), "an error occurs in lama_bekerja_log range."
    
# 8. Check data definition
    check_data(raw_dataset, config_data)

## 5. Data Splitting

### 5.1 Splitting input output


In [29]:
raw_dataset[config_data['label']].copy

<bound method NDFrame.copy of 0       Tidak
1       Tidak
2       Tidak
3       Tidak
4       Tidak
        ...  
9213       Ya
9214       Ya
9215       Ya
9216       Ya
9217       Ya
Name: NPL, Length: 9218, dtype: object>

In [30]:
# Split input/variable/feature with target/labet/output
x = raw_dataset[config_data["predictors"]].copy()
y = raw_dataset[config_data['label']]
y

0       Tidak
1       Tidak
2       Tidak
3       Tidak
4       Tidak
        ...  
9213       Ya
9214       Ya
9215       Ya
9216       Ya
9217       Ya
Name: NPL, Length: 9218, dtype: object

In [31]:
# First split, splitting train and test set with ratio 0.7:0.3 and do stratify splitting
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size = 0.3, random_state = 42, stratify = y)

In [32]:
# Second split, splitting test and valid set with ratio 0.5:0.5 and do stratify splitting
x_valid, x_test, y_valid, y_test = train_test_split(x_test, y_test, test_size = 0.5, random_state = 42, stratify = y_test)

In [33]:
#Save train, valid and test set
util.pickle_dump(x_train, config_data["train_set_path"][0])
util.pickle_dump(y_train, config_data["train_set_path"][1])

util.pickle_dump(x_valid, config_data["valid_set_path"][0])
util.pickle_dump(y_valid, config_data["valid_set_path"][1])

util.pickle_dump(x_test, config_data["test_set_path"][0])
util.pickle_dump(y_test, config_data["test_set_path"][1])