# Task: File Ingestion and Schema validation

- Take any csv/text file of 2+ GB of your choice. --- (You can do this assignment on Google colab)

- Read the file ( Present approach of reading the file )

- Try different methods of file reading eg: Dask, Modin, Ray, pandas and present your findings in term of computational efficiency.

- Perform basic validation on data columns : eg: remove special character , white spaces from the col name.

- As you already know the schema hence create a YAML file and write the column name in YAML file. --define separator of read and write file, column name in YAML.

- Validate number of columns and column name of ingested file with YAML.

- Write the file in pipe separated text file (|) in gz format.

- Create a summary of the file:

  Total number of rows,

  total number of columns

  file size


In [41]:
import os
import time

In [2]:
file_path = 'C:/Users/Murat Kiran/Desktop/DG/W6/price_paid_records.csv'

In [3]:
# Size of the file
###################
os.path.getsize(file_path)

2405685902

## Reading the file 

In [4]:
####### 
#Pandas 
#######

import pandas as pd
start = time.time()
df = pd.read_csv('C:/Users/Murat Kiran/Desktop/DG/W6/price_paid_records.csv')
end = time.time()
print("Read csv with pandas: ",(end-start),"sec")

Read csv with pandas:  1843.375968694687 sec


In [7]:
####### 
#Dask 
#######

from dask import dataframe as dd
start = time.time()
dask_df = dd.read_csv('C:/Users/Murat Kiran/Desktop/DG/W6/price_paid_records.csv')
end = time.time()
print("Read csv with dask: ",(end-start),"sec")

Read csv with dask:  1.6645479202270508 sec


- **Dask is better than Pandas**

In [16]:
from dask import dataframe as dd
df = dd.read_csv('C:/Users/Murat Kiran/Desktop/DG/W6/price_paid_records.csv',delimiter=',')

In [17]:
df.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 11 entries, Transaction unique identifier to Record Status - monthly file only
dtypes: object(10), int64(1)

In [8]:
#No. of Rows
len(df.index)

22489348

In [18]:
def clean_column_names(df):
    # Remove special characters
    df.columns = df.columns.str.replace('[^a-zA-Z0-9]+', '_')
    
    # Remove specific characters like '#', '@', '&'
    df.columns = df.columns.str.replace('[#@&]', '')

    # Replace whitespaces with '_'
    df.columns = df.columns.str.replace(' ', '_')

    # Convert column names to lowercase
    df.columns = df.columns.str.strip().str.lower()

    return df


df = clean_column_names(df)

In [19]:
df.columns

Index(['transaction_unique_identifier', 'price', 'date_of_transfer',
       'property_type', 'old/new', 'duration', 'town/city', 'district',
       'county', 'ppdcategory_type', 'record_status_-_monthly_file_only'],
      dtype='object')

## Validation

In [20]:
%%writefile utility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting utility.py


In [21]:
%%writefile file.yaml
file_type: csv
dataset_name: housingPricesPaid 
file_name: price_paid_records
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - transaction_unique_identifier
    - price
    - date_of_transfer
    - property_type
    - old/new
    - duration
    - town/city
    - district
    - county
    - ppdcategory_type
    - record_status_-_monthly_file_only

Writing file.yaml


In [23]:
# Reading config file
import utility as util
config_data = util.read_config_file("file.yaml")

In [24]:
config_data['inbound_delimiter']

','

In [25]:
#inspecting data of config file
config_data

{'file_type': 'csv',
 'dataset_name': 'housingPricesPaid',
 'file_name': 'price_paid_records',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['transaction_unique_identifier',
  'price',
  'date_of_transfer',
  'property_type',
  'old/new',
  'duration',
  'town/city',
  'district',
  'county',
  'ppdcategory_type',
  'record_status_-_monthly_file_only']}

In [26]:
# Reading process of the file using Dask
from dask import dataframe as dd
df_sample = dd.read_csv('C:/Users/Murat Kiran/Desktop/DG/W6/price_paid_records.csv',delimiter=',')
df_sample.head()

