# Data Pipeline

## 0. Load Required Libraries

In [1]:
from tqdm import tqdm
import pandas as pd
import numpy as np
import yfinance as yf
from datetime import date
import joblib
import os
import yaml
import src.util as util

## 1. Load Configuration File

In [2]:
def read_raw_data(config: dict) -> pd.DataFrame:
    # Load and define stock ticker list at IDX
    stock_list = pd.read_excel(config['raw_dataset_dir'])

    # Add new column with a value suitable to ticker name at yfinance
    stock_list['ticker.jk'] = stock_list['Kode'] + config['ticker_ext']

    # Take only the needed column and change it from df to list
    ticker_list = stock_list['ticker.jk'].tolist()
    
    # Define the date range parameter
    start_date = config['start_date']
    end_date = date.today()
    interval = config['interval_date']

    # Download stock data from yfinance
    stock_data = {}
    for ticker in tqdm(ticker_list, desc='Downloading stock data'):
        stock_data[ticker] = yf.download(ticker, start=start_date, end=end_date, interval=interval, progress=False)

    # Convert the dictionary to a pandas DataFrame with a MultiIndex
    dataset = pd.concat(stock_data, axis=1)

    # re adjust the table only to show the required column (adj. closing price)
    dataset = dataset[dataset.columns[4::6]]
    dataset.columns = dataset.columns.droplevel(1)

    # return raw dataset
    return dataset

In [3]:
config_data = util.load_config()
raw_dataset =read_raw_data(config_data)

Downloading stock data:   4%|▍         | 32/853 [00:03<01:34,  8.65it/s]


1 Failed download:
- TRIL.JK: No timezone found, symbol may be delisted


Downloading stock data:   9%|▉         | 81/853 [00:10<01:27,  8.83it/s]


1 Failed download:
- ASMI.JK: No data found for this date range, symbol may be delisted


Downloading stock data:  28%|██▊       | 236/853 [00:33<01:27,  7.07it/s]


1 Failed download:
- HDTX.JK: No timezone found, symbol may be delisted


Downloading stock data:  44%|████▎     | 373/853 [00:54<01:04,  7.45it/s]


1 Failed download:
- NIPS.JK: No timezone found, symbol may be delisted


Downloading stock data:  55%|█████▌    | 471/853 [01:09<00:49,  7.64it/s]


1 Failed download:
- SUGI.JK: No timezone found, symbol may be delisted


Downloading stock data:  57%|█████▋    | 487/853 [01:12<00:56,  6.49it/s]


1 Failed download:
- TRIO.JK: No timezone found, symbol may be delisted


Downloading stock data: 100%|██████████| 853/853 [01:58<00:00,  7.17it/s]


In [4]:
raw_dataset

Unnamed: 0_level_0,PACK.JK,VAST.JK,CHIP.JK,HALO.JK,KING.JK,PGEO.JK,FUTR.JK,HILL.JK,BDKR.JK,PTMP.JK,...,CBPE.JK,SUNI.JK,CBRE.JK,WINE.JK,BMBL.JK,PEVE.JK,LAJU.JK,FWCT.JK,NAYZ.JK,IRSX.JK
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2023-02-08,146.0,101.0,176.0,117.0,,,,,,,...,148.0,296.0,90.0,372.0,68.0,204.0,173.0,112.0,73.0,101.0
2023-02-09,132.0,96.0,193.0,118.0,,,,,,,...,149.0,298.0,84.0,368.0,64.0,200.0,206.0,128.0,75.0,100.0
2023-02-10,124.0,120.0,212.0,131.0,,,,,,,...,147.0,306.0,88.0,344.0,65.0,200.0,244.0,130.0,72.0,93.0
2023-02-13,112.0,112.0,232.0,122.0,,,,,,,...,152.0,300.0,85.0,324.0,65.0,199.0,228.0,144.0,76.0,94.0
2023-02-14,102.0,108.0,230.0,119.0,,,,,,,...,149.0,312.0,80.0,304.0,65.0,208.0,214.0,134.0,81.0,95.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2021-07-20,,,,,,,,,,,...,,,,,,,,,,
2022-05-03,,,,,,,,,,,...,,,,,,,,,,
2022-05-04,,,,,,,,,,,...,,,,,,,,,,
2022-05-05,,,,,,,,,,,...,,,,,,,,,,


