# Data Pipeline

In [1]:
from tqdm import tqdm
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np 
import joblib
import os
import yaml

In [2]:
params_dir = "config/params.yaml"

In [3]:
def load_params(param_dir):
    with open(param_dir, 'r') as file:
        params = yaml.safe_load(file)
        
    return params

In [4]:
params = load_params(params_dir)

In [5]:
params

{'dataset_dir': 'data/raw/',
 'datetime_columns': ['tanggal'],
 'int32_columns': ['pm10', 'pm25', 'so2', 'co', 'o3', 'no2', 'max'],
 'label': 'categori',
 'label_categories': ['BAIK', 'SEDANG', 'TIDAK SEHAT'],
 'label_categories_new': ['BAIK', 'TIDAK BAIK'],
 'missing_value_co': 11,
 'missing_value_no2': 18,
 'missing_value_o3': 29,
 'missing_value_pm10': {'BAIK': 28, 'TIDAK BAIK': 55},
 'missing_value_pm25': {'BAIK': 38, 'TIDAK BAIK': 82},
 'missing_value_so2': 35,
 'object_columns': ['stasiun', 'critical', 'categori'],
 'predictors': ['stasiun', 'pm10', 'pm25', 'so2', 'co', 'o3', 'no2'],
 'range_co': [-1, 100],
 'range_no2': [-1, 100],
 'range_o3': [-1, 160],
 'range_pm10': [-1, 800],
 'range_pm25': [-1, 400],
 'range_so2': [-1, 500],
 'range_stasiun': ['DKI1 (Bunderan HI)',
  'DKI2 (Kelapa Gading)',
  'DKI3 (Jagakarsa)',
  'DKI4 (Lubang Buaya)',
  'DKI5 (Kebon Jeruk) Jakarta Barat']}

## 1. Data Collection

In [6]:
# function memuat file dan menggabungkan dataset
def read_dataset(dataset_dir):
    dataset = pd.DataFrame()

    for i in tqdm(os.listdir(dataset_dir)):
        dataset = pd.concat([pd.read_csv(dataset_dir + i), dataset])
    
    return dataset

In [7]:
# read path dataset yg ada di file config
dataset = read_dataset(params["dataset_dir"])

100%|█████████████████████████████████████████████████████████████████████████████████| 12/12 [00:00<00:00, 223.45it/s]


In [15]:
%ls data\raw

 Volume in drive C is OS
 Volume Serial Number is AE91-FD4F

 Directory of C:\Users\ASUS\Intro to ML & ML Process\Data Pipeline\data\raw

29/09/2023  22:35    <DIR>          .
29/09/2023  19:44    <DIR>          ..
29/09/2023  19:34            10.597 indeks-standar-pencemar-udara-di-spku-bulan-agustus-tahun-2021.csv
29/09/2023  19:34            10.234 indeks-standar-pencemar-udara-di-spku-bulan-april-tahun-2021.csv
29/09/2023  19:34            10.458 indeks-standar-pencemar-udara-di-spku-bulan-desember-tahun-2021.csv
29/09/2023  19:34             9.304 indeks-standar-pencemar-udara-di-spku-bulan-februari-tahun-2021.csv
29/09/2023  19:34            10.018 indeks-standar-pencemar-udara-di-spku-bulan-januari-tahun-2021.csv
29/09/2023  22:35            10.842 indeks-standar-pencemar-udara-di-spku-bulan-juli-tahun-2021.csv
29/09/2023  22:35            10.205 indeks-standar-pencemar-udara-di-spku-bulan-juni-tahun-2021.csv
29/09/2023  19:34            10.498 indeks-standar-pencemar-udara-di-s

In [9]:
# cek kondisi dataset
dataset

Unnamed: 0,tanggal,stasiun,pm10,pm25,so2,co,o3,no2,max,critical,categori
0,2021-09-01,DKI1 (Bunderan HI),63,88,29,15,24,38,88,PM25,SEDANG
1,2021-09-02,DKI1 (Bunderan HI),60,83,29,11,30,28,83,PM25,SEDANG
2,2021-09-03,DKI1 (Bunderan HI),60,82,27,11,37,30,82,PM25,SEDANG
3,2021-09-04,DKI1 (Bunderan HI),58,77,26,10,31,28,77,PM25,SEDANG
4,2021-09-05,DKI1 (Bunderan HI),63,85,27,11,28,28,85,PM25,SEDANG
...,...,...,...,...,...,...,...,...,...,...,...
150,2021-08-27,DKI5 (Kebon Jeruk) Jakarta Barat,61,96,34,8,29,15,96,PM25,SEDANG
151,2021-08-28,DKI5 (Kebon Jeruk) Jakarta Barat,63,100,31,8,44,12,100,PM25,SEDANG
152,2021-08-29,DKI5 (Kebon Jeruk) Jakarta Barat,67,111,32,10,36,13,111,PM25,TIDAK SEHAT
153,2021-08-30,DKI5 (Kebon Jeruk) Jakarta Barat,83,126,35,16,32,29,126,PM25,TIDAK SEHAT


