# Distributed Data Classification with Multiple Classifiers

Cross-validation is a machine learning technique in which multiple models are trained on multiple subsets of your data and validated on the remaining data portions. It is useful because it reduces the risk of overfitting to your data and provides a better estimate of how the model will perform on unseen data. This is particularly valuable when dealing with limited data, as it allows for more efficient use of the available samples.

In this tutorial, we demonstrate how to use NeMo Curator's `DistributedDataClassifier` to build our own `PyTorchClassifier` class for loading and performing batched inference with multiple pretrained models. We assume the user has pretrained PTH model files, with [DeBERTaV3](https://huggingface.co/microsoft/deberta-v3-base) as the base model used for training. The classifiers are accelerated using [CrossFit](https://github.com/rapidsai/crossfit), a library that leverages intellegent batching and RAPIDS to accelerate the offline inference on large datasets.

First, let's run some preliminary imports and set up our Dask client.

In [1]:
import os
os.environ["RAPIDS_NO_INITIALIZE"] = "1"

# Silence Warnings (HuggingFace internal warnings)
%env PYTHONWARNINGS=ignore
import warnings
warnings.filterwarnings("ignore")



In [2]:
from dataclasses import dataclass
from typing import List, Optional

In [3]:
import cudf
import dask_cudf
import torch
import torch.nn as nn
from crossfit.backend.torch.hf.model import HFModel
from transformers import AutoConfig, AutoModel
from transformers.models.deberta_v2 import DebertaV2TokenizerFast

In [4]:
# NeMo Curator modules
from nemo_curator import get_client
from nemo_curator.classifiers.base import (
    DistributedDataClassifier,
    _run_classifier_helper,
)
from nemo_curator.datasets import DocumentDataset

In [5]:
client = get_client(cluster_type="gpu")

cuDF Spilling is enabled


# Create `PyTorchClassifier` Class

To create our `PyTorchClassifier` class, we will be extendeding NeMo Curator's `DistributedDataClassifier` class.

The goal of the base `DistributedDataClassifier` class is to enable multi-node multi-GPU data classification of your data. NeMo Curator provides several subclasses that focus on domain, quality, content safety, and educational content classification. However, the `DistributedDataClassifier` can be extended to fit any model; the only requirement is that the model can fit on a single GPU. See NeMo Curator's [Distributed Data Classification](https://docs.nvidia.com/nemo-framework/user-guide/latest/datacuration/distributeddataclassification.html) documentation for more information.

First, let's create a `PyTorchModelConfig` class. Its purpose is to store some of the attributes that will be used by our model, including the base model of the classifier.

In [6]:
@dataclass
class PyTorchModelConfig:
    base_model: str = "microsoft/deberta-v3-base"
    fc_dropout: float = 0.2
    max_len: int = 512

Next, we create an `NCCustomModel` (for "NeMo Curator Custom Model") class. It inherits from `nn.Module`, the base class for all neural network modules in PyTorch.

Inside `__init__`, the model loads the model configuration and model. The `autocast` boolean determines whether mixed precision (`torch.autocast`) is used during inference to speed up computations on CUDA devices. The `forward` method is required by `nn.Module` and runs the model's forward pass (the computation performed at every call).

In [7]:
class NCCustomModel(nn.Module):
    def __init__(
        self,
        config: dataclass,
        out_dim: int,
        config_path: str = None,
        pretrained: bool = False,
        autocast: bool = False,
    ):
        super().__init__()
        self.config = config
        if config_path is None:
            self.config = AutoConfig.from_pretrained(
                config.base_model, output_hidden_states=True
            )
        else:
            self.config = torch.load(config_path)

        if pretrained:
            self.model = AutoModel.from_pretrained(
                config.base_model, config=self.config
            )
        else:
            self.model = AutoModel(self.config)

        self.fc_dropout = nn.Dropout(config.fc_dropout)
        self.fc = nn.Linear(self.config.hidden_size, out_dim)
        self.autocast = autocast

    def feature(self, input_ids, attention_mask):
        outputs = self.model(input_ids=input_ids, attention_mask=attention_mask)
        last_hidden_states = outputs[0]
        return last_hidden_states

    def _forward(self, batch):
        feature = self.feature(batch["input_ids"], batch["attention_mask"])
        output = self.fc(self.fc_dropout(feature))
        output = output.to(torch.float32)
        return torch.softmax(output[:, 0, :], dim=1)

    def forward(self, batch):
        if self.autocast:
            with torch.autocast(device_type="cuda"):
                return self._forward(batch)
        else:
            return self._forward(batch)

