In [2]:
#pip install --upgrade pandas


Note: you may need to restart the kernel to use updated packages.


In [2]:
#pip install dask==2023.2.0


Collecting dask==2023.2.0
  Downloading dask-2023.2.0-py3-none-any.whl (1.2 MB)
     ---------------------------------------- 0.0/1.2 MB ? eta -:--:--
     -- ------------------------------------- 0.1/1.2 MB 2.0 MB/s eta 0:00:01
     -------- ------------------------------- 0.3/1.2 MB 2.6 MB/s eta 0:00:01
     ------------------------------ --------- 0.9/1.2 MB 6.1 MB/s eta 0:00:01
     ---------------------------------------  1.1/1.2 MB 6.6 MB/s eta 0:00:01
     ---------------------------------------- 1.2/1.2 MB 5.6 MB/s eta 0:00:00
Installing collected packages: dask
  Attempting uninstall: dask
    Found existing installation: dask 2021.10.0
    Uninstalling dask-2021.10.0:
      Successfully uninstalled dask-2021.10.0
Successfully installed dask-2023.2.0
Note: you may need to restart the kernel to use updated packages.


ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
distributed 2021.10.0 requires dask==2021.10.0, but you have dask 2023.2.0 which is incompatible.


In [1]:
#pip install ray

Note: you may need to restart the kernel to use updated packages.


In [2]:
#pip install pandas dask modin[ray] pyyaml

Note: you may need to restart the kernel to use updated packages.


In [6]:
# Import necessary libraries
import pandas as pd
import dask.dataframe as dd
import modin.pandas as mpd
import ray
import yaml
import re
import os

# Check if Ray is already initialized
if not ray.is_initialized():
    # Initialize Ray
    ray.init()
    
# Define file path
file_path = 'Manufacturer_Car_data.csv'

# Present approach of reading the file using pandas
pandas_start_time = pd.Timestamp.now()
df_pandas = pd.read_csv(file_path)
pandas_end_time = pd.Timestamp.now()
pandas_execution_time = pandas_end_time - pandas_start_time

# Reading file using Dask
#dask_start_time = pd.Timestamp.now()
#df_dask = dd.read_csv(file_path)
#dask_end_time = pd.Timestamp.now()
#dask_execution_time = dask_end_time - dask_start_time

# Reading file using Dask with explicit dtype specification
dask_start_time = pd.Timestamp.now()
dtype_specification = {'Fuel efficiency': 'float64', 'Horsepower': 'float64'}
df_dask = dd.read_csv(file_path, dtype=dtype_specification)
dask_end_time = pd.Timestamp.now()
dask_execution_time = dask_end_time - dask_start_time


# Reading file using Modin
modin_start_time = pd.Timestamp.now()
df_modin = mpd.read_csv(file_path)
modin_end_time = pd.Timestamp.now()
modin_execution_time = modin_end_time - modin_start_time

# Reading file using Ray (parallelized operations)
@ray.remote
def read_csv_ray(file_path):
    return pd.read_csv(file_path)

ray_start_time = pd.Timestamp.now()
df_ray_remote = read_csv_ray.remote(file_path)
df_ray = ray.get(df_ray_remote)
ray_end_time = pd.Timestamp.now()
ray_execution_time = ray_end_time - ray_start_time

# Basic validation on data columns
def clean_column_names(df):
    df.columns = [re.sub(r'\W+', '_', col.strip()) for col in df.columns]
    return df

df_pandas_cleaned = clean_column_names(df_pandas)
df_dask_cleaned = clean_column_names(df_dask.compute())
df_modin_cleaned = clean_column_names(df_modin)
df_ray_cleaned = clean_column_names(df_ray)

# Create YAML file with column names
yaml_file_path = 'columns.yaml'
columns_yaml = {'columns': df_pandas_cleaned.columns.tolist(), 'separator': ','}
with open(yaml_file_path, 'w') as yaml_file:
    yaml.dump(columns_yaml, yaml_file, default_flow_style=False)

# Validate number of columns and column names with YAML
expected_columns = columns_yaml['columns']
if len(df_pandas_cleaned.columns) == len(expected_columns) and all(col in expected_columns for col in df_pandas_cleaned.columns):
    print("Validation successful.")
else:
    print("Validation failed.")

# Write the file in pipe-separated text file (|) in gz format
output_file_path = 'output_file.txt.gz'
df_pandas_cleaned.to_csv(output_file_path, sep='|', compression='gzip', index=False)

# Create a summary of the file
summary = {
    'Total number of rows': len(df_pandas_cleaned),
    'Total number of columns': len(df_pandas_cleaned.columns),
    'File size': os.path.getsize(output_file_path) / (1024**3)  # in GB
}

print("Summary:", summary)

# Shutdown Ray
if ray.is_initialized():
    ray.shutdown()


Validation successful.
Summary: {'Total number of rows': 157, 'Total number of columns': 8, 'File size': 3.041699528694153e-06}