In [10]:
# reset index untuk mengatasi poin pertama
dataset.reset_index(inplace = True, drop = True)

In [11]:
dataset

Unnamed: 0,tanggal,stasiun,pm10,pm25,so2,co,o3,no2,max,critical,categori
0,2021-09-01,DKI1 (Bunderan HI),63,88,29,15,24,38,88,PM25,SEDANG
1,2021-09-02,DKI1 (Bunderan HI),60,83,29,11,30,28,83,PM25,SEDANG
2,2021-09-03,DKI1 (Bunderan HI),60,82,27,11,37,30,82,PM25,SEDANG
3,2021-09-04,DKI1 (Bunderan HI),58,77,26,10,31,28,77,PM25,SEDANG
4,2021-09-05,DKI1 (Bunderan HI),63,85,27,11,28,28,85,PM25,SEDANG
...,...,...,...,...,...,...,...,...,...,...,...
1820,2021-08-27,DKI5 (Kebon Jeruk) Jakarta Barat,61,96,34,8,29,15,96,PM25,SEDANG
1821,2021-08-28,DKI5 (Kebon Jeruk) Jakarta Barat,63,100,31,8,44,12,100,PM25,SEDANG
1822,2021-08-29,DKI5 (Kebon Jeruk) Jakarta Barat,67,111,32,10,36,13,111,PM25,TIDAK SEHAT
1823,2021-08-30,DKI5 (Kebon Jeruk) Jakarta Barat,83,126,35,16,32,29,126,PM25,TIDAK SEHAT


In [12]:
# simpan dataset yang telah digabungkan
joblib.dump(dataset, "data/processed/dataset.pkl")

['data/processed/dataset.pkl']

In [13]:
!ls data/processed

dataset.pkl


## 2. Data Definition

## 3. Data Validation

### 3.1. Tipe Data

In [16]:
# cek tipe data
dataset.dtypes

tanggal     object
stasiun     object
pm10        object
pm25        object
so2         object
co          object
o3          object
no2         object
max         object
critical    object
categori    object
dtype: object

### 3.2. Range

In [17]:
# pengecekan cakupan data menjadi kacau jika tipe data tidak sesuai
dataset.describe()

Unnamed: 0,tanggal,stasiun,pm10,pm25,so2,co,o3,no2,max,critical,categori
count,1825,1825,1825,1763,1825,1825,1825,1825,1825,1809,1824
unique,365,5,130,192,121,54,121,96,187,6,4
top,2021-09-01,DKI1 (Bunderan HI),51,77,---,9,---,13,77,PM25,SEDANG
freq,5,365,69,45,114,163,68,81,50,1630,1349


### 3.3. Dimensi Data

In [18]:
# dimensi data kemungkinan besar tidak terpengaruh, namun nanti kita kembali lagi
dataset.shape

(1825, 11)

### 3.4. Handling Columns Error:

### 3.5. Handing Column "Tanggal"

In [19]:
# cek tipe data pada kolom tanggal
dataset.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1825 entries, 0 to 1824
Data columns (total 11 columns):
 #   Column    Non-Null Count  Dtype 
---  ------    --------------  ----- 
 0   tanggal   1825 non-null   object
 1   stasiun   1825 non-null   object
 2   pm10      1825 non-null   object
 3   pm25      1763 non-null   object
 4   so2       1825 non-null   object
 5   co        1825 non-null   object
 6   o3        1825 non-null   object
 7   no2       1825 non-null   object
 8   max       1825 non-null   object
 9   critical  1809 non-null   object
 10  categori  1824 non-null   object
dtypes: object(11)
memory usage: 157.0+ KB


In [20]:
# casting tipe data ke datetime
dataset.tanggal = pd.to_datetime(dataset.tanggal)

