<a href="https://colab.research.google.com/github/samson12193/Ingestion_and_pipeline/blob/main/ingestion_and_pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Data Ingestion & Pipeline

Task:
This project takes a csv file of 2+ GB, read the file using Dask, Modin, Ray, pandas and present findings in term of computational efficiency. It Performs basic validation on data columns: remove special character , white spaces from the column name.

It creates a YAML file and write the column name in YAML file and validates number of columns and column name of ingested file with YAML.

It writes the file in pipe separated text file (|) in gz format and creates a summary of the file: total number of rows, total number of columns and file size.

In [1]:
pip install ray



In [2]:
pip install modin



In [3]:
pip install --upgrade modin




In [4]:
pip install --upgrade pandas==2.2.*




In [5]:
pip install dask[dataframe]



# Load libraries

In [6]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [7]:
 # Importing libraries
import numpy as np
import pandas as pd
import modin.pandas as pdm
import time
import os
import io
from dask import dataframe as df1
import dask.dataframe as dd
import logging
import subprocess
import datetime
import gc
import re


# Mount Google Drive

# Initialize Google Path

In [8]:
#this is to initialise path for the parent folders after "MyDrive",
GOOGLE_DRIVE_PATH_AFTER_MYDRIVE  = os.path.join('/content/drive/MyDrive/Combined_Flights_2019.csv')
GOOGLE_DRIVE_PATH = os.path.join('gdrive', 'MyDrive', GOOGLE_DRIVE_PATH_AFTER_MYDRIVE)


# Data Ingestion

1. Dask

In [9]:
# Read the file using Dask
dask_time = time.time()
df_dask = dd.read_csv(GOOGLE_DRIVE_PATH)
e_time_dask = time.time()

# Calculate file size
file_stats = os.stat(GOOGLE_DRIVE_PATH)
print("File size in GB is {:.2f}".format(file_stats.st_size / (1024 * 1024 * 1024)))

# Data ingestion time
print("Time taken to read with dask: ", round(e_time_dask - dask_time, 3), "seconds")

# Display the first few rows of the DataFrame
display(df_dask.head(5))


File size in GB is 2.63
Time taken to read with dask:  0.034 seconds


Unnamed: 0,FlightDate,Airline,Origin,Dest,Cancelled,Diverted,CRSDepTime,DepTime,DepDelayMinutes,DepDelay,...,WheelsOff,WheelsOn,TaxiIn,CRSArrTime,ArrDelay,ArrDel15,ArrivalDelayGroups,ArrTimeBlk,DistanceGroup,DivAirportLandings
0,2019-04-01,Envoy Air,LIT,ORD,False,False,1212,1209.0,0.0,-3.0,...,1219.0,1342.0,8.0,1405,-15.0,0.0,-1.0,1400-1459,3,0
1,2019-04-02,Envoy Air,LIT,ORD,False,False,1212,1200.0,0.0,-12.0,...,1210.0,1339.0,9.0,1405,-17.0,0.0,-2.0,1400-1459,3,0
2,2019-04-03,Envoy Air,LIT,ORD,False,False,1212,1203.0,0.0,-9.0,...,1214.0,1336.0,6.0,1405,-23.0,0.0,-2.0,1400-1459,3,0
3,2019-04-04,Envoy Air,LIT,ORD,False,False,1212,1435.0,143.0,143.0,...,1452.0,1615.0,6.0,1405,136.0,1.0,9.0,1400-1459,3,0
4,2019-04-05,Envoy Air,LIT,ORD,False,False,1212,1216.0,4.0,4.0,...,1234.0,1357.0,13.0,1405,5.0,0.0,0.0,1400-1459,3,0


2. Pandas

In [10]:
# Record the start time before reading the CSV file with Pandas
s_time_pandas = time.time()

# Read the CSV file using Pandas' read_csv function
df_pandas = pd.read_csv(GOOGLE_DRIVE_PATH)

# Record the end time after reading the CSV file
e_time_pandas = time.time()

# Calculate and print the time taken
print("Time taken to read with pandas: ", round(e_time_pandas - s_time_pandas, 3), "seconds")


Time taken to read with pandas:  52.75 seconds


3. Modin


In [11]:
# Record the start time before reading the CSV file
s_time_modin = time.time()

# Read the CSV file using Modin's read_csv function
# GOOGLE_DRIVE_PATH should be the path to your CSV file on Google Drive
df_modin = pdm.read_csv(GOOGLE_DRIVE_PATH)

