In [1]:
import warnings
warnings.filterwarnings("ignore")

In [2]:
#Installing the required libraries
!pip install pyaml
!pip install dask
!pip install modin
!pip install ray

Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable


## Importing the required libraries

In [2]:
import pandas as pd
import ray
import modin.pandas as mpd
import dask.dataframe as dd
import os
import timeit
import csv

## Making a sample large csv file

In [10]:
num_rows = 30_000_000
header = ['id', 'name', 'age', 'city', 'country'] # Column names

data = []
for i in range(num_rows):
    row = [f'ID{i+1}', f'first_name{i+1} middle_name{i+1} last_name{i+1}', 20+i%10, f'city{i%5+1}, state{i%20+1}', 'country1']
    data.append(row)

file_path = 'large_csv.csv'
with open(file_path, 'w', newline='') as csv_file:
    writer = csv.writer(csv_file)
    writer.writerow(header)
    writer.writerows(data)

# Check file size
file_size = os.path.getsize(file_path)
print(f"Generated file size: {file_size/1024/1024} MB")

Generated file size: 2748.542417526245 MB


## Checking time taken to read the csv file through different methods

In [6]:
file_path = "large_csv.csv"

In [6]:
print(f"Pandas: {timeit.timeit(lambda: pd.read_csv(file_path), number=1)} seconds")
print(f"Dask: {timeit.timeit(lambda: dd.read_csv(file_path), number=1)} seconds")
print(f"Modin: {timeit.timeit(lambda: mpd.read_csv(file_path), number=1)} seconds")
ray.shutdown() #Shutting down modin ray instance

Pandas: 133.68586099999993 seconds
Dask: 0.14022990000012214 seconds
Modin: 137.4125736000001 seconds


In [None]:
@ray.remote
def ray_read_csv(file_path):
    return pd.read_csv(file_path)
ray.init()
print(f"Ray: {timeit.timeit(lambda: ray.get(ray_read_csv.remote(file_path)), number=1)} seconds")
ray.shutdown()

2023-04-01 21:29:37,891	ERROR services.py:1169 -- Failed to start the dashboard 
2023-04-01 21:29:37,891	ERROR services.py:1194 -- Error should be written to 'dashboard.log' or 'dashboard.err'. We are printing the last 20 lines for you. See 'https://docs.ray.io/en/master/ray-observability/ray-logging.html#logging-directory-structure' to find where the log file is.
2023-04-01 21:29:37,891	ERROR services.py:1204 -- Couldn't read dashboard.log file. Error: [Errno 2] No such file or directory: 'C:\\Users\\yashd\\AppData\\Local\\Temp\\ray\\session_2023-04-01_21-29-13_887448_3648\\logs\\dashboard.log'. It means the dashboard is broken even before it initializes the logger (mostly dependency issues). Reading the dashboard.err file which contains stdout/stderr.
2023-04-01 21:29:37,891	ERROR services.py:1238 -- Failed to read dashboard.err file: cannot mmap an empty file. It is unexpected. Please report an issue to Ray github. https://github.com/ray-project/ray/issues
2023-04-01 21:29:44,527	IN

#### Dask seems to be the most efficient for our case

## Writing yaml and utility file

In [8]:
%%writefile file_metadata.yaml
file_type: csv
dataset_name: person_details
file_name: large_csv
inbound_delimiter: ","
outbound_delimiter: "|"
columns:
    - ID
    - Name
    - Age
    - City
    - Country

Overwriting file_metadata.yaml


In [9]:
%%writefile utility.py
import yaml
import re
def read_config_file(path):
    with open(path,'r') as file:
        try:
            return yaml.safe_load(stream=file)
        except Exception as e:
            return e

def header_preprocessor(header_list):
    header_preprocessed_list=[]
    for header in header_list:
        header_preprocessed = header.lower() # Converting header to lower case
        header_preprocessed = header_preprocessed.strip() # Removing extra spaces
        header_preprocessed = re.sub("[\s_@]+","_",header_preprocessed) # Substituting special characters
        header_preprocessed_list.append(header_preprocessed) # Appending preprocessed header to list
    return header_preprocessed_list

