In [1]:
!pip install pyaml
!pip install dask
!pip install modin
!pip install vaex



In [2]:
import modin.pandas as mpd
import pandas as pd
import dask.dataframe as dd
import vaex
import timeit
import os
import warnings

warnings.filterwarnings('ignore')

In [3]:
file_path = 'simple_dataset.csv'

### File Size

In [4]:
file_size_bytes = os.path.getsize(file_path)
file_size_gb = file_size_bytes / (10**9)
print(f"File size: {file_size_gb:.2f} GB")


File size: 2.72 GB


### Execution time

In [28]:
# Measure execution time using pandas
pandas_time = timeit.timeit(lambda: pd.read_csv(file_path), number=1)

# Measure execution time using Dask
dask_time = timeit.timeit(lambda: dd.read_csv(file_path).compute(), number=1)

# Measure execution time using Modin
modin_time = timeit.timeit(lambda: mpd.read_csv(file_path), number=1)

# Measure execution time using Vaex
vaex_time = timeit.timeit(lambda: vaex.from_csv(file_path), number=1)



print(f'\n\nPandas execution time: {round(pandas_time,3)}s')
print(f'Dask execution time: {round(dask_time,3)}s')
print(f'Modin execution time: {round(pandas_time,3)}s')
print(f'Vaex execution time: {round(vaex_time,3)}s')




Pandas execution time: 77.081s
Dask execution time: 57.815s
Modin execution time: 77.081s
Vaex execution time: 114.403s


### Memory Usage

In [6]:
def load_with_pandas():
    return pd.read_csv(file_path)

def load_with_dask():
    return dd.read_csv(file_path).compute()

def load_with_modin():
    return mpd.read_csv(file_path)

def load_with_vaex():
    return vaex.from_csv(file_path)

In [7]:
from memory_profiler import memory_usage

pandas_mem = memory_usage(load_with_pandas)
print(f'Pandas memory usage: {round((max(pandas_mem) - min(pandas_mem)),3)} bytes')

dask_mem = memory_usage(load_with_dask)
print(f'Dask memory usage: {round((max(dask_mem) - min(dask_mem)),3)} bytes')

modin_mem = memory_usage(load_with_modin)
print(f'Modin memory usage: {round((max(modin_mem) - min(modin_mem)),3)} bytes')

vaex_mem = memory_usage(load_with_vaex)
print(f'Vaex memory usage: {round((max(vaex_mem) - min(vaex_mem)),3)} bytes')

Pandas memory usage: 7334.582 bytes
Dask memory usage: 9135.191 bytes
Modin memory usage: 1175.598 bytes
Vaex memory usage: 10594.441 bytes


### Disk I/O Usage

In [8]:
import psutil

disk_io_before = psutil.disk_io_counters()
df = pd.read_csv(file_path)
disk_io_after = psutil.disk_io_counters()
print(f"Disk I/O usage for Pandas: {((disk_io_after.read_bytes - disk_io_before.read_bytes)/ (10**9))} GB")

disk_io_before = psutil.disk_io_counters()
df_dd = dd.read_csv(file_path).compute()
disk_io_after = psutil.disk_io_counters()
print(f"Disk I/O usage for Dask: {((disk_io_after.read_bytes - disk_io_before.read_bytes)/ (10**9))} GB")

disk_io_before = psutil.disk_io_counters()
df_mpd = mpd.read_csv(file_path)
disk_io_after = psutil.disk_io_counters()
print(f"Disk I/O usage for Modin: {((disk_io_after.read_bytes - disk_io_before.read_bytes)/ (10**9))} GB")

disk_io_before = psutil.disk_io_counters()
df_vaex = vaex.from_csv(file_path)
disk_io_after = psutil.disk_io_counters()
print(f"Disk I/O usage for Vaex: {((disk_io_after.read_bytes - disk_io_before.read_bytes)/ (10**9))} GB")