In [21]:
# cek perubahan tipe data untuk kolom tanggal
dataset.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1825 entries, 0 to 1824
Data columns (total 11 columns):
 #   Column    Non-Null Count  Dtype         
---  ------    --------------  -----         
 0   tanggal   1825 non-null   datetime64[ns]
 1   stasiun   1825 non-null   object        
 2   pm10      1825 non-null   object        
 3   pm25      1763 non-null   object        
 4   so2       1825 non-null   object        
 5   co        1825 non-null   object        
 6   o3        1825 non-null   object        
 7   no2       1825 non-null   object        
 8   max       1825 non-null   object        
 9   critical  1809 non-null   object        
 10  categori  1824 non-null   object        
dtypes: datetime64[ns](1), object(10)
memory usage: 157.0+ KB


### 3.6. Handling Column "PM10"

In [22]:
# terlihat tidak ada masalah pada kolom pm10
dataset

Unnamed: 0,tanggal,stasiun,pm10,pm25,so2,co,o3,no2,max,critical,categori
0,2021-09-01,DKI1 (Bunderan HI),63,88,29,15,24,38,88,PM25,SEDANG
1,2021-09-02,DKI1 (Bunderan HI),60,83,29,11,30,28,83,PM25,SEDANG
2,2021-09-03,DKI1 (Bunderan HI),60,82,27,11,37,30,82,PM25,SEDANG
3,2021-09-04,DKI1 (Bunderan HI),58,77,26,10,31,28,77,PM25,SEDANG
4,2021-09-05,DKI1 (Bunderan HI),63,85,27,11,28,28,85,PM25,SEDANG
...,...,...,...,...,...,...,...,...,...,...,...
1820,2021-08-27,DKI5 (Kebon Jeruk) Jakarta Barat,61,96,34,8,29,15,96,PM25,SEDANG
1821,2021-08-28,DKI5 (Kebon Jeruk) Jakarta Barat,63,100,31,8,44,12,100,PM25,SEDANG
1822,2021-08-29,DKI5 (Kebon Jeruk) Jakarta Barat,67,111,32,10,36,13,111,PM25,TIDAK SEHAT
1823,2021-08-30,DKI5 (Kebon Jeruk) Jakarta Barat,83,126,35,16,32,29,126,PM25,TIDAK SEHAT


In [23]:
# namun kolom pm10 bukanlah bertipe integer
dataset.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1825 entries, 0 to 1824
Data columns (total 11 columns):
 #   Column    Non-Null Count  Dtype         
---  ------    --------------  -----         
 0   tanggal   1825 non-null   datetime64[ns]
 1   stasiun   1825 non-null   object        
 2   pm10      1825 non-null   object        
 3   pm25      1763 non-null   object        
 4   so2       1825 non-null   object        
 5   co        1825 non-null   object        
 6   o3        1825 non-null   object        
 7   no2       1825 non-null   object        
 8   max       1825 non-null   object        
 9   critical  1809 non-null   object        
 10  categori  1824 non-null   object        
dtypes: datetime64[ns](1), object(10)
memory usage: 157.0+ KB


In [26]:
# sanity check
dataset[(dataset.eq("-1").any(1)) | (dataset.eq(-1).any(1))] 

Unnamed: 0,tanggal,stasiun,pm10,pm25,so2,co,o3,no2,max,critical,categori


In [27]:
# replace value pm10 yang bertipe string dengan -1
dataset.pm10 = dataset.pm10.replace("---", -1).astype(int)

In [28]:
# cek tipe data
dataset.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1825 entries, 0 to 1824
Data columns (total 11 columns):
 #   Column    Non-Null Count  Dtype         
---  ------    --------------  -----         
 0   tanggal   1825 non-null   datetime64[ns]
 1   stasiun   1825 non-null   object        
 2   pm10      1825 non-null   int32         
 3   pm25      1763 non-null   object        
 4   so2       1825 non-null   object        
 5   co        1825 non-null   object        
 6   o3        1825 non-null   object        
 7   no2       1825 non-null   object        
 8   max       1825 non-null   object        
 9   critical  1809 non-null   object        
 10  categori  1824 non-null   object        
dtypes: datetime64[ns](1), int32(1), object(9)
memory usage: 149.8+ KB


### 3.7. Handling Column "PM25"

In [29]:
# cek tipe data
dataset.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1825 entries, 0 to 1824
Data columns (total 11 columns):
 #   Column    Non-Null Count  Dtype         