def column_header_validation(df_columns,config_columns):
    df_cols_preprocessed = header_preprocessor(df_columns)
    config_cols_preprocessed = header_preprocessor(config_columns)

    if df_cols_preprocessed==config_cols_preprocessed:
        return "Column validation passed"
    elif len(df_cols_preprocessed)==len(config_cols_preprocessed):
        print("Length of columns is same but column names are different")
        print(f"Following column headers not present in config file: {[x for x in df_cols_preprocessed if x not in config_cols_preprocessed]}")
        print(f"Following columns of config file not found in dataframe: {[x for x in config_cols_preprocessed if x not in df_cols_preprocessed]}")
    else:
        print("Entire column validation failed (both names and length are different)")
        print(f"Following columns of dataframe not found in config file: {[x for x in df_cols_preprocessed if x not in config_cols_preprocessed]}")
        print(f"Following columns of config file not found in dataframe: {[x for x in config_cols_preprocessed if x not in df_cols_preprocessed]}")

Overwriting utility.py


## Importing sample data and validating it

In [10]:
#Defining config file path
config_file_path = 'file_metadata.yaml'

In [11]:
import utility
#Reading configuration file
config_file = utility.read_config_file(config_file_path)

In [12]:
config_file

{'file_type': 'csv',
 'dataset_name': 'person_details',
 'file_name': 'large_csv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'columns': ['ID', 'Name', 'Age', 'City', 'Country']}

In [18]:
# Defining data file path
data_path = "D:/DataspellProjects/Data Science/Data Glacier Internship/"+config_file['file_name']+f".{config_file['file_type']}"

In [19]:
# Reading the csv file using dask
df = dd.read_csv(data_path)

In [20]:
# Validating the imported dataframe with config data
utility.column_header_validation(list(df.columns.values),list(config_file['columns']))

'Column validation passed'

## Printing Data Ingestion Summary

In [21]:
print("Data Ingestion Summary")
print(f"Total number of rows: {df.shape[0].compute()}")
print(f"Total number of columns: {df.shape[1]}")
print(f"File size: {os.path.getsize(data_path)/(1024*1024*1024)} GB")
print(f"File type: {config_file['file_type']}")

Data Ingestion Summary
Total number of rows: 30000000
Total number of columns: 5
File size: 2.6841234546154737 GB
File type: csv


## Saving the dataframe in pipe separated text file (|) in gz format

In [22]:
df.to_csv('large_gz.gz',sep=config_file['outbound_delimiter'],header=True,index=False,compression='gzip')

['D:\\DataspellProjects\\Data Science\\Data Glacier Internship\\Data-Glacier-Intern-Projects-Week-6\\large_gz.gz\\00.part',
 'D:\\DataspellProjects\\Data Science\\Data Glacier Internship\\Data-Glacier-Intern-Projects-Week-6\\large_gz.gz\\01.part',
 'D:\\DataspellProjects\\Data Science\\Data Glacier Internship\\Data-Glacier-Intern-Projects-Week-6\\large_gz.gz\\02.part',
 'D:\\DataspellProjects\\Data Science\\Data Glacier Internship\\Data-Glacier-Intern-Projects-Week-6\\large_gz.gz\\03.part',
 'D:\\DataspellProjects\\Data Science\\Data Glacier Internship\\Data-Glacier-Intern-Projects-Week-6\\large_gz.gz\\04.part',
 'D:\\DataspellProjects\\Data Science\\Data Glacier Internship\\Data-Glacier-Intern-Projects-Week-6\\large_gz.gz\\05.part',
 'D:\\DataspellProjects\\Data Science\\Data Glacier Internship\\Data-Glacier-Intern-Projects-Week-6\\large_gz.gz\\06.part',
 'D:\\DataspellProjects\\Data Science\\Data Glacier Internship\\Data-Glacier-Intern-Projects-Week-6\\large_gz.gz\\07.part',
 'D:\\Da

In [23]:
#Getting number of files and total folder size
file_count=0
for file in os.listdir('large_gz.gz'):
    file_count+=1
print(f"Total .gz file count: {file_count}")
print(f".gz folder size: {os.path.getsize('large_gz.gz')/(1024)} KB")

Total .gz file count: 45
.gz folder size: 8.0 KB


## Experimenting with column validation

In [29]:
df_copy = df.copy()
df_copy = df_copy.rename(columns={'name':' name '})

In [30]:
utility.column_header_validation(df_copy.columns.values,config_file['columns'])

'Column validation passed'

In [32]:
df_copy_columns = list(df_copy.columns.values)
df_copy_columns.append('test col 1')

In [33]:
utility.column_header_validation(df_copy_columns,config_file['columns'])

Entire column validation failed (both names and length are different)
Following columns of dataframe not found in config file: ['test_col_1']
Following columns of config file not found in dataframe: []


In [39]:
df_copy = df_copy.rename(columns={' name ':'__name__ '})
utility.column_header_validation(df_copy.columns.values,config_file['columns'])

Length of columns is same but column names are different
Following column headers not present in config file: ['_name_']
Following columns of config file not found in dataframe: ['name']
