# Distributed Data Classification with NeMo Curator's `ContentTypeClassifier`

This notebook demonstrates the use of NeMo Curator's `ContentTypeClassifier`. The [content type classifier](https://huggingface.co/nvidia/content-type-classifier-deberta) is used to categorize documents into one of 11 distinct speech types based on their content. It helps with data annotation, which is useful in data blending for foundation model training. Please refer to the NemoCurator Content Type Classifier DeBERTa Hugging Face page for more information about the content type classifier, including its output labels, here: https://huggingface.co/nvidia/content-type-classifier-deberta.

This tutorial requires at least 1 NVIDIA GPU with:
  - Volta™ or higher (compute capability 7.0+)
  - CUDA 12.x

Before running this notebook, please see this [Installation Guide](https://docs.nvidia.com/nemo/curator/latest/admin/installation.html#admin-installation) page for instructions on how to install NeMo Curator. Be sure to use an installation method which includes GPU dependencies.

In [1]:
# Silence Curator logs via Loguru
import os

os.environ["LOGURU_LEVEL"] = "ERROR"

The following imports are required for this tutorial:

In [None]:
import pandas as pd

from nemo_curator.core.client import RayClient
from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.text.classifiers import ContentTypeClassifier
from nemo_curator.stages.text.io.reader.jsonl import JsonlReader
from nemo_curator.stages.text.io.writer.jsonl import JsonlWriter

To run a pipeline in NeMo Curator, we must start a Ray cluster. This can be done manually (see the [Ray documentation](https://docs.ray.io/en/latest/ray-core/starting-ray.html)) or with Curator's `RayClient`:

In [3]:
try:
    ray_client = RayClient()
    ray_client.start()
except Exception as e:
    msg = f"Error initializing Ray client: {e}"
    raise RuntimeError(msg) from e

# Initialize Read, Classification, and Write Stages

Functions in NeMo Curator are called stages. For this tutorial, we will initialize 3 stages: a JSONL file reader, a content type classification stage, and a JSONL file writer.

For this tutorial, we will create a sample JSONL file to use:

In [4]:
news_example = """
Brent awarded for leading collaborative efforts and leading SIA International Relations Committee.

Mar 20, 2018

The Security Industry Association (SIA) will recognize Richard Brent, CEO, Louroe Electronics with the prestigious 2017 SIA Chairman's Award for his work to support leading the SIA International Relations Committee and supporting key government relations initiatives.

With his service on the SIA Board of Directors and as Chair of the SIA International Relations Committee, Brent has forged relationships between SIA and agencies like the U.S. Commercial Service. A longtime advocate for government engagement generally and exports specifically, Brent's efforts resulted in the publication of the SIA Export Assistance Guide last year as a tool to assist SIA member companies exploring export opportunities or expanding their participation in trade.

SIA Chairman Denis Hébert will present the SIA Chairman's Award to Brent at The Advance, SIA's annual membership meeting, scheduled to occur on Tuesday, April 10, 2018, at ISC West.

"As the leader of an American manufacturing company, I have seen great business opportunities in foreign sales," said Brent. "Through SIA, I have been pleased to extend my knowledge and experience to other companies that can benefit from exporting. And that is the power of SIA: To bring together distinct companies to share expertise across vertical markets in a collaborative fashion. I'm pleased to contribute, and I thank the Chairman for his recognition."

"As a member of the SIA Board of Directors, Richard Brent is consistently engaged on a variety of issues of importance to the security industry, particularly related to export assistance programs that will help SIA members to grow their businesses," said Hébert. "His contributions in all areas of SIA programming have been formidable, but we owe him a particular debt in sharing his experiences in exporting. Thank you for your leadership, Richard."

Hébert will present SIA award recipients, including the SIA Chairman's Award, SIA Committee Chair of the Year Award and Sandy Jones Volunteer of the Year Award, at The Advance, held during ISC West in Rooms 505/506 of the Sands Expo in Las Vegas, Nevada, on Tuesday, April 10, 10:30-11:30 a.m. Find more info and register at https:/​/​www.securityindustry.org/​advance.

The Advance is co-located with ISC West, produced by ISC Security Events. Security professionals can register to attend the ISC West trade show and conference, which runs April 10-13, at http:/​/​www.iscwest.com.
"""

In [5]:
online_comments_example = "Hi, great video! I am now a subscriber."
explanatory_articles_example = "Photosynthesis is the process by which plants convert sunlight into chemical energy. During this process, plants absorb carbon dioxide from the air and water from the soil, producing glucose for energy and releasing oxygen as a byproduct."
blogs_example = "This week I finally tried making sourdough bread from scratch, and let me tell you—it was both harder and more rewarding than I expected. Here's what I learned (and what I'll do differently next time)."
boilerplate_content_example = "All rights reserved. This website and its contents may not be reproduced, distributed, or transmitted in any form without prior written permission from the publisher."

In [6]:
input_file_path = "./input_data_dir"

# Create sample dataset for the tutorial
text = [news_example, online_comments_example, explanatory_articles_example, blogs_example, boilerplate_content_example]
df = pd.DataFrame({"text": text})

try:
    os.makedirs(input_file_path, exist_ok=True)
    df.to_json(input_file_path + "/data.jsonl", orient="records", lines=True)
except Exception as e:
    msg = f"Error creating input file: {e}"
    raise RuntimeError(msg) from e

We can define the reader stage with:

In [7]:
# Read existing directory of JSONL files
read_stage = JsonlReader(input_file_path, files_per_partition=1)

The classifier stage is broken down under the hood into a tokenizer stage and a model inference stage. Tokenization is run on the CPU while model inference is run on the GPU. This means that behind the scenes, the `ContentTypeClassifier` stage is actually being broken down into 2 stages (some parameters and details omitted to avoid complexity, please refer to the documentation for more details):

```python
class TokenizerStage:
    self._resources = Resources(cpus=1)
    self.model_identifier = "nvidia/content-type-classifier-deberta"
    self.text_field = "text"
    self.padding_side = "right"
    ...
class ModelStage:
    self._resources = Resources(cpus=1, gpus=1)
    self.model_identifier = "nvidia/content-type-classifier-deberta"
    self.model_inference_batch_size = 256
    ...
```

Optionally, the classifier predictions may be filtered to include only texts with values listed in `filter_by`. If the `filter_by` parameter is set, then a third stage is added:

```python
def filter_by_category(self, value: str) -> bool:
    return value in self.filter_by

...

if self.filter_by is not None and len(self.filter_by) > 0:
    self.stages.append(Filter(filter_fn=self.filter_by_category, filter_field=...))
```

In [8]:
# Initialize the content type classifier
classifier_stage = ContentTypeClassifier()

# If desired, you may filter your dataset with:
# classifier_stage = ContentTypeClassifier(filter_by=["News"])  # noqa: ERA001
# See full list of labels here: https://huggingface.co/nvidia/content-type-classifier-deberta

Finally, we can define a stage for writing the results:

In [None]:
# Write results to a directory
output_file_path = "./content_type_classifier_results"

# Use mode="overwrite" to overwrite the output directory if it already exists
# This helps to ensure that the correct output is written
write_stage = JsonlWriter(output_file_path, mode="overwrite")

# Initialize Pipeline

In NeMo Curator, we use pipelines to run distributed data workflows using Ray. Pipelines take care of resource allocation and autoscaling to achieve enhanced performance and minimize GPU idleness.

For the distributed data classifiers, we are able to achieve speedups by ensuring that model inference is run in parallel across all available GPUs, while other stages such as I/O, tokenization, and filtering are run across all available CPUs. This is possible because Curator pipelines are composable, which allows each stage in a pipeline to run independently and with its own specified hardware resources.

In [10]:
classifier_pipeline = Pipeline(name="classifier_pipeline", description="Run a classifier pipeline")

# Add stages to the pipeline
classifier_pipeline.add_stage(read_stage)
classifier_pipeline.add_stage(classifier_stage)
classifier_pipeline.add_stage(write_stage)

Pipeline(name='classifier_pipeline', stages=[jsonl_reader(JsonlReader), content_type_classifier_deberta_classifier(ContentTypeClassifier), jsonl_writer(JsonlWriter)])

Composability is also what allows a classifier to sit between pre-processing and post-processing stages. Typical text pre-processing add-ons include text normalization (lowercasing, URL/email removal, Unicode cleanup) and language identification and filtering (to keep only target languages). A full pipeline may look something like:

```python
pipeline = Pipeline(name="full_pipeline")
pipeline.add_stage(read_stage)                # reader (JSONL/S3/etc.)
pipeline.add_stage(lang_id_stage)             # optional: language filter
pipeline.add_stage(classifier_stage)          # classifier
pipeline.add_stage(write_stage)               # writer (JSONL/Parquet)
```

# Run the  Classifier

Let's run the full classifier pipeline:

In [None]:
# Run the pipeline
result = classifier_pipeline.run()

Since the pipeline ran to completion and the result was written to a JSONL file, we can shut down the Ray cluster with:

In [12]:
try:
    ray_client.stop()
except Exception as e:  # noqa: BLE001
    print(f"Error stopping Ray client: {e}")

# Inspect the Output

The write stage returns a list of written files. We can read the output file as a Pandas DataFrame for inspection.

In [None]:
# For simplicity, we take the first written file from the writer stage
# In real pipelines, the writer may return multiple files (shards) or objects
result_file = result[0].data[0]

result_df = pd.read_json(result_file, lines=True)
result_df.head()

Unnamed: 0,text,content_pred
0,\nBrent awarded for leading collaborative effo...,News
1,"Hi, great video! I am now a subscriber.",Online Comments
2,Photosynthesis is the process by which plants ...,Explanatory Articles
3,This week I finally tried making sourdough bre...,Blogs
4,All rights reserved. This website and its cont...,Boilerplate Content


We can see that the predictions were generated as expected.