---  ------    --------------  -----         
 0   tanggal   1825 non-null   datetime64[ns]
 1   stasiun   1825 non-null   object        
 2   pm10      1825 non-null   int32         
 3   pm25      1763 non-null   object        
 4   so2       1825 non-null   object        
 5   co        1825 non-null   object        
 6   o3        1825 non-null   object        
 7   no2       1825 non-null   object        
 8   max       1825 non-null   object        
 9   critical  1809 non-null   object        
 10  categori  1824 non-null   object        
dtypes: datetime64[ns](1), int32(1), object(9)
memory usage: 149.8+ KB


In [31]:
dataset.pm25.isna().sum()

62

In [32]:
# replacing NaN dengan -1
dataset.pm25.fillna(-1, inplace = True)

In [33]:
dataset.pm25 = dataset.pm25.replace("---", -1).astype(int)

In [34]:
# cek tipe data
dataset.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1825 entries, 0 to 1824
Data columns (total 11 columns):
 #   Column    Non-Null Count  Dtype         
---  ------    --------------  -----         
 0   tanggal   1825 non-null   datetime64[ns]
 1   stasiun   1825 non-null   object        
 2   pm10      1825 non-null   int32         
 3   pm25      1825 non-null   int32         
 4   so2       1825 non-null   object        
 5   co        1825 non-null   object        
 6   o3        1825 non-null   object        
 7   no2       1825 non-null   object        
 8   max       1825 non-null   object        
 9   critical  1809 non-null   object        
 10  categori  1824 non-null   object        
dtypes: datetime64[ns](1), int32(2), object(8)
memory usage: 142.7+ KB


### 3.8. Handling Column "SO2"

In [36]:
dataset.so2 = dataset.so2.replace("---", -1).astype(int)

### 3.9. Handling Column "CO"

In [38]:
dataset.co = dataset.co.replace("---", -1).astype(int)

### 3.10. Handling Column "O3"

In [40]:
dataset.o3 = dataset.o3.replace("---", -1).astype(int)

### 3.11. Handling Column "NO2"

In [42]:
dataset.no2 = dataset.no2.replace("---", -1).astype(int)

### 3.12. Handling Column "Max"

In [43]:
dataset["max"] = dataset["max"].astype(int)

ValueError: invalid literal for int() with base 10: 'PM25'

In [44]:
# perlu kita perhatikan lebih detail
dataset[dataset["max"] == "PM25"]

Unnamed: 0,tanggal,stasiun,pm10,pm25,so2,co,o3,no2,max,critical,categori
1367,2021-12-03,DKI1 (Bunderan HI),49,31,9,19,7,49,PM25,BAIK,


In [45]:
# fix the problem
dataset.loc[1367, "max"] = 49
dataset.loc[1367, "critical"] = "PM10"
dataset.loc[1367, "categori"] = "BAIK"

In [46]:
# cek ulang hasilnya
dataset[dataset["max"] == "PM25"]

Unnamed: 0,tanggal,stasiun,pm10,pm25,so2,co,o3,no2,max,critical,categori


In [47]:
dataset.loc[1367]

tanggal     2021-12-03 00:00:00
stasiun      DKI1 (Bunderan HI)
pm10                         49
pm25                         31
so2                           9
co                           19
o3                            7
no2                          49
max                          49
critical                   PM10
categori                   BAIK
Name: 1367, dtype: object

In [48]:
dataset["max"] = dataset["max"].astype(int)

In [49]:
dataset.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1825 entries, 0 to 1824
Data columns (total 11 columns):
 #   Column    Non-Null Count  Dtype         
---  ------    --------------  -----         
 0   tanggal   1825 non-null   datetime64[ns]
 1   stasiun   1825 non-null   object        
 2   pm10      1825 non-null   int32         
 3   pm25      1825 non-null   int32         
 4   so2       1825 non-null   int32         
 5   co        1825 non-null   int32         
 6   o3        1825 non-null   int32         
 7   no2       1825 non-null   int32         
 8   max       1825 non-null   int32         
 9   critical  1809 non-null   object        
 10  categori  1825 non-null   object        
dtypes: datetime64[ns](1), int32(7), object(3)
memory usage: 107.1+ KB


### 3.13. Handling Column "Critical"

In [50]:
# cek data unik pada kolom kategorik "critical"
dataset.critical.value_counts()

PM25    1630
PM10      64
O3        56
CO        34
SO2       25
Name: critical, dtype: int64

