# File ingestion and Validation

In [13]:
import pandas as pd
import dask.dataframe as dd
import modin.pandas as mpd
import ray
import yaml
import re
import gzip
import logging

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)

# Choose a file to read
input_file = "train.csv"

# Read using pandas and measure the time taken
start_time = pd.Timestamp.now()
df = pd.read_csv(input_file)
end_time = pd.Timestamp.now()
print(f"Pandas took {(end_time-start_time).total_seconds()} seconds to read the file.")

# Read using Dask and measure the time taken
start_time = pd.Timestamp.now()
ddf = dd.read_csv(input_file)
df = ddf.compute()
end_time = pd.Timestamp.now()
print(f"Dask took {(end_time-start_time).total_seconds()} seconds to read the file.")

# Read using Modin and measure the time taken
start_time = pd.Timestamp.now()
df = mpd.read_csv(input_file)
end_time = pd.Timestamp.now()
print(f"Modin took {(end_time-start_time).total_seconds()} seconds to read the file.")

# Read using Ray and measure the time taken
#ray.init()
#@ray.remote
#def read_csv_ray(input_file):
#    return pd.read_csv(input_file)
#start_time = pd.Timestamp.now()
#df_id = read_csv_ray.remote(input_file)
#df = ray.get(df_id)
#end_time = pd.Timestamp.now()
#ray.shutdown()
#print(f"Ray took {(end_time-start_time).total_seconds()} seconds to read the file.")

#Data Validation functions
def validate_columns(df, expected_cols):
    df.columns = df.columns.str.strip().str.lower().str.replace('[^a-zA-Z0-9]', '', regex=True)
    expected_cols = [col.strip().lower().replace('[^a-zA-Z0-9]', '', regex=True) for col in expected_cols]
    if set(df.columns) == set(expected_cols):
        return True
    else:
        missing_cols = set(expected_cols) - set(df.columns)
        extra_cols = set(df.columns) - set(expected_cols)
    if missing_cols:
        logging.error(f"The following expected columns are missing: {missing_cols}")
    if extra_cols:
        logging.warning(f"The following extra columns are present: {extra_cols}")
    return False

Pandas took 5.493535 seconds to read the file.
Dask took 6.284257 seconds to read the file.
Modin took 4.981606 seconds to read the file.


### Analysying the efficiency of file reading

Tried reading the files using different methods(pandas, dask, modin) according to industry standards by configuring files to read. Among these I found out Modin takes lesser time to read the file than Dask and Pandas. Dask takes longer time to read. Therefore, Modin has better computational efficiency.

In [27]:
%%writefile train.yaml
file_type: csv
file_name: train
inbound_delimiter: ','
outbound_delimiter: '|'
skip_leading_rows: 1
columns:
  - book
  - laptop
  - bag

Overwriting train.yaml


In [28]:
# Read the YAML configuration file
with open('train.yaml', 'r') as stream:
    config_data = yaml.safe_load(stream)

In [29]:
#inspecting data of config file
config_data

{'file_type': 'csv',
 'file_name': 'train',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['book', 'laptop', 'bag']}

In [24]:
# Normal reading process of the file
import pandas as pd
df_sample = pd.read_csv('train.csv', delimiter=',')
df_sample.head()

Unnamed: 0,Star,Review,Description
0,2,The best soundtrack ever to anything.,I'm reading a lot of reviews saying that this ...
1,2,Amazing!,This soundtrack is my favorite music of all ti...
2,2,Excellent Soundtrack,I truly like this soundtrack and I enjoy video...
3,2,"Remember, Pull Your Jaw Off The Floor After He...","If you've played the game, you know how divine..."
4,2,an absolute masterpiece,I am quite sure any of you actually taking the...


In [25]:
# Validate number of columns
if len(df.columns) != len(config_data['columns']):
    print('Error: Number of columns in the file does not match with the YAML configuration')
    print('Validation failed')
else:
    # Validate column names
    for col in df.columns:
        if col not in config_data['columns']:
            print(f'Error: Column "{col}" is not present in the YAML configuration')
    print('Validation failed')

Error: Column "Star" is not present in the YAML configuration
Error: Column "Review" is not present in the YAML configuration
Error: Column "Description" is not present in the YAML configuration
Validation failed


In [26]:
import csv

# Write the file in pipe-separated text file in gz format
output_file = config_data['file_name'] + '.txt.gz'
temp_file = 'temp.txt'

with open(temp_file, 'w', newline='', encoding='utf-8-sig') as file:
    writer = csv.writer(file, delimiter='|')
    writer.writerows(df.values)

with open(temp_file, 'rb') as file_in:
    with gzip.open(output_file, 'wb') as file_out:
        file_out.writelines(file_in)

os.remove(temp_file)

print(f'{output_file} file has been generated successfully')

train.txt.gz file has been generated successfully


### Summary of the file:

Dataset: train.csv

Total number of rows: 1048576

total number of columns: 3

file size: 2.3GB