# Record the end time after reading the CSV file
e_time_modin = time.time()

# Calculate and print the time taken to read the file with Modin
# round() is used to limit the displayed time to three decimal places
print("Time taken to read with modin: ", round(e_time_modin - s_time_modin, 3), "seconds")

2024-05-17 16:58:59,055	INFO worker.py:1749 -- Started a local Ray instance.


Time taken to read with modin:  19.23 seconds


4. Ray

In [14]:
# Initialize Ray
import ray
ray.shutdown()
ray.init()  # This sets up the Ray runtime environment

# Record the start time before reading the CSV file with Ray
s_time_ray = time.time()

# Read the CSV file using Ray Datasets
ds = ray.data.read_csv(GOOGLE_DRIVE_PATH)

# Convert Ray Dataset to Pandas DataFrame
df_ray = ds.to_pandas()

# Record the end time after reading the CSV file
e_time_ray = time.time()

# Calculate and print the time taken
print("Time taken to read with Ray: ", round(e_time_ray - s_time_ray, 3), "seconds")


2024-05-17 17:00:34,010	INFO worker.py:1749 -- Started a local Ray instance.


Read progress 0:   0%|          | 0/1 [00:00<?, ?it/s]

Read progress 0:   0%|          | 0/1 [00:00<?, ?it/s]

Time taken to read with Ray:  50.489 seconds


Getting column list

In [15]:
df_pandas.columns.values.tolist()


['FlightDate',
 'Airline',
 'Origin',
 'Dest',
 'Cancelled',
 'Diverted',
 'CRSDepTime',
 'DepTime',
 'DepDelayMinutes',
 'DepDelay',
 'ArrTime',
 'ArrDelayMinutes',
 'AirTime',
 'CRSElapsedTime',
 'ActualElapsedTime',
 'Distance',
 'Year',
 'Quarter',
 'Month',
 'DayofMonth',
 'DayOfWeek',
 'Marketing_Airline_Network',
 'Operated_or_Branded_Code_Share_Partners',
 'DOT_ID_Marketing_Airline',
 'IATA_Code_Marketing_Airline',
 'Flight_Number_Marketing_Airline',
 'Operating_Airline',
 'DOT_ID_Operating_Airline',
 'IATA_Code_Operating_Airline',
 'Tail_Number',
 'Flight_Number_Operating_Airline',
 'OriginAirportID',
 'OriginAirportSeqID',
 'OriginCityMarketID',
 'OriginCityName',
 'OriginState',
 'OriginStateFips',
 'OriginStateName',
 'OriginWac',
 'DestAirportID',
 'DestAirportSeqID',
 'DestCityMarketID',
 'DestCityName',
 'DestState',
 'DestStateFips',
 'DestStateName',
 'DestWac',
 'DepDel15',
 'DepartureDelayGroups',
 'DepTimeBlk',
 'TaxiOut',
 'WheelsOff',
 'WheelsOn',
 'TaxiIn',
 'CRS

# Validation:
- Lower, remove whitespace and replace special characters by _ in the data file read.

- If the number of columns and their names from the YAML file is the same as that in the data file read, Validation passed.

- Else, point out the different columns in both files

In [16]:
%%writefile testutility.py

# Util.py file Reading function
# This script contains utility functions for reading configuration files
# and validating DataFrame columns.

import yaml  # Import the YAML library for parsing YAML files

def read_config_file(filepath):
    """
    Reads a YAML configuration file and returns the contents.

    Parameters:
    filepath (str): Path to the YAML file.

    Returns:
    dict: Contents of the YAML file as a dictionary, or None if an error occurs.
    """
    try:
        # Open and read the YAML file
        with open(filepath, 'r') as stream:
            return yaml.safe_load(stream)  # Parse and return the YAML file contents
    except yaml.YAMLError as exc:
        # Print an error message if a YAML parsing error occurs
        print(exc)
        return None
    except FileNotFoundError:
        # Print an error message if the file is not found
        print("File not found:", filepath)
        return None

# Using the read_config_file function
# This reads the configuration data from the specified YAML file
config_data = read_config_file("ingested_file_info.yaml")

def col_header_val(df, table_config):
    """
    Validates and standardizes the column names of a DataFrame.

    Parameters:
    df (pd.DataFrame): DataFrame whose columns are to be validated.
    table_config (dict): Configuration dictionary containing expected column names.

    Returns:
    int: 1 if validation passes, 0 if validation fails.
    """
    # Convert all column names to lowercase
    df.columns = df.columns.str.lower()

    # Replace non-alphanumeric characters with underscores
    df.columns = df.columns.str.replace('[^\w]', '_', regex=True)

    # Remove leading and trailing underscores from column names
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))

    # Get the list of expected column names from the configuration
    expected_col = list(map(lambda x: x.lower(), table_config['columns']))
    expected_col.sort()  # Sort the expected column names

    # Sort DataFrame columns for comparison
    df.columns = list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)

    # Check if the DataFrame columns match the expected columns
    if len(df.columns) == len(expected_col) and list(expected_col) == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")

        # Identify and print mismatched columns
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file:", mismatched_columns_file)

        # Identify and print missing columns
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded:", missing_YAML_file)

        # Log detailed information about the columns
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')

        return 0