### 3.14. Handling Column "Categori"

In [52]:
# cek data unik untuk kolom kategorik "categori" yang merupakan data label atau dependen variabel
dataset.categori.value_counts()

SEDANG            1349
TIDAK SEHAT        272
BAIK               188
TIDAK ADA DATA      16
Name: categori, dtype: int64

In [53]:
# terdapat data "TIDAK ADA DATA" yang mengindikasikan null value
# bisa langsung hapus (drop)

In [54]:
dataset.drop(index = dataset[dataset.categori == "TIDAK ADA DATA"].index, inplace = True)

In [55]:
dataset.categori.value_counts()

SEDANG         1349
TIDAK SEHAT     272
BAIK            188
Name: categori, dtype: int64

In [56]:
# semua tipe data sudah sesuai, bisa disimpan agar nanti dapat digunakan kembali

In [57]:
joblib.dump(dataset, "data/processed/dataset_clean.pkl")

['data/processed/dataset_clean.pkl']

## 4. Data Defense

In [59]:
def check_data(input_data, params):
    # check data types
    assert input_data.select_dtypes("datetime").columns.to_list() == params["datetime_columns"], "an error occurs in datetime column(s)."
    assert input_data.select_dtypes("object").columns.to_list() == params["object_columns"], "an error occurs in object column(s)."
    assert input_data.select_dtypes("int").columns.to_list() == params["int32_columns"], "an error occurs in int32 column(s)."

    # check range of data
    assert set(input_data.stasiun).issubset(set(params["range_stasiun"])), "an error occurs in stasiun range."
    assert input_data.pm10.between(params["range_pm10"][0], params["range_pm10"][1]).sum() == len(input_data), "an error occurs in pm10 range."
    assert input_data.pm25.between(params["range_pm25"][0], params["range_pm25"][1]).sum() == len(input_data), "an error occurs in pm25 range."
    assert input_data.so2.between(params["range_so2"][0], params["range_so2"][1]).sum() == len(input_data), "an error occurs in so2 range."
    assert input_data.co.between(params["range_co"][0], params["range_co"][1]).sum() == len(input_data), "an error occurs in co range."
    assert input_data.o3.between(params["range_o3"][0], params["range_o3"][1]).sum() == len(input_data), "an error occurs in o3 range."
    assert input_data.no2.between(params["range_no2"][0], params["range_no2"][1]).sum() == len(input_data), "an error occurs in no2 range."

In [60]:
# jika tidak ada error berarti data sudah sesuai dengan desain
check_data(dataset, params)

## 5. Data Splitting

In [61]:
params['label']

'categori'

In [69]:
params["predictors"]

['stasiun', 'pm10', 'pm25', 'so2', 'co', 'o3', 'no2']

In [63]:
# pisahkan data x dan y (x adalah fitur, y adalah label)
x = dataset[params["predictors"]].copy()
y = dataset[params["label"]].copy()

In [64]:
x.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 1809 entries, 0 to 1824
Data columns (total 7 columns):
 #   Column   Non-Null Count  Dtype 
---  ------   --------------  ----- 
 0   stasiun  1809 non-null   object
 1   pm10     1809 non-null   int32 
 2   pm25     1809 non-null   int32 
 3   so2      1809 non-null   int32 
 4   co       1809 non-null   int32 
 5   o3       1809 non-null   int32 
 6   no2      1809 non-null   int32 
dtypes: int32(6), object(1)
memory usage: 70.7+ KB


In [65]:
y.value_counts()

SEDANG         1349
TIDAK SEHAT     272
BAIK            188
Name: categori, dtype: int64

In [66]:
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size = 0.3, random_state = 42, stratify = y)

In [67]:
# dari data test displit lagi jd validation dan test
x_valid, x_test, y_valid, y_test = train_test_split(x_test, y_test, test_size = 0.5, random_state = 42, stratify = y_test)

In [68]:
# data yg sdh bersih dan displit td disimpan dlm format pkl
joblib.dump(x_train, "data/processed/x_train.pkl")
joblib.dump(y_train, "data/processed/y_train.pkl")
joblib.dump(x_valid, "data/processed/x_valid.pkl")
joblib.dump(y_valid, "data/processed/y_valid.pkl")
joblib.dump(x_test, "data/processed/x_test.pkl")
joblib.dump(y_test, "data/processed/y_test.pkl")

['data/processed/y_test.pkl']