In [11]:
from glob import glob
import pandas as pd
import numpy as np
import datetime as dt
import jdatetime as jt
from persiantools import characters
from joblib import Parallel, delayed
import chardet

In [2]:
path = 'C:/Users/Sarv/Desktop/vs_code/Git/ETL-pipeline/data/raw/stock/'
trade_history_parquet_path='C:/Users/Sarv/Desktop/vs_code/Git/ETL-pipeline/data/processed/trade_history_h5/'

files = glob(path + '/*.csv')
trade_history_files=glob(trade_history_h5_path+'*.h5')

In [3]:
def clean_cols(df):
    dropped_cols = ['nTran','insCode', 'dEven', 'qTitNgJ', 'iSensVarP',
                    'pPhSeaCotJ', 'pPbSeaCotJ', 'iAnuTran',
                    'xqVarPJDrPRf', 'canceledCompony']
    df.insert(0,'trade_id',None)
    new_columns=['trade_id', 'time', 'vol', 'price',
                'national_id', 'date','canceled', 'namad']

    df.drop(columns=dropped_cols, inplace=True)
    df.columns=new_columns
    return df

In [4]:
def ar_to_fa(df):
    fa_cols = ['namad']
    df[fa_cols]=df[fa_cols].applymap(lambda x: characters.ar_to_fa(x))
    return df

In [5]:
def str_to_date(df):
    year = df.date.astype(str).str.slice_replace(4,)
    month = df.date.astype(str).str.slice_replace(start=6).str.slice_replace(stop=4)
    day = df.date.astype(str).str.slice_replace(stop=6,)
    date=day+'-'+month+'-'+year
    date = pd.to_datetime(date).dt.date
    df.drop(columns='date', inplace=True)
    df.insert(5,'date',date)
    return df

In [6]:
def date_to_jdate(df):
    jalalidate = df.date.apply(lambda x: jt.date.fromgregorian(date=x).strftime("%Y-%m-%d"))
    df.insert(6, 'jdate', jalalidate)
    return df

In [7]:
def str_to_time(df):
    df.time=df.time.astype(str).str.zfill(6)
    hour=df.time.str.slice_replace(2,)
    minute=df.time.str.slice_replace(start=4).str.slice_replace(stop=2)
    second=df.time.str.slice_replace(stop=4)
    time=hour+':'+minute+':'+second
    time=pd.to_datetime(time).dt.time
    df.drop(columns='time', inplace=True)
    df.insert(6, 'time', time)
    timestamp=df.date.astype(str) +" "+ df.time.astype(str)
    timestamp=pd.to_datetime(timestamp)
    df.insert(5,'datetime',timestamp)
    return df

In [8]:
def export_cleaned_files(df,count):
    path='C:/Users/Sarv/Desktop/vs_code/Git/ETL-pipeline/data/processed/trade_history_h5/'
    return df.to_hdf(path=path+f'trade_history_table_b{count}_cleaned.h5',
                     key='trade_history_table_b1_cleaned',
                     mode='a',
                     complevel=5,
                     index=False,format='table')

In [12]:
def pipeline(chunk):
    try:
        chunk.pipe(clean_cols).pipe(ar_to_fa).pipe(str_to_date).pipe(date_to_jdate).pipe(str_to_time)
        chunk.pipe(export_cleaned_files)
        count+=1
    except:
        print('failed')

In [13]:
# with open(trade_history_files[0], 'rb') as f:
#     result=chardet.detect(f.read())
chunk_size=50000
count=1
Parallel(n_jobs=4,verbose=10000)(delayed(pipeline)(chunk) for chunk in np.array_split(pd.read_hdf(trade_history_files[0],chunksize=chunk_size),50))

TypeError: can only use an iterator or chunksize on a table

In [None]:
# with open(trade_history_files[0], 'rb') as f:
#     result=chardet.detect(f.read())
#     chunk_size=50000
#     count=1
#     for chunk in pd.read_hdf(trade_history_files[0],encoding=result['encoding'],chunksize=chunk_size):
#         try:
#             chunk.pipe(clean_cols).pipe(ar_to_fa).pipe(str_to_date).pipe(date_to_jdate).pipe(str_to_time)
#             chunk.pipe(export_cleaned_files)
#             count+=1
#         except:
#             print('failed')