# Table of Contents

* [Part 1 - Read the file ( Present approach of reading the file )](#1)
  - [1.1 Read the data with Pandas](#1.1)
  - [1.2 Read the data with Ray & Modin](#1.2)
  - [1.3 Read the data with Dask](#1.3)

* [Part 2 - Perform basic validation on data columns : eg: remove special character , white spaces from the col name](#2)

* [Part 3 - Create a YAML file and write the column name in YAML file. --define separator of read and write file, column name in YAML](#3)

* [Part 4 - Validate number of columns and column name of ingested file with YAML](#4)

* [Part 5 - Write the file in pipe separated text file (|) in gz format](#5)

* [Part 6 - Create a summary of the file: total number of rows, total number of columns, file size](#6)

# Part 1 - Read the file (Present approach of reading the file) <a id="1"></a>

In [2]:
import os
import time

#### 1.1 Read the data with Pandas <a id="1.1"></a>

In [3]:
import pandas as pd
start = time.time()
df_pandas = pd.read_csv('taxi.csv')
end = time.time()
print("Read csv with pandas: ",(end-start),"sec")

Read csv with pandas:  347.62111139297485 sec


#### 1.2 Read the data with Ray & Modin <a id="1.2"></a>

In [5]:
import modin.pandas as pd
import ray
ray.init()
ray.shutdown()
start = time.time()
df_modin = pd.read_csv('taxi.csv')
end = time.time()
print("Read csv with modin and ray: ",(end-start),"sec")

2023-02-15 17:57:20,508	INFO worker.py:1529 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


Read csv with modin and ray:  231.0076470375061 sec


#### 1.3 Read the data with Dask <a id="1.4"></a>

In [2]:
from dask import dataframe as dd
start = time.time()
df_dask = dd.read_csv('taxi.csv')
end = time.time()
print("Read csv with dask: ",(end-start),"sec")

Read csv with dask:  0.053991079330444336 sec


#### Dask performed better than Pandas and Modin & Ray in terms of time, with only 0.04078841209411621 seconds to compute.

# Part 2 - Perform basic validation on data columns : eg: remove special character , white spaces from the col name <a id="2"></a>

In [3]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re

################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting testutility.py


# Part 3 - Create a YAML file and write the column name in YAML file. --define separator of read and write file, column name in YAML <a id="3"></a>

In [4]:
%%writefile file.yaml
file_type: csv
dataset_name: testfile
file_name: taxi
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - key
    - fare_amount
    - pickup_datetime
    - pickup_longitude
    - pickup_latitude
    - dropoff_longitude
    - dropoff_latitude
    - passenger_count

Overwriting file.yaml


In [6]:
df_dask.columns

Index(['key', 'fare_amount', 'pickup_datetime', 'pickup_longitude',
       'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude',
       'passenger_count'],
      dtype='object')

In [7]:
# Read config file
import testutility as util
config_data = util.read_config_file("file.yaml")

In [8]:
#inspecting data of config file
config_data

{'file_type': 'csv',
 'dataset_name': 'testfile',
 'file_name': 'taxi',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['key',
  'fare_amount',
  'pickup_datetime',
  'pickup_longitude',
  'pickup_latitude',
  'dropoff_longitude',
  'dropoff_latitude',
  'passenger_count']}

In [9]:
# Reading process of the file using Dask
from dask import dataframe as dd
df_dask = dd.read_csv('taxi.csv',delimiter=',')
df_dask.head()

Unnamed: 0,key,fare_amount,pickup_datetime,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count
0,2009-06-15 17:26:21.0000001,4.5,2009-06-15 17:26:21 UTC,-73.844311,40.721319,-73.84161,40.712278,1
1,2010-01-05 16:52:16.0000002,16.9,2010-01-05 16:52:16 UTC,-74.016048,40.711303,-73.979268,40.782004,1
2,2011-08-18 00:35:00.00000049,5.7,2011-08-18 00:35:00 UTC,-73.982738,40.76127,-73.991242,40.750562,2
3,2012-04-21 04:30:42.0000001,7.7,2012-04-21 04:30:42 UTC,-73.98713,40.733143,-73.991567,40.758092,1
4,2010-03-09 07:51:00.000000135,5.3,2010-03-09 07:51:00 UTC,-73.968095,40.768008,-73.956655,40.783762,1


In [23]:
# read the file using config file
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
#print("",source_file)
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.head()

  df = pd.read_csv(source_file,config_data['inbound_delimiter'])


Unnamed: 0,key,fare_amount,pickup_datetime,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count
0,2009-06-15 17:26:21.0000001,4.5,2009-06-15 17:26:21 UTC,-73.844311,40.721319,-73.84161,40.712278,1
1,2010-01-05 16:52:16.0000002,16.9,2010-01-05 16:52:16 UTC,-74.016048,40.711303,-73.979268,40.782004,1
2,2011-08-18 00:35:00.00000049,5.7,2011-08-18 00:35:00 UTC,-73.982738,40.76127,-73.991242,40.750562,2
3,2012-04-21 04:30:42.0000001,7.7,2012-04-21 04:30:42 UTC,-73.98713,40.733143,-73.991567,40.758092,1
4,2010-03-09 07:51:00.000000135,5.3,2010-03-09 07:51:00 UTC,-73.968095,40.768008,-73.956655,40.783762,1


# Part 4 - Validate number of columns and column name of ingested file with YAML <a id="4"></a>

In [55]:
#validate the header of the file
util.col_header_val(df,config_data)

column name and column length validation passed


1

In [56]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
    
else:
    print("col validation passed")
    

column name and column length validation passed
col validation passed


# Part 5 - Write the file in pipe separated text file (|) in gz format <a id="5"></a>

In [58]:
import datetime
import csv
import gzip

from dask import dataframe as dd
df = dd.read_csv('taxi.csv',delimiter=',')

# Write csv in gz format in pipe separated text file (|)
df.to_csv('taxi.csv.gz',
          sep='|',
          header=True,
          index=False,
          quoting=csv.QUOTE_ALL,
          compression='gzip',
          quotechar='"',
          doublequote=True,
          lineterminator='\n')

['C:\\Users\\antho\\Data-Glacier\\Week 6\\taxi.csv.gz\\00.part',
 'C:\\Users\\antho\\Data-Glacier\\Week 6\\taxi.csv.gz\\01.part',
 'C:\\Users\\antho\\Data-Glacier\\Week 6\\taxi.csv.gz\\02.part',
 'C:\\Users\\antho\\Data-Glacier\\Week 6\\taxi.csv.gz\\03.part',
 'C:\\Users\\antho\\Data-Glacier\\Week 6\\taxi.csv.gz\\04.part',
 'C:\\Users\\antho\\Data-Glacier\\Week 6\\taxi.csv.gz\\05.part',
 'C:\\Users\\antho\\Data-Glacier\\Week 6\\taxi.csv.gz\\06.part',
 'C:\\Users\\antho\\Data-Glacier\\Week 6\\taxi.csv.gz\\07.part',
 'C:\\Users\\antho\\Data-Glacier\\Week 6\\taxi.csv.gz\\08.part',
 'C:\\Users\\antho\\Data-Glacier\\Week 6\\taxi.csv.gz\\09.part',
 'C:\\Users\\antho\\Data-Glacier\\Week 6\\taxi.csv.gz\\10.part',
 'C:\\Users\\antho\\Data-Glacier\\Week 6\\taxi.csv.gz\\11.part',
 'C:\\Users\\antho\\Data-Glacier\\Week 6\\taxi.csv.gz\\12.part',
 'C:\\Users\\antho\\Data-Glacier\\Week 6\\taxi.csv.gz\\13.part',
 'C:\\Users\\antho\\Data-Glacier\\Week 6\\taxi.csv.gz\\14.part',
 'C:\\Users\\antho\\Data-

# Part 6 - Create a summary of the file: total number of rows, total number of columns, file size <a id="6"></a>

In [15]:
import os
import pandas as pd

# set the file path
file_path = 'taxi.csv.gz'

# get the file size
file_size = os.path.getsize(file_path)

# read the file using pandas
#df = pd.read_csv(file_path, delimiter='|')

# get the total number of rows and columns
#num_rows = df.shape[0]
#num_cols = df.shape[1]

# print the summary
print('File summary:')
#print(f'Total number of rows: {num_rows}')
#print(f'Total number of columns: {num_cols}')
print(f'File size: {file_size} bytes')


File summary:
File size: 40960 bytes