Disk I/O usage for Pandas: 2.87569664 GB
Disk I/O usage for Dask: 4.649304576 GB
Disk I/O usage for Modin: 12.814777344 GB
Disk I/O usage for Vaex: 4.12378112 GB


Getting the shape of dataframe loaded by each method 

Reason : Dask uses lazy evaluation, which means that computations are only performed when the results are actually needed. If dataframe through dask is loaded correctly it will give us shape like 
**(Delayed('int-24cd3fef-2071-4a2e-9969-934ac021adbb'), 8)** 

In [9]:
print('The Shape of Dataframe loaded by Pandas is',df.shape)
print('The Shape of Dataframe loaded by Dask is',df_dd.shape)
print('The Shape of Dataframe loaded by Modin is',df_mpd.shape)
print('The Shape of Dataframe loaded by Vaex is',df_vaex.shape)

The Shape of Dataframe loaded by Pandas is (35000000, 8)
The Shape of Dataframe loaded by Dask is (35000000, 8)
The Shape of Dataframe loaded by Modin is (35000000, 8)
The Shape of Dataframe loaded by Vaex is (35000000, 8)


* Dask appears to be the most effective in terms of time and disk I/O usage. Modin, on the other hand, appears to be the most effective in terms of memory usage.It's true that Dask is designed for distributed computing and can handle large datasets more efficiently than pandas or Modin. Dask can parallelize the computations and distribute them across multiple cores or machines, which makes it more effective in terms of time and disk I/O usage.

* However, the memory usage of Dask can be high, especially when dealing with large datasets, as Dask creates a task graph to manage the computation. Modin, on the other hand, uses a different approach by leveraging the pandas API and using distributed computing frameworks like Dask or Ray under the hood. This allows Modin to utilize the available memory more efficiently than Dask.

* Therefore, the choice of the tool depends on the specific requirements of the project. If the dataset is very large and the computation needs to be distributed across multiple machines or cores, Dask may be the better option. If the dataset can fit into memory but needs to be processed more efficiently than pandas, Modin could be a better choice.

### Loading File with dask

In [10]:
df_dask = dd.read_csv(file_path)

### Basic Validation

In [11]:
df_dask.columns = df_dask.columns.str.replace('[^a-zA-Z0-9]+', '_').str.strip()

In [12]:
df_dask.columns

Index(['ID', 'Name', 'Age', 'Address', 'City', 'State', 'Country', 'Postcode'], dtype='object')

### Creatin YAML File

In [13]:
%%writefile schema.yaml
file_type: csv
file_name: simple_dataset
outbound_delimiter: "|"
columns:
    - ID
    - Name
    - Age
    - Address
    - City
    - State
    - Country
    - Postcode

Writing schema.yaml


### Creating Utility File

In [14]:
%%writefile utility.py
import yaml
import re

def read_config_file(path):
    with open(path,'r') as file:
        try:
            return yaml.safe_load(stream=file)
        except Exception as e:
            return e

def preprocessing(list):
    return [x.title() for x in list]

def validate_schema(dataframe,config_columns):

    dataframe = preprocessing(dataframe)
    config_columns =  preprocessing(config_columns)

    # Validate the number of columns
    if len(dataframe) != len(config_columns):
        print("Number of columns does not match schema.\n")

        for i in range(len(dataframe)):
            if config_columns.__contains__(dataframe[i]) == False:
                print(f"The dataframe has a column name \'{dataframe[i]}\' which does not match columns name in schema.\n")

    if len(dataframe) == len(config_columns):
        if all(col in config_columns for col in dataframe):
            if all(col in dataframe for col in config_columns):
                print('The validation process for the column schema defined in the YAML file was successful.')

    for i in range(len(config_columns)):
        if dataframe.__contains__(config_columns[i]) == False:
            print(f"The schema entry for \'{config_columns[i]}\' does not match any of the column name in dataframe.")
  

Writing utility.py


### Validation

In [15]:
import utility as util
config_data = util.read_config_file('schema.yaml')
config_data