In [5]:
# reset the index, so the date will be the column and sort it accordingly
raw_dataset = raw_dataset.sort_index(ascending=False)
#raw_dataset = raw_dataset.reset_index()

# Delete unrequired rows & columns where all its value is NaN
raw_dataset.dropna(axis=0, how='all', inplace=True)
raw_dataset.dropna(axis=1, how='all', inplace=True)

# Replace NaN value with 0
raw_dataset.fillna(0, inplace=True)

# set date as the index of the dataset
raw_dataset

Unnamed: 0,Date,PACK.JK,VAST.JK,CHIP.JK,HALO.JK,KING.JK,PGEO.JK,FUTR.JK,HILL.JK,BDKR.JK,...,CBPE.JK,SUNI.JK,CBRE.JK,WINE.JK,BMBL.JK,PEVE.JK,LAJU.JK,FWCT.JK,NAYZ.JK,IRSX.JK
0,2023-04-06,162.0,110.0,1315.0,300.0,260.0,650.0,102.0,1745.0,256.0,...,148.0,298.0,60.0,660.0,53.0,181.0,328.0,96.0,110.0,130.0
1,2023-04-05,160.0,112.0,1460.0,286.0,258.0,665.0,109.0,1875.0,256.0,...,151.0,302.0,57.0,705.0,58.0,173.0,332.0,97.0,113.0,139.0
2,2023-04-04,164.0,114.0,1460.0,260.0,256.0,660.0,115.0,1990.0,258.0,...,149.0,296.0,56.0,565.0,64.0,182.0,312.0,99.0,105.0,140.0
3,2023-04-03,154.0,114.0,1460.0,220.0,264.0,655.0,121.0,2030.0,258.0,...,149.0,294.0,54.0,585.0,71.0,157.0,318.0,95.0,100.0,140.0
4,2023-03-31,155.0,112.0,1460.0,212.0,252.0,695.0,109.0,2020.0,256.0,...,150.0,292.0,55.0,550.0,72.0,161.0,326.0,98.0,96.0,119.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
837,2020-01-08,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
838,2020-01-07,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
839,2020-01-06,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
840,2020-01-03,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [6]:
# Save raw dataset to file
util.pickle_dump(raw_dataset, config_data['raw_dataset_path'])

## 2. Data Definition

## 3. Data Validation

### 3.1 Data type

In [7]:
# Chek data type each variable
raw_dataset.dtypes

Date       datetime64[ns]
PACK.JK           float64
VAST.JK           float64
CHIP.JK           float64
HALO.JK           float64
                ...      
PEVE.JK           float64
LAJU.JK           float64
FWCT.JK           float64
NAYZ.JK           float64
IRSX.JK           float64
Length: 848, dtype: object

In [8]:
# since there are a lot of columns, need to check the dtypes in summarry
# showing that there are only 2 dtypes, datetime & float.
raw_dataset.dtypes.value_counts()

float64           847
datetime64[ns]      1
dtype: int64

### 3.2 Data Range

In [9]:
raw_dataset.describe()

Unnamed: 0,PACK.JK,VAST.JK,CHIP.JK,HALO.JK,KING.JK,PGEO.JK,FUTR.JK,HILL.JK,BDKR.JK,PTMP.JK,...,CBPE.JK,SUNI.JK,CBRE.JK,WINE.JK,BMBL.JK,PEVE.JK,LAJU.JK,FWCT.JK,NAYZ.JK,IRSX.JK
count,842.0,842.0,842.0,842.0,842.0,842.0,842.0,842.0,842.0,842.0,...,842.0,842.0,842.0,842.0,842.0,842.0,842.0,842.0,842.0,842.0
mean,5.920428,4.915677,36.976247,7.295724,6.960808,26.152019,4.11639,54.893112,6.91924,4.150831,...,11.13658,21.565321,6.160333,24.36342,5.239905,11.418052,15.503563,6.061758,4.488124,5.23753
std,27.030957,22.115119,192.523839,34.751938,35.334207,141.7507,22.796327,316.546917,41.342922,25.757486,...,39.571595,77.232365,24.329715,91.596182,20.193698,45.331083,66.149782,25.782839,19.729936,23.674178
min,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
25%,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
50%,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
75%,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
max,166.0,120.0,1460.0,300.0,264.0,875.0,163.0,2140.0,268.0,212.0,...,192.0,320.0,195.0,705.0,170.0,244.0,398.0,159.0,113.0,170.0