Writing testutility.py


Prepare the YAML file that parmetrize the process of data ingestion



In [17]:
%%writefile ingested_file_info.yaml
file_type: csv
dataset_name: Combined_Flights_2019
file_name: Combined_Flights_2019
table_name: Combined_Flights_2019
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns:
  - FlightDate
  - Airline
  - Origin
  - Dest
  - Cancelled
  - Diverted
  - CRSDepTime
  - DepTime
  - DepDelayMinutes
  - DepDelay
  - ArrTime
  - ArrDelayMinutes
  - AirTime
  - CRSElapsedTime
  - ActualElapsedTime
  - Distance
  - Year
  - Quarter
  - Month
  - DayofMonth
  - DayOfWeek
  - Marketing_Airline_Network
  - Operated_or_Branded_Code_Share_Partners
  - DOT_ID_Marketing_Airline
  - IATA_Code_Marketing_Airline
  - Flight_Number_Marketing_Airline
  - Operating_Airline
  - DOT_ID_Operating_Airline
  - IATA_Code_Operating_Airline
  - Tail_Number
  - Flight_Number_Operating_Airline
  - OriginAirportID
  - OriginAirportSeqID
  - OriginCityMarketID
  - OriginCityName
  - OriginState
  - OriginStateFips
  - OriginStateName
  - OriginWac
  - DestAirportID
  - DestAirportSeqID
  - DestCityMarketID
  - DestCityName
  - DestState
  - DestStateFips
  - DestStateName
  - DestWac
  - DepDel15
  - DepartureDelayGroups
  - DepTimeBlk
  - TaxiOut
  - WheelsOff
  - WheelsOn
  - TaxiIn
  - CRSArrTime
  - ArrDelay
  - ArrDel15
  - ArrivalDelayGroups
  - ArrTimeBlk
  - DistanceGroup
  - DivAirportLandings

Writing ingested_file_info.yaml


In [18]:
# Read config file
import testutility as util
config_data = util.read_config_file("ingested_file_info.yaml")
config_data

