# Objective: File Ingestion and Schema Validation

## Breakdown:

1. Take any csv/text file of 2+ GB **[✔]**

2. Read the file **[✔]**

3. Try different methods of file reading eg: Dask, Modin, Ray, pandas and present your findings in term of computational efficiency **[✔]**

4. Perform basic validation on data columns **[✔]**

5. Create a YAML file and write the column name in YAML file. --define separator of read and write file, column name in YAML **[✔]**

6. Validate number of columns and column name of ingested file with YAML **[✔]**

7. Write the file in pipe separated text file (|) in gz format **[✔]**

8. Create a summary of the file: **[✔]**
* Total number of rows,

* Total number of columns

* File size

In [1]:
!pip install --upgrade "dask[complete]"



In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Step 1: File Saving

## Due to RAM overflow, data size has 1/4 of the required size.

In [3]:
csv_file = '/content/drive/MyDrive/CPAT.csv'

In [4]:
!pip install --force-reinstall pandas

Collecting pandas
  Using cached pandas-2.0.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (12.3 MB)
Collecting python-dateutil>=2.8.2 (from pandas)
  Using cached python_dateutil-2.8.2-py2.py3-none-any.whl (247 kB)
Collecting pytz>=2020.1 (from pandas)
  Using cached pytz-2023.3-py2.py3-none-any.whl (502 kB)
Collecting tzdata>=2022.1 (from pandas)
  Using cached tzdata-2023.3-py2.py3-none-any.whl (341 kB)
Collecting numpy>=1.21.0 (from pandas)
  Using cached numpy-1.25.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (18.2 MB)
Collecting six>=1.5 (from python-dateutil>=2.8.2->pandas)
  Using cached six-1.16.0-py2.py3-none-any.whl (11 kB)
Installing collected packages: pytz, tzdata, six, numpy, python-dateutil, pandas
  Attempting uninstall: pytz
    Found existing installation: pytz 2023.3
    Uninstalling pytz-2023.3:
      Successfully uninstalled pytz-2023.3
  Attempting uninstall: tzdata
    Found existing installation: tzdata 2023.3
    Uninstalling tzda

In [5]:
!pip install modin --upgrade
!pip install ray



# Step 2 & 3: Reading and comparing computational efficiency

In [6]:
import time
import dask
import modin.pandas as m_pd
import dask.dataframe as dd
import pandas as pd
import numpy as np
import ray

# Dask
start_time = time.time()
df = dd.read_csv(csv_file)
end_time = time.time()
print("Computational Efficiency | Dask:", round(end_time - start_time,2), "seconds\n")

# Motin Ray
ray.init()
start_time = time.time()
df = m_pd.read_csv(csv_file)
end_time = time.time()
print("Computational Efficiency | Motin Ray:", round(end_time - start_time,2), "seconds\n")
ray.shutdown()

# Pandas
start_time = time.time()
df = pd.read_csv(csv_file)
end_time = time.time()
print("Computational Efficiency | Pandas:", round(end_time - start_time,2), "seconds\n")

Computational Efficiency | Dask: 0.04 seconds



2023-08-06 04:58:13,763	INFO worker.py:1621 -- Started a local Ray instance.


Computational Efficiency | Motin Ray: 15.31 seconds

Computational Efficiency | Pandas: 7.02 seconds



**Dask performs massively better than Modin/Ray and Pandas**

# Step 4: Perform basic validation on data columns

## Validation by own def

In [7]:
df['number'].fillna('Not found', inplace=True)
df['origin'].fillna('Not found', inplace=True)
df['destination'].fillna('Not found', inplace=True)
df['typecode'].fillna('Not found', inplace=True)
df

