In [1]:
!pip install dask modin[ray] ray pandas pyyaml datasets

Collecting ray
  Downloading ray-2.37.0-cp310-cp310-manylinux2014_x86_64.whl.metadata (16 kB)
Collecting datasets
  Downloading datasets-3.0.1-py3-none-any.whl.metadata (20 kB)
Collecting modin[ray]
  Downloading modin-0.32.0-py3-none-any.whl.metadata (17 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess (from datasets)
  Downloading multiprocess-0.70.17-py310-none-any.whl.metadata (7.2 kB)
INFO: pip is looking at multiple versions of multiprocess to determine which version is compatible with other requirements. This could take a while.
  Downloading multiprocess-0.70.16-py310-none-any.whl.metadata (7.2 kB)
Downloading ray-2.37.0-cp310-cp310-manylinux2014_x86_64.whl (65.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m65.6/65.6 MB[0m [31m7.4 MB/s[0

In [2]:
import pandas as pd
import dask.dataframe as dd
import modin.pandas as mpd
import ray
import time
from datasets import load_dataset
import yaml

Dask dataframe query planning is disabled because dask-expr is not installed.

You can install it with `pip install dask[dataframe]` or `conda install dask`.
This will raise in a future version.



In [3]:
dataset = load_dataset("pszemraj/synthetic-text-similarity", split="train")
print(f"Dataset loaded with {len(dataset)} rows.")

df = pd.DataFrame(dataset)

csv_file_name = 'synthetic_text_similarity.csv'
df.to_csv(csv_file_name, index=False)
print(f"Dataset saved as {csv_file_name}.")
file_path = csv_file_name

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md:   0%|          | 0.00/4.02k [00:00<?, ?B/s]

train-00000-of-00006.parquet:   0%|          | 0.00/289M [00:00<?, ?B/s]

train-00001-of-00006.parquet:   0%|          | 0.00/289M [00:00<?, ?B/s]

train-00002-of-00006.parquet:   0%|          | 0.00/292M [00:00<?, ?B/s]

train-00003-of-00006.parquet:   0%|          | 0.00/291M [00:00<?, ?B/s]

train-00004-of-00006.parquet:   0%|          | 0.00/287M [00:00<?, ?B/s]

train-00005-of-00006.parquet:   0%|          | 0.00/290M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/100000 [00:00<?, ? examples/s]

Dataset loaded with 100000 rows.
Dataset saved as synthetic_text_similarity.csv.


In [4]:
ray.init()

2024-10-06 17:52:46,973	INFO worker.py:1786 -- Started a local Ray instance.


0,1
Python version:,3.10.12
Ray version:,2.37.0


In [5]:
# Pandas
start_time = time.time()
df_pandas = pd.read_csv(file_path)
end_time = time.time()
pandas_time = end_time - start_time
print(f"Pandas read time: {pandas_time:.2f} seconds")

# Dask
start_time = time.time()
df_dask = dd.read_csv(file_path, sample=5000000, sample_rows=100)
end_time = time.time()
dask_time = end_time - start_time
print(f"Dask read time: {dask_time:.2f} seconds")


# Modin with Ray
start_time = time.time()
df_modin = mpd.read_csv(file_path)
end_time = time.time()
modin_time = end_time - start_time
print(f"Modin (Ray) read time: {modin_time:.2f} seconds")

Pandas read time: 36.99 seconds
Dask read time: 0.27 seconds


[33m(raylet)[0m [2024-10-06 17:54:46,914 E 1338 1338] (raylet) node_manager.cc:3065: 1 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: f859ee2c7b31c53a528d4907556a85b4ae8848dce8ccc0757bb93034, IP: 172.28.0.12) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 172.28.0.12`
[33m(raylet)[0m 
[33m(raylet)[0m Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refresh_ms` to zero.


Modin (Ray) read time: 86.60 seconds


In [6]:
df_pandas.columns = df_pandas.columns.str.replace(r'[^\w\s]', '', regex=True).str.strip()
df_dask.columns = df_dask.columns.str.replace(r'[^\w\s]', '', regex=True).str.strip()
df_modin.columns = df_modin.columns.str.replace(r'[^\w\s]', '', regex=True).str.strip()

print(df_pandas.columns)

Index(['text1', 'text2', 'label'], dtype='object')


In [7]:
schema = {
    'columns': list(df_pandas.columns),
    'separator': '|'
}

with open('schema.yaml', 'w') as yaml_file:
    yaml.dump(schema, yaml_file, default_flow_style=False)

print(yaml.dump(schema, default_flow_style=False))

with open('schema.yaml', 'r') as file:
    loaded_schema = yaml.safe_load(file)

assert df_pandas.shape[1] == len(loaded_schema['columns']), "Column count mismatch!"
assert all(df_pandas.columns == loaded_schema['columns']), "Column names mismatch!"
print("Validation passed!")

df_pandas.to_csv('cleaned_file.csv.gz', sep='|', index=False, compression='gzip')
print("File written in pipe-separated format and compressed to .gz")

columns:
- text1
- text2
- label
separator: '|'

Validation passed!
File written in pipe-separated format and compressed to .gz


In [8]:
import os
import gzip

gz_file_path = 'cleaned_file.csv.gz'

with gzip.open(gz_file_path, 'rt') as f:
    df = pd.read_csv(f, sep='|')

total_rows = df.shape[0]
total_columns = df.shape[1]
file_size = os.path.getsize(gz_file_path)

[33m(raylet)[0m [2024-10-06 18:07:46,948 E 1338 1338] (raylet) node_manager.cc:3065: 1 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: f859ee2c7b31c53a528d4907556a85b4ae8848dce8ccc0757bb93034, IP: 172.28.0.12) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 172.28.0.12`
[33m(raylet)[0m 
[33m(raylet)[0m Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refresh_ms` to zero.


In [17]:
file_size_gb = file_size / (1024 ** 3)

summary = {
    "Total Rows": total_rows,
    "Total Columns": total_columns,
    f"{gz_file_path} File Size (GB)": file_size_gb
}
summary

{'Total Rows': 100006,
 'Total Columns': 3,
 'cleaned_file.csv.gz File Size (GB)': 1.0210130643099546}

In [16]:
print(f"{csv_file_name} File Size (GB):", os.path.getsize("synthetic_text_similarity.csv") / (1024 ** 3))

synthetic_text_similarity.csv File Size (GB): 2.7945711109787226