{'file_type': 'csv',
 'file_name': 'simple_dataset',
 'outbound_delimiter': '|',
 'columns': ['ID',
  'Name',
  'Age',
  'Address',
  'City',
  'State',
  'Country',
  'Postcode']}

In [16]:
util.validate_schema(df_dask.columns.tolist(),config_data.get('columns'))

The validation process for the column schema defined in the YAML file was successful.


### Data Ingestion summary

In [17]:
print("Data Ingestion Summary")
print(f"Total number of rows: {df_dd.shape[0]}")
print(f"Total number of columns: {df_dd.shape[1]}")
print(f"File size: {file_size_gb:.2f} GB")
print(f"File type: {os.path.splitext(file_path)[1]}")

Data Ingestion Summary
Total number of rows: 35000000
Total number of columns: 8
File size: 2.72 GB
File type: .csv


### Saving the dataframe in pipe separated text file (|) in gz format

In [18]:
df_dask.to_csv('dataset.gz',sep='|',header=True,index=False,compression='gzip')

['c:\\Users\\yashj\\Desktop\\DG Internship\\File Ingestion Week6\\dataset.gz\\00.part',
 'c:\\Users\\yashj\\Desktop\\DG Internship\\File Ingestion Week6\\dataset.gz\\01.part',
 'c:\\Users\\yashj\\Desktop\\DG Internship\\File Ingestion Week6\\dataset.gz\\02.part',
 'c:\\Users\\yashj\\Desktop\\DG Internship\\File Ingestion Week6\\dataset.gz\\03.part',
 'c:\\Users\\yashj\\Desktop\\DG Internship\\File Ingestion Week6\\dataset.gz\\04.part',
 'c:\\Users\\yashj\\Desktop\\DG Internship\\File Ingestion Week6\\dataset.gz\\05.part',
 'c:\\Users\\yashj\\Desktop\\DG Internship\\File Ingestion Week6\\dataset.gz\\06.part',
 'c:\\Users\\yashj\\Desktop\\DG Internship\\File Ingestion Week6\\dataset.gz\\07.part',
 'c:\\Users\\yashj\\Desktop\\DG Internship\\File Ingestion Week6\\dataset.gz\\08.part',
 'c:\\Users\\yashj\\Desktop\\DG Internship\\File Ingestion Week6\\dataset.gz\\09.part',
 'c:\\Users\\yashj\\Desktop\\DG Internship\\File Ingestion Week6\\dataset.gz\\10.part',
 'c:\\Users\\yashj\\Desktop\\DG 

In [19]:
print("The total number of files",len(os.listdir('dataset.gz')))
print(f"Total folder size: {os.path.getsize('dataset.gz')/1024:.2f} KB")

The total number of files 42
Total folder size: 16.00 KB


### More schema Validations after saving file in .gz format

In [20]:
df_dask_copy = df_dask.copy()


In [21]:
util.validate_schema(df_dask_copy.columns.tolist(),config_data.get('columns'))

The validation process for the column schema defined in the YAML file was successful.


In [22]:
df_dask_copy = df_dask_copy.drop('Name',axis=1)
df_dask_copy.columns.tolist()

['ID', 'Age', 'Address', 'City', 'State', 'Country', 'Postcode']

In [23]:
util.validate_schema(df_dask_copy.columns.tolist(),config_data.get('columns'))

Number of columns does not match schema.

The schema entry for 'Name' does not match any of the column name in dataframe.


In [24]:
df_dask_copy = df_dask.copy()
df_dask_copy = df_dask_copy.rename(columns={'City':'_City'})


In [25]:
util.validate_schema(df_dask_copy.columns.tolist(),config_data.get('columns'))

The schema entry for 'City' does not match any of the column name in dataframe.


In [26]:
df_dask_copy = df_dask.copy()
df_dask_copy = df_dask_copy.rename(columns={'City':'city'})

In [27]:
util.validate_schema(df_dask_copy.columns.tolist(),config_data.get('columns'))

The validation process for the column schema defined in the YAML file was successful.
