### Imports

In [12]:
import pandas as pd
import dask.dataframe as dd
import modin.pandas as mpd
import ray
import time

### Util File

In [13]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re

# Supplied by Data Glacier
def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def col_header_val(df, table_config):
    # sort, strip leading and trailing spaces, and replace space with _
    df_columns = sorted([col.strip().lower().replace(' ', '_') for col in df.columns])
    yaml_columns = sorted([col.strip().lower().replace(' ', '_') for col in table_config['columns']])

    if df_columns == yaml_columns:
        return True
    else:
        # Find the mismatched columns
        mismatched_columns = set(df_columns) ^ set(yaml_columns)
        print(f"Mismatched columns: {list(mismatched_columns)}")
        return False

Overwriting testutility.py


### YAML File

In [6]:
%%writefile file.yaml
file_type: csv
dataset_name: parking_data
file_path: data/parking_tickets_2017
table_name: parking
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns:
    - Summons Number
    - Plate ID
    - Registration State
    - Plate Type
    - Issue Date
    - Violation Code
    - Vehicle Body Type
    - Vehicle Make
    - Issuing Agency
    - Street Code1
    - Street Code2
    - Street Code3
    - Vehicle Expiration Date
    - Violation Location
    - Violation Precinct
    - Issuer Precinct
    - Issuer Code
    - Issuer Command
    - Issuer Squad
    - Violation Time
    - Time First Observed
    - Violation County
    - Violation In Front Of Or Opposite
    - House Number
    - Street Name
    - Intersecting Street
    - Date First Observed
    - Law Section
    - Sub Division
    - Violation Legal Code
    - Days Parking In Effect
    - From Hours In Effect
    - To Hours In Effect
    - Vehicle Color
    - Unregistered Vehicle?
    - Vehicle Year
    - Meter Number
    - Feet From Curb
    - Violation Post Code
    - Violation Description
    - No Standing or Stopping Violation
    - Hydrant Violation
    - Double Parking Violation

Overwriting file.yaml


In [7]:
# Read config
import testutility as util

config_data = util.read_config_file("file.yaml")

config_data

{'file_type': 'csv',
 'dataset_name': 'parking_data',
 'file_path': 'data/parking_tickets_2017',
 'table_name': 'parking',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['Summons Number',
  'Plate ID',
  'Registration State',
  'Plate Type',
  'Issue Date',
  'Violation Code',
  'Vehicle Body Type',
  'Vehicle Make',
  'Issuing Agency',
  'Street Code1',
  'Street Code2',
  'Street Code3',
  'Vehicle Expiration Date',
  'Violation Location',
  'Violation Precinct',
  'Issuer Precinct',
  'Issuer Code',
  'Issuer Command',
  'Issuer Squad',
  'Violation Time',
  'Time First Observed',
  'Violation County',
  'Violation In Front Of Or Opposite',
  'House Number',
  'Street Name',
  'Intersecting Street',
  'Date First Observed',
  'Law Section',
  'Sub Division',
  'Violation Legal Code',
  'Days Parking In Effect',
  'From Hours In Effect',
  'To Hours In Effect',
  'Vehicle Color',
  'Unregistered Vehicle?',
  'Vehicle Year',
  'Meter Numbe

### Comparing reading speed of pandas, dask, modin, and ray

In [8]:
# Get file path
file_path = f"./{config_data['file_path']}.{config_data['file_type']}"
file_path

'./data/parking_tickets_2017.csv'

In [9]:
# Using Pandas
start_time = time.time()
df_pandas = pd.read_csv(file_path, delimiter=config_data['inbound_delimiter'])
pandas_time = time.time() - start_time

print(f"Pandas Reading Time: {pandas_time} seconds")
# Pandas Reading Time: 50.59611439704895 seconds



Pandas Reading Time: 50.75420165061951 seconds


In [None]:
# Using Dask
start_time = time.time()
df_dask = dd.read_csv(
    file_path,
    delimiter=config_data['inbound_delimiter'],
    dtype={"House Number": "object", "Time First Observed": "object"}
)
df_dask_computed = df_dask.compute()  # This forces the actual read
dask_time = time.time() - start_time

print(f"Dask Reading Time: {dask_time} seconds")
# Dask Reading Time: 42.42091226577759 seconds

In [None]:
# Using Modin
start_time = time.time()
df_modin = mpd.read_csv(file_path, delimiter=config_data["inbound_delimiter"])
modin_time = time.time() - start_time

print(f"Modin Reading Time: {modin_time} seconds")
# Modin Reading Time: 38.331472635269165 seconds

In [None]:
# Using Ray
if not ray.is_initialized():
    ray.init(ignore_reinit_error=True)

@ray.remote
def read_csv(file_path):
    return pd.read_csv(file_path, delimiter=config_data["inbound_delimiter"])

start_time = time.time()
future = read_csv.remote(file_path)
df = ray.get(future)
ray_time = time.time() - start_time

print(f"Ray Reading Time: {ray_time} seconds")
# Ray Reading Time: 168.3240203857422 seconds

In [None]:
pd.set_option("display.max_columns", None)
df_pandas.info(max_cols=50)
df_pandas.head()

### Data validation

In [10]:
print(f"Columns from {config_data['file_path']}.{config_data['file_type']}:")
print(df_pandas.columns)

print("Columns from file.yaml:")
print(config_data['columns'])

Columns from data/parking_tickets_2017.csv:
Index(['Summons Number', 'Plate ID', 'Registration State', 'Plate Type',
       'Issue Date', 'Violation Code', 'Vehicle Body Type', 'Vehicle Make',
       'Issuing Agency', 'Street Code1', 'Street Code2', 'Street Code3',
       'Vehicle Expiration Date', 'Violation Location', 'Violation Precinct',
       'Issuer Precinct', 'Issuer Code', 'Issuer Command', 'Issuer Squad',
       'Violation Time', 'Time First Observed', 'Violation County',
       'Violation In Front Of Or Opposite', 'House Number', 'Street Name',
       'Intersecting Street', 'Date First Observed', 'Law Section',
       'Sub Division', 'Violation Legal Code', 'Days Parking In Effect    ',
       'From Hours In Effect', 'To Hours In Effect', 'Vehicle Color',
       'Unregistered Vehicle?', 'Vehicle Year', 'Meter Number',
       'Feet From Curb', 'Violation Post Code', 'Violation Description',
       'No Standing or Stopping Violation', 'Hydrant Violation',
       'Double Parkin

In [15]:
if util.col_header_val(df_pandas, config_data) == True:
    print("Column Validation passed!")
else:
    print("Column Validation failed!")

Column Validation passed!


### Output CSV with | Delimiter

In [None]:
pandas_df.to_csv()