Unnamed: 0,Transaction unique identifier,Price,Date of Transfer,Property Type,Old/New,Duration,Town/City,District,County,PPDCategory Type,Record Status - monthly file only
0,{81B82214-7FBC-4129-9F6B-4956B4A663AD},25000,1995-08-18 00:00,T,N,F,OLDHAM,OLDHAM,GREATER MANCHESTER,A,A
1,{8046EC72-1466-42D6-A753-4956BF7CD8A2},42500,1995-08-09 00:00,S,N,F,GRAYS,THURROCK,THURROCK,A,A
2,{278D581A-5BF3-4FCE-AF62-4956D87691E6},45000,1995-06-30 00:00,T,N,F,HIGHBRIDGE,SEDGEMOOR,SOMERSET,A,A
3,{1D861C06-A416-4865-973C-4956DB12CD12},43150,1995-11-24 00:00,T,N,F,BEDFORD,NORTH BEDFORDSHIRE,BEDFORDSHIRE,A,A
4,{DD8645FD-A815-43A6-A7BA-4956E58F1874},18899,1995-06-23 00:00,S,N,F,WAKEFIELD,LEEDS,WEST YORKSHIRE,A,A


In [34]:
# Read the file using config file
#####################################

file_type = config_data['file_type']
source_file = "C:/Users/Murat Kiran/Desktop/DG/W6/" + config_data['file_name'] + f'.{file_type}'
df = pd.read_csv(source_file, delimiter=config_data['inbound_delimiter'])
df.head()

Unnamed: 0,Transaction unique identifier,Price,Date of Transfer,Property Type,Old/New,Duration,Town/City,District,County,PPDCategory Type,Record Status - monthly file only
0,{81B82214-7FBC-4129-9F6B-4956B4A663AD},25000,1995-08-18 00:00,T,N,F,OLDHAM,OLDHAM,GREATER MANCHESTER,A,A
1,{8046EC72-1466-42D6-A753-4956BF7CD8A2},42500,1995-08-09 00:00,S,N,F,GRAYS,THURROCK,THURROCK,A,A
2,{278D581A-5BF3-4FCE-AF62-4956D87691E6},45000,1995-06-30 00:00,T,N,F,HIGHBRIDGE,SEDGEMOOR,SOMERSET,A,A
3,{1D861C06-A416-4865-973C-4956DB12CD12},43150,1995-11-24 00:00,T,N,F,BEDFORD,NORTH BEDFORDSHIRE,BEDFORDSHIRE,A,A
4,{DD8645FD-A815-43A6-A7BA-4956E58F1874},18899,1995-06-23 00:00,S,N,F,WAKEFIELD,LEEDS,WEST YORKSHIRE,A,A


In [35]:
#validate the header of the file
util.col_header_val(df,config_data)

column name and column length validation failed
Following File columns are not in the YAML file ['old_new', 'town_city', 'record_status_monthly_file_only']
Following YAML columns are not in the file uploaded ['record_status_-_monthly_file_only', 'town/city', 'old/new']


0

In [36]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['transaction_unique_identifier', 'price', 'date_of_transfer',
       'property_type', 'old_new', 'duration', 'town_city', 'district',
       'county', 'ppdcategory_type', 'record_status_monthly_file_only'],
      dtype='object')
columns of YAML are: ['transaction_unique_identifier', 'price', 'date_of_transfer', 'property_type', 'old/new', 'duration', 'town/city', 'district', 'county', 'ppdcategory_type', 'record_status_-_monthly_file_only']


In [37]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action
    # in the pipleine

column name and column length validation failed
Following File columns are not in the YAML file ['old_new', 'town_city', 'record_status_monthly_file_only']
Following YAML columns are not in the file uploaded ['record_status_-_monthly_file_only', 'town/city', 'old/new']
validation failed


## Write the file in gz. format

In [39]:
df.to_csv('price_paid_records.csv.gz', sep='|', compression='gzip', index=False)

### Summary

In [42]:
# Create a summary of the file
file_path = 'price_paid_records.csv.gz'

# Read the DataFrame without using 'with'
df = pd.read_csv(file_path, sep='|', compression='gzip')

# Calculate the total number of rows and columns
num_rows, num_columns = df.shape

# Get the file size (in bytes)
file_size = os.path.getsize(file_path)

# Print the summary information
print(f"Total Number of Rows: {num_rows}")
print(f"Total Number of Columns: {num_columns}")
print(f"File Size: {file_size}")

Total Number of Rows: 22489348
Total Number of Columns: 11
File Size: 751249222
