# Pretraining Data Curation in NeMo Curator

## Table of Contents

1. [Introduction](#introduction)
2. [Getting Started](#get-start)
3. [RedPajama-Data-v2](#rpv2)
4. [Data Preprocessing](#preprocess)
5. [Deduplication](#dedup)
6. [Quality filtering](#filter)

# 1. Introduction
<a id="introduction"></a>

In this tutorial, we will show how to curate large-scale data for LLM pretraining in a distributed environment using NeMo-Curator. Specifically, we will focus on the following modules in NeMo-Curator:

- Language identification and separation
- Text reformatting and cleaning
- Quality filtering
- Document-level deduplication

For demonstration, we will use the [RedPajama-Data-v2](#rpv2) dataset, an open dataset for LLM pretraining.

## 1.1 System Information
Here is the information on the system this notebook was run on:

- **GPU**: 2 A100 nodes (each with 8 A100-SXM4-80GB)

- **CUDA & Nvidia Drivers**: CUDA 12.4 with Driver 535.104.12

- **OS**: Ubuntu 22.04.4 LTS

## 1.2 Running NeMo-Curator

NeMo-curator came pre-installed in Nemo Framework container. This notebook use 24.07 release of the NeMo Framework container. User can pull the container following the steps below:

- Get access to the NeMo Framework container on [NGC](https://catalog.ngc.nvidia.com/orgs/nvidia/containers/nemo)

- Set your docker credentials


    `docker login nvcr.io`

    Username: `$oauthtoken`
    
    Password: `<NGC_API_KEY Key>`
    
- Pull the NeMo Framework Container image
    
    `docker pull docker pull nvcr.io/nvidia/nemo:24.07`

Alternatively, NeMo-Curator is also available on [PyPi](https://pypi.org/project/nemo-curator/) and [GitHub](https://github.com/NVIDIA/NeMo-Curator).

# 2. Getting started
<a id="get-start"></a>

NeMo-Curator uses dask for parallelization. Before we start using curator, we need to start a dask cluster. To start a multi-node dask cluster in slurm, we can use the `start-distributed-notebook.sh` script in this directory to start the cluster. The user will need to change the following variables:

- Slurm job directives
- Device type (`cpu` or `gpu`). Curator has both cpu and gpu modules. Check [here](https://docs.nvidia.com/nemo-framework/user-guide/latest/datacuration/cpuvsgpu.html) to see which modules are cpu/gpu
- CPU related parameters if using cpu modules. Configure the number of workers and memory limit to efficiently use available computational resources while preventing out of memory
- Path to the NeMo Framework container image
- Path to `container-entrypoint.sh` script which is responsible for launching the dask schduler and workers

Running the script will also launch a jupyter lab session on the rank 0 node and pass the dask schduler address as an environment variable that will be used later to connect to the dask client.

The preprocessing modules such as Add ID and Text cleaning are cpu-based so we will start a cpu dask cluster first.

In [2]:
import os
import time
from dask.distributed import Client
import warnings
import dask.dataframe as dd
import dask_cudf
import cudf
import gzip
import json
import dask.bag as db
import glob
from dask.distributed import wait
import numpy as np

from nemo_curator import get_client
from nemo_curator.datasets import DocumentDataset
from nemo_curator.utils.distributed_utils import (
    get_num_workers,
    read_data,
    write_to_disk,
)
from nemo_curator.utils.file_utils import (
    expand_outdir_and_mkdir, 
    get_all_files_paths_under, 
    separate_by_metadata,
    get_batched_files,
)

warnings.filterwarnings('ignore')
base_dir = "/home/neelesh/4_new_c4"

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
scheduler_address = os.getenv('SCHEDULER_ADDRESS')
# cpu_client = get_client(
#     cluster_type="cpu",
#     n_workers=64,                  # Number of workers, adjust to suit load (64 workers = 2 cores each)
#     threads_per_worker=2,          # Threads per worker for I/O-bound tasks or to maintain high CPU utilization
#     enable_spilling=True,          # Allow Dask to spill to disk when memory is tight
#     protocol="tcp",                # Communication protocol
#     set_torch_to_use_rmm=False,    # Not applicable for CPU, but good to be explicit
#     memory_limit="16GB",           # Memory per worker, allowing flexibility within 1TB
# )
gpu_client = get_client(
    cluster_type="gpu",
    n_workers=4,              # One worker per GPU
    threads_per_worker=1,     # One thread per worker
    rmm_pool_size="48GB",     # Memory pool size per GPU
    protocol="ucx"           # Communication protocol
)
print(f"Num Workers = {get_num_workers(gpu_client)}", flush=True)

[1730894319.421165] [riogrande:2078536:0]          parser.c:2305 UCX  WARN  unused environment variable: UCX_MEMTYPE_CACHE (maybe: UCX_MEMTYPE_CACHE?)
cuDF Spilling is enabled
Num Workers = 4


In [19]:
gpu_client.cluster.close()
gpu_client.shutdown()

# 3. RedPajama-Data-v2
<a id="rpv2"></a>

RedPajama-V2 (rpv2) is an advanced open-source initiative designed to support the development of large language models (LLMs). This dataset, sourced from 84 CommonCrawl snapshots, spans five major languages—English, French, Spanish, German, and Italian—making it one of the largest and most comprehensive public datasets available for LLM training.

The RedPajama-V2 dataset is available on [Huggingface](https://huggingface.co/datasets/togethercomputer/RedPajama-Data-V2).

For this tutorial, we will start with a single snapshot from rpv2 and then scale to multiple snapshots to demonstrate the pre-training data curation workflow.

The raw rpv2 data is stored in compressed json. We will first decompress the json.gz file and write them into jsonl files. For this, we will use a helper function `convert_json_gz_to_jsonl` in `helper.py`


In [5]:
from helper import convert_json_gz_to_jsonl

input_data_dir = os.path.join(base_dir,"c4/en")
output_data_dir = os.path.join(base_dir,"../clean_c4")

t0 = time.time()
# convert_json_gz_to_jsonl(input_data_dir, output_data_dir)
print(f"Uncompressing data took {time.time()-t0} s")

Uncompressing data took 0.00010585784912109375 s


To get started, we can read the jsonl files into a `DocumentDataset` which is the standard format for text dataset used in curator.

In [11]:
from nemo_curator.datasets import DocumentDataset
# don't read in validation data
input_dataset = DocumentDataset.read_json(output_data_dir, add_filename=True)

Reading 256 files


`DocumentDataset` is essentially a wrapper around dask dataframe and we can get the dataframe by calling `input_dataset.df`:

In [12]:
input_dataset.df.head()

Unnamed: 0,filename,text,timestamp,url
0,c4-train.00004-of-01024.jsonl,New pictures from Ironman Hawaii and XTerra Ma...,2019-04-20 02:21:16+00:00,http://michiweiss.at/stories-pid386
1,c4-train.00004-of-01024.jsonl,2) Given the cavitation on tight turns and pos...,2019-04-19 10:14:41+00:00,http://www.rib.net/forum/f36/engine-height-pro...
2,c4-train.00004-of-01024.jsonl,In the recording and in the PDF I explain my p...,2019-04-23 06:56:40+00:00,https://qualitytime-esl.com/spip.php?article467
3,c4-train.00004-of-01024.jsonl,"I picked up the bass a few months ago, and I'v...",2019-04-24 08:24:16+00:00,https://music.stackexchange.com/questions/5275...
4,c4-train.00004-of-01024.jsonl,Get lyrics of Cc dust new ways song you love. ...,2019-04-24 04:50:06+00:00,https://www.lyrics.cat/lyrics+cc+dust+new+ways


There are a total of 1,088,468,779 documents in this single snapshot.

There are about 91 million for me - 91217230

In [15]:
len(input_dataset.df)

91217230

# 4. Data Preprocessing
<a id="preprocess"></a>

## 4.1 Data resharding

The input text files have varying sizes, which leads to imbalanced partitions that could result in out-of-memory issues. Ideally, we want to make balanced text files of similar sizes. Curator offers utility to reshard the text files to simiar sizes.

In [16]:
from nemo_curator.utils.file_utils import reshard_jsonl
from nemo_curator.utils.file_utils import expand_outdir_and_mkdir

output_resharded_dir = expand_outdir_and_mkdir(os.path.join(base_dir,"../clean_c4_resharded"))

t0 = time.time()
reshard_jsonl(
    output_data_dir,
    output_resharded_dir,
    output_file_size="100M",
    start_index=0,
    file_prefix="c4-train",
)
print(f"Data sharding took:{time.time()-t0}")

Data sharding took:53.97713375091553


[Optional] Removing the raw dataset to save disk space:

In [15]:
!rm -rf {base_dir}/rpv2-2023-06

## 4.2 Add ID

We will assign a unique ID for each document in the dataset so we can refrence them.

In [6]:
from nemo_curator import AddId
from nemo_curator.datasets import DocumentDataset

We will create an instance of Curator's `AddId` class and use it to add ID for all documents in the dataset.

In [18]:
input_data_dir = os.path.join(base_dir,"../clean_c4_resharded")
input_dataset = DocumentDataset.read_json(input_data_dir, add_filename=True)
id_data_dir = expand_outdir_and_mkdir(os.path.join(base_dir,"../clean_c4_resharded_with_id"))

t0 = time.time()
# specify add_id function
add_id = AddId(
    id_field="id",
    id_prefix="c4-train",
    start_index=0,
)
id_dataset = add_id(input_dataset)
id_dataset.to_json(id_data_dir, write_to_filename=True)
print(f"Adding ID took :{time.time()-t0}")

Reading 1792 files


2024-11-03 11:24:52,002 - distributed.worker - ERROR - Worker stream died during communication: tcp://127.0.0.1:46611
Traceback (most recent call last):
  File "/home/neelesh/anaconda3/envs/nemo/lib/python3.10/site-packages/distributed/comm/tcp.py", line 227, in read
    frames_nosplit = await read_bytes_rw(stream, frames_nosplit_nbytes)
  File "/home/neelesh/anaconda3/envs/nemo/lib/python3.10/site-packages/distributed/comm/tcp.py", line 366, in read_bytes_rw
    actual = await stream.read_into(chunk)  # type: ignore[arg-type]
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/neelesh/anaconda3/envs/nemo/lib/python3.10/site-packages/distributed/worker.py", line 2056, in gather_dep
    response = await get_data_from_worker(
  File "/home/neelesh/anaconda3/envs/nemo/lib/python3.10/site-packages/distributed/worker.py", line 2874, in get_data_from_worker
    response = 

Task exception was never retrieved
future: <Task finished name='Task-2861484' coro=<Client._gather.<locals>.wait() done, defined at /home/neelesh/anaconda3/envs/nemo/lib/python3.10/site-packages/distributed/client.py:2385> exception=AllExit()>
Traceback (most recent call last):
  File "/home/neelesh/anaconda3/envs/nemo/lib/python3.10/site-packages/distributed/client.py", line 2394, in wait
    raise AllExit()
distributed.client.AllExit


Writing to disk complete for 1792 partitions
Adding ID took :296.1998860836029


We can validate the added IDs below:

In [20]:
id_dataset.df.head()



Unnamed: 0,filename,text,timestamp,url,id
0,c4-train00000.jsonl,New pictures from Ironman Hawaii and XTerra Ma...,2019-04-20 02:21:16+00:00,http://michiweiss.at/stories-pid386,c4-train-0000000000
1,c4-train00000.jsonl,2) Given the cavitation on tight turns and pos...,2019-04-19 10:14:41+00:00,http://www.rib.net/forum/f36/engine-height-pro...,c4-train-0000000001
2,c4-train00000.jsonl,In the recording and in the PDF I explain my p...,2019-04-23 06:56:40+00:00,https://qualitytime-esl.com/spip.php?article467,c4-train-0000000002
3,c4-train00000.jsonl,"I picked up the bass a few months ago, and I'v...",2019-04-24 08:24:16+00:00,https://music.stackexchange.com/questions/5275...,c4-train-0000000003
4,c4-train00000.jsonl,Get lyrics of Cc dust new ways song you love. ...,2019-04-24 04:50:06+00:00,https://www.lyrics.cat/lyrics+cc+dust+new+ways,c4-train-0000000004


[Optional] Remove the sharded dataset to save disk space:

In [21]:
!rm -rf {base_dir}/../clean_c4_resharded

ValueError: filedescriptor out of range in select()

## 4.3 Language ID and Separation

Data curation usually includes steps that are language specific (e.g. using language-tuned heuristics for quality filtering). NeMo Curator provides utilities to identify languages. The language identification is performed using fastText.

It is worth mentioning that even though a preliminary language identification has been performed on rpv2 and we started with English-only dataset, fastText is more accurate so it can be used for a second pass.

In [4]:
from nemo_curator import ScoreFilter, Modify
from nemo_curator.filters import FastTextLangId
from nemo_curator.modifiers import UnicodeReformatter
from nemo_curator.utils.file_utils import get_all_files_paths_under, separate_by_metadata

# Language ID path
language_output_path = expand_outdir_and_mkdir(os.path.join(base_dir,"../clean_c4_resharded_with_id_lang"))
language_data_output_path = expand_outdir_and_mkdir(os.path.join(language_output_path,"data"))

# Fasttext model path
model_path = language_output_path

# Define key in output .jsonl files to store the language information
language_field = "language"

Download the fastText model for langague detection.

In [8]:
!wget https://dl.fbaipublicfiles.com/fasttext/supervised-models/lid.176.bin -P {model_path}

--2024-11-03 11:40:24--  https://dl.fbaipublicfiles.com/fasttext/supervised-models/lid.176.bin
Resolving dl.fbaipublicfiles.com (dl.fbaipublicfiles.com)... 3.163.189.14, 3.163.189.51, 3.163.189.96, ...
Connecting to dl.fbaipublicfiles.com (dl.fbaipublicfiles.com)|3.163.189.14|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 131266198 (125M) [application/octet-stream]
Saving to: ‘/home/neelesh/clean_c4_resharded_with_id_lang/lid.176.bin’


2024-11-03 11:40:28 (38.6 MB/s) - ‘/home/neelesh/clean_c4_resharded_with_id_lang/lid.176.bin’ saved [131266198/131266198]



We will create an instance of Curator's `ScoreFilter` and use a helper function `separate_by_metadata` to separate the dataset into subfolders based on language.

In [5]:
t0 = time.time()

# Load dataset
id_data_dir = os.path.join(base_dir,"../clean_c4_resharded_with_id")
input_dataset = DocumentDataset.read_json(id_data_dir, add_filename=True)

# Define Language separation pipeline
lang_filter = FastTextLangId(os.path.join(model_path,'lid.176.bin'))
language_id_pipeline = ScoreFilter(
    lang_filter, 
    score_field=language_field,
    text_field="text",
    score_type='object'
)
filtered_dataset = language_id_pipeline(input_dataset)

# drop the detailed classifier score
filtered_dataset.df[language_field] = filtered_dataset.df[language_field].apply(
    lambda score: score[1],meta = (language_field, 'object')
    )

# Split the dataset to corresponding language sub-folders
language_stats = separate_by_metadata(
    filtered_dataset.df, 
    language_data_output_path, 
    metadata_field=language_field
).compute()

print(f"Time taken for splitting language:{time.time()-t0}")

Reading 1792 files











Time taken for splitting language:1321.483018875122


The English dataset has 1,088,311,520 documents compared to 1,088,468,779 documents in the raw dataset. This is because the raw dataset is aleady detected and filtered to English dataset.

We went from 91,217,230 examples down to 90858953

In [6]:
en_dataset_path = os.path.join(base_dir,"../clean_c4_resharded_with_id_lang/data/EN")
en_dataset = DocumentDataset.read_json(en_dataset_path, add_filename=True)

len(en_dataset)

Reading 1792 files


90858953

[Optional] Removing the ID'ed data to save disk space:

In [None]:
!rm -rf {base_dir}/rpv2-2023-06-id

In [None]:
# ja_dataset_path = os.path.join(base_dir,"rpv2-2023-06-language/data/JA")
# ja_dataset = DocumentDataset.read_json(ja_dataset_path, add_filename=True)

# ja_dataset.df.head(1)

## 4.4 Text cleaning

Datasets may have improperly decoded unicode characters. Curator provides utilities to fix improperly decoded unicode characters based on the heuristics defined within the `ftfy` package.

In [8]:
import nemo_curator
from nemo_curator.modifiers import UnicodeReformatter

en_dataset_path = os.path.join(base_dir,"../clean_c4_resharded_with_id_lang/data/EN")
en_dataset = DocumentDataset.read_json(en_dataset_path, add_filename=True)

Reading 1792 files


Curator offers uses the `modify` method with `UnicodeReformatter` for text cleaning. It requires the following arguments:

In [9]:
# make directory for cleaned dataset
output_clean_dir = expand_outdir_and_mkdir(os.path.join(base_dir,"../clean_c4_en_cleaned"))
# specify text field name and file type
input_text_field = "text"
input_file_type = "jsonl"

In [10]:
t0 = time.time()
# specify clearner
cleaner = nemo_curator.Modify(
    UnicodeReformatter(), 
    text_field=input_text_field
)

# clean dataset and write to disk
cleaned_dataset = cleaner(en_dataset)
cleaned_dataset.to_json(output_clean_dir, write_to_filename=True)
print(f"Text cleaning took {time.time()-t0} s")

Writing to disk complete for 1792 partitions
Text cleaning took 3688.7079470157623 s


[Optional] Removing intermediate data to save disk space:

In [9]:
!rm -rf {base_dir}/rpv2-2023-06-language/data/EN

# 5. Deduplication
<a id="dedup"></a>



## 5.1 Exact Deduplication

Exact dedup computes a hash for the raw text of each document. Documents with the same hash value will be exact duplicates and will be removed. Curator provides GPU-accelerated exact deduplication using Rapids.

In [2]:
from nemo_curator.log import create_logger
from nemo_curator.modules import ExactDuplicates

# def pre_imports():
#     import cudf  # noqa: F401

In [3]:
scheduler_address = os.getenv('SCHEDULER_ADDRESS')
# gpu_client = get_client(scheduler_address=scheduler_address)
# gpu_client = get_client(
#     cluster_type="gpu",
#     n_workers=4,                             # One worker per GPU
#     threads_per_worker=1,                    # One thread per worker
#     rmm_pool_size="48GB",                    # Memory pool size per GPU
#     enable_spilling=True,                    # Allow spilling to host memory
#     rmm_async=True,                          # Enable asynchronous RMM
#     rmm_managed_memory=False,                # Prefer pool-based memory management
#     protocol="ucx",                          # Communication protocol
#     set_torch_to_use_rmm=True,               # Use RMM if working with PyTorch
# )
cpu_client = get_client(
    cluster_type="cpu",
    n_workers=128,                  # Number of workers, adjust to suit load (64 workers = 2 cores each)
    threads_per_worker=1,          # Threads per worker for I/O-bound tasks or to maintain high CPU utilization
    enable_spilling=True,          # Allow Dask to spill to disk when memory is tight
    set_torch_to_use_rmm=False,    # Not applicable for CPU, but good to be explicit
    memory_limit="16GB",           # Memory per worker, allowing flexibility within 1TB
)

print(f"Num Workers = {get_num_workers(cpu_client)}", flush=True)

# cpu_client.run(pre_imports)
print("Pre imports complete")

Num Workers = 128
Pre imports complete


In [11]:
cpu_client.cluster.close()
cpu_client.shutdown()

2024-11-03 14:21:48,437 - distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.
Traceback (most recent call last):
  File "/home/neelesh/anaconda3/envs/nemo/lib/python3.10/site-packages/distributed/comm/tcp.py", line 225, in read
    frames_nosplit_nbytes_bin = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/neelesh/anaconda3/envs/nemo/lib/python3.10/site-packages/distributed/worker.py", line 1250, in heartbeat
    response = await retry_operation(
  File "/home/neelesh/anaconda3/envs/nemo/lib/python3.10/site-packages/distributed/utils_comm.py", line 461, in retry_operation
    return await retry(
  File "/home/neelesh/anaconda3/envs/nemo/lib/python3.10/site-packages/distributed/utils_comm.py", line 440, in retry
    return await coro()
  File "/home/neelesh/anaconda3/envs/nemo/lib/python3.10/site-pa

: 

In [4]:
cleaned_dataset_path = os.path.join(base_dir,"../clean_c4_en_cleaned")
log_dir = expand_outdir_and_mkdir(os.path.join(base_dir, "logs"))
input_id_field = 'id'
input_text_field = 'text'
hash_method = 'md5'
output_dir = expand_outdir_and_mkdir(os.path.join(base_dir,"../clean_c4_exact_dedup"))

In [None]:
t0 = time.time()
# Read the input dataset from the cleaned dataset dir
input_dataset = DocumentDataset.read_json(cleaned_dataset_path)

# Perform exact dedup
exact_dups = ExactDuplicates(
    logger=log_dir,
    id_field=input_id_field,
    text_field=input_text_field,
    hash_method=hash_method,
    cache_dir=output_dir,
)
duplicates = exact_dups(dataset=input_dataset)
print(f"Exact dedup took:{time.time()-t0}")


Reading 1792 files


Exact dedup took:187.99768805503845


Exact deduplication found 97,327,867 duplicated documents.

Let's see the results of exact dedup:

In [5]:
duplicates_df = duplicates.df
duplicates_df.head()

NameError: name 'duplicates' is not defined

We can sort the duplicate cluster by size and see that the largest cluster has 8 exact duplicates.

In [17]:
duplicates_df.groupby('_hashes') \
             .agg({'id': 'count'}) \
             .rename(columns={'id': 'count'}) \
             .sort_values('count', ascending=False) \
             .head()

Unnamed: 0_level_0,count
_hashes,Unnamed: 1_level_1
61a3ade28229b37a315c102f982c6592,8
24ca359fdcad574baf1bde55efbc2012,6
4524e08fdbd1cb1f50e7a22664326d6d,3
dac7e27b45e1306c82ea3e2ce12bfd95,3
87f8ae68fa14ce3f154f0a661e20477e,3


In [18]:
dup_group = duplicates_df[duplicates_df['_hashes'] == '61a3ade28229b37a315c102f982c6592'].compute()
dup_group.head()

Unnamed: 0,id,_hashes
2,c4-train-0007801382,61a3ade28229b37a315c102f982c6592
5,c4-train-0017258282,61a3ade28229b37a315c102f982c6592
6,c4-train-0018256270,61a3ade28229b37a315c102f982c6592
8,c4-train-0028846946,61a3ade28229b37a315c102f982c6592
10,c4-train-0044802347,61a3ade28229b37a315c102f982c6592


[Optional] Verify if the documents with the same hash are exactly the same. We can use the ids from the cell output above (ids may change so revise the `dup_ids` as needed):

In [19]:
t0 = time.time()
dup_ids = ['c4-train-0007801382', 'c4-train-0017258282', 'c4-train-0018256270'] 
dup_examples = input_dataset.df[input_dataset.df['id'].isin(dup_ids)].compute()
print(f"Searching for example duplicates with specific IDs took {time.time()-t0} seconds")

Searching for example duplicates with specific IDs took 64.67666721343994 seconds


In [20]:
dup_examples

Unnamed: 0,id,language,text,timestamp,url
13133,c4-train-0007801382,EN,网站ICP备案序号:粤ICP备09046014号-1 Copyright ? 2015 ds...,1555944862000,http://www.5coney.com/2018/strength_0529/12.html
2547,c4-train-0017258282,EN,网站ICP备案序号:粤ICP备09046014号-1 Copyright ? 2015 ds...,1555944975000,http://www.5coney.com/2018/advantage_0528/1.html
32792,c4-train-0018256270,EN,网站ICP备案序号:粤ICP备09046014号-1 Copyright ? 2015 ds...,1555944384000,http://www.5coney.com/2018/advantage_0528/2.html


In [21]:
print('Example duplicate 1\n' + dup_examples.text.iloc[0])
print('\n\nExample duplicate 2\n' + dup_examples.text.iloc[1])
print('\n\nExample duplicate 3\n' + dup_examples.text.iloc[2])

Example duplicate 1
网站ICP备案序号:粤ICP备09046014号-1 Copyright ? 2015 ds1234567.com. All rights reserved.


Example duplicate 2
网站ICP备案序号:粤ICP备09046014号-1 Copyright ? 2015 ds1234567.com. All rights reserved.


Example duplicate 3
网站ICP备案序号:粤ICP备09046014号-1 Copyright ? 2015 ds1234567.com. All rights reserved.


Now, we will remove the exact duplicates and write the remaining dataset to disk.

In [6]:
input_dataset = DocumentDataset.read_json(cleaned_dataset_path, add_filename=True)
duplicates = DocumentDataset.read_parquet(os.path.join(output_dir,"_exact_duplicates.parquet"))
duplicates_df = duplicates.df

Reading 1792 files


Reading 1 files


In [8]:
output_dir = expand_outdir_and_mkdir(os.path.join(base_dir,"../clean_c4_exact_dedup_removed_final"))

t0 = time.time()
docs_to_remove = duplicates_df.map_partitions(
    lambda x: x[x._hashes.duplicated(keep="first")]
)

# When there are few duplicates we can compute the results to a list and use `isin`.
result = input_dataset.df[
    ~input_dataset.df[input_id_field].isin(
        docs_to_remove[input_id_field].compute()
    )
]

write_to_disk(
    result,
    output_dir,
    write_to_filename=True,
    output_type='jsonl',
)

print(f"Removing exact duplicates took:{time.time()-t0}")

Writing to disk complete for 1792 partitions
Removing exact duplicates took:171.1081485748291


We can see that exact dedup removed 4,264 documents and we now have 90,854,688 documents left in the dataset.

In [9]:
len(docs_to_remove)

4265

In [10]:
len(result)

90854688

In [11]:
cpu_client.cluster.close()
cpu_client.shutdown()

2024-11-03 14:41:06,643 - distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.
Traceback (most recent call last):
  File "/home/neelesh/anaconda3/envs/nemo/lib/python3.10/site-packages/distributed/comm/tcp.py", line 225, in read
    frames_nosplit_nbytes_bin = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/neelesh/anaconda3/envs/nemo/lib/python3.10/site-packages/distributed/worker.py", line 1250, in heartbeat
    response = await retry_operation(
  File "/home/neelesh/anaconda3/envs/nemo/lib/python3.10/site-packages/distributed/utils_comm.py", line 461, in retry_operation
    return await retry(
  File "/home/neelesh/anaconda3/envs/nemo/lib/python3.10/site-packages/distributed/utils_comm.py", line 440, in retry
    return await coro()
  File "/home/neelesh/anaconda3/envs/nemo/lib/python3.10/site-pa

## 5.2 Fuzzy Deduplication

Fuzzy deduplication aims to find near-duplicated documents in our dataset. Near-duplicated documents are common in web crawl data due to plagiarism and mirror sites. Removing them can help improve the quality of trained models. In many cases, we can skip exact dedup and just perform fuzzy dedup as it will also find the exact duplicates. Thus, we will start with the cleaned dataset for fuzzy dedup.

Curator implements GPU-accelerated Fuzzy Deduplication based on minhash + LSH algorithm for finding similar documents across the dataset. Specifically, Fuzzy Deduplication include six steps:

- Compute minhashes
- Locality-Sensitive Hashing (LSH)
- Map buckets
- Jaccard shuffle
- Jaccard compute
- Connected components


In [3]:
def pre_imports():
    import cudf  # noqa: F401

# scheduler_address = os.getenv('SCHEDULER_ADDRESS')
# gpu_client = get_client(scheduler_address=scheduler_address)
# gpu_client = get_client(
#     cluster_type="gpu",
#     n_workers=4,                             # One worker per GPU
#     threads_per_worker=32,                    # One thread per worker
#     rmm_pool_size="48GB",                    # Memory pool size per GPU
#     enable_spilling=True,                    # Allow spilling to host memory
#     rmm_async=True,                          # Enable asynchronous RMM
#     rmm_managed_memory=False,                # Prefer pool-based memory management
#     protocol="ucx",                          # Communication protocol
#     set_torch_to_use_rmm=True,               # Use RMM if working with PyTorch
# )
print(f"Num Workers = {get_num_workers(gpu_client)}", flush=True)

gpu_client.run(pre_imports)
print("Pre imports complete")

Num Workers = 4
Pre imports complete


### 5.2.1 Compute minhashes

First, we will compute the minhash signature for each documents. For this purpose, each document will be represented by a set of n-grams. We will apply random hash functions on each element of the set. The minimum hash value generated by each hash function will be recorded and becomes a component of the MinHash signature. Thus, the length of the minhash signature will be the same as the number of hash functions. 

In [15]:
from nemo_curator import MinHash

input_data_dir = os.path.join(base_dir,"../clean_c4_en_cleaned")
seed = 42
minhash_length = 260
char_ngram = 5
log_dir = expand_outdir_and_mkdir(os.path.join(base_dir, "logs"))
id_field = 'id'
text_field = 'text'
minshah_output_dir = expand_outdir_and_mkdir(os.path.join(base_dir,"../clean_c4_minhash"))

In [16]:
files = get_all_files_paths_under(root=input_data_dir, recurse_subdirectories=False)
files = [f for f in files if f.endswith(".jsonl")]
df = read_data(
    files,
    file_type="jsonl",
    backend="cudf",
    files_per_partition=1,
    add_filename=False,
)[[id_field, text_field]]

Reading 1792 files


In [17]:
t0 = time.time()

# Run MinHash() on input data
minhasher = MinHash(
    seed=seed,
    num_hashes=minhash_length,
    char_ngrams=char_ngram,
    use_64bit_hash=False,
    logger=log_dir,
    id_field=id_field,
    text_field=text_field,
    cache_dir=minshah_output_dir
)

result = minhasher(DocumentDataset(df)).df

print(f"Computing minhashes took:{time.time()-t0}")

Task exception was never retrieved
future: <Task finished name='Task-178816' coro=<Client._gather.<locals>.wait() done, defined at /home/neelesh/anaconda3/envs/nemo/lib/python3.10/site-packages/distributed/client.py:2385> exception=AllExit()>
Traceback (most recent call last):
  File "/home/neelesh/anaconda3/envs/nemo/lib/python3.10/site-packages/distributed/client.py", line 2394, in wait
    raise AllExit()
distributed.client.AllExit


Computing minhashes took:314.1223084926605


We can see some example outputs from the minhash computation.

In [18]:
result.head()

Unnamed: 0,id,_minhash_signature
0,c4-train-0000000000,"[30032382, 157261, 5008033, 4311555, 19755091,..."
1,c4-train-0000000001,"[511522, 1015487, 2320335, 651428, 1906819, 14..."
2,c4-train-0000000002,"[15994705, 6370213, 15559465, 9740304, 6210120..."
3,c4-train-0000000003,"[1449615, 1872293, 3654170, 452331, 780352, 39..."
4,c4-train-0000000004,"[6685476, 2415781, 1112347, 742646, 6898911, 4..."


### 5.2.2 Minhash LSH

LSH() implements LSH algorithm which includes the following steps:

- Divide the minhash signature array into X different portions.

- For each portions, hash the minhash values into buckets. One document will be assigned to X buckets.

- Documents within the same bucket will be deemed similar. Since every document will be assigned X buckets and as long as two documents share 1 or more buckets they are deemed similar, the result of LSH will have more false positive as compared to false negative. The false positive cases will be filtered in following modules, namely jaccard compute.

In [19]:
from nemo_curator import LSH
from nemo_curator.utils.fuzzy_dedup_utils.id_mapping import convert_str_id_to_int

lsh_input_dir = os.path.join(base_dir,"../clean_c4_minhash")
id_field = 'id'
output_bucket_dir = expand_outdir_and_mkdir(os.path.join(base_dir,"../clean_c4_fuzzy_dedup"))
num_bands = 20
buckets_per_shuffle = 1
minhash_field = '_minhash_signature'
minhash_length = 260
log_dir = os.path.join(base_dir, "logs")

In [20]:
t0 = time.time()

#Load MinHash output
df = dask_cudf.read_parquet(lsh_input_dir, blocksize="2GB", aggregate_files=True)
df = df.map_partitions(
    convert_str_id_to_int,
    id_column=id_field,
    meta=cudf.DataFrame(
        {minhash_field: [[1, 2, 3]], "doc_id": [1], "dataset_id": np.uint32(1)}
    ),
)

lsh = LSH(
    cache_dir=output_bucket_dir,
    num_hashes=minhash_length,
    num_buckets=num_bands,
    buckets_per_shuffle=buckets_per_shuffle,
    id_fields=["dataset_id", "doc_id"],
    minhash_field=minhash_field,
    logger=log_dir,
)

lsh_result = lsh(DocumentDataset(df))
print(f"LSH took {time.time()-t0} s")

LSH took 776.1493239402771 s


In [21]:
lsh_result.df.head()

Unnamed: 0,dataset_id,doc_id,_bucket_id
0,2191958705,39975608,164
1,2191958705,36113741,11309
2,2191958705,42465749,2525
3,2191958705,66092294,6835
4,2191958705,49010978,13088


### 5.2.3 Map Buckets

After performing LSH, we processed each bucket and calculated an approximation of the all-pairs Jaccard
similarity in order to remove false positive duplicates introduced by LSH. For this purpose, we will randomly sample n "anchor" documents within each buckets and calculate the Jaccard similarity with everything remaining in the bucket.

In [22]:
from nemo_curator.modules.fuzzy_dedup import _MapBuckets
from nemo_curator.utils.fuzzy_dedup_utils.io_utils import (
    get_bucket_ddf_from_parquet_path,
    get_text_ddf_from_json_path_with_blocksize,
)

input_data_paths = [os.path.join(base_dir,"../clean_c4_en_cleaned")]
num_files = None
text_ddf_blocksize = 256 #The block size for chunking jsonl files for text ddf in mb
id_field = 'id'
text_field = 'text'
input_bucket_path = os.path.join(base_dir,"../clean_c4_fuzzy_dedup/_buckets.parquet")
input_bucket_field = '_bucket_id'
shuffle_type ='tasks'
log_dir = os.path.join(base_dir, "logs")
output_anchor_docs_with_bk_path = expand_outdir_and_mkdir(os.path.join(base_dir,"../clean_c4_fuzzy_dedup/anchor_docs_with_bk.parquet"))

In [23]:
# Read .jsonl input data
ddf_text = get_text_ddf_from_json_path_with_blocksize(
    input_data_paths=input_data_paths,
    num_files=num_files,
    blocksize=text_ddf_blocksize,
    id_column=id_field,
    text_column=text_field,
)

print(f"ddf_text.npartitions  = {ddf_text.npartitions}", flush=True)

Number of files being read for jaccard calculation = 1792


ddf_text.npartitions  = 896


In [24]:
t0 = time.time()
num_workers = get_num_workers(gpu_client)

# Read "_buckets.parquet"
ddf_bk = get_bucket_ddf_from_parquet_path(
    input_bucket_path=input_bucket_path, 
    num_workers=num_workers
)

#Run _MapBuckets()
map_buckets = _MapBuckets(
    id_fields=["dataset_id", "doc_id"], 
    bucket_field=input_bucket_field, 
    logger=log_dir,
    text_field=text_field,
)

ddf_anchor_docs_with_bk = map_buckets.map_buckets_with_anchors(
    documents_df=ddf_text, 
    buckets_df=ddf_bk, 
    shuffle_type=shuffle_type
)

#Write to disk
ddf_anchor_docs_with_bk.to_parquet(
    output_anchor_docs_with_bk_path, 
    write_index=False
)

print(f"Mapping Bucket took {time.time()-t0} s")

Number of ddf_bk partitions = 4
Mapping Bucket took 71.95588207244873 s


In [25]:
ddf_anchor_docs_with_bk.head()

Unnamed: 0,dataset_id,doc_id,anchor_1_dataset_id,anchor_1_doc_id,anchor_0_dataset_id,anchor_0_doc_id,_output_partition_id
0,2191958705,80650180,2191958705,82126513,2191958705,5644139,4
1,2191958705,37749971,2191958705,37749971,2191958705,40615424,11
2,2191958705,43538852,2191958705,78393372,2191958705,43538852,10
3,2191958705,4999544,2191958705,50915758,2191958705,90667015,6
4,2191958705,49792759,2191958705,78927539,2191958705,86045229,5


### 5.2.4 Jaccard Shuffle

We shuffle the documents within the dataset based on their bucket assignments, essentially distributing similar documents across different partitions or workers, enabling efficient parallel processing and deduplication in subsequent steps.

In [26]:
from nemo_curator.modules.fuzzy_dedup import _Shuffle

log_dir = os.path.join(base_dir, "logs")
input_anchor_docs_with_bk_path = os.path.join(base_dir,"../clean_c4_fuzzy_dedup/anchor_docs_with_bk.parquet")
output_shuffled_docs_path = expand_outdir_and_mkdir(
    os.path.join(base_dir, "../clean_c4_fuzzy_dedup/shuffled_docs.parquet")
)
bucket_mapping_ddf_blocksize = 256
parts_per_worker = 16
bucket_parts_per_worker = 256
id_field = 'id'
text_field = 'text'

In [27]:
t0 = time.time()

shuffle = _Shuffle(
    id_fields=["dataset_id", "doc_id"],
    text_field=text_field,
    int_to_str_id=id_field,
    logger=log_dir,
)

shuffle.shuffle_docs_on_buckets(
    documents_df=ddf_text,
    bucket_w_anchors_path=input_anchor_docs_with_bk_path,
    output_shuffled_docs_path=output_shuffled_docs_path,
    bucket_mapping_df_blocksize=bucket_mapping_ddf_blocksize,
    parts_per_worker=parts_per_worker,
    bucket_parts_per_worker=bucket_parts_per_worker,
    partition_on="_output_partition_id",
)

print(f"Jaccard Shuffle took {time.time()-t0} s")

  0%|          | 0/1 [00:00<?, ?it/s]


Started processing bucket-map partitions 0 through 4 of 4
Using 64 text partitions.
Starting text bytes aware shuffle
Will write 2995018 rows to disk
Text-df partition  64/896 completed in 10.766233205795288
Using 64 text partitions.
Starting text bytes aware shuffle
Will write 3000449 rows to disk
Text-df partition  128/896 completed in 10.238612651824951
Using 64 text partitions.
Starting text bytes aware shuffle
Will write 2988926 rows to disk
Text-df partition  192/896 completed in 9.267656803131104
Using 64 text partitions.
Starting text bytes aware shuffle
Will write 2991151 rows to disk
Text-df partition  256/896 completed in 9.227095365524292
Using 64 text partitions.
Starting text bytes aware shuffle
Will write 2986838 rows to disk
Text-df partition  320/896 completed in 8.993366956710815
Using 64 text partitions.
Starting text bytes aware shuffle
Will write 2993451 rows to disk
Text-df partition  384/896 completed in 8.884929895401001
Using 64 text partitions.
Starting text 

100%|██████████| 1/1 [02:09<00:00, 129.71s/it]

Jaccard Shuffle took 129.80706119537354 s





We can visualize the jaccard shuffle results for a single partition:

In [28]:
jaccard_shuffle_res = dd.read_parquet(os.path.join(output_shuffled_docs_path,"_output_partition_id=0"))
jaccard_shuffle_res.head()

Unnamed: 0,text,_text_bytes,id,anchor_0_id,anchor_1_id,_output_partition_id
0,You are able to rely on Bathroom Toilet Guys t...,2347,2191958705-2610117,2191958705-26983035,2191958705-40014867,0
1,You are able to trust in Building Contractor T...,2655,2191958705-5361227,2191958705-42968742,2191958705-36434720,0
2,A super premium .Com domain name from DomainMa...,2937,2191958705-3201346,2191958705-70134761,2191958705-55782143,0
3,Prefab Building Guys is here for your goals co...,2339,2191958705-6065219,2191958705-54311928,2191958705-6602398,0
4,You're able to depend on 2 Person Hot Tub Guys...,2462,2191958705-2088686,2191958705-65642188,2191958705-86688878,0


### 5.2.5 Jaccard Compute

Now we have the jaccard pairs sampled, we can compute the Jaccard similarity score for all pairs.

In [10]:
from nemo_curator.modules.fuzzy_dedup import JaccardSimilarity

id_field = 'id'
text_field = 'text'
ngram_size = 5
shuffled_docs_path = os.path.join(base_dir, "../clean_c4_fuzzy_dedup/shuffled_docs.parquet")
jaccard_results_path = expand_outdir_and_mkdir(
    os.path.join(base_dir, "../clean_c4_fuzzy_dedup/jaccard_similarity_results.parquet")
)

In [None]:
t0 = time.time()
jaccard = JaccardSimilarity(
    id_field=id_field,
    text_field=text_field,
    anchor_id_fields=[f"anchor_{i}_{id_field}" for i in range(2)],
    ngram_width=ngram_size,
)

# Run actual computation
result_df = jaccard.jaccard_compute(shuffled_docs_path)

result_df.to_parquet(
    jaccard_results_path,
    write_index=False,
    write_metadata_file=False,
)

print(f"Jaccard Computing+Writing took {time.time() - t0} seconds")

Jaccard Computing+Writing took 18.565584421157837 seconds


In [32]:
jaccard_compute_res = dd.read_parquet(jaccard_results_path)
jaccard_compute_res.head()

Unnamed: 0,id_x,id_y,jaccard
0,2191958705-17148786,2191958705-9519602,0.809187
1,2191958705-9327883,2191958705-7073840,0.908163
2,2191958705-8357724,2191958705-9443292,0.733333
3,2191958705-15262553,2191958705-9443292,0.849854
4,2191958705-19226246,2191958705-12862039,0.923301


### 5.2.6 Connected Component

After all buckets were processed and duplicates (at the threshold) were approximately discovered,
we constructed a sparse document graph and found the connected components therein (using scipy). Each
connected component represents a set of documents that we consider similar enough to be duplicates, and
from which we select a single representative.

In [6]:
from nemo_curator.modules.fuzzy_dedup import ConnectedComponents

cache_dir = expand_outdir_and_mkdir(
    os.path.join(base_dir, "../clean_c4_fuzzy_dedup/new-cc-cache")
)
jaccard_pairs_path = os.path.join(base_dir, "../clean_c4_fuzzy_dedup/jaccard_similarity_results.parquet")
id_field = 'id'
jaccard_threshold = 0.8
output_path = expand_outdir_and_mkdir(
    os.path.join(base_dir, "../clean_c4_fuzzy_dedup/connected_components.parquet")
)

In [4]:
t0 = time.time()
components_stage = ConnectedComponents(
    cache_dir=cache_dir,
    jaccard_pairs_path=jaccard_pairs_path,
    id_column=id_field,
    # convert_str_ids=True,

    jaccard_threshold=jaccard_threshold,
)
components_stage.cc_workflow(output_path=output_path)
print(f"Connected Component took {time.time()-t0} seconds")

Connected Component took 23.1136314868927 seconds


In [None]:
# from nemo_curator.modules.fuzzy_dedup import ConnectedComponents

# # start a local GPU dask cluster with 4 GPUs 
# gpu_client = get_client(
#     cluster_type="gpu",
#     n_workers=4,                             # One worker per GPU
#     threads_per_worker=1,                    # One thread per worker
#     rmm_pool_size="48GB",                    # Memory pool size per GPU
#     enable_spilling=True,                    # Allow spilling to host memory
#     rmm_async=True,                          # Enable asynchronous RMM
#     rmm_managed_memory=False,                # Prefer pool-based memory management
#     protocol="ucx",                          # Communication protocol
#     set_torch_to_use_rmm=True,               # Use RMM if working with PyTorch
# )


# # cache_dir = expand_outdir_and_mkdir(
# #     os.path.join(base_dir, "../clean_c4_fuzzy_dedup/cc-cache")
# # )
# # jaccard_pairs_path = os.path.join(base_dir, "../clean_c4_fuzzy_dedup/jaccard_similarity_results.parquet")
# # id_field = 'id'
# # jaccard_threshold = 0.8
# # output_path = expand_outdir_and_mkdir(
# #     os.path.join(base_dir, "../clean_c4_fuzzy_dedup/connected_components.parquet")
# # )

Task exception was never retrieved
future: <Task finished name='Task-47525' coro=<_listener_handler_coroutine() done, defined at /home/neelesh/anaconda3/envs/nemo/lib/python3.10/site-packages/ucp/core.py:140> exception=RuntimeError('<distributed._async_taskgroup.AsyncTaskGroup object at 0x7fd4b818aa40> is bound to a different event loop')>
Traceback (most recent call last):
  File "/home/neelesh/anaconda3/envs/nemo/lib/python3.10/site-packages/ucp/core.py", line 190, in _listener_handler_coroutine
    await func(ep)
  File "/home/neelesh/anaconda3/envs/nemo/lib/python3.10/site-packages/distributed/comm/ucx.py", line 526, in serve_forever
    await self.comm_handler(ucx)
  File "/home/neelesh/anaconda3/envs/nemo/lib/python3.10/site-packages/distributed/core.py", line 732, in handle_comm
    self._ongoing_background_tasks.call_soon(self._handle_comm, comm)
  File "/home/neelesh/anaconda3/envs/nemo/lib/python3.10/site-packages/distributed/_async_taskgroup.py", line 90, in call_soon
    ta

In [5]:
# visualize connected components output 
cc_output = dd.read_parquet(output_path)
cc_output.head()
len(cc_output)
output_path = os.path.join(base_dir, "../clean_c4_fuzzy_dedup/connected_components.parquet")
cc_result = dask_cudf.read_parquet(output_path, split_row_groups=False).repartition(npartitions=1)

Let's check the results of connected components step. We can see that 239,037,733 are identified as duplicates to be removed.

In [None]:
# output_path = os.path.join(base_dir, "../clean_c4_fuzzy_dedup/connected_components.parquet")
# cc_result = dask_cudf.read_parquet(output_path, split_row_groups=True).repartition(npartitions=1)

# def alt_split_ids(df):
#     df['dataset_id'] = df['id'].str.rsplit('-', n=1).str[0]
#     df['doc_id'] = df['id'].str.rsplit('-', n=1).str[1]
#     return df

# cc_result = cc_result.map_partitions(
#     alt_split_ids,
# )

# # Set 'group' as the index and shuffle to ensure all same 'group' values are in the same partition
# cc_result = cc_result.set_index('group', shuffle='tasks')
# # Define a function to assign cumulative counts and filter duplicates
# def assign_cumcount(df):
#     df['cumcount'] = df.groupby(level=0).cumcount()
#     df = df[df['cumcount'] >= 1]
#     df = df.drop(columns=['cumcount'])
#     return df

# # Find duplicates by applying the function to each partition
# docs_to_remove = cc_result.map_partitions(assign_cumcount, meta=cc_result)

# # Reset the index
# docs_to_remove = docs_to_remove.reset_index()

# print("num of docs to remove =", len(docs_to_remove))

# docs_to_remove = docs_to_remove[["dataset_id", "doc_id"]]
# docs_to_remove = docs_to_remove.rename(columns={"dataset_id":"to_remove_dataset_id", "doc_id":"to_remove_doc_id"})
# docs_to_remove = docs_to_remove.reset_index(drop=True).persist()
# _ = wait(docs_to_remove)
# del _ 

# print("num of docs to remove =", len(docs_to_remove))

ValueError: Metadata inference failed in `alt_split_ids`.

You have supplied a custom function and Dask is unable to 
determine the type of output that that function returns. 

To resolve this please provide a meta= keyword.
The docstring of the Dask function you ran should have more information.

Original error is below:
------------------------
RuntimeError('CUDF failure at: /__w/cudf/cudf/cpp/src/strings/strings_column_view.cpp:27: strings_column_view only supports strings')

Traceback:
---------
  File "/home/neelesh/anaconda3/envs/nemo/lib/python3.10/site-packages/dask/dataframe/utils.py", line 193, in raise_on_meta_error
    yield
  File "/home/neelesh/anaconda3/envs/nemo/lib/python3.10/site-packages/dask_expr/_expr.py", line 4025, in _emulate
    return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
  File "/tmp/ipykernel_1927870/497653039.py", line 5, in alt_split_ids
    df['dataset_id'] = df['id'].str.rsplit('-', n=1).str[0]
  File "/home/neelesh/anaconda3/envs/nemo/lib/python3.10/site-packages/cudf/core/column/string.py", line 189, in __getitem__
    return self.get(key)
  File "/home/neelesh/anaconda3/envs/nemo/lib/python3.10/site-packages/cudf/core/column/string.py", line 2314, in get
    return self._return_or_inplace(libstrings.get(self._column, i))
  File "/home/neelesh/anaconda3/envs/nemo/lib/python3.10/contextlib.py", line 79, in inner
    return func(*args, **kwds)
  File "substring.pyx", line 79, in cudf._lib.strings.substring.get
  File "slice.pyx", line 18, in pylibcudf.strings.slice.__pyx_fuse_1slice_strings
  File "slice.pyx", line 92, in pylibcudf.strings.slice.slice_strings


In [4]:
output_path = os.path.join(base_dir, "../clean_c4_fuzzy_dedup/connected_components.parquet")
cc_result = dask_cudf.read_parquet(output_path, split_row_groups=False).repartition(npartitions=1)

# Set 'group' as the index and shuffle to ensure all same 'group' values are in the same partition
cc_result = cc_result.set_index('group', shuffle='tasks')

# Define a function to assign cumulative counts and filter duplicates
def assign_cumcount(df):
    df['cumcount'] = df.groupby(level=0).cumcount()
    df = df[df['cumcount'] >= 1]
    df = df.drop(columns=['cumcount'])
    return df

# Find duplicates by applying the function to each partition
docs_to_remove = cc_result.map_partitions(assign_cumcount, meta=cc_result)

# Reset the index
docs_to_remove = docs_to_remove.reset_index()

# docs_to_remove = docs_to_remove[["dataset_id", "doc_id"]]
docs_to_remove = docs_to_remove[["id"]]
# docs_to_remove = docs_to_remove.rename(columns={"dataset_id":"to_remove_dataset_id", "doc_id":"to_remove_doc_id"})
docs_to_remove = docs_to_remove.rename(columns={"id":"to_remove_id"})
docs_to_remove = docs_to_remove.reset_index(drop=True).persist()
_ = wait(docs_to_remove)
del _ 

print("num of docs to remove =", len(docs_to_remove))

num of docs to remove = 4252657


In [10]:
docs_to_remove.head()

Unnamed: 0,to_remove_id
0,2191958705-75949870
1,2191958705-49217365
2,2191958705-59970680
3,2191958705-39328485
4,2191958705-52628412


We can examine the size of the duplicate clusters. The largest cluster has 775,379 near duplicates.

In [13]:
# cc_grouped = cc_result.groupby('group').agg({'doc_id': 'count'}).rename(columns={'doc_id': 'count'}).sort_values('count', ascending=False).compute()

cc_grouped = cc_result.groupby('group').agg({
    'id': lambda x: x.str.split('-').str[0].nunique()
}).rename(columns={'id': 'count'}).sort_values('count', ascending=False).compute()
cc_grouped.head()

ValueError: unknown aggregate lambda

[Optional] Verify if fuzzy duplicates are similar. For example, we can look into the largest group "350652173".

In [8]:
dup_group = cc_result.loc[350652173].compute()
dup_group.head()

Unnamed: 0_level_0,dataset_id,doc_id
group,Unnamed: 1_level_1,Unnamed: 2_level_1
350652173,256213913,1285625132
350652173,256213913,2033200488
350652173,256213913,428016172
350652173,256213913,1268721963
350652173,256213913,1285428574


We will examine the first five documents in this cluster:

In [4]:
# read input dataset
input_data_dir = os.path.join(base_dir, "rpv2-2023-06-en-cleaned")
input_dataset = DocumentDataset.read_json(input_data_dir, add_filename=True)

Reading 37848 files


Let's visualize the content of these documents and see if they are similar (ids may change so revise the `dup_ids` as needed).

In [10]:
t0 = time.time()
dup_ids = [
    'rpv2-2023-06-1285625132',
    'rpv2-2023-06-2033200488',
    'rpv2-2023-06-0428016172',
    'rpv2-2023-06-1268721963',
    'rpv2-2023-06-1285428574'
] 
dup_examples = input_dataset.df[input_dataset.df['id'].isin(dup_ids)].compute()
print(f"Searching for near duplicate examples with specific IDs took {time.time()-t0} seconds")

Searching for near duplicate examples with specific IDs took 610.5046670436859 seconds


In [None]:
dup_examples

In [None]:
print('Example duplicate 1\n' + dup_examples.raw_content.iloc[0])
print('\n\nExample duplicate 2\n' + dup_examples.raw_content.iloc[1])
print('\n\nExample duplicate 3\n' + dup_examples.raw_content.iloc[2])
print('\n\nExample duplicate 4\n' + dup_examples.raw_content.iloc[3])
print('\n\nExample duplicate 4\n' + dup_examples.raw_content.iloc[4])

### 5.2.7 Duplicates Removal

Next, we will proceed to remove the duplicates identified from the dataset. We will first change the string ID to `doc_id` and `dataset_id` in the input dataset.

In [16]:
from helper import convert_str_id_to_int

input_dataset = DocumentDataset.read_json(os.path.join(base_dir, "../clean_c4_en_cleaned"))
input_df = input_dataset.df[['text','id']]
meta = input_df._meta
meta['doc_id']=np.int64([0])
meta['dataset_id']=np.uint32([0])
input_df = input_df.map_partitions(
    convert_str_id_to_int,
    id_column="id",
    meta=meta,
)

Reading 1792 files


Then, we will perform a merge between the `input_df` and the `docs_to_remove` on the IDs and drop the fuzzy duplicates.

In [17]:
def convert_str_id_to_int(df, id_column='id'):
    # Make a deep copy of the dataframe
    df = df.copy(deep=True)
    
    # Split the id column
    dx = df[id_column].str.split('-', expand=True)
    
    # Use loc for safe assignment
    df.loc[:, 'doc_id'] = dx[1].astype('int64')
    
    # Hash computation
    import hashlib
    df.loc[:, 'dataset_id'] = dx[0].apply(lambda x: int(hashlib.md5(str(x).encode()).hexdigest()[:8], 16))
    
    return df

# Create output directory and process
dedup_output_dir = expand_outdir_and_mkdir(os.path.join(base_dir, "../clean_c4_deduped"))

# Process the data
input_df = input_df.compute()
input_df = convert_str_id_to_int(input_df)

# Remove duplicates
deduped_df = input_df[~input_df['id'].isin(docs_to_remove['to_remove_id'])]

# Write to parquet
t0 = time.time()
deduped_df.to_parquet(dedup_output_dir)
print(f"Removing duplicates and writing deduped dataset took {time.time()-t0} seconds")

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["doc_id"] = dx[1].astype("int64").values
2024-11-06 12:09:04,876 - distributed.worker - ERROR - Compute Failed
Key:       ('read_single_partition-getitem-convert_str_id_to_int-fa420657f4c775fc4951a135b3a23837', 998)
State:     executing
Function:  execute_task
args:      ((<function Fused._execute_task at 0x7ff381b4bd90>, {'read_single_partition-getitem-convert_str_id_to_int-fa420657f4c775fc4951a135b3a23837': ('convert_str_id_to_int-d81d02f032afe14e77a472319de3a460', 998), ('convert_str_id_to_int-d81d02f032afe14e77a472319de3a460', 998): (<function apply at 0x7ff3fb540160>, <function apply_and_enforce at 0x7ff38237fbe0>, [('getitem-c07894a7fd914c16d3773a255c06073f', 998)], {'id_column': 'id', '_func': <function convert_str_

AttributeError: 'Series' object has no attribute 'hash_values'

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["doc_id"] = dx[1].astype("int64").values
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["doc_id"] = dx[1].astype("int64").values


In [None]:


dedup_output_dir = expand_outdir_and_mkdir(os.path.join(base_dir, "../clean_c4_deduped"))
input_df = input_df.compute()

# Filter out rows where id is in the to_remove_id list
deduped_df = input_df[~input_df['id'].isin(docs_to_remove['to_remove_id'])]

t0 = time.time()
deduped_df.to_parquet(dedup_output_dir)
print(f"Removing duplicates and writing deduped dataset took {time.time()-t0} seconds")

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["doc_id"] = dx[1].astype("int64").values
2024-11-06 12:07:16,198 - distributed.worker - ERROR - Compute Failed
Key:       ('read_single_partition-getitem-convert_str_id_to_int-fa420657f4c775fc4951a135b3a23837', 996)
State:     executing
Function:  execute_task
args:      ((<function Fused._execute_task at 0x7fc886f4fd90>, {'read_single_partition-getitem-convert_str_id_to_int-fa420657f4c775fc4951a135b3a23837': ('convert_str_id_to_int-d81d02f032afe14e77a472319de3a460', 996), ('convert_str_id_to_int-d81d02f032afe14e77a472319de3a460', 996): (<function apply at 0x7fc900b74160>, <function apply_and_enforce at 0x7fc887783be0>, [('getitem-c07894a7fd914c16d3773a255c06073f', 996)], {'id_column': 'id', '_func': <function convert_str_

AttributeError: 'Series' object has no attribute 'hash_values'

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["doc_id"] = dx[1].astype("int64").values
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["doc_id"] = dx[1].astype("int64").values
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["doc_id"] = dx[1].astype("int64").values


In [None]:
dedup_output_dir = expand_outdir_and_mkdir(os.path.join(base_dir, "../clean_c4_deduped"))
input_df = input_df.compute()
deduped_df = input_df.merge(docs_to_remove,
                             left_on=['id'],
                             right_on=["to_remove_id"],
                             how='left')

deduped_df = deduped_df[deduped_df['to_remove_id'].isna()].drop(columns=['to_remove_id']).reset_index(drop=True)

t0 = time.time()
deduped_df.to_parquet(dedup_output_dir)
print(f"Removing duplicates and writing deduped dataset took {time.time()-t0} seconds")

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["doc_id"] = dx[1].astype("int64").values
2024-11-06 12:04:47,343 - distributed.worker - ERROR - Compute Failed
Key:       ('read_single_partition-getitem-convert_str_id_to_int-fa420657f4c775fc4951a135b3a23837', 996)
State:     executing
Function:  execute_task
args:      ((<function Fused._execute_task at 0x7ff08fb4bd90>, {'read_single_partition-getitem-convert_str_id_to_int-fa420657f4c775fc4951a135b3a23837': ('convert_str_id_to_int-d81d02f032afe14e77a472319de3a460', 996), ('convert_str_id_to_int-d81d02f032afe14e77a472319de3a460', 996): (<function apply at 0x7ff1096e0160>, <function apply_and_enforce at 0x7ff09037fbe0>, [('getitem-c07894a7fd914c16d3773a255c06073f', 996)], {'id_column': 'id', '_func': <function convert_str_

AttributeError: 'Series' object has no attribute 'hash_values'

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["doc_id"] = dx[1].astype("int64").values
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["doc_id"] = dx[1].astype("int64").values


To verify the results, we can confirm that we have 849,273,787 documents left compared to 1,088,311,520 in the input dataset, essentially removing 239,037,733 duplicates.

In [6]:
len(deduped_df)

849273787

In [7]:
len(input_df)

1088311520

## 5.3 Inter-snapshot Deduplication

So far we have deduplicated a single snapshot from rpv2. Pre-training dataet include multiple snapshots so we will often need to perform inter-snapshot deduplication. For this tutorial, we will demostrate deduplication across two snapshots as an example.

We first performed all the above steps for another snapshot `2023-14` and then combined the two deduped datasets into one and stored them in `rpv2-2023-06-and-14-deduped`.

Next, we will perform the fuzzy deduplication on the combined dataset.

### 5.3.1 Compute Minhash

In [4]:
from nemo_curator import MinHash
from nemo_curator import LSH
from nemo_curator.modules.fuzzy_dedup import _MapBuckets
from nemo_curator.modules.fuzzy_dedup import _Shuffle
from nemo_curator.modules.fuzzy_dedup import ConnectedComponents
from nemo_curator.modules.fuzzy_dedup import JaccardSimilarity

from nemo_curator.utils.file_utils import reshard_jsonl
from nemo_curator.utils.fuzzy_dedup_utils.id_mapping import convert_str_id_to_int
from nemo_curator.utils.fuzzy_dedup_utils.io_utils import (
    get_bucket_ddf_from_parquet_path,
    get_text_ddf_from_json_path_with_blocksize,
)

In [23]:
seed = 42
minhash_length = 260
char_ngram = 5
log_dir = expand_outdir_and_mkdir(os.path.join(base_dir, "logs"))
id_field = 'id'
text_field = 'raw_content'
minshah_output_dir = expand_outdir_and_mkdir(os.path.join(base_dir,"rpv2-2023-06-and-14-minhash"))

In [9]:
input_data_dir = os.path.join(base_dir,"rpv2-2023-06-and-14-deduped")

files = []
for file in os.listdir(input_data_dir):
    if file.endswith('.part'):
        new_file = file.replace('.part', '.jsonl')
        old_file_path = os.path.join(input_data_dir, file)
        new_file_path = os.path.join(input_data_dir, new_file)
        os.rename(old_file_path, new_file_path)
    files.append(new_file_path)


In [19]:
files = [f for f in files if f.endswith(".jsonl")]
df = read_data(
    files,
    file_type="jsonl",
    backend="cudf",
    files_per_partition=2,
    add_filename=False,
)[[id_field, text_field]]

Reading 72797 files


In [24]:
t0 = time.time()

# Run MinHash() on input data
minhasher = MinHash(
    seed=seed,
    num_hashes=minhash_length,
    char_ngrams=char_ngram,
    use_64bit_hash=False,
    logger=log_dir,
    id_field=id_field,
    text_field=text_field,
    cache_dir=minshah_output_dir
)

result = minhasher(DocumentDataset(df)).df

print(f"Computing minhashes took:{time.time()-t0}")

Computing minhashes took:6115.702769517899


In [25]:
result.head()

Unnamed: 0,id,_minhash_signature
0,rpv2-2023-06-0678400000,"[36422228, 15993596, 3538361, 16103012, 194100..."
1,rpv2-2023-06-0678500000,"[34662, 17635, 1112347, 293654, 313382, 160184..."
2,rpv2-2023-06-0678600000,"[15076006, 1801689, 3181854, 2949398, 5699436,..."
3,rpv2-2023-06-0678700000,"[13528976, 2438382, 26260517, 26187347, 249748..."
4,rpv2-2023-06-0678800000,"[2550974, 157261, 1536526, 1169030, 576861, 10..."


### 5.3.2 Minhash LSH

In [7]:
lsh_input_dir = os.path.join(base_dir,"rpv2-2023-06-and-14-minhash")
id_field = 'id'
output_bucket_dir = expand_outdir_and_mkdir(os.path.join(base_dir,"fuzzy-dedup-output-2023-06-and-14"))
num_bands = 20
buckets_per_shuffle = 1
minhash_field = '_minhash_signature'
minhash_length = 260
log_dir = os.path.join(base_dir, "logs")

In [8]:
t0 = time.time()

#Load MinHash output
df = dask_cudf.read_parquet(lsh_input_dir, blocksize="2GB", aggregate_files=True)
df = df.map_partitions(
    convert_str_id_to_int,
    id_column=id_field,
    meta=cudf.DataFrame(
        {minhash_field: [[1, 2, 3]], "doc_id": [1], "dataset_id": np.uint32(1)}
    ),
)

lsh = LSH(
    cache_dir=output_bucket_dir,
    num_hashes=minhash_length,
    num_buckets=num_bands,
    buckets_per_shuffle=buckets_per_shuffle,
    id_fields=["dataset_id", "doc_id"],
    minhash_field=minhash_field,
    logger=log_dir,
)

lsh_result = lsh(DocumentDataset(df))
print(f"LSH took {time.time()-t0} s")

LSH took 10536.635195493698 s


In [10]:
lsh_result.df.head()

Unnamed: 0,dataset_id,doc_id,_bucket_id
0,256213913,2480637085,74400
1,256213913,2079208983,88082
2,256213913,1142812586,7198
3,4217914658,3589401712,54808
4,256213913,1827931650,58134


### 5.3.3 Map Buckets

In [6]:
input_data_paths = [os.path.join(base_dir,"rpv2-2023-06-and-14-deduped")]
num_files = None
text_ddf_blocksize = 256 #The block size for chunking jsonl files for text ddf in mb
id_field = 'id'
text_field = 'raw_content'
input_bucket_path = os.path.join(base_dir,"fuzzy-dedup-output-2023-06-and-14/_buckets.parquet")
input_bucket_field = '_bucket_id'
shuffle_type ='tasks'
log_dir = os.path.join(base_dir, "logs")
output_anchor_docs_with_bk_path = expand_outdir_and_mkdir(os.path.join(base_dir,"fuzzy-dedup-output-2023-06-and-14/anchor_docs_with_bk.parquet"))

In [7]:
# Read .jsonl input data
ddf_text = get_text_ddf_from_json_path_with_blocksize(
    input_data_paths=input_data_paths,
    num_files=num_files,
    blocksize=text_ddf_blocksize,
    id_column=id_field,
    text_column=text_field,
)

print(f"ddf_text.npartitions  = {ddf_text.npartitions}", flush=True)

Number of files being read for jaccard calculation = 72797
ddf_text.npartitions  = 23876


In [14]:
t0 = time.time()
num_workers = get_num_workers(gpu_client)

# Read "_buckets.parquet"
ddf_bk = get_bucket_ddf_from_parquet_path(
    input_bucket_path=input_bucket_path, 
    num_workers=num_workers
)

#Run _MapBuckets()
map_buckets = _MapBuckets(
    id_fields=["dataset_id", "doc_id"], 
    bucket_field=input_bucket_field, 
    logger=log_dir,
    text_field=text_field,
)

ddf_anchor_docs_with_bk = map_buckets.map_buckets_with_anchors(
    documents_df=ddf_text, 
    buckets_df=ddf_bk, 
    shuffle_type=shuffle_type
)

#Write to disk
ddf_anchor_docs_with_bk.to_parquet(
    output_anchor_docs_with_bk_path, 
    write_index=False
)

print(f"Mapping Bucket took {time.time()-t0} s")

Number of ddf_bk partitions = 54
Mapping Bucket took 1034.9348919391632 s


In [15]:
ddf_anchor_docs_with_bk.head()

Unnamed: 0,dataset_id,doc_id,anchor_1_dataset_id,anchor_1_doc_id,anchor_0_dataset_id,anchor_0_doc_id,_output_partition_id
0,4217914658,518211850,4217914658,518211850,256213913,491920892,2004
1,4217914658,6364303356,256213913,2308804621,4217914658,6364303356,4246
2,256213913,2103535708,4217914658,1208111155,256213913,2103535708,4003
3,256213913,1359208912,4217914658,6342510538,256213913,1359208912,3738
4,256213913,162316349,256213913,162316349,4217914658,1033014280,4258


### 6.8.4 Jaccard Shuffle

In [4]:
log_dir = os.path.join(base_dir, "logs")
input_anchor_docs_with_bk_path = os.path.join(base_dir,"fuzzy-dedup-output-2023-06-and-14/anchor_docs_with_bk.parquet")
output_shuffled_docs_path = expand_outdir_and_mkdir(
    os.path.join(base_dir, "fuzzy-dedup-output-2023-06-and-14/shuffled_docs.parquet")
)
bucket_mapping_ddf_blocksize = 256
parts_per_worker = 16
bucket_parts_per_worker = 256
id_field = 'id'
text_field = 'raw_content'

In [8]:
t0 = time.time()

shuffle = _Shuffle(
    id_fields=["dataset_id", "doc_id"],
    text_field=text_field,
    int_to_str_id=id_field,
    logger=log_dir,
)

shuffle.shuffle_docs_on_buckets(
    documents_df=ddf_text,
    bucket_w_anchors_path=input_anchor_docs_with_bk_path,
    output_shuffled_docs_path=output_shuffled_docs_path,
    bucket_mapping_df_blocksize=bucket_mapping_ddf_blocksize,
    parts_per_worker=parts_per_worker,
    bucket_parts_per_worker=bucket_parts_per_worker,
    partition_on="_output_partition_id",
)

print(f"Jaccard Shuffle took {time.time()-t0} s")

  0%|          | 0/1 [00:00<?, ?it/s]


Started processing bucket-map partitions 0 through 54 of 54
Using 256 text partitions.
Starting text bytes aware shuffle
Will write 4620819 rows to disk
Text-df partition  256/23876 completed in 105.13463497161865
Using 256 text partitions.
Starting text bytes aware shuffle
Will write 4520986 rows to disk
Text-df partition  512/23876 completed in 100.3475558757782
Using 256 text partitions.
Starting text bytes aware shuffle
Will write 5232824 rows to disk
Text-df partition  768/23876 completed in 56.71416783332825
Using 256 text partitions.
Starting text bytes aware shuffle
Will write 4700161 rows to disk
Text-df partition  1024/23876 completed in 27.45123529434204
Using 256 text partitions.
Starting text bytes aware shuffle
Will write 4638892 rows to disk
Text-df partition  1280/23876 completed in 26.144277334213257
Using 256 text partitions.
Starting text bytes aware shuffle
Will write 4973176 rows to disk
Text-df partition  1536/23876 completed in 28.32722544670105
Using 256 text p

100%|██████████| 1/1 [49:22<00:00, 2962.52s/it]

Jaccard Shuffle took 2963.7552287578583 s





### 5.3.5 Jaccard Compute

In [9]:
id_field = 'id'
text_field = 'raw_content'
ngram_size = 5
shuffled_docs_path = os.path.join(base_dir, "fuzzy-dedup-output-2023-06-and-14/shuffled_docs.parquet")
jaccard_results_path = expand_outdir_and_mkdir(
    os.path.join(base_dir, "fuzzy-dedup-output-2023-06-and-14/jaccard_similarity_results.parquet")
)

In [10]:
t0 = time.time()
jaccard = JaccardSimilarity(
    id_field=id_field ,
    text_field=text_field,
    anchor_id_fields=[f"anchor_{i}_{id_field}" for i in range(2)],
    ngram_width=ngram_size,
)

# Run actual computation
result_df = jaccard.jaccard_compute(shuffled_docs_path)

result_df.to_parquet(
    jaccard_results_path,
    write_index=False,
    write_metadata_file=False,
)

print(f"Jaccard Computing+Writing took {time.time() - t0} seconds")

Jaccard Computing+Writing took 1300.0965530872345 seconds


### 5.3.6 Connected Component

In [3]:
cache_dir = expand_outdir_and_mkdir(
    os.path.join(base_dir, "fuzzy-dedup-output-2023-06-and-14/cc-cache")
)
jaccard_pairs_path = os.path.join(base_dir, "fuzzy-dedup-output-2023-06-and-14/jaccard_similarity_results.parquet")
id_field = 'id'
jaccard_threshold = 0.8
output_path = expand_outdir_and_mkdir(
    os.path.join(base_dir, "fuzzy-dedup-output-2023-06-and-14/connected_components.parquet")
)

In [5]:
t0 = time.time()
components_stage = ConnectedComponents(
    cache_dir=cache_dir,
    jaccard_pairs_path=jaccard_pairs_path,
    id_column=id_field,
    # convert_str_ids=True,
    jaccard_threshold=jaccard_threshold,
)
components_stage.cc_workflow(output_path=output_path)
print(f"Connected Component took {time.time()-t0} seconds")

FileNotFoundError: An error occurred while calling the read_parquet method registered to the cudf backend.
Original Message: An error occurred while calling the read_parquet method registered to the pandas backend.
Original Message: /home/neelesh/4_new_c4/fuzzy-dedup-output-2023-06-and-14/jaccard_similarity_results.parquet

### 5.3.7 Duplicates Removal

From the outputs of the Connect Component step, we can see that inter-snapshot dedup found 81,764,804 duplicates.

In [4]:
output_path = os.path.join(base_dir, "fuzzy-dedup-output-2023-06-and-14/connected_components.parquet")
cc_result = dask_cudf.read_parquet(output_path, split_row_groups=False).repartition(npartitions=1)

# Set 'group' as the index and shuffle to ensure all same 'group' values are in the same partition
cc_result = cc_result.set_index('group', shuffle='tasks')

# Define a function to assign cumulative counts and filter duplicates
def assign_cumcount(df):
    df['cumcount'] = df.groupby(level=0).cumcount()
    df = df[df['cumcount'] >= 1]
    df = df.drop(columns=['cumcount'])
    return df

# Find duplicates by applying the function to each partition
docs_to_remove = cc_result.map_partitions(assign_cumcount, meta=cc_result)

# Reset the index
docs_to_remove = docs_to_remove.reset_index()

docs_to_remove = docs_to_remove[["dataset_id", "doc_id"]]
docs_to_remove = docs_to_remove.rename(columns={"dataset_id":"to_remove_dataset_id", "doc_id":"to_remove_doc_id"})
docs_to_remove = docs_to_remove.reset_index(drop=True).persist()
_ = wait(docs_to_remove)
del _ 

print("docs_to_remove", len(docs_to_remove))

docs_to_remove 81764804


Before proceeding to duplicates removal, we suggest resharding the data to fix potentially empty partitions due to duplicates removal for single snapshots.

In [10]:
output_resharded_dir = expand_outdir_and_mkdir(os.path.join(base_dir, "rpv2-2023-06-and-14-deduped-resharded"))

t0 = time.time()
reshard_jsonl(
    os.path.join(base_dir, "rpv2-2023-06-and-14-deduped"),
    output_resharded_dir,
    output_file_size="100M",
    start_index=0,
    file_prefix="rpv2-2023-06-and-14-deduped",
)
print(f"Data sharding took:{time.time()-t0}")

Data sharding took:904.7163739204407


In [5]:
from helper import convert_str_id_to_int

input_dataset = DocumentDataset.read_json(os.path.join(base_dir, "rpv2-2023-06-and-14-deduped-resharded"), backend="cudf")
input_df = input_dataset.df[['raw_content','id']]
meta = input_df._meta
meta['doc_id']=np.int64([0])
meta['dataset_id']=np.uint32([0])
input_df = input_df.map_partitions(
    convert_str_id_to_int,
    id_column="id",
    meta=meta,
)

Reading 72780 files


In [7]:
dedup_output_dir = expand_outdir_and_mkdir(os.path.join(base_dir, "/rpv2-2023-06-and-14-inter-deduped"))
deduped_df = input_df.merge(docs_to_remove,
                             left_on=['doc_id','dataset_id'],
                             right_on=["to_remove_doc_id", "to_remove_dataset_id"],
                             how='left')

deduped_df = deduped_df[deduped_df['to_remove_doc_id'].isna()].drop(columns=['to_remove_doc_id', "to_remove_dataset_id"]).reset_index(drop=True)

t0 = time.time()
deduped_df.to_parquet(dedup_output_dir)
print(f"Removing duplicates and writing deduped dataset took {time.time()-t0} seconds")

Removing duplicates and writing deduped dataset took 2084.46063041687 seconds


We can verify that the deduped dataset has 1,585,546,179 documents, compared to 1,667,310,983 documents befoe dedup.

In [8]:
len(deduped_df)

1585546179

In [9]:
len(input_df)

1667310983

# 6. Quality Filtering
<a id="filter"></a>

Web crawled dataset often has low quality documents that we do not want the model to learn from. We can perform quality filtering to remove low quality data. NeMo Curator offers modules for both classifier-based and heuristic-based filtering. In this tutorial, we will perform heuristic filtering using a list of heuristic filters to improve data quality.

Curator provides a generic list of heuristic filters but for this tutorial, we only select 10 filters for demo purposes. The selected filters are given in `config/heuristic_filter_en.yaml`.

Heuristic filtering in Curator is a cpu module so we will need to use the cpu cluter.

In [2]:
scheduler_address = os.getenv('SCHEDULER_ADDRESS')
cpu_client = get_client(scheduler_address=scheduler_address)
print(f"Num Workers = {get_num_workers(cpu_client)}", flush=True)

Num Workers = 256


In [6]:
import nemo_curator
from nemo_curator.utils.config_utils import build_filter_pipeline

filter_config_file = os.path.join(base_dir, "config/heuristic_filter_en.yaml")
hf_input_data_dir = os.path.join(base_dir, "rpv2-2023-06-and-14-inter-deduped")
kept_document_dir =  expand_outdir_and_mkdir(os.path.join(base_dir,'rpv2-2023-06-and-14-heuristic-filtering','hf.parquet'))

In [4]:
t0 = time.time()

# Load dataset
dataset = DocumentDataset.read_parquet(hf_input_data_dir)

# construct pipeline from config
filter_pipeline = build_filter_pipeline(filter_config_file)

# filter data and write to disk
filtered_dataset = filter_pipeline(dataset)
filtered_dataset.to_parquet(kept_document_dir)

print(f"Time taken for Heuristic filtering: {time.time()-t0} s")

Reading 72780 files
Writing to disk complete for 72780 partitions
Time taken for Heuristic filtering: 5647.508106470108 s


After filitering, we have 1,229,679,047 documents left, removing 355,867,132 documents from the deduped dataset.

In [5]:
len(filtered_dataset)

1229679047

[Optional] Examine example low quality documents:

In [5]:
from helper import get_dataframe_complement

original_df = dd.read_parquet(hf_input_data_dir)
filtered_df = dd.read_parquet(kept_document_dir)
removed_df = get_dataframe_complement(original_df, filtered_df)
removed_df_example = removed_df.head()

In [None]:
print(removed_df_example.raw_content.iloc[0])

In [None]:
print(removed_df_example.raw_content.iloc[1])