{'file_type': 'csv',
 'dataset_name': 'Combined_Flights_2019',
 'file_name': 'Combined_Flights_2019',
 'table_name': 'Combined_Flights_2019',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['FlightDate',
  'Airline',
  'Origin',
  'Dest',
  'Cancelled',
  'Diverted',
  'CRSDepTime',
  'DepTime',
  'DepDelayMinutes',
  'DepDelay',
  'ArrTime',
  'ArrDelayMinutes',
  'AirTime',
  'CRSElapsedTime',
  'ActualElapsedTime',
  'Distance',
  'Year',
  'Quarter',
  'Month',
  'DayofMonth',
  'DayOfWeek',
  'Marketing_Airline_Network',
  'Operated_or_Branded_Code_Share_Partners',
  'DOT_ID_Marketing_Airline',
  'IATA_Code_Marketing_Airline',
  'Flight_Number_Marketing_Airline',
  'Operating_Airline',
  'DOT_ID_Operating_Airline',
  'IATA_Code_Operating_Airline',
  'Tail_Number',
  'Flight_Number_Operating_Airline',
  'OriginAirportID',
  'OriginAirportSeqID',
  'OriginCityMarketID',
  'OriginCityName',
  'OriginState',
  'OriginStateFips',
  'OriginSt

In [19]:
import os
# Google Drive path after MyDrive
GOOGLE_DRIVE_PATH_AFTER_MYDRIVE = 'Combined_Flights_2019.csv'

# Construct full Google Drive path
GOOGLE_DRIVE_PATH = '/content/drive/MyDrive'

# Read the file using the config file
file_type = config_data['file_type']
file_name = config_data['file_name']
delimiter = config_data['inbound_delimiter']

# Construct the full source file path
source_file = os.path.join(GOOGLE_DRIVE_PATH, GOOGLE_DRIVE_PATH_AFTER_MYDRIVE)

print("Source File Path:", source_file)

# Read the file into a DataFrame
df = pd.read_csv(source_file, delimiter=delimiter)
df.head(5)


Source File Path: /content/drive/MyDrive/Combined_Flights_2019.csv


Unnamed: 0,FlightDate,Airline,Origin,Dest,Cancelled,Diverted,CRSDepTime,DepTime,DepDelayMinutes,DepDelay,...,WheelsOff,WheelsOn,TaxiIn,CRSArrTime,ArrDelay,ArrDel15,ArrivalDelayGroups,ArrTimeBlk,DistanceGroup,DivAirportLandings
0,2019-04-01,Envoy Air,LIT,ORD,False,False,1212,1209.0,0.0,-3.0,...,1219.0,1342.0,8.0,1405,-15.0,0.0,-1.0,1400-1459,3,0
1,2019-04-02,Envoy Air,LIT,ORD,False,False,1212,1200.0,0.0,-12.0,...,1210.0,1339.0,9.0,1405,-17.0,0.0,-2.0,1400-1459,3,0
2,2019-04-03,Envoy Air,LIT,ORD,False,False,1212,1203.0,0.0,-9.0,...,1214.0,1336.0,6.0,1405,-23.0,0.0,-2.0,1400-1459,3,0
3,2019-04-04,Envoy Air,LIT,ORD,False,False,1212,1435.0,143.0,143.0,...,1452.0,1615.0,6.0,1405,136.0,1.0,9.0,1400-1459,3,0
4,2019-04-05,Envoy Air,LIT,ORD,False,False,1212,1216.0,4.0,4.0,...,1234.0,1357.0,13.0,1405,5.0,0.0,0.0,1400-1459,3,0


In [20]:
# validate the header of the file
import testutility as util
util.col_header_val(df,config_data)

column name and column length validation passed


1

In [21]:
print("columns of files are:" ,df.columns.tolist())
print("columns of YAML are:" ,config_data['columns'])

columns of files are: ['flightdate', 'airline', 'origin', 'dest', 'cancelled', 'diverted', 'crsdeptime', 'deptime', 'depdelayminutes', 'depdelay', 'arrtime', 'arrdelayminutes', 'airtime', 'crselapsedtime', 'actualelapsedtime', 'distance', 'year', 'quarter', 'month', 'dayofmonth', 'dayofweek', 'marketing_airline_network', 'operated_or_branded_code_share_partners', 'dot_id_marketing_airline', 'iata_code_marketing_airline', 'flight_number_marketing_airline', 'operating_airline', 'dot_id_operating_airline', 'iata_code_operating_airline', 'tail_number', 'flight_number_operating_airline', 'originairportid', 'originairportseqid', 'origincitymarketid', 'origincityname', 'originstate', 'originstatefips', 'originstatename', 'originwac', 'destairportid', 'destairportseqid', 'destcitymarketid', 'destcityname', 'deststate', 'deststatefips', 'deststatename', 'destwac', 'depdel15', 'departuredelaygroups', 'deptimeblk', 'taxiout', 'wheelsoff', 'wheelson', 'taxiin', 'crsarrtime', 'arrdelay', 'arrdel15'

In [22]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action in the pipleine

column name and column length validation passed
col validation passed


# Writing the data file to zipped (.gz) pipe delimited txt file.


In [23]:
df.to_csv('data.txt.gz', sep='|',index=False,compression='gzip')


# Summary of the data file


In [24]:
print("The size of the .gz file = {:.2f} MB \nThe size of original csv file = {:.2f} GB ".format(os.stat('data.txt.gz').st_size/(1024*1024),os.stat(source_file).st_size/(1024*1024*1024)))
print("The number of columns = {} \nThe number of rows = {}".format(len(df.columns.tolist()),len(df)))


The size of the .gz file = 384.77 MB 
The size of original csv file = 2.63 GB 
The number of columns = 61 
The number of rows = 8091684


#References:
1. https://datascience.stackexchange.com/questions/85590/how-do-i-transform-a-file-to-txt-file-using-pandas
2. https://www.geeksforgeeks.org/working-with-large-csv-files-in-python/
3. https://www.kaggle.com/code/robikscube/flight-cancellation-dataset-eda