# Data Curation
This notebook showcases the building blocks that can be used for building a simple data curation pipeline using [NeMo Curator](https://github.com/NVIDIA/NeMo-Curator).

## Reading Materials
Before proceeding, we highly recommend looking through the following deep dive blog posts that walk you through building data curation pipelines using NeMo Curator:
- [Curating Custom Datasets for LLM Training with NVIDIA NeMo Curator](https://developer.nvidia.com/blog/curating-custom-datasets-for-llm-training-with-nvidia-nemo-curator/)
- [Curating Custom Datasets for LLM Parameter-Efficient Fine-Tuning with NVIDIA NeMo Curator](https://developer.nvidia.com/blog/curating-custom-datasets-for-llm-parameter-efficient-fine-tuning-with-nvidia-nemo-curator/)

In this notebook, we will use the [Law-StackExchange dataset](https://huggingface.co/datasets/ymoslem/Law-StackExchange) for this pipeline, which is a dataset of legal question/answers scraped from the Stack Exchange website.

## Setup and Requirements
Before proceeding, you need to install one dependency to follow along. Execute the following cell before getting started.

In [1]:
! pip install ipywidgets

Defaulting to user installation because normal site-packages is not writeable
Looking in indexes: https://pypi.org/simple, https://pypi.ngc.nvidia.com

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.1.2[0m[39;49m -> [0m[32;49m24.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


---
## Getting Started

To get started, let's setup some environment variables, as well as path variables that will be used for storing the curated data, as well as intermediate temporary files that are required for this notebooks to function.

In [2]:
import os
os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False"  # Needed for running Curator on the GPU

NOTEBOOK_DIR = os.path.abspath("")
DATA_DIR = os.path.join(NOTEBOOK_DIR, "data")
TEMP_DIR = os.path.join(NOTEBOOK_DIR, ".temp")
os.makedirs(DATA_DIR, exist_ok=True)

Let's now import everything we need for building our data curation pipeline. For your conveniene, we've provided the document builder implementations which allow you to download the dataset from HuggingFace and convert it into a data frame format.

We have additionally implemented a score-based filterer that allows you to filter the dataset rows using the score values assigned to each question. You can use this implementation as the basis for creating your own filtering/scoring mechanisms using NeMo Curator.

In [3]:
from nemo_curator.utils.distributed_utils import get_client
from nemo_curator.datasets import DocumentDataset
from nemo_curator.filters import WordCountFilter
from nemo_curator.modifiers import UnicodeReformatter
from nemo_curator.modules.config import SemDedupConfig
from nemo_curator.modules.semantic_dedup import SemDedup
from nemo_curator.utils.file_utils import expand_outdir_and_mkdir
from nemo_curator import ScoreFilter, Sequential
from nemo_curator.modules.modify import Modify

# Importing helper functions
from helpers.filters import FilterLowScores
from helpers.docbuilder import download_and_convert_dataset

Before proceeding, let's decide the compute resources we'd like to use for running our data curation pipeline. NeMo Curator uses Dask to orchestrate scalable data processing. As such, it needs to know what resources to use. 

For the purposes of this exercise, we will use the GPU and instruct NeMo Curator to use 8 CPU workers. While most NeMo Curator functionalities can be executed on the CPU, some modules (such as semantic deduplication) can only be executed on the GPU.

Note that you can increase or decrease the number of CPU workers depending on the runtime environment. Keep in mind that each CPU worker gets allocated a fixed amount of the total available system memory (RAM). Thus, if the environment does not have enough memory available, Dask opeartions might fail.

Once we have decided on the resources to use, we can initialize our Dask cluster and start using NeMo Curator.

In [4]:
device = "cpu"  # It can be either "cpu" or "gpu"
n_workers = 8  # Number of workers to use for Dask. If running out of memory, try reducing this.
client = get_client(device, n_workers=n_workers, set_torch_to_use_rmm=False)

---
## The Main Data Curation Pipeline

We start by downloading and converting the dataset into a suitable format. This is done via the document builders that we have provided for you.

In [5]:
dataset_df = download_and_convert_dataset(DATA_DIR)
raw_dataset = DocumentDataset.from_pandas(dataset_df)

Download directory:  /home/mmaghoumi/git/dl-tme/2024/odsc-hackathon-october-2024/all-in-one/data/raw
File '/home/mmaghoumi/git/dl-tme/2024/odsc-hackathon-october-2024/all-in-one/data/raw/law-stackexchange-questions-answers.json' already exists, skipping download.


Next, we need to define our data curation pipeline. The pipeline we define here is very simple, as it contains basic filtering operations, as well as GPU-based semantic deduplication.
Note that in order to use the modules that need a GPU, the dataset has to be converted to the `cudf` backend.

We point the semantic deduplication module to a config file that defines the exact model and parameters to use for performing semantic deduplication.

In [6]:
def semantic_dedupe(dataset: DocumentDataset):
    """
    Perform semantic deduplication on the given dataset.

    Args:
        dataset: The input DocumentDataset.

    Returns:
        The deduplicated DocumentDataset.
    """
    # Clean up the temporary directory to ensure everything is clean.
    if os.path.isdir(TEMP_DIR):
        os.system(f"rm -rf {TEMP_DIR}")

    semdedup_config = SemDedupConfig.from_yaml("helpers/sem_dedup_config.yaml")
    expand_outdir_and_mkdir(semdedup_config.cache_dir)
    semdup = SemDedup(semdedup_config)
    dedup_ids = semdup(dataset)
    # When there are few duplicates we can compute the results to a list and use `isin`.
    result = dataset.df[dataset.df["id"].isin(dedup_ids.df["id"].compute())]
    return DocumentDataset(result)

def run_curation_pipeline(dataset: DocumentDataset, device: str) -> DocumentDataset:
    print(f"Running curation pipeline on '{device}'...")
    orig_dataset = dataset

    cpu_curation_steps = Sequential(
        [
            #
            # Modifications
            #
            # Unify the text encoding to Unicode.
            Modify(UnicodeReformatter(), text_field="title"),
            Modify(UnicodeReformatter(), text_field="question"),
            #
            # Filtering
            #
            # Filter out records based on the question word counts.
            ScoreFilter(
                WordCountFilter(min_words=50, max_words=500),
                text_field="question",
                score_type=int,
            ),
            # Filter out records where the question has a negative score.
            ScoreFilter(
                FilterLowScores(score_threshold=0),
                text_field="question_score",
                score_type=bool,
            ),
        ]
    )

    # Run the CPU curation steps.
    dataset = cpu_curation_steps(dataset)

    # Perform semantic deduplication on the dataset (if the device is GPU).
    if device == "gpu":
        # Create a "text" field comprised of the title and the question.
        # Note that the "text" field here must be the same as the setting specified for `input_column` in sem_dedup_config.yaml file.
        # The algorithm looks at this field to find semantically similar records.
        dataset.df["text"] = (
            dataset.df["title"]
            + "\n"
            + dataset.df["question"]
        )
        # Convert the dataset to a GPU backend.
        dataset.df = dataset.df.to_backend("cudf")
        dataset = semantic_dedupe(dataset)
        # Delete the text field as it is no longer needed.
        del dataset.df["text"]
        # Convert the dataset back to a CPU backend.
        dataset.df = dataset.df.to_backend("pandas")

    dataset = dataset.persist()
    # Drop the columns that are no longer needed.
    dataset.df = dataset.df.drop(columns=["answer", "answer_score", "question_score"])
    orig_len = len(orig_dataset.df)
    new_len = len(dataset.df)

    print(f"Original dataset length: {orig_len}")
    print(f"New dataset length: {new_len}")

    return dataset

Finally, we are ready to run the pipeline and get our final dataset. This will take some time to execute, especially if semantic deduplication is enabled.

In [7]:
curated_dataset = run_curation_pipeline(raw_dataset, device)

Running curation pipeline on 'cpu'...


This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


Original dataset length: 24343
New dataset length: 18759


Next, let's specify the final columns that we would like our dataset to have. Depending on how you plan on consuming this dataset for training, you may decide to introduce other arbitrary columns to help the model learn better.

Also, this is a great place to add system or instruction prompts to every record, in case you intend to use the same instruction prompt for every record.

In [8]:
SYSTEM_PROMPT = "Read the following title and question about a legal issue and assign the most appropriate tag to it. All tags must be in lowercase, ordered lexicographically and separated by commas.\n\n"

df = curated_dataset.df
df["input"] = SYSTEM_PROMPT + "TITLE:\n" + df["title"] + "\n\n" + "QUESTION:\n" + df["question"]
df["output"] = df["tags"]
df["filename"] = "law-stackexchange-curated.jsonl"

df = df.drop(columns=["title", "question", "tags"])
curated_dataset.df = df
print(f"Dataset columns: {df.columns}")

Dataset columns: Index(['filename', 'id', 'input', 'output'], dtype='object')


Once the final dataset is ready, we can write it into a JSONL file and start using it for model training.

> NOTE: The curated dataset will be written under `curator/data/curated_dataset/law-stackexchange-curated.jsonl`

In [9]:
print(f"Curated dataset columns: {curated_dataset.df.columns}")
result_fp = os.path.join(DATA_DIR, "curated_dataset")
print()
print(f"Saving curated dataset to '{result_fp}'...")
curated_dataset.to_json(result_fp, write_to_filename=True)

Curated dataset columns: Index(['filename', 'id', 'input', 'output'], dtype='object')

Saving curated dataset to '/home/mmaghoumi/git/dl-tme/2024/odsc-hackathon-october-2024/all-in-one/data/curated_dataset'...
Writing to disk complete for 1 partitions


---
# Spliting the Dataset

Before starting the model training procedure, let's split the dataset we've just curated into `training`, `validation` and `test` splits with 80/10/10 ratios.

In [21]:
from sklearn.model_selection import train_test_split

VAL_RATIO = 0.1
TEST_RATIO = 0.1

df = curated_dataset.df.compute()

# Some sanity checks
assert len(df) > 0, "The dataset is empty."
assert VAL_RATIO >= 0 and VAL_RATIO <= 1, "VAL_RATIO must be between 0 and 1."
assert TEST_RATIO >= 0 and TEST_RATIO <= 1, "TEST_RATIO must be between 0 and 1."
assert VAL_RATIO + TEST_RATIO < 1, "VAL_RATIO + TEST_RATIO must be less than 1."
val_size = int(len(df) * VAL_RATIO)
test_size = int(len(df) * TEST_RATIO)
output_dir = f"{DATA_DIR}/curated_dataset/split"
os.makedirs(output_dir, exist_ok=True)

# Split the data into training and temporary sets
train_df, temp_df = train_test_split(df, test_size=val_size + test_size, random_state=42)
# Split the temporary set into validation and test sets
val_df, test_df = train_test_split(temp_df, test_size=test_size, random_state=42)

print(f"Original size: {len(df)}")
print("After splitting:")
print(f"    Train size: {len(train_df)}")
print(f"    Validation size: {len(val_df)}")
print(f"    Test size: {len(test_df)}")

train_df["filename"] = "train.jsonl"
val_df["filename"] = "val.jsonl"
test_df["filename"] = "test.jsonl"

DocumentDataset.from_pandas(train_df).to_json(output_dir, write_to_filename=True)
DocumentDataset.from_pandas(val_df).to_json(output_dir, write_to_filename=True)
DocumentDataset.from_pandas(test_df).to_json(output_dir, write_to_filename=True)


Original size: 18759
After splitting:
    Train size: 15009
    Validation size: 1875
    Test size: 1875


This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


Writing to disk complete for 1 partitions
Writing to disk complete for 1 partitions
Writing to disk complete for 1 partitions
