<a href="https://colab.research.google.com/github/rgs8890/Data-Glacier-Internship/blob/main/Week%206/Data_Glacier_Week_6_File_Ingestion_%26_Schema_Validation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# File Ingestion and Schema Validation

In [98]:
!pip install utility

Collecting utility
  Downloading utility-1.0.tar.gz (3.1 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: utility
  Building wheel for utility (setup.py) ... [?25l[?25hdone
  Created wheel for utility: filename=utility-1.0-py3-none-any.whl size=3804 sha256=ee67d852fb06c6d3c6dc9efaddae899fbb63625f29585a655fa4db0c8c0bddf1
  Stored in directory: /root/.cache/pip/wheels/e5/f5/50/6624c9be2f958eca2fa2dc9fc635dedf9b1ec5b8ddbb46bb02
Successfully built utility
Installing collected packages: utility
Successfully installed utility-1.0


In [113]:
import pandas as pd

import time
import os
import logging
import subprocess
import yaml
import gc
import re
import gzip
import datetime

from dask import dataframe as dd
import modin as mpd
import ray

In [62]:
print("Modin version:", mpd.__version__)
print("Pandas version:", pd.__version__)

Modin version: 0.24.1
Pandas version: 1.5.3


## Importing the data into python using Dask, Pandas, Ray and Modin

In [63]:
start_pandas = time.time()
gfp_data_pandas = pd.read_csv(r"/content/global_food_prices.csv")
end_pandas = time.time()
time_pandas = end_pandas - start_pandas
print("Read the csv with pandas: ",time_pandas, "sec")



Read the csv with pandas:  7.172436475753784 sec


In [64]:
gfp_data_pandas

Unnamed: 0,adm0_id,adm0_name,adm1_id,adm1_name,mkt_id,mkt_name,cm_id,cm_name,cur_id,cur_name,pt_id,pt_name,um_id,um_name,mp_month,mp_year,mp_price,mp_commoditysource
0,1.0,Afghanistan,272,Badakhshan,266,Fayzabad,55,Bread - Retail,0.0,AFN,15,Retail,5,KG,1,2014,50.0000,
1,1.0,Afghanistan,272,Badakhshan,266,Fayzabad,55,Bread - Retail,0.0,AFN,15,Retail,5,KG,2,2014,50.0000,
2,1.0,Afghanistan,272,Badakhshan,266,Fayzabad,55,Bread - Retail,0.0,AFN,15,Retail,5,KG,3,2014,50.0000,
3,1.0,Afghanistan,272,Badakhshan,266,Fayzabad,55,Bread - Retail,0.0,AFN,15,Retail,5,KG,4,2014,50.0000,
4,1.0,Afghanistan,272,Badakhshan,266,Fayzabad,55,Bread - Retail,0.0,AFN,15,Retail,5,KG,5,2014,50.0000,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2050633,271.0,Zimbabwe,3444,Midlands,5594,Mbilashaba,432,Beans (sugar) - Retail,0.0,ZWL,15,Retail,5,KG,6,2021,233.3333,
2050634,271.0,Zimbabwe,3444,Midlands,5594,Mbilashaba,539,Toothpaste - Retail,0.0,ZWL,15,Retail,116,100 ML,6,2021,112.5000,
2050635,271.0,Zimbabwe,3444,Midlands,5594,Mbilashaba,540,Laundry soap - Retail,0.0,ZWL,15,Retail,5,KG,6,2021,114.0000,
2050636,271.0,Zimbabwe,3444,Midlands,5594,Mbilashaba,541,Handwash soap - Retail,0.0,ZWL,15,Retail,66,250 G,6,2021,59.5000,


In [65]:
%time gfp_data_pandas = pd.read_csv(r"/content/global_food_prices.csv")



CPU times: user 3.89 s, sys: 729 ms, total: 4.62 s
Wall time: 4.7 s


In [66]:
ray.shutdown()
ray.init()
start_ray = time.time()
%time gfp_data_ray = pd.read_csv(r"/content/global_food_prices.csv")
end_ray = time.time()
time_ray = end_ray - start_ray
print("Read csv with ray: ",time_ray,"sec")

2023-10-17 19:11:13,637	INFO worker.py:1642 -- Started a local Ray instance.


CPU times: user 3.39 s, sys: 1.14 s, total: 4.53 s
Wall time: 4.6 s
Read csv with ray:  4.6022233963012695 sec


In [67]:
start_dask = time.time()
%time gfp_data_dask = dd.read_csv(r"/content/global_food_prices.csv")
end_dask = time.time()
time_dask = end_dask - start_dask
print("Read the csv with dask: ",time_dask, "sec")


CPU times: user 10.6 ms, sys: 1.77 ms, total: 12.4 ms
Wall time: 13.2 ms
Read the csv with dask:  0.013916492462158203 sec


Dask is much quicker than pandas and modin+ray, with a speed time of 0.012 seconds.

## Comparison of Download times with each Library

In [68]:
computational_efficiency = {
    "Pandas": time_pandas,
    "Ray": time_ray,
    "Dask": time_dask
}

computational_efficiency_data = pd.DataFrame(computational_efficiency.items(), columns=["Method", "Efficiency"])

In [69]:
computational_efficiency_data

Unnamed: 0,Method,Efficiency
0,Pandas,7.172436
1,Ray,4.602223
2,Dask,0.013916


## Observing the Data-Frame

In [70]:
def data_inspection(df):
    null = df.isnull().sum()
    duplicate = df.duplicated().sum()
    percentage_null = 100*(null/len(df))
    data_type = df.dtypes
    unique = df.nunique()
    return pd.DataFrame({"Duplicated Values": duplicate,
                         "Null Values": null,
                         "Percentage of Null Values": percentage_null,
                         "Unique Values": unique,
                         "Data Type": data_type})

In [71]:
data_inspection(gfp_data_pandas)

Unnamed: 0,Duplicated Values,Null Values,Percentage of Null Values,Unique Values,Data Type
adm0_id,0,0,0.0,98,float64
adm0_name,0,0,0.0,98,object
adm1_id,0,0,0.0,894,int64
adm1_name,0,611016,29.796385,617,object
mkt_id,0,0,0.0,3266,int64
mkt_name,0,0,0.0,3235,object
cm_id,0,0,0.0,636,int64
cm_name,0,0,0.0,838,object
cur_id,0,0,0.0,1,float64
cur_name,0,0,0.0,84,object


In [72]:
gfp_data_pandas

Unnamed: 0,adm0_id,adm0_name,adm1_id,adm1_name,mkt_id,mkt_name,cm_id,cm_name,cur_id,cur_name,pt_id,pt_name,um_id,um_name,mp_month,mp_year,mp_price,mp_commoditysource
0,1.0,Afghanistan,272,Badakhshan,266,Fayzabad,55,Bread - Retail,0.0,AFN,15,Retail,5,KG,1,2014,50.0000,
1,1.0,Afghanistan,272,Badakhshan,266,Fayzabad,55,Bread - Retail,0.0,AFN,15,Retail,5,KG,2,2014,50.0000,
2,1.0,Afghanistan,272,Badakhshan,266,Fayzabad,55,Bread - Retail,0.0,AFN,15,Retail,5,KG,3,2014,50.0000,
3,1.0,Afghanistan,272,Badakhshan,266,Fayzabad,55,Bread - Retail,0.0,AFN,15,Retail,5,KG,4,2014,50.0000,
4,1.0,Afghanistan,272,Badakhshan,266,Fayzabad,55,Bread - Retail,0.0,AFN,15,Retail,5,KG,5,2014,50.0000,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2050633,271.0,Zimbabwe,3444,Midlands,5594,Mbilashaba,432,Beans (sugar) - Retail,0.0,ZWL,15,Retail,5,KG,6,2021,233.3333,
2050634,271.0,Zimbabwe,3444,Midlands,5594,Mbilashaba,539,Toothpaste - Retail,0.0,ZWL,15,Retail,116,100 ML,6,2021,112.5000,
2050635,271.0,Zimbabwe,3444,Midlands,5594,Mbilashaba,540,Laundry soap - Retail,0.0,ZWL,15,Retail,5,KG,6,2021,114.0000,
2050636,271.0,Zimbabwe,3444,Midlands,5594,Mbilashaba,541,Handwash soap - Retail,0.0,ZWL,15,Retail,66,250 G,6,2021,59.5000,


## Data Pre-Processing

In [73]:
gfp_data_ray
gfp_data_pandas
gfp_data_dask

Unnamed: 0_level_0,adm0_id,adm0_name,adm1_id,adm1_name,mkt_id,mkt_name,cm_id,cm_name,cur_id,cur_name,pt_id,pt_name,um_id,um_name,mp_month,mp_year,mp_price,mp_commoditysource
npartitions=3,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1
,float64,object,int64,object,int64,object,int64,object,float64,object,int64,object,int64,object,int64,int64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [74]:
gfp_data_pandas

Unnamed: 0,adm0_id,adm0_name,adm1_id,adm1_name,mkt_id,mkt_name,cm_id,cm_name,cur_id,cur_name,pt_id,pt_name,um_id,um_name,mp_month,mp_year,mp_price,mp_commoditysource
0,1.0,Afghanistan,272,Badakhshan,266,Fayzabad,55,Bread - Retail,0.0,AFN,15,Retail,5,KG,1,2014,50.0000,
1,1.0,Afghanistan,272,Badakhshan,266,Fayzabad,55,Bread - Retail,0.0,AFN,15,Retail,5,KG,2,2014,50.0000,
2,1.0,Afghanistan,272,Badakhshan,266,Fayzabad,55,Bread - Retail,0.0,AFN,15,Retail,5,KG,3,2014,50.0000,
3,1.0,Afghanistan,272,Badakhshan,266,Fayzabad,55,Bread - Retail,0.0,AFN,15,Retail,5,KG,4,2014,50.0000,
4,1.0,Afghanistan,272,Badakhshan,266,Fayzabad,55,Bread - Retail,0.0,AFN,15,Retail,5,KG,5,2014,50.0000,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2050633,271.0,Zimbabwe,3444,Midlands,5594,Mbilashaba,432,Beans (sugar) - Retail,0.0,ZWL,15,Retail,5,KG,6,2021,233.3333,
2050634,271.0,Zimbabwe,3444,Midlands,5594,Mbilashaba,539,Toothpaste - Retail,0.0,ZWL,15,Retail,116,100 ML,6,2021,112.5000,
2050635,271.0,Zimbabwe,3444,Midlands,5594,Mbilashaba,540,Laundry soap - Retail,0.0,ZWL,15,Retail,5,KG,6,2021,114.0000,
2050636,271.0,Zimbabwe,3444,Midlands,5594,Mbilashaba,541,Handwash soap - Retail,0.0,ZWL,15,Retail,66,250 G,6,2021,59.5000,


In [75]:
gfp_data_pandas.dropna(thresh=len(gfp_data_pandas) * 0.3, axis=1, inplace=True)
gfp_data_ray.dropna(thresh=len(gfp_data_ray) * 0.3, axis=1, inplace=True)

In [77]:
gfp_data_pandas.columns = gfp_data_pandas.columns.str.replace('[_01]', '', regex=True).str.strip()
gfp_data_ray.columns = gfp_data_ray.columns.str.replace('[_01]', '', regex=True).str.strip()

In [78]:
gfp_data_pandas = gfp_data_pandas.loc[:, gfp_data_pandas.nunique() > 1]
gfp_data_ray = gfp_data_ray.loc[:, gfp_data_ray.nunique() > 1]

In [86]:
gfp_data_pandas

Unnamed: 0,Country_ID,Country,Country_ID.1,Country.1,City_ID,City_Name,Food_ID,Food_Name,Country_Code,Sector_ID,Sector_Name,Weight_ID,Weight_Name,Purchase_Month,Purchase_Year,Purchase_Price
0,1.0,Afghanistan,272,Badakhshan,266,Fayzabad,55,Bread - Retail,AFN,15,Retail,5,KG,1,2014,50.0000
1,1.0,Afghanistan,272,Badakhshan,266,Fayzabad,55,Bread - Retail,AFN,15,Retail,5,KG,2,2014,50.0000
2,1.0,Afghanistan,272,Badakhshan,266,Fayzabad,55,Bread - Retail,AFN,15,Retail,5,KG,3,2014,50.0000
3,1.0,Afghanistan,272,Badakhshan,266,Fayzabad,55,Bread - Retail,AFN,15,Retail,5,KG,4,2014,50.0000
4,1.0,Afghanistan,272,Badakhshan,266,Fayzabad,55,Bread - Retail,AFN,15,Retail,5,KG,5,2014,50.0000
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2050633,271.0,Zimbabwe,3444,Midlands,5594,Mbilashaba,432,Beans (sugar) - Retail,ZWL,15,Retail,5,KG,6,2021,233.3333
2050634,271.0,Zimbabwe,3444,Midlands,5594,Mbilashaba,539,Toothpaste - Retail,ZWL,15,Retail,116,100 ML,6,2021,112.5000
2050635,271.0,Zimbabwe,3444,Midlands,5594,Mbilashaba,540,Laundry soap - Retail,ZWL,15,Retail,5,KG,6,2021,114.0000
2050636,271.0,Zimbabwe,3444,Midlands,5594,Mbilashaba,541,Handwash soap - Retail,ZWL,15,Retail,66,250 G,6,2021,59.5000


In [80]:
gfp_data_pandas = gfp_data_pandas.rename(columns = {"admid": "Country_ID",
                                                    "admname": "Country",
                                                    "mktid": "City_ID",
                                                    "mktname": "City_Name",
                                                    "cmid": "Food_ID",
                                                    "cmname": "Food_Name",
                                                    "curname": "Country_Code",
                                                    "ptid": "Sector_ID",
                                                    "ptname": "Sector_Name",
                                                    "umid": "Weight_ID",
                                                    "umname": "Weight_Name",
                                                    "mpmonth": "Purchase_Month",
                                                    "mpyear": "Purchase_Year",
                                                    "mpprice": "Purchase_Price"})

gfp_data_ray = gfp_data_ray.rename(columns = {"admid": "Country_ID",
                                              "admname": "Country",
                                              "mktid": "City_ID",
                                              "mktname": "City_Name",
                                              "cmid": "Food_ID",
                                              "cmname": "Food_Name",
                                              "curname": "Country_Code",
                                              "ptid": "Sector_ID",
                                              "ptname": "Sector_Name",
                                              "umid": "Weight_ID",
                                              "umname": "Weight_Name",
                                              "mpmonth": "Purchase_Month",
                                              "mpyear": "Purchase_Year",
                                              "mpprice": "Purchase_Price"})


In [55]:
data_inspection(gfp_data_pandas)

Unnamed: 0,Duplicated Values,Null Values,Percentage of Null Values,Unique Values,Data Type
CountryID,0,0,0.0,98,float64
Country,0,0,0.0,98,object
CountryID,0,0,0.0,894,int64
Country,0,611016,29.796385,617,object
MarketID,0,0,0.0,3266,int64
MarketName,0,0,0.0,3235,object
FoodID,0,0,0.0,636,int64
FoodName,0,0,0.0,838,object
curid,0,0,0.0,1,float64
CountryName,0,0,0.0,84,object


## Data Validation Functions

In [111]:
def read_parse_file(filepath):
    with open(filepath, "r") as yaml_file:
        try:
            return yaml.load(yaml_file, Loader=yaml.FullLoader)
        except yaml.YAMLError as exc:
            logging.error(exc)

def write_file(df, filepath):
    column_names = df.columns.tolist()
    with open(filepath, "w") as yaml_file:
        try:
            yaml.dump(column_names, yaml_file, default_style='"', default_flow_style=False)
        except yaml.YAMLError as exc:
            logging.error(exc)

def validate_file(df, yaml_file_path):
    with open(yaml_file_path, 'r') as yaml_file:
        expected_column_names = yaml.safe_load(yaml_file)

    if len(df.columns) == len(expected_column_names):
        print("The number of columns is the same.")
    else:
        print("The number of columns is not the same.")

    if all(col in df.columns for col in expected_column_names):
        print("Column names match.")
        return 0
    else:
        print("Column names do not match.")
        return 1

def column_header_validation(df, yaml_data):
  df = df.dropna(thresh=len(df) * 0.3, axis=1)
  df.columns = df.columns.str.replace('[_01]', '', regex=True).str.strip()
  df.columns = df.columns.map(lambda x: x.lower())
  expected_col = list(map(lambda x: x.lower(), yaml_data.columns))
  df.columns = list(df.columns)
  if len(expected_col) == len(df.columns) and list(expected_col) == list(df.columns):
    print("The columns match in the dataframe and the yaml_file. Success!")
    return 1
  else:
    print("The columns do not match in the dataframe and the yaml_file. Failure!")
    Mismatched_Column_Files = list(set(df.columns).difference(expected_col))
    print("Following columns are not in the original file:", Mismatched_Column_Files)
    Mismatched_YAML_Files = list(set(expected_col).difference(df.columns))
    print("Following columns are not within the YAML file: ", Mismatched_YAML_Files)
    logging.info(f'Original columns: {df.columns}')
    logging.info(f'YAML columns: {expected_col}')
    return 0

### Writing and Checking the Yaml File

In [105]:
%%writefile global_food_yaml
file_type: csv
dataset_name: file
file_name: global_food_prices
table_name: global_food_prices_table
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns:
    - admid
    - admname
    - mktid
    - mktname
    - cmid
    - cmname
    - curname
    - ptid
    - ptname
    - umid
    - umname
    - mpmonth
    - mpyear
    - mpprice

Overwriting global_food_yaml


In [106]:
yaml_data = read_parse_file("/content/global_food_yaml")
yaml_data

{'file_type': 'csv',
 'dataset_name': 'file',
 'file_name': 'global_food_prices',
 'table_name': 'global_food_prices_table',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['admid',
  'admname',
  'mktid',
  'mktname',
  'cmid',
  'cmname',
  'curname',
  'ptid',
  'ptname',
  'umid',
  'umname',
  'mpmonth',
  'mpyear',
  'mpprice']}

In [107]:
# Reading the file using the configuration file method
file_type = yaml_data["file_type"]
source_file = "/content/" + yaml_data["file_name"] + f'.{file_type}'
print(source_file)

/content/global_food_prices.csv


In [109]:
yaml_df = pd.read_csv(source_file, yaml_data["inbound_delimiter"])
yaml_df



Unnamed: 0,adm0_id,adm0_name,adm1_id,adm1_name,mkt_id,mkt_name,cm_id,cm_name,cur_id,cur_name,pt_id,pt_name,um_id,um_name,mp_month,mp_year,mp_price,mp_commoditysource
0,1.0,Afghanistan,272,Badakhshan,266,Fayzabad,55,Bread - Retail,0.0,AFN,15,Retail,5,KG,1,2014,50.0000,
1,1.0,Afghanistan,272,Badakhshan,266,Fayzabad,55,Bread - Retail,0.0,AFN,15,Retail,5,KG,2,2014,50.0000,
2,1.0,Afghanistan,272,Badakhshan,266,Fayzabad,55,Bread - Retail,0.0,AFN,15,Retail,5,KG,3,2014,50.0000,
3,1.0,Afghanistan,272,Badakhshan,266,Fayzabad,55,Bread - Retail,0.0,AFN,15,Retail,5,KG,4,2014,50.0000,
4,1.0,Afghanistan,272,Badakhshan,266,Fayzabad,55,Bread - Retail,0.0,AFN,15,Retail,5,KG,5,2014,50.0000,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2050633,271.0,Zimbabwe,3444,Midlands,5594,Mbilashaba,432,Beans (sugar) - Retail,0.0,ZWL,15,Retail,5,KG,6,2021,233.3333,
2050634,271.0,Zimbabwe,3444,Midlands,5594,Mbilashaba,539,Toothpaste - Retail,0.0,ZWL,15,Retail,116,100 ML,6,2021,112.5000,
2050635,271.0,Zimbabwe,3444,Midlands,5594,Mbilashaba,540,Laundry soap - Retail,0.0,ZWL,15,Retail,5,KG,6,2021,114.0000,
2050636,271.0,Zimbabwe,3444,Midlands,5594,Mbilashaba,541,Handwash soap - Retail,0.0,ZWL,15,Retail,66,250 G,6,2021,59.5000,


In [112]:
x = column_header_validation(gfp_data_pandas, yaml_df)

The columns do not match in the dataframe and the yaml_file. Failure!
Following columns are not in the original file: ['cityid', 'country', 'weightid', 'foodname', 'purchaseprice', 'weightname', 'foodid', 'cityname', 'countrycode', 'countryid', 'purchasemonth', 'sectorid', 'sectorname', 'purchaseyear']
Following columns are not within the YAML file:  ['cm_id', 'adm0_id', 'pt_name', 'cur_name', 'mp_price', 'pt_id', 'mp_month', 'mkt_id', 'mp_commoditysource', 'cur_id', 'adm1_name', 'um_name', 'um_id', 'adm0_name', 'mkt_name', 'mp_year', 'adm1_id', 'cm_name']


### Saving the Python File as a Zipped File

In [87]:
# Writing the file in GZ Format
gfp_data_pandas.to_csv("global_food_pricesgz.csv.gz", sep = "|", compression = "gzip", index = False)

### Summary of the File

In [114]:
file_size_mb = os.path.getsize("/content/global_food_pricesgz.csv.gz") / (1024 * 1024)
file_size_bytes = os.path.getsize("/content/global_food_pricesgz.csv.gz")
total_rows = len(gfp_data_pandas)
total_columns = len(gfp_data_pandas.columns)
print("Total number of rows:", total_rows)
print("Total number of columns:", total_columns)
print("File size (bytes):", file_size_bytes)
print("File size (megabytes)", file_size_mb)

Total number of rows: 2050638
Total number of columns: 16
File size (bytes): 11231545
File size (megabytes) 10.711236000061035


Other Way of Using YAML

In [116]:
import yaml

schema = {
    'separator': '|',
    'columns': list(gfp_data_pandas.columns)
}

with open('schema.yaml', 'w') as yaml_file:
    yaml.dump(schema, yaml_file, default_flow_style=False)

In [120]:
# Validating this Schema
with open('schema.yaml', 'r') as yaml_file:
    schema = yaml.load(yaml_file, Loader=yaml.FullLoader)

if len(gfp_data_pandas.columns) != len(schema['columns']):
    print("Number of columns in the file does not match the schema.")

if not all(col in gfp_data_pandas.columns for col in schema['columns']):
    print("Column names in the file do not match the schema.")