In [1]:
!pip install modin

Defaulting to user installation because normal site-packages is not writeable


In [2]:
!pip install dask

Defaulting to user installation because normal site-packages is not writeable


In [3]:
!pip install ray

Defaulting to user installation because normal site-packages is not writeable


In [4]:
pip install pyarrow

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [5]:
# Pandas 
import pandas as pd
# Modin
import modin
import modin.pandas as mpd
import ray
## Dask CPU
import dask.dataframe as dd

In [None]:
%%timeit
df = pd.read_csv("large_example_file.csv")

In [None]:
%%timeit
ddf = dd.read_csv("large_example_file.csv")

In [None]:
# Modin with Dask
import os
os.environ["MODIN_ENGINE"] = "dask"  # Modin will use Dask
from distributed import Client
client = Client(memory_limit='8GB')
import modin.pandas as dask_pd
%time  mdask_df = dask_pd.read_csv("large_example_file.csv")

***Clearly Dask outperformed Pandas by a big margin. It took around 1 minute for pandas to read the big file while it took less than 1 second for dask to do the same.***

### Utility file
### Config file 
### Data Ingestion Pipeline

In [None]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml 
import pandas as pd
import datetime
import gc
import re

#################
# File Reading #
#################

def read_config_file(filepath):
  with open(filepath, "r") as stream:
    try:
      return yaml.safe_load(stream)
    except yaml.YAMLError as exc:
      logging.error(exc)

def replacer(string, char):
  pattern = char + "{2,}"
  string = re.sub(pattern , char, string)
  return string

def col_header_val(df, table_config):
  '''
  replace whitespaces in the column 
  and standardized column names
  '''
  df.columns = df.columns.str.lower()
  df.columns = df.columns.str.replace('[^w]', '_', regex=True)
  df.columns = list(map(lambda x: x.strip("_"), list(df.columns)))
  df.columns = list(map(lambda x: replacer(x, "_"), list(df.columns)))
  expected_col = list(map(lambda x: x.lower(), table_config["columns"]))
  expected_col.sort()
  df.columns = list(map(lambda x: x.lower(), list(df.columns)))
  df = df.reindex(sorted(df.columns), axis=1)
  if len(df.columns) == len(expected_col) and list(expected_col) == list(df.columns): 
    print("column name and column length validation passed")
    return 1
  else: 
    print("column name and columns length validation failed")
    mismatched_columns_file = list(set(df.columns).difference(expected_col))
    print("Following File columns are not in the YAML file", mismatched_columns_file)
    missing_YAML_file = list(set(expected_col).difference(df.columns))
    print("Following YAML columns are not in the file uploaded", missing_YAML_file)
    logging.info(f'expected columns: {expected_col}')
    return 0

## Write YAML File

In [None]:
%%writefile file.yaml
file_type: csv
dataset_name: testfile
file_name: large_example_file
table_name: edsurv
inbound_delimeter: ","
outbound_delimeter: "|"
skip_leading_rows: 1
columns:
  - ""
  - ""
  - ""
  - ""
  - ""
  - ""
  - ""
  - ""

In [None]:
# Read Config file
import testutility as util
config_data = util.read_config_file("file.yaml")

In [None]:
config_data["file_name"]

In [None]:
config_data

In [None]:
# read the file using config file
file_type = config_data["file_type"]
source_file = "./" + config_data["file_name"] + f".{file_type}"
# print("", source_file)
df = pd.read_csv(source_file, config_data["inbound_delimeter"])
df = df.reset_index(drop=True)

In [None]:
df.shape

In [None]:
# validate the header of the file
util.col_header_val(df, config_data)

In [None]:
print("columns of the file are:", df.columns)
print("columns of the YAML are:", config_data["columns"])

In [None]:
if util.col_header_val(df, config_data) == 0:
  print("Validation Failed")
else:
  print("col validation passed")
  # Code to perform further action in the pipeline

In [None]:
def summary(df):
    rows = len(df)
    columns = len(df.columns)
    print(f"Number of Rows: {rows}")
    print(f"Number of Columns: {columns}")
    print("Size: 4.23 GB")

In [None]:
summary(df)

In [None]:
import gzip
input_file = "data_ingestion_pipeline.ipynb"
output_file = "data_ingestion_sample.gz"
with open(input_file, 'rb') as f_in, gzip.open(output_file, 'wb') as f_out:
    f_out.write(f_in.read())

In [None]:
import zipfile

list_files = ['file.yaml', 'testutility.py', 'data_ingestion_pipeline.ipynb']

with zipfile.ZipFile('final.zip', 'w') as zipF:
    for file in list_files:
        zipF.write(file, compress_type=zipfile.ZIP_DEFLATED)