<a href="https://colab.research.google.com/github/nimasha1228/Data_Ingestion/blob/main/Data_ingestion.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Download the Dataset 

The dataset was taken from the G-Research Crypto Forecasting competition. The csv file is in google drive and could be downloaded by following code. 

In [None]:
from google_drive_downloader import GoogleDriveDownloader as gdd

FILE_PATH = "/content/Data/train.csv.zip"
# Link: https://drive.google.com/file/d/1-sZwK6sq5BgI99hH-WNgJpKz42cgEcme/view?usp=sharing

gdd.download_file_from_google_drive(file_id='1-sZwK6sq5BgI99hH-WNgJpKz42cgEcme',
                                    dest_path=FILE_PATH,
                                    unzip=True)

Downloading 1-sZwK6sq5BgI99hH-WNgJpKz42cgEcme into /content/Data/train.csv.zip... Done.
Unzipping...Done.


In [None]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_dataset(filepath, method='pandas'):
    if method == 'pandas':
        ds = pd.read_csv(filepath)
        return ds


def read_config_file(filepath):

    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0


Writing testutility.py


**Read Data File Using Pandas, Dask, Modin, Ray**

**Pandas**

In [None]:
import pandas as pd

In [None]:
%%time
df = pd.read_csv('/content/Data/train.csv')
print(df.head())

    timestamp  Asset_ID  Count  ...       Volume          VWAP    Target
0  1514764860         2   40.0  ...    19.233005   2373.116392 -0.004218
1  1514764860         0    5.0  ...    78.380000      8.530000 -0.014399
2  1514764860         1  229.0  ...    31.550062  13827.062093 -0.014643
3  1514764860         5   32.0  ...  6626.713370      7.657713 -0.013922
4  1514764860         7    5.0  ...   121.087310     25.891363 -0.008264

[5 rows x 10 columns]
CPU times: user 29.5 s, sys: 4.46 s, total: 34 s
Wall time: 33.9 s


--------- End -----------

**Modin**

In [None]:
!pip install "modin[ray]" # Install Modin dependencies and Ray to run on Ray

Collecting modin[ray]
  Downloading modin-0.12.1-py3-none-any.whl (761 kB)
