# Distributed Data Classification with NeMo Curator's `MultilingualDomainClassifier`

This notebook demonstrates the use of NeMo Curator's `MultilingualDomainClassifier`. The [multilingual domain classifier](https://huggingface.co/nvidia/multilingual-domain-classifier) is used to classify the domain of texts in any of 52 languages, including English. It helps with data annotation, which is useful in data blending for foundation model training. Please refer to the NemoCurator Multilingual Domain Classifier Hugging Face page for more information about the multilingual domain classifier, including its output labels, here: https://huggingface.co/nvidia/multilingual-domain-classifier.

This tutorial requires at least 1 NVIDIA GPU with:
  - Volta™ or higher (compute capability 7.0+)
  - CUDA 12.x

For more information about the classifiers, refer to our [Distributed Data Classification](https://docs.nvidia.com/nemo/curator/latest/curate-text/process-data/quality-assessment/distributed-classifier.html) documentation page.

Before running this notebook, see this [Installation Guide](https://docs.nvidia.com/nemo/curator/latest/admin/installation.html#admin-installation) page for instructions on how to install NeMo Curator. Be sure to use an installation method which includes GPU dependencies (`text_cuda12` or `all`). Check proper installation with:

In [None]:
# First, silence Curator logs via Loguru
import os

os.environ["LOGURU_LEVEL"] = "ERROR"

In [None]:
import nemo_curator

nemo_curator.__version__  # should be >= 1.0.0

We can check that GPUs are available, then check that the `gpustat` dependency was installed:

In [None]:
!nvidia-smi

In [None]:
import gpustat

gpustat.__version__  # check gpu dependency is installed

The following imports are required for this tutorial:

In [2]:
import pandas as pd

from nemo_curator.core.client import RayClient
from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.text.classifiers import MultilingualDomainClassifier
from nemo_curator.stages.text.io.reader.jsonl import JsonlReader
from nemo_curator.stages.text.io.reader.parquet import ParquetReader
from nemo_curator.stages.text.io.writer.jsonl import JsonlWriter
from nemo_curator.stages.text.io.writer.parquet import ParquetWriter
from nemo_curator.utils.file_utils import get_all_file_paths_under

To run a pipeline in NeMo Curator, we must start a Ray cluster. This can be done manually (see the [Ray documentation](https://docs.ray.io/en/latest/ray-core/starting-ray.html)) or with Curator's `RayClient`:

In [3]:
try:
    ray_client = RayClient()
    ray_client.start()
except Exception as e:
    msg = f"Error initializing Ray client: {e}"
    raise RuntimeError(msg) from e

# Initialize Read, Classification, and Write Stages

Functions in NeMo Curator are called stages. For this tutorial, we will initialize 3 stages: a file reader, a multilingual domain classification stage, and a file writer.

For this tutorial, an existing directory containing JSONL or Parquet files may be provided via `input_file_path`. If `input_file_path` does not exist or is empty, we download a small dataset from Hugging Face.

In [4]:
input_file_path = "./multilingual_data_dir"
input_file_type = "parquet"  # can be "jsonl" or "parquet"
text_field = "analysis"

if len(get_all_file_paths_under(input_file_path)) == 0:
    from huggingface_hub import snapshot_download

    # Download Parquet file from Hugging Face
    # This dataset contains texts in multiple languages
    snapshot_download(
        repo_id="HuggingFaceH4/Multilingual-Thinking",
        repo_type="dataset",
        local_dir=input_file_path,
        allow_patterns=["data/*"],
    )
    print(f"Downloaded Hugging Face dataset to {input_file_path}")

Fetching 1 files:   0%|          | 0/1 [00:00<?, ?it/s]

Downloaded Hugging Face dataset to ./multilingual_data_dir


We can define the reader stage with:

In [5]:
# Read directory of files and only read the text field
if input_file_type == "jsonl":
    read_stage = JsonlReader(input_file_path, files_per_partition=1, fields=[text_field])
elif input_file_type == "parquet":
    read_stage = ParquetReader(input_file_path, files_per_partition=1, fields=[text_field])
else:
    msg = f"Invalid input file type: {input_file_type}"
    raise ValueError(msg)

The classifier stage is broken down under the hood into a tokenizer stage and a model inference stage. Tokenization is run on the CPU while model inference is run on the GPU. This means that behind the scenes, the `MultilingualDomainClassifier` stage is actually being broken down into 2 stages (some parameters and details omitted to avoid complexity, please refer to the documentation for more details):

```python
class TokenizerStage:
    self.resources = Resources(cpus=1)
    self.model_identifier = "nvidia/multilingual-domain-classifier"
    self.text_field = "text"
    self.padding_side = "right"
    ...
class ModelStage:
    self.resources = Resources(cpus=1, gpus=1)
    self.model_identifier = "nvidia/multilingual-domain-classifier"
    self.model_inference_batch_size = 256
    ...
```

Optionally, the classifier predictions may be filtered to include only texts with values listed in `filter_by`. If the `filter_by` parameter is set, then a third stage is added:

```python
def filter_by_category(self, value: str) -> bool:
    return value in self.filter_by

...

if self.filter_by is not None and len(self.filter_by) > 0:
    self.stages.append(Filter(filter_fn=self.filter_by_category, filter_field=...))
```

See the [API Reference](https://docs.nvidia.com/nemo/curator/latest/apidocs/stages/stages.text.classifiers.domain.html#stages.text.classifiers.domain.MultilingualDomainClassifier) for more information about the `MultilingualDomainClassifier` class.

In [6]:
# Initialize the multilingual domain classifier
classifier_stage = MultilingualDomainClassifier(text_field=text_field)

# If desired, you may filter your dataset with:
# classifier_stage = MultilingualDomainClassifier(filter_by=["Science", "Health"])  # noqa: ERA001
# See full list of domains here: https://huggingface.co/nvidia/multilingual-domain-classifier

Finally, we can define a stage for writing the results:

In [7]:
output_file_path = "./multilingual_domain_classifier_results"
output_file_type = "parquet"  # can be "jsonl" or "parquet"

# Use mode="overwrite" to overwrite the output directory if it already exists
# This helps to ensure that the correct output is written
if output_file_type == "jsonl":
    write_stage = JsonlWriter(output_file_path, mode="overwrite")
elif output_file_type == "parquet":
    write_stage = ParquetWriter(output_file_path, mode="overwrite")
else:
    msg = f"Invalid output file type: {output_file_type}"
    raise ValueError(msg)

# Initialize Pipeline

In NeMo Curator, we use pipelines to run distributed data workflows using Ray. Pipelines take care of resource allocation and autoscaling to achieve enhanced performance and minimize GPU idleness.

For the distributed data classifiers, we are able to achieve speedups by ensuring that model inference is run in parallel across all available GPUs, while other stages such as I/O, tokenization, and filtering are run across all available CPUs. This is possible because Curator pipelines are composable, which allows each stage in a pipeline to run independently and with its own specified hardware resources.

In [8]:
classifier_pipeline = Pipeline(name="classifier_pipeline", description="Run a classifier pipeline")

# Add stages to the pipeline
classifier_pipeline.add_stage(read_stage)
classifier_pipeline.add_stage(classifier_stage)
classifier_pipeline.add_stage(write_stage)

Pipeline(name='classifier_pipeline', stages=[parquet_reader(ParquetReader), multilingual_domain_classifier_classifier(MultilingualDomainClassifier), parquet_writer(ParquetWriter)])

Composability is also what allows a classifier to sit between pre-processing and post-processing stages. Typical text pre-processing add-ons include text normalization (lowercasing, URL/email removal, Unicode cleanup) and language identification and filtering (to keep only target languages). A full pipeline may look something like:

```python
pipeline = Pipeline(name="full_pipeline")
pipeline.add_stage(read_stage)                # reader (JSONL/Parquet)
pipeline.add_stage(lang_id_stage)             # optional: language filter
pipeline.add_stage(classifier_stage)          # classifier
pipeline.add_stage(write_stage)               # writer (JSONL/Parquet)
```

# Run the  Classifier

Let's run the full classifier pipeline:

In [9]:
# Run the pipeline
result = classifier_pipeline.run()

2025-12-03 15:14:31,428	INFO worker.py:1692 -- Using address 127.0.1.1:6380 set in the environment variable RAY_ADDRESS
2025-12-03 15:14:31,432	INFO worker.py:1833 -- Connecting to existing Ray cluster at address: 127.0.1.1:6380...


2025-12-03 15:14:31,297	INFO usage_lib.py:447 -- Usage stats collection is disabled.
2025-12-03 15:14:31,297	INFO scripts.py:914 -- [37mLocal node IP[39m: [1m127.0.1.1[22m
2025-12-03 15:14:33,281	SUCC scripts.py:950 -- [32m--------------------[39m
2025-12-03 15:14:33,281	SUCC scripts.py:951 -- [32mRay runtime started.[39m
2025-12-03 15:14:33,281	SUCC scripts.py:952 -- [32m--------------------[39m
2025-12-03 15:14:33,281	INFO scripts.py:954 -- [36mNext steps[39m
2025-12-03 15:14:33,281	INFO scripts.py:957 -- To add another node to this Ray cluster, run
2025-12-03 15:14:33,281	INFO scripts.py:960 -- [1m  ray start --address='127.0.1.1:6380'[22m
2025-12-03 15:14:33,281	INFO scripts.py:969 -- To connect to this Ray cluster:
2025-12-03 15:14:33,282	INFO scripts.py:971 -- [35mimport[39m[26m ray
2025-12-03 15:14:33,282	INFO scripts.py:972 -- ray[35m.[39m[26minit(_node_ip_address[35m=[39m[26m[33m'127.0.1.1'[39m[26m)
2025-12-03 15:14:33,282	INFO scripts.py:984 -- To su

2025-12-03 15:14:33,696	INFO worker.py:2004 -- Connected to Ray cluster. View the dashboard at [1m[32mhttp://127.0.0.1:8266 [39m[22m
2025-12-03 15:14:34,695	INFO worker.py:1692 -- Using address 127.0.1.1:6380 set in the environment variable RAY_ADDRESS
2025-12-03 15:14:34,697	INFO worker.py:1833 -- Connecting to existing Ray cluster at address: 127.0.1.1:6380...
2025-12-03 15:14:34,698	INFO worker.py:1851 -- Calling ray.init() again after it has already been called.
Fetching 10 files: 100%|██████████| 10/10 [00:00<00:00, 22758.02it/s]
Fetching 10 files: 100%|██████████| 10/10 [00:00<00:00, 69672.82it/s]


Since the pipeline ran to completion and the result was written to disk, we can shut down the Ray cluster with:

In [10]:
try:
    ray_client.stop()
except Exception as e:  # noqa: BLE001
    print(f"Error stopping Ray client: {e}")

# Inspect the Output

The write stage returns a list of written files. We can read the output file as a Pandas DataFrame for inspection.

In [11]:
# For simplicity, we take the first written file from the writer stage
# In real pipelines, adjust as needed
result_file = result[0].data[0]

result_df = pd.read_json(result_file, lines=True) if output_file_type == "jsonl" else pd.read_parquet(result_file)
result_df.head()

Unnamed: 0,analysis,multilingual_domain_pred
0,"D'accord, l'utilisateur demande les tendances ...",Online_Communities
1,"Okay, the user is asking for the top-rated se...",Arts_and_Entertainment
2,"In Ordnung, der Benutzer fragt, ob ich seine T...",Online_Communities
3,"Perfecto, veamos. El usuario quiere un viaje d...",Travel_and_Transportation
4,"Okay, the user is feeling down and wants acti...",Beauty_and_Fitness


Let's examine the distribution of predictions for this file:

In [12]:
result_df["multilingual_domain_pred"].value_counts()

multilingual_domain_pred
Computers_and_Electronics    97
Books_and_Literature         85
Food_and_Drink               77
Arts_and_Entertainment       71
People_and_Society           71
Internet_and_Telecom         66
Beauty_and_Fitness           52
Online_Communities           48
Games                        44
Health                       38
Business_and_Industrial      37
Pets_and_Animals             37
Finance                      34
News                         33
Hobbies_and_Leisure          31
Jobs_and_Education           28
Science                      25
Shopping                     24
Travel_and_Transportation    24
Home_and_Garden              24
Autos_and_Vehicles           21
Law_and_Government           17
Sports                       15
Real_Estate                   1
Name: count, dtype: int64

We can see that the predictions were generated as expected.