In [1]:
%%writefile utility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    for col in expected_col:
        if col not in df.columns:
            return 0
    return 1
    # if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
    #     print("column name and column length validation passed")
    #     return 1
    # else:
    #     print("column name and column length validation failed")
    #     mismatched_columns_file = list(set(df.columns).difference(expected_col))
    #     print("Following File columns are not in the YAML file",mismatched_columns_file)
    #     missing_YAML_file = list(set(expected_col).difference(df.columns))
    #     print("Following YAML columns are not in the file uploaded",missing_YAML_file)
    #     logging.info(f'df columns: {df.columns}')
    #     logging.info(f'expected columns: {expected_col}')
    #     return 0

Overwriting utility.py


In [2]:
%%writefile file.yaml
file_type: csv
dataset_name: nifty100
file_name: test_data
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - date
    - open
    - high
    - low
    - close
    - volume

Overwriting file.yaml


In [3]:
import utility
config_data = utility.read_config_file("file.yaml")

In [4]:
config_data

{'file_type': 'csv',
 'dataset_name': 'nifty100',
 'file_name': 'test_data',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['date', 'open', 'high', 'low', 'close', 'volume']}

In [None]:
import pandas as pd
import os
df_list = []
for file in os.listdir(os.path.join(os.getcwd(), config_data["dataset_name"])):
    if file.endswith(".csv"):
        # print(os.path.join(os.getcwd(), config_data["dataset_name"], file))
        df_list.append(pd.read_csv(os.path.join(config_data["dataset_name"], file), sep=config_data["inbound_delimiter"]))
        # print(df.head())
df = pd.concat(df_list)
df.head()

In [11]:
import dask.dataframe as dd
df = dd.read_csv(config_data["dataset_name"] + "/*.csv")
df.head()

Unnamed: 0,date,open,high,low,close,volume,sma5,sma10,sma15,sma20,...,fastd,fastksr,fastdsr,ULTOSC,WILLR,ATR,Trange,TYPPRICE,HT_DCPERIOD,BETA
0,2015-02-02 14:30:00+05:30,1528.5,1529.95,1526.05,1527.4,4678,1538.82,1543.015,1542.016667,1539.8375,...,4.838951,0.0,0.0,43.346867,-95.063985,5.282946,3.9,1527.8,25.928999,0.479466
1,2015-02-02 14:35:00+05:30,1527.4,1528.0,1516.0,1521.95,10165,1532.81,1540.67,1541.213333,1539.285,...,7.147969,0.0,0.0,41.448445,-84.090909,5.762736,12.0,1521.983333,25.595475,0.200019
2,2015-02-02 14:40:00+05:30,1521.3,1526.7,1521.0,1521.55,8078,1527.52,1538.205,1540.316667,1538.7225,...,12.588612,0.0,0.0,36.648343,-85.160428,5.758254,5.7,1523.083333,25.184555,0.450949
3,2015-02-02 14:45:00+05:30,1520.65,1522.9,1519.8,1520.25,4733,1523.93,1535.725,1538.996667,1538.125,...,17.267679,0.0,0.0,30.139572,-88.636364,5.568379,3.1,1520.983333,25.349728,0.560333
4,2015-02-02 14:50:00+05:30,1521.2,1526.1,1516.25,1526.1,4636,1523.45,1533.44,1537.406667,1537.68,...,36.09846,100.0,33.333333,41.145881,-72.994652,5.874209,9.85,1522.816667,26.308002,-0.058313


In [48]:
import modin.pandas as pd
import numpy as np
import os
for file in os.listdir(os.path.join(os.getcwd(), config_data["dataset_name"])):
    if file.endswith(".csv"):
        # print(os.path.join(os.getcwd(), config_data["dataset_name"], file))
        df = pd.read_csv(os.path.join(config_data["dataset_name"], file), sep=config_data["inbound_delimiter"])
        # print(df.head())


    from distributed import Client

    client = Client()



In [64]:
import ray
file_list = []
for file in os.listdir(os.path.join(os.getcwd(), config_data["dataset_name"])):
    if file.endswith(".csv"):
        file_list.append(os.path.join(os.getcwd(), config_data["dataset_name"], file))
df = ray.data.read_csv(file_list)



In [7]:
if utility.col_header_val(df, config_data) == 0:
    print("column validation failed")
else:
    print("column validation passed")

column validation passed


In [None]:
df = df[config_data["columns"]]
df.to_csv("output.csv", index=False, single_file=True)