[K     |████████████████████████████████| 761 kB 5.0 MB/s 
[?25hCollecting fsspec
  Downloading fsspec-2022.1.0-py3-none-any.whl (133 kB)
[K     |████████████████████████████████| 133 kB 51.5 MB/s 
[?25hCollecting pandas==1.3.5
  Downloading pandas-1.3.5-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (11.3 MB)
[K     |████████████████████████████████| 11.3 MB 49.4 MB/s 
Collecting ray[default]>=1.4.0
  Downloading ray-1.9.2-cp37-cp37m-manylinux2014_x86_64.whl (57.6 MB)
[K     |████████████████████████████████| 57.6 MB 1.3 MB/s 
Collecting redis>=3.5.0
  Downloading redis-4.1.0-py3-none-any.whl (171 kB)
[K     |████████████████████████████████| 171 kB 55.7 MB/s 
Collecting aioredis<2
  Downloading aioredis-1.3.1-py3-none-any.whl (65 kB)
[K     |████████████████████████████████| 65 kB 3.4 MB/s 
[?25hCollecting opencensus
  Downloading opencensus-0.8.0-py2.py3-none-any.whl (128 kB)
[K     |████

In [None]:
!pip install "modin[dask]" # Install Modin dependencies and Dask to run on Dask

Collecting dask>=2.22.0
  Downloading dask-2022.1.0-py3-none-any.whl (1.0 MB)
[K     |████████████████████████████████| 1.0 MB 5.0 MB/s 
[?25hCollecting distributed>=2.22.0
  Downloading distributed-2022.1.0-py3-none-any.whl (822 kB)
[K     |████████████████████████████████| 822 kB 49.0 MB/s 
Collecting pyyaml>=5.3.1
  Downloading PyYAML-6.0-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl (596 kB)
[K     |████████████████████████████████| 596 kB 58.8 MB/s 
[?25hCollecting partd>=0.3.10
  Downloading partd-1.2.0-py3-none-any.whl (19 kB)
Collecting cloudpickle>=1.1.1
  Downloading cloudpickle-2.0.0-py3-none-any.whl (25 kB)
Collecting locket
  Downloading locket-0.2.1-py2.py3-none-any.whl (4.1 kB)
Installing collected packages: locket, pyyaml, partd, cloudpickle, dask, distributed
  Attempting uninstall: pyyaml
    Found existing installation: PyYAML 3.13
    Uninstalling PyYAML-3.13:
      Successfully uninstalled PyYAML-3.13
  Attemp

Restart Runtime and Run the code

In [None]:
import modin.pandas as pd

In [None]:
%%time
df = pd.read_csv('/content/Data/train.csv')
print(df.head()) 


    import ray
    ray.init()

[2m[36m(deploy_ray_func pid=1289)[0m tcmalloc: large alloc 1409646592 bytes == 0x56130e660000 @  0x7f99c0ec31e7 0x56130ae4af98 0x56130ae15e27 0x56130aef79be 0x56130af2e802 0x56130ae1b7da 0x56130ae8e18e 0x56130ae879ee 0x56130ae1abda 0x56130ae8cd00 0x56130ae879ee 0x56130ad59e2b 0x56130ae89fe4 0x56130ae879ee 0x56130ad59e2b 0x56130ae89fe4 0x56130ad59d14 0x7f99bdcaebc2 0x7f99bdd5d6fc 0x7f99bdcb5696 0x7f99bde4b3bb 0x7f99bdda6c7f 0x7f99bde72533 0x7f99bde7322a 0x7f99bde7ea7e 0x7f99bde5c25b 0x7f99be056806 0x7f99be42bcfb 0x7f99be42cf51 0x7f99be42d3d0 0x7f99bde576d0
[2m[36m(deploy_ray_func pid=1288)[0m tcmalloc: large alloc 1409646592 bytes == 0x557d4f694000 @  0x7f522c3061e7 0x557d4c317f98 0x557d4c2e2e27 0x557d4c3c49be 0x557d4c3fb802 0x557d4c2e87da 0x557d4c35b18e 0x557d4c3549ee 0x557d4c2e7bda 0x557d4c359d00 0x557d4c3549ee 0x557d4c226e2b 0x557d4c356fe4 0x557d4c3549ee 0x557d4c226e2b 0x557d4c356fe4 0x557d4c226d14 0x7f52290f1bc2 0x7f52291a06fc 0x7f52290f8696 0x7

    timestamp  Asset_ID  Count  ...       Volume          VWAP    Target
0  1514764860         2   40.0  ...    19.233005   2373.116392 -0.004218
1  1514764860         0    5.0  ...    78.380000      8.530000 -0.014399
2  1514764860         1  229.0  ...    31.550062  13827.062093 -0.014643
3  1514764860         5   32.0  ...  6626.713370      7.657713 -0.013922
4  1514764860         7    5.0  ...   121.087310     25.891363 -0.008264

[5 rows x 10 columns]
CPU times: user 4.65 s, sys: 4.2 s, total: 8.84 s
Wall time: 57.1 s


**Ray**

*Restart Runtime and Run the code*

In [None]:
import os
os.environ["MODIN_ENGINE"] = "ray"

In [None]:
import modin.pandas as pd

In [None]:
%%time
df = pd.read_csv('/content/Data/train.csv')
print(df.head()) 


    import ray
    ray.init()

[2m[36m(deploy_ray_func pid=885)[0m tcmalloc: large alloc 1409646592 bytes == 0x55685e31e000 @  0x7f6eb88df1e7 0x55685b99ef98 0x55685b969e27 0x55685ba4b9be 0x55685ba82802 0x55685b96f7da 0x55685b9e218e 0x55685b9db9ee 0x55685b96ebda 0x55685b9e0d00 0x55685b9db9ee 0x55685b8ade2b 0x55685b9ddfe4 0x55685b9db9ee 0x55685b8ade2b 0x55685b9ddfe4 0x55685b8add14 0x7f6eb56cabc2 0x7f6eb57796fc 0x7f6eb56d1696 0x7f6eb58673bb 0x7f6eb57c2c7f 0x7f6eb588e533 0x7f6eb588f22a 0x7f6eb589aa7e 0x7f6eb587825b 0x7f6eb5a72806 0x7f6eb5e47cfb 0x7f6eb5e48f51 0x7f6eb5e493d0 0x7f6eb58736d0
[2m[36m(deploy_ray_func pid=884)[0m tcmalloc: large alloc 1409646592 bytes == 0x55a11e39c000 @  0x7f5826f1f1e7 0x55a11b4ddf98 0x55a11b4a8e27 0x55a11b58a9be 0x55a11b5c1802 0x55a11b4ae7da 0x55a11b52118e 0x55a11b51a9ee 0x55a11b4adbda 0x55a11b51fd00 0x55a11b51a9ee 0x55a11b3ece2b 0x55a11b51cfe4 0x55a11b51a9ee 0x55a11b3ece2b 0x55a11b51cfe4 0x55a11b3ecd14 0x7f5823d0abc2 0x7f5823db96fc 0x7f5823d11696 0x7f5

    timestamp  Asset_ID  Count  ...       Volume          VWAP    Target
0  1514764860         2   40.0  ...    19.233005   2373.116392 -0.004218
1  1514764860         0    5.0  ...    78.380000      8.530000 -0.014399
2  1514764860         1  229.0  ...    31.550062  13827.062093 -0.014643
3  1514764860         5   32.0  ...  6626.713370      7.657713 -0.013922
4  1514764860         7    5.0  ...   121.087310     25.891363 -0.008264

[5 rows x 10 columns]
CPU times: user 4.81 s, sys: 4.8 s, total: 9.61 s
Wall time: 1min


------------ End ------------

**Dask**

*Restart Run time and Run the code*

In [None]:
import os
os.environ["MODIN_ENGINE"] = "dask"

In [None]:
import modin.pandas as pd

In [None]:
%%time
df = pd.read_csv('/content/Data/train.csv')
print(df.head())


    from distributed import Client

    client = Client()



    timestamp  Asset_ID  Count  ...       Volume          VWAP    Target
0  1514764860         2   40.0  ...    19.233005   2373.116392 -0.004218
1  1514764860         0    5.0  ...    78.380000      8.530000 -0.014399
2  1514764860         1  229.0  ...    31.550062  13827.062093 -0.014643
3  1514764860         5   32.0  ...  6626.713370      7.657713 -0.013922
4  1514764860         7    5.0  ...   121.087310     25.891363 -0.008264

[5 rows x 10 columns]
CPU times: user 7.51 s, sys: 15.5 s, total: 23 s
Wall time: 1min 3s


In [None]:
df.head()

Unnamed: 0,timestamp,Asset_ID,Count,Open,High,Low,Close,Volume,VWAP,Target
0,1514764860,2,40.0,2376.58,2399.5,2357.14,2374.59,19.233005,2373.116392,-0.004218
1,1514764860,0,5.0,8.53,8.53,8.53,8.53,78.38,8.53,-0.014399
2,1514764860,1,229.0,13835.194,14013.8,13666.11,13850.176,31.550062,13827.062093,-0.014643
3,1514764860,5,32.0,7.6596,7.6596,7.6567,7.6576,6626.71337,7.657713,-0.013922
4,1514764860,7,5.0,25.92,25.92,25.874,25.877,121.08731,25.891363,-0.008264


----------- End -------------

**yaml file**

In [None]:
%%writefile file.yaml
file_type: csv
file_name: train
table_name: train
output_file_name: out
output_file_type: txt
inbound_delimiter: ","
outbound_delimiter: "|"
columns:
    - timestamp
    - Asset_ID
    - Count
    - Open
    - High
    - Low
    - Close
    - Volume
    - VWAP
    - Target

Overwriting file.yaml


In [None]:
# Read config file
import testutility as util
config_data = util.read_config_file("file.yaml")

In [None]:
config_data['inbound_delimiter']

','

In [None]:
#inspecting data of config file
config_data

{'columns': ['timestamp',
  'Asset_ID',
  'Count',
  'Open',
  'High',
  'Low',
  'Close',
  'Volume',
  'VWAP',
  'Target'],
 'file_name': 'train',
 'file_type': 'csv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'output_file_name': 'out',
 'output_file_type': 'txt',
 'table_name': 'train'}

In [None]:
# Normal reading process of the file
import pandas as pd
df_sample = pd.read_csv("/content/Data/train.csv",delimiter=',')
df_sample.head()

Unnamed: 0,timestamp,Asset_ID,Count,Open,High,Low,Close,Volume,VWAP,Target
0,1514764860,2,40.0,2376.58,2399.5,2357.14,2374.59,19.233005,2373.116392,-0.004218
1,1514764860,0,5.0,8.53,8.53,8.53,8.53,78.38,8.53,-0.014399
2,1514764860,1,229.0,13835.194,14013.8,13666.11,13850.176,31.550062,13827.062093,-0.014643
3,1514764860,5,32.0,7.6596,7.6596,7.6567,7.6576,6626.71337,7.657713,-0.013922
4,1514764860,7,5.0,25.92,25.92,25.874,25.877,121.08731,25.891363,-0.008264


In [None]:
# read the file using config file
file_type = config_data['file_type']
source_file = "./Data/" + config_data['file_name'] + f'.{file_type}'
print("",source_file)
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.head()

 ./Data/train.csv




Unnamed: 0,timestamp,Asset_ID,Count,Open,High,Low,Close,Volume,VWAP,Target
0,1514764860,2,40.0,2376.58,2399.5,2357.14,2374.59,19.233005,2373.116392,-0.004218
1,1514764860,0,5.0,8.53,8.53,8.53,8.53,78.38,8.53,-0.014399
2,1514764860,1,229.0,13835.194,14013.8,13666.11,13850.176,31.550062,13827.062093,-0.014643
3,1514764860,5,32.0,7.6596,7.6596,7.6567,7.6576,6626.71337,7.657713,-0.013922
4,1514764860,7,5.0,25.92,25.92,25.874,25.877,121.08731,25.891363,-0.008264


In [None]:
#validate the header of the file
util.col_header_val(df,config_data)

column name and column length validation passed


1

In [None]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['timestamp', 'asset_id', 'count', 'open', 'high', 'low', 'close',
       'volume', 'vwap', 'target'],
      dtype='object')
columns of YAML are: ['timestamp', 'Asset_ID', 'Count', 'Open', 'High', 'Low', 'Close', 'Volume', 'VWAP', 'Target']


In [None]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action
    # in the pipleine

column name and column length validation passed
col validation passed


In [None]:
output_file_path = './' +config_data['output_file_name'] +'.'+config_data['output_file_type']+'.'+'gz'
output_file_path

'./out.txt.gz'

In [None]:
df.to_csv(output_file_path, sep=config_data['outbound_delimiter'],compression='gzip')

**Summary of the file**

In [None]:
num_cols = len(df.columns)
num_rows = df.shape[0]
print("Number of columns: ",num_cols)
print("Number of Rows: ",num_rows)
df.info(memory_usage=True)


Number of columns:  10
Number of Rows:  24236806
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 24236806 entries, 0 to 24236805
Data columns (total 10 columns):
 #   Column     Dtype  
---  ------     -----  
 0   timestamp  int64  
 1   asset_id   int64  
 2   count      float64
 3   open       float64
 4   high       float64
 5   low        float64
 6   close      float64
 7   volume     float64
 8   vwap       float64
 9   target     float64
dtypes: float64(8), int64(2)
memory usage: 1.8 GB
