<a href="https://colab.research.google.com/github/sbpravallika/File-ingestion-and-schema-validation/blob/main/File_ingestion_and_schema_validation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Task: File Ingestion and Schema validation
Take any csv/text file of 2+ GB of your choice. --- (You can do this assignment on Google colab)

Read the file ( Present approach of reading the file )

Try different methods of file reading eg: Dask, Modin, Ray, pandas and present your findings in term of computational efficiency

Perform basic validation on data columns : eg: remove special character , white spaces from the col name

As you already know the schema hence create a YAML file and write the column name in YAML file. --define separator of
read and write file, column name in YAML

Validate number of columns and column name of ingested file with YAML.

Write the file in pipe separated text file (|) in gz format.

Create a summary of the file:

Total number of rows,

total number of columns

file size

In [1]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string)
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Writing testutility.py


### Write YAML file

In [6]:
%%writefile file.yaml
file_type: csv
dataset_name: testfile
file_name: test_data
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns:
    - City name
    - rights
    - network

Overwriting file.yaml


In [7]:
# Read config file
import testutility as util
config_data = util.read_config_file("file.yaml")

In [8]:
config_data['inbound_delimiter']

','

In [9]:
#inspecting data of config file
config_data

{'file_type': 'csv',
 'dataset_name': 'testfile',
 'file_name': 'test_data',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['City name', 'rights', 'network']}

In [10]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [31]:
from dask import dataframe as dd
dask_df = dd.read_csv('/content/drive/MyDrive/dataset/OlympicTV.csv')
print("Read csv with dask: ")

Read csv with dask: 


In [None]:
!pip install modin

In [None]:
!pip install ray

Read the data with Modin and Ray

In [34]:
import modin.pandas as pd
import ray
ray.shutdown()
ray.init()
df = pd.read_csv('/content/drive/MyDrive/dataset/OlympicTV.csv')
print("Read csv with modin and ray: ")

2023-06-21 19:05:33,892	INFO worker.py:1636 -- Started a local Ray instance.


Read csv with modin and ray: 


In [36]:
#No. of Rows
len(df.index)

10

In [37]:
#No, of Columns
len(df.columns)

3

In [38]:
# remove special character
df.columns=df.columns.str.replace('[#,@,&]','')



In [39]:
#To remove white space from columns
df.columns = df.columns.str.replace(' ', '')

In [40]:
data=df.columns
data

Index(['Cityname', 'rights', 'network'], dtype='object')

In [11]:
# Normal reading process of the file
import pandas as pd
df_sample = pd.read_csv("/content/drive/MyDrive/dataset/OlympicTV.csv",delimiter=',')
df_sample.head()

Unnamed: 0,City name,rights,network
0,Rome,0.394,CBS
1,Tokyo,1.5,NBC
2,Mexico City,4.5,ABC
3,Munich,7.5,ABC
4,Montreal,25.0,ABC


In [12]:
# read the file using config file
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
#print("",source_file)
df = pd.read_csv('/content/drive/MyDrive/dataset/OlympicTV.csv',config_data['inbound_delimiter'])
df.head()

  df = pd.read_csv('/content/drive/MyDrive/dataset/OlympicTV.csv',config_data['inbound_delimiter'])


Unnamed: 0,City name,rights,network
0,Rome,0.394,CBS
1,Tokyo,1.5,NBC
2,Mexico City,4.5,ABC
3,Munich,7.5,ABC
4,Montreal,25.0,ABC


In [13]:
#validate the header of the file
util.col_header_val(df,config_data)

column name and column length validation failed
Following File columns are not in the YAML file ['city_name']
Following YAML columns are not in the file uploaded ['city name']


0

In [14]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['city_name', 'rights', 'network'], dtype='object')
columns of YAML are: ['City name', 'rights', 'network']


In [15]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action
    # in the pipleine

column name and column length validation failed
Following File columns are not in the YAML file ['city_name']
Following YAML columns are not in the file uploaded ['city name']
validation failed


In [16]:
pd.read_csv("/content/drive/MyDrive/dataset/OlympicTV.csv")

Unnamed: 0,City name,rights,network
0,Rome,0.394,CBS
1,Tokyo,1.5,NBC
2,Mexico City,4.5,ABC
3,Munich,7.5,ABC
4,Montreal,25.0,ABC
5,Moscow,87.0,NBC
6,Los Angeles,225.0,ABC
7,Seoul,300.0,NBC
8,Barcelona,401.0,NBC
9,Atlanta,456.0,NBC


In [17]:
df

Unnamed: 0,city_name,rights,network
0,Rome,0.394,CBS
1,Tokyo,1.5,NBC
2,Mexico City,4.5,ABC
3,Munich,7.5,ABC
4,Montreal,25.0,ABC
5,Moscow,87.0,NBC
6,Los Angeles,225.0,ABC
7,Seoul,300.0,NBC
8,Barcelona,401.0,NBC
9,Atlanta,456.0,NBC


In [18]:
### Creating test file for this demo:
testdata = {
    'City name' : ['Rome', 'Tokyo', 'Mexico City','Munich','Montreal','Moscow','Los Angeles','Seoul','Barcelona', 'Atlanta'],
    'rights' : [0.394,1.5,4.5,7.5,25,87,225,300,401,456],
    'network' : ['CBS','NBC','ABC','ABC','ABC','NBC','ABC','NBC','NBC','NBC']
}
import pandas as pd
df = pd.DataFrame(testdata, columns=['City name', 'rights','network'])
df.to_csv("/content/drive/MyDrive/dataset/OlympicTV.csv",index=False)

In [19]:
df

Unnamed: 0,City name,rights,network
0,Rome,0.394,CBS
1,Tokyo,1.5,NBC
2,Mexico City,4.5,ABC
3,Munich,7.5,ABC
4,Montreal,25.0,ABC
5,Moscow,87.0,NBC
6,Los Angeles,225.0,ABC
7,Seoul,300.0,NBC
8,Barcelona,401.0,NBC
9,Atlanta,456.0,NBC


In [20]:
testdata


{'City name': ['Rome',
  'Tokyo',
  'Mexico City',
  'Munich',
  'Montreal',
  'Moscow',
  'Los Angeles',
  'Seoul',
  'Barcelona',
  'Atlanta'],
 'rights': [0.394, 1.5, 4.5, 7.5, 25, 87, 225, 300, 401, 456],
 'network': ['CBS',
  'NBC',
  'ABC',
  'ABC',
  'ABC',
  'NBC',
  'ABC',
  'NBC',
  'NBC',
  'NBC']}

In [23]:
import datetime
import csv
import gzip

from dask import dataframe as dd
df = dd.read_csv('/content/drive/MyDrive/dataset/OlympicTV.csv',delimiter=',')

# Write csv in gz format in pipe separated text file (|)
df.to_csv('OlympicTV.csv.gz',
          sep='|',
          header=True,
          index=False,
          quoting=csv.QUOTE_ALL,
          compression='gzip',
          quotechar='"',
          doublequote=True,
          line_terminator='\n')

  df.to_csv(f, **kwargs)


['/content/OlympicTV.csv.gz/0.part']

In [25]:
import os
entries = os.listdir('/content/OlympicTV.csv.gz/')
for entry in entries:
    print(entry)

0.part


In [35]:
#size of the gz format folder
os.path.getsize('/content/OlympicTV.csv.gz/')


4096