Unnamed: 0,callsign,number,aircraft_uid,typecode,origin,destination,firstseen,lastseen,day,latitude_1,longitude_1,altitude_1,latitude_2,longitude_2,altitude_2
0,CBJ431,Not found,e73950ce-16b4-4838-9828-f69c7c51668d,A332,YSSY,EGLL,2019-01-31 00:14:53+00:00,2019-02-01 19:38:23+00:00,2019-02-01 00:00:00+00:00,-33.932019,151.172276,0.0,51.477558,-0.488968,220.98
1,CCA839,Not found,9d8c6719-f6f6-486d-b725-cea288b0cb16,A332,YMML,Not found,2019-01-31 00:50:19+00:00,2019-02-01 06:53:38+00:00,2019-02-01 00:00:00+00:00,-37.690796,144.841997,304.8,41.352539,2.462158,1508.76
2,HVN37,Not found,0c72436b-a844-4f0d-bfe7-589414fdc6ff,B789,YSSY,EDDF,2019-01-31 01:11:43+00:00,2019-02-01 05:02:22+00:00,2019-02-01 00:00:00+00:00,-33.926376,151.170978,304.8,50.044258,8.539503,304.80
3,SIA322,Not found,9aa14f29-2d50-45a6-92de-1cd811b0970e,A388,YSSY,EGLL,2019-01-31 01:33:48+00:00,2019-02-01 05:44:51+00:00,2019-02-01 00:00:00+00:00,-33.929169,151.171819,0.0,51.464813,-0.483695,289.56
4,CSN461,Not found,3cdbc44e-0d86-4b9f-9c0d-6bb0e3d92f4b,B77L,KORD,EDDF,2019-01-31 02:13:26+00:00,2019-02-01 11:12:52+00:00,2019-02-01 00:00:00+00:00,41.969403,-87.938275,304.8,50.027985,8.536088,259.08
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2005953,,Not found,7934ed5f-78bb-4598-ba23-d1fe5db81dce,SR22,KHPN,KFRG,2019-02-28 23:46:07+00:00,2019-02-28 23:56:59+00:00,2019-02-28 00:00:00+00:00,41.066615,-73.706651,0.0,40.713364,-73.414124,-7.62
2005954,FZA6743,Not found,40611fae-dd10-468c-a5a7-c6e0230143e6,Not found,Not found,Not found,2019-02-28 23:46:39+00:00,2019-02-28 23:59:49+00:00,2019-02-28 00:00:00+00:00,36.288666,113.100643,6400.8,37.604233,112.667425,6004.56
2005955,NZM353,Not found,f44ef3bd-cfaa-4163-a08c-7bb9e1d3ec74,Not found,Not found,NZCH,2019-02-28 23:46:52+00:00,2019-02-28 23:57:55+00:00,2019-02-28 00:00:00+00:00,-43.198471,173.085895,3352.8,-43.511865,172.505624,45.72
2005956,PXT920,Not found,54a55dc3-c677-4936-ad75-a9c6abcdfead,Not found,KSJC,KOAK,2019-02-28 23:46:56+00:00,2019-02-28 23:57:09+00:00,2019-02-28 00:00:00+00:00,37.362919,-121.926926,0.0,37.726730,-122.210687,-60.96


In [8]:
import re

def data_validation(index):
    return re.sub(r'[^\w\s]', '', index).strip()

df_cleaned = df.applymap(lambda x: data_validation(x) if isinstance(x, str) else x)

## Validation by Data Glacier def

In [9]:
%%writefile validation_scripts.py

import logging
import os
import subprocess
import yaml
import datetime
import gc
import re

def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string)
    return string

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)

def col_header_val(df,table_config):
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Writing validation_scripts.py


# Step 5: Create a YAML file and write the column name in YAML file. --define separator of read and write file, column name in YAML

In [10]:
%%writefile table_config.yaml

file_type: csv
dataset_name: cpat
file_name: config_v1
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns:
    - callsign
    - number
    - aircraft_uid
    - typecode
    - origin
    - destination
    - firstseen
    - lastseen
    - day
    - latitude_1
    - longitude_1
    - altitude_1
    - latitude_2
    - longitude_2
    - altitude_2

Overwriting table_config.yaml


# Step 6: Validate number of columns and column name of ingested file with YAML

In [11]:
import validation_scripts as sc
table_config = sc.read_config_file("table_config.yaml")
sc.col_header_val(df,table_config)

column name and column length validation passed


1

# Step 7: Write the file in pipe separated text file (|) in gz format

In [12]:
df.to_csv('new_df.gz', sep='|', compression='gzip', index=False)

# Step 8: Create a summary of the file

In [17]:
import os
file_size = os.path.getsize('new_df.gz')

num_rows = df.shape[0]
num_columns = df.shape[1]

print(f"File Size: {file_size} bytes")
print(f"Number of Rows: {num_rows}")
print(f"Number of Columns: {num_columns}")

File Size: 175062119 bytes
Number of Rows: 2005958
Number of Columns: 15