Now, let's create the `PyTorchModel` class, a model management class. It inherits from `HFModel`, a class created by NVIDIA's [CrossFit](https://github.com/rapidsai/crossfit) library, which enables multi-node and multi-GPU offline inference. In it, we create several methods which define how to load our model, its configuration, and its tokenizer.

In [8]:
class PyTorchModel(HFModel):
    def __init__(
        self,
        config: dataclass,
        out_dim: int,
        model_path: str,
        autocast: bool = False,
    ):
        self.config = config
        self.out_dim = out_dim
        self.model_path = model_path
        self.autocast = autocast
        super().__init__(self.config.base_model)

    def load_model(self, device: str = "cuda"):
        model = NCCustomModel(
            self.config,
            out_dim=self.out_dim,
            config_path=None,
            pretrained=True,
            autocast=self.autocast,
        )
        model = model.to(device)

        if os.path.exists(self.model_path):
            sd = torch.load(self.model_path, map_location="cpu")
            if "model_state_dict" in sd:
                sd = sd["model_state_dict"]
            sd = {k[7:] if k.startswith("module.") else k: sd[k] for k in sd.keys()}
            model.load_state_dict(sd, strict=True)
        else:
            raise ValueError(f"Model path {self.model_path} does not exist")

        return model.eval()

    def load_tokenizer(self):
        return DebertaV2TokenizerFast.from_pretrained(self.config.base_model)

    def load_config(self):
        return AutoConfig.from_pretrained(self.path_or_name)

Finally, we create the `PyTorchClassifier` class. We start with the `__init__` method, which uses the `DistributedDataClassifier`, `PyTorchModelConfig`, and `PyTorchModel` classes described above. Next is the `_run_classifier` method, which is called by `DistributedDataClassifier`'s `__call__` method; it is required for all classes that inherit the `DistributedDataClassifier` class.

Here is a quick rundown of all the attributes of the `PyTorchClassifier` class:
- `pretrained_model_name_or_path` (`str`): The path to your PyTorch model file.
- `labels` (`list[str]`): The classes output by the model classifier.
- `out_dim` (`list[str], optional`): Set to 1 for a binary classification task. Otherwise, defaults to `len(labels)`.
- `filter_by` (`list[str], optional`): The classes to filter the dataset by. If None, all classes will be included. Defaults to None.
- `batch_size` (`int`): The number of samples per batch for inference. Defaults to 256.
- `text_field` (`str`): The field in the dataset that should be classified.
- `pred_column` (`str`): The column name where predictions will be stored. Defaults to "pred".
- `prob_column` (`str`): The column name where prediction probabilities will be stored. Defaults to "prob".
- `max_chars` (`int`): The maximum number of characters in each document to consider for classification. Defaults to 6000.
- `device_type` (`str`): The type of device to use for inference, either "cuda" or "cpu". Defaults to "cuda".
- `autocast` (`bool`): Whether to use mixed precision for faster inference. Defaults to True.
- `base_model` (`str`): The base model on which your PyTorch model was trained. Defaults to "microsoft/deberta-v3-base".
- `fc_dropout` (`str`): Dropout rate used during training. Defaults to 0.2.
- `max_len` (`str`): Maximum sequence length used during training. Defaults to 512.

In [9]:
class PyTorchClassifier(DistributedDataClassifier):
    """
    PyTorchClassifier is a general classifier designed for running generic PTH model files.
    This class is optimized for running on multi-node, multi-GPU setups to enable fast and efficient inference on large datasets.

    """

    def __init__(
        self,
        pretrained_model_name_or_path: str,
        labels: List[str],
        out_dim: Optional[int] = None,
        filter_by: Optional[List[str]] = None,
        batch_size: int = 256,
        text_field: str = "text",
        pred_column: str = "pred",
        prob_column: str = "prob",
        max_chars: int = 6000,
        device_type: str = "cuda",
        autocast: bool = True,
        base_model: str = "microsoft/deberta-v3-base",
        fc_dropout: float = 0.2,
        max_len: int = 512,
    ):
        config = PyTorchModelConfig(
            base_model=base_model,
            fc_dropout=fc_dropout,
            max_len=max_len,
        )

        self.labels = labels
        if out_dim:
            self.out_dim = out_dim
        else:
            self.out_dim = len(labels)

        self.text_field = text_field
        self.prob_column = prob_column

        model = PyTorchModel(
            config=config,
            out_dim=self.out_dim,
            model_path=pretrained_model_name_or_path,
            autocast=autocast,
        )

        super().__init__(
            model=model,
            labels=self.labels,
            filter_by=filter_by,
            batch_size=batch_size,
            out_dim=self.out_dim,
            pred_column=pred_column,
            max_chars=max_chars,
            device_type=device_type,
            autocast=autocast,
        )

    def _run_classifier(self, dataset: DocumentDataset):
        print("Starting PyTorch classifier inference", flush=True)
        df = dataset.df
        df = _run_classifier_helper(
            df=df,
            model=self.model,
            labels=self.labels,
            max_chars=self.max_chars,
            batch_size=self.batch_size,
            label_col=self.pred_column,
            text_field=self.text_field,
            prob_col=self.prob_column,
        )
        return DocumentDataset(df)

We have successfully built our PyTorch classifier pipeline! Now, let's demonstrate how to use it with a simple example.

# Prepare Dataset and Set File Paths

For our demonstration, we need to create or read the dataset on which we want to run inference. In this notebook, we provide a sample dataset with 10 text sentences to evaluate. Alternatively, the user may read in their own existing data (e.g., JSON or Parquet files) as demonstrated by the commented code.

In [10]:
# Create sample DataFrame
text = [
    "Quantum computing is set to revolutionize the field of cryptography.",
    "Investing in index funds is a popular strategy for long-term financial growth.",
    "Recent advancements in gene therapy offer new hope for treating genetic disorders.",
    "Online learning platforms have transformed the way students access educational resources.",
    "Traveling to Europe during the off-season can be a more budget-friendly option.",
    "Training regimens for athletes have become more sophisticated with the use of data analytics.",
    "Streaming services are changing the way people consume television and film content.",
    "Vegan recipes have gained popularity as more people adopt plant-based diets.",
    "Climate change research is critical for developing sustainable environmental policies.",
    "Telemedicine has become increasingly popular due to its convenience and accessibility.",
]
df = cudf.DataFrame({"text": text})
dataset = DocumentDataset(dask_cudf.from_cudf(df, npartitions=1))
write_to_filename = False

# Alternatively, read existing directory of JSONL files
# input_file_path="/input_data_dir/"
# input_dataset = DocumentDataset.read_json(
#     input_file_path, backend="cudf", add_filename=True
# )
# write_to_filename = True

The user should also specify where to write the results, as well as the local file paths to the pretrained PyTorch classifiers. Finally, the user should include the labels the classifier is expected to produce.

In [None]:
output_file_path = "output_data_dir/"
model_paths = [
    "model0.pth",
    "model1.pth",
    "model2.pth",
    "model3.pth",
    "model4.pth",
]
labels = ["label_a", "label_b", "label_c"]

# Run Classification with Multiple Models

Now we can use the `PyTorchClassifier` class to load each of our PyTorch models and run inference. We will write the results to a JSON file.

In [12]:
fold = 0
pred_columns = []
for model_path in model_paths:
    pred_column = "pred_" + str(fold)
    prob_column = "prob_" + str(fold)
    pred_columns.append(pred_column)

    classifier = PyTorchClassifier(
        pretrained_model_name_or_path=model_path,
        labels=labels,
        batch_size=1024,
        text_field="text",
        pred_column=pred_column,
        prob_column=prob_column,
    )
    dataset = classifier(dataset=dataset)
    fold += 1

Starting PyTorch classifier inference
Starting PyTorch classifier inference
Starting PyTorch classifier inference
Starting PyTorch classifier inference
Starting PyTorch classifier inference


In [13]:
%%time

dataset.to_json(output_file_dir=output_file_path, write_to_filename=write_to_filename)

GPU: tcp://127.0.0.1:32893, Part: 0: 100%|██████████| 10/10 [00:21<00:00,  2.13s/it]
GPU: tcp://127.0.0.1:32893, Part: 0: 100%|██████████| 10/10 [00:05<00:00,  1.81it/s]
GPU: tcp://127.0.0.1:32893, Part: 0: 100%|██████████| 10/10 [00:05<00:00,  1.81it/s]
GPU: tcp://127.0.0.1:32893, Part: 0: 100%|██████████| 10/10 [00:05<00:00,  1.84it/s]
GPU: tcp://127.0.0.1:32893, Part: 0: 100%|██████████| 10/10 [00:04<00:00,  2.01it/s]

Writing to disk complete for 1 partitions
CPU times: user 7.01 s, sys: 4.56 s, total: 11.6 s
Wall time: 1min 4s


GPU: tcp://127.0.0.1:32893, Part: 0: 100%|██████████| 10/10 [00:05<00:00,  1.81it/s]


# Inspect the Output

Finally, let's verify that everything worked as expected.

In [14]:
output_dataset = DocumentDataset.read_json(output_file_path, backend="cudf", add_filename=write_to_filename)
output_dataset.df.head()

Reading 1 files


Unnamed: 0,pred_0,pred_1,pred_2,pred_3,pred_4,prob_0,prob_1,prob_2,prob_3,prob_4,text
0,label_b,label_b,label_b,label_b,label_b,"[0.37283509970000006, 0.49910834430000006, 0.1...","[0.3027972281, 0.5215288401, 0.1756739765]","[0.41288739440000005, 0.5265461801999999, 0.06...","[0.32485893370000013, 0.46514019370000004, 0.2...","[0.3685780168000001, 0.5256645678999999, 0.105...",Quantum computing is set to revolutionize the ...
1,label_b,label_b,label_b,label_b,label_b,"[0.34135937690000007, 0.5343321562, 0.1243084297]","[0.34347015620000004, 0.5304207801999999, 0.12...","[0.4346009791000001, 0.5130862594, 0.052312787...","[0.3181181848000001, 0.4944583774000001, 0.187...","[0.39643365140000003, 0.5143401027, 0.08922628...",Investing in index funds is a popular strategy...
2,label_b,label_b,label_b,label_b,label_b,"[0.38975748420000006, 0.48216831680000005, 0.1...","[0.33265304570000004, 0.5090963244, 0.1582506448]","[0.44722059370000006, 0.4945448935000001, 0.05...","[0.3444236219000001, 0.45550799370000006, 0.20...","[0.3919632137000001, 0.5084934831, 0.099543325...",Recent advancements in gene therapy offer new ...
3,label_b,label_b,label_b,label_b,label_b,"[0.38686266540000014, 0.48784771560000006, 0.1...","[0.3482291102, 0.5138959289, 0.13787493110000001]","[0.4499093592, 0.49849084020000006, 0.05159985...","[0.3489176929000001, 0.45996120570000004, 0.19...","[0.38338246940000015, 0.5131927133, 0.10342480...",Online learning platforms have transformed the...
4,label_b,label_b,label_b,label_b,label_b,"[0.3207181096000001, 0.5833522080999999, 0.095...","[0.3277938664, 0.5600519180000001, 0.112154245...","[0.39969193940000003, 0.5546463728000001, 0.04...","[0.3249147236000001, 0.5021025537999999, 0.172...","[0.35228130220000003, 0.5585800408999999, 0.08...",Traveling to Europe during the off-season can ...


Thank you for reading! In this tutorial, we demonstrated how to create and use the `PyTorchClassifier` class to load locally-stored PyTorch models and run inference on our dataset.

For more information about NeMo Curator's `DistributedDataClassifier`, please reference the [documentation page](https://docs.nvidia.com/nemo-framework/user-guide/latest/datacuration/distributeddataclassification.html). For an example on how to run NeMo Curator's `DomainClassifier` and `QualityClassifier`, please see [this sample notebook](https://github.com/NVIDIA/NeMo-Curator/blob/main/tutorials/distributed_data_classification/distributed_data_classification.ipynb).