In [4]:
import os
import time
import pandas as pd
os.environ["MODIN_ENGINE"] = "dask"  # Modin will use Dask
import dask.dataframe as dd
import modin.pandas as mpd
import yaml

In [5]:
file_path = 'Combined_Flights_2021.csv'

# Method 1: Pandas
# Read the CSV file using Pandas and measure the time taken
start_time = time.time()
df_pandas = pd.read_csv(file_path)
end_time = time.time()
time_pandas = end_time - start_time
print("Time taken by Pandas:", time_pandas, "seconds")

# Method 2: Dask
# Read the CSV file using Dask and measure the time taken
start_time = time.time()
df_dask = dd.read_csv(file_path)
df_dask.compute()
end_time = time.time()
time_dask = end_time - start_time
print("Time taken by Dask:", time_dask, "seconds")

# Method 3: Modin
# Read the CSV file using Modin and measure the time taken
start_time = time.time()
df_modin = mpd.read_csv(file_path)
end_time = time.time()
time_modin = end_time - start_time
print("Time taken by Modin:", time_modin, "seconds")

# Conclusion: Dask showed the fastest computational speed

Time taken by Pandas: 36.29462122917175 seconds
Time taken by Dask: 24.996678829193115 seconds



    from distributed import Client

    client = Client()



Time taken by Modin: 40.94792652130127 seconds


In [6]:
# Perform basic validation on data columns

# Read the CSV file into a DataFrame
df = pd.read_csv(file_path)

# Remove special characters from column names
# Using regex, replace any characters that are not letters, numbers, or underscores with an underscore
df.columns = df.columns.str.replace('[^a-zA-Z0-9]+', '_')

# Remove leading and trailing whitespaces from column names and convert them to lowercase
df.columns = df.columns.str.strip().str.lower()

# Check for missing values in each column
# The `isnull()` function returns a DataFrame of the same shape as `df` with True for missing values and False for non-missing values
# The `sum()` function is then applied to the resulting DataFrame to calculate the total number of missing values in each column
missing_values = df.isnull().sum()

# Check for duplicates in the DataFrame
# The `duplicated()` function returns a boolean Series that indicates whether each row is duplicated or not
# By applying the function to `df`, it returns a DataFrame containing only the duplicated rows
duplicates = df[df.duplicated()]




In [7]:
# Get all columns of the DataFrame

# Convert the DataFrame columns to a list using the `tolist()` method
all_cols = df.columns.tolist()

# Print the columns
print(all_cols)

['flightdate', 'airline', 'origin', 'dest', 'cancelled', 'diverted', 'crsdeptime', 'deptime', 'depdelayminutes', 'depdelay', 'arrtime', 'arrdelayminutes', 'airtime', 'crselapsedtime', 'actualelapsedtime', 'distance', 'year', 'quarter', 'month', 'dayofmonth', 'dayofweek', 'marketing_airline_network', 'operated_or_branded_code_share_partners', 'dot_id_marketing_airline', 'iata_code_marketing_airline', 'flight_number_marketing_airline', 'operating_airline', 'dot_id_operating_airline', 'iata_code_operating_airline', 'tail_number', 'flight_number_operating_airline', 'originairportid', 'originairportseqid', 'origincitymarketid', 'origincityname', 'originstate', 'originstatefips', 'originstatename', 'originwac', 'destairportid', 'destairportseqid', 'destcitymarketid', 'destcityname', 'deststate', 'deststatefips', 'deststatename', 'destwac', 'depdel15', 'departuredelaygroups', 'deptimeblk', 'taxiout', 'wheelsoff', 'wheelson', 'taxiin', 'crsarrtime', 'arrdelay', 'arrdel15', 'arrivaldelaygroups'

In [8]:
# Validating the number of columns and column names of an ingested file with a YAML schema.

import yaml

# Load the YAML schema from the file
with open('file.yaml', 'r') as f:
    schema = yaml.safe_load(f)

# Check if 'columns' key is present in the schema
if 'columns' not in schema:
    print("Error: Invalid schema file. 'columns' key is missing.")
# Check if the value of 'columns' is a list
elif not isinstance(schema['columns'], list):
    print("Error: Invalid schema file. 'columns' value should be a list.")
else:
    # Check if the number of columns in the DataFrame matches the schema
    if len(df.columns) != len(schema['columns']):
        print("Error: Number of columns in the file doesn't match the schema.")
    else:
        # Compare each column name in the DataFrame with the corresponding column name in the schema
        for i, col in enumerate(df.columns):
            if col != schema['columns'][i]:
                print(f"Error: Column {i+1} doesn't match the schema.")

In [9]:
subset_df = df.iloc[:1000]  # selecting the first 1000 rows, since full size is taking too long to compute
subset_df.to_csv('output_file.csv.gz', sep='|', compression='gzip', index=False)

In [10]:
# get the file size
file_size = os.path.getsize('output_file.csv.gz')

# get the total number of rows and columns
num_rows = len(df)
num_cols = len(df.columns)

# print the summary
print(f"Total number of rows: {num_rows}")
print(f"Total number of columns: {num_cols}")
print(f"File size: {file_size} bytes")

Total number of rows: 6311871
Total number of columns: 61
File size: 57771 bytes
