In [14]:
import pandas as pd
import dask.dataframe as dd
import modin.pandas as mpd
import yaml
import os
import time

def read_csv(file_path):
    """Read the CSV file using different methods."""
    try:
        start_time = time.time()
        # Reading with pandas
        df_pandas = pd.read_csv(file_path)
        pandas_time = time.time() - start_time

        start_time = time.time()
        # Reading with Dask
        df_dask = dd.read_csv(file_path)
        dask_time = time.time() - start_time

        start_time = time.time()
        # Reading with Modin
        df_modin = mpd.read_csv(file_path)
        modin_time = time.time() - start_time

        return df_pandas, df_dask, df_modin, pandas_time, dask_time, modin_time

    except Exception as e:
        print(f"An error occurred: {e}")
        return None, None, None, None, None, None

def clean_column_names(df):
    """Perform basic validation on data columns."""
    df.columns = df.columns.str.replace('[^\w\s]', '').str.strip()
    return df

def write_yaml(df, file_path):
    """Write the column names to a YAML file."""
    columns_yaml = df.columns.tolist()
    with open(file_path, "w") as yaml_file:
        yaml.dump(columns_yaml, yaml_file)

def validate_columns(df, yaml_file):
    """Validate the number of columns and column names against the YAML file."""
    with open(yaml_file, "r") as file:
        expected_columns = yaml.load(file, Loader=yaml.FullLoader)
    is_match = (len(df.columns) == len(expected_columns)) and all(col in df.columns for col in expected_columns)
    return is_match

def write_csv(df, file_path, sep="|", compression="gzip"):
    """Write the file in pipe-separated text format in gz format."""
    df.to_csv(file_path, sep=sep, index=False, compression=compression)

def get_file_summary(file_path):
    """Create a summary of the file."""
    total_rows = len(df_pandas)
    total_columns = len(df_pandas.columns)
    file_size = os.path.getsize(file_path)

    summary = {
        "Total number of rows": total_rows,
        "Total number of columns": total_columns,
        "File size (bytes)": file_size
    }
    return summary

if __name__ == "__main__":
    file_path = "/content/Parking_Violations_Issued_-_Fiscal_Year_2017.csv"
    yaml_file = "columns.yaml"
    output_file = "output_file.txt.gz"

    # Read CSV files
    df_pandas, df_dask, df_modin, pandas_time, dask_time, modin_time = read_csv(file_path)

    # Clean column names
    df_pandas = clean_column_names(df_pandas)
    df_dask = clean_column_names(df_dask)
    df_modin = clean_column_names(df_modin)

    # Write column names to YAML
    write_yaml(df_pandas, yaml_file)

    # Validate columns
    is_match = validate_columns(df_pandas, yaml_file)
    if is_match:
        print("All columns match.")
    else:
        print("Not all columns match.")

    # Write CSV file
    write_csv(df_pandas, output_file)

    # Generate file summary
    summary = get_file_summary(output_file)
    print("Summary of the file:")
    print(summary)

    # Print timing information
    print(f"Time taken for pandas: {pandas_time} seconds")
    print(f"Time taken for Dask: {dask_time} seconds")
    print(f"Time taken for Modin: {modin_time} seconds")


All columns match.
Summary of the file:
{'Total number of rows': 1740011, 'Total number of columns': 43, 'File size (bytes)': 108346810}
Time taken for pandas: 13.484307527542114 seconds
Time taken for Dask: 0.028912067413330078 seconds
Time taken for Modin: 19.256351470947266 seconds