In [10]:
# Check data statistics, since the column qty is a lot, then we summarize the describe feature in following function
def raw_dataset_describe(dataset):

    # Get the date index statistic info
    date_df = pd.Series(dataset.index).describe(include='datetime64', datetime_is_numeric=True)

    df = dataset.describe()
    # Get the minimum value for each row across all columns and convert it to a DataFrame
    row_min_df = df.min(axis=1).to_frame('Min')

    # Get the maximum value for each row across all columns and convert it to a DataFrame
    row_max_df = df.max(axis=1).to_frame('Max')

    # Concatenate the min and max DataFrames horizontally
    result_df = pd.concat([date_df, row_min_df, row_max_df], axis=1)


    return result_df

In [11]:
# Call Dataset statistics
raw_dataset_describe(raw_dataset)

Unnamed: 0,Date,Min,Max
count,842,842.0,842.0
mean,2021-08-20 12:30:47.030878720,-10646.082446,831649.3
min,2020-01-02 00:00:00,-16000.001953,0.0
25%,2020-10-26 06:00:00,-16000.001953,26631.25
50%,2021-08-21 12:00:00,-16000.001953,35850.0
75%,2022-06-15 18:00:00,-16000.001953,39900.0
max,2023-04-06 00:00:00,50.0,5000000.0
std,,8.970304,1862592.0


### 3.3 Data Dimension

In [12]:
raw_dataset.shape

(842, 848)

## 4. Data Defense

In [13]:
def check_data(input_data, params, print_errors=True):

    error_messages = []
    error_stock_tickers = []
    try:
        # Check data types
        assert input_data.index.dtype == params['datetime_index'], 'an error occurs in index format, should be datetime.'

        # Check data range
        for column in input_data.columns[1:]:
            if input_data[column].dtype != 'float64':
                error_messages.append(f"Column ({column}) has a non-float data type")
                error_stock_tickers.append(column)

            if not (input_data[column] >= 0).sum() == len(input_data):
                error_messages.append(f'an error occurs in {column} column')
                if column not in error_stock_tickers:
                    error_stock_tickers.append(column)
        
        if error_messages:
            total_errors = len(error_messages)
            error_summary = f"\nTotal errors: {total_errors} errors out of {len(input_data.columns)}\n"
            raise AssertionError(error_summary + "\n".join(error_messages))
    
    except AssertionError as e:
        if print_errors:
            print(e)
    
    return error_stock_tickers



In [14]:
# Check the error stock in the dataset
check_data(raw_dataset, config_data)


Total errors: 1 errors out of 848
an error occurs in SCPI.JK column


['SCPI.JK']

In [15]:
# Found error in stock above (SCPI.JK), which after checking through news and yfinance data,
# it is already delisted since 2013. So this stock should be removed since it is considered
# as an anomaly.
error_stock_tickers = check_data(raw_dataset, config_data, print_errors=False)
raw_dataset.drop(error_stock_tickers, axis=1, inplace=True)

In [16]:
# Recheck the data and found no issue
check_data(raw_dataset, config_data)

[]

## 5. Data Splitting

In [19]:
# Split input/ variable/ feature with target/ output
x = raw_dataset.drop(columns=config_data['target']).copy()
y = raw_dataset[config_data['target']].copy()

In [24]:
x.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 842 entries, 0 to 841
Columns: 846 entries, Date to IRSX.JK
dtypes: datetime64[ns](1), float64(845)
memory usage: 5.4 MB


In [26]:
y.info()

<class 'pandas.core.series.Series'>
RangeIndex: 842 entries, 0 to 841
Series name: BMRI.JK
Non-Null Count  Dtype  
--------------  -----  
842 non-null    float64
dtypes: float64(1)
memory usage: 6.7 KB


In [None]:
# First Split between train & test with ratio 0.7:0.3