# [Hate Speech Identification Shared Task](https://multihate.github.io/): Subtask 1A

---

at [BLP Workshop](https://blp-workshop.github.io/) @IJCNLP-AACL 2025

This shared task is designed to identify the type of hate, its severity, and the targeted group from social media content. The goal is to develop robust systems that advance research in this area.

In this subtask, given a Bangla text collected from YouTube comments, categorize whether it contains abusive, sexism, religious hate, political hate, profane, or none.

### Downloading dataset from github

In [None]:
# !wget https://raw.githubusercontent.com/mehedihasan88/Hate-Speech-Detection-Bangla/refs/heads/main/data/subtask_1A/augment_1a_train.tsv
# !wget https://raw.githubusercontent.com/AridHasan/blp25_task1/refs/heads/main/data/subtask_1A/blp25_hatespeech_subtask_1A_dev.tsv
# !wget https://raw.githubusercontent.com/AridHasan/blp25_task1/refs/heads/main/data/subtask_1A/blp25_hatespeech_subtask_1A_dev_test.tsv

In [None]:
# from google.colab import files
# uploaded = files.upload()

### installing required libraries.
 - transformers
 - datasets
 - evaluate
 - accelerate

In [None]:
train_file = '/kaggle/input/subtast1a/blp25_hatespeech_subtask_1A_train.tsv'
validation_file = '/kaggle/input/subtast1a/blp25_hatespeech_subtask_1A_dev.tsv'
test_file = '/kaggle/input/subtast1a/blp25_hatespeech_subtask_1A_dev_test.tsv'

In [None]:
!pip install transformers
!pip install datasets
!pip install evaluate
!pip install git+https://github.com/csebuetnlp/normalizer

# !pip install --upgrade accelerate

#### importing required libraries and setting up logger

In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

import logging
import os
import random
import sys
from dataclasses import dataclass, field
from typing import Optional
import pandas as pd
import datasets
import evaluate
import numpy as np
from datasets import load_dataset, Dataset, DatasetDict
import torch

import transformers
from transformers import (
    AutoConfig,
    AutoModelForSequenceClassification,
    AutoTokenizer,
    DataCollatorWithPadding,
    EvalPrediction,
    HfArgumentParser,
    PretrainedConfig,
    Trainer,
    TrainingArguments,
    default_data_collator,
    set_seed,
)
from transformers.trainer_utils import get_last_checkpoint
from transformers.utils import check_min_version, send_example_telemetry
from transformers.utils.versions import require_version


logger = logging.getLogger(__name__)

logging.basicConfig(
    format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
    datefmt="%m/%d/%Y %H:%M:%S",
    handlers=[logging.StreamHandler(sys.stdout)],
)

### Defining the training, validation, and test data

In [None]:
# train_file = '/content/drive/MyDrive/hate-speech-detection/subtask_1a/augment_1a_train.tsv'
# validation_file = '/content/drive/MyDrive/hate-speech-detection/subtask_1a/blp25_hatespeech_subtask_1A_dev.tsv'
# test_file = '/content/drive/MyDrive/hate-speech-detection/subtask_1a/blp25_hatespeech_subtask_1A_dev_test.tsv'

### Disable wandb

In [None]:
import os
os.environ["WANDB_DISABLED"] = "true"

### Setting up the training parameters

In [None]:
# training_args = TrainingArguments(
#     output_dir="/content/drive/MyDrive/banbert_model",  # Persistent storage
#     overwrite_output_dir=False,
#     save_strategy="epoch",
#     save_total_limit=2,
#     eval_strategy="epoch",
#     logging_dir="/content/drive/MyDrive/banbert_logs",
#     logging_strategy="steps",
#     logging_steps=50,
#     load_best_model_at_end=True,
#     metric_for_best_model="eval_loss",
#     fp16=True,
#     learning_rate=3e-5,
#     num_train_epochs=3,
#     weight_decay=0.01,
#     label_smoothing_factor=0.1,
#     gradient_accumulation_steps=2,
#     per_device_train_batch_size=16,
#     per_device_eval_batch_size=16
# )

training_args = TrainingArguments(
  output_dir="/kaggle/working/banbert_model",
  overwrite_output_dir=False,
  save_strategy="steps",
  save_steps=200,
  save_total_limit=2,

  eval_strategy="steps",
  eval_steps=200,
  logging_dir="/kaggle/working/banbert_logs",
  logging_strategy="steps",
  logging_steps=50,

  load_best_model_at_end=True,
  metric_for_best_model="eval_micro_f1",

  fp16=True,
  learning_rate=3e-5,
  warmup_ratio=0.1,
  weight_decay=0.01,
  label_smoothing_factor=0.1,
  lr_scheduler_type="linear",

  gradient_accumulation_steps=2,
  per_device_train_batch_size=8,
  per_device_eval_batch_size=8,

  num_train_epochs=2
)

max_train_samples = None
max_eval_samples=None
max_predict_samples=None
max_seq_length = 512
batch_size = 8

In [None]:
transformers.utils.logging.set_verbosity_info()

log_level = training_args.get_process_log_level()
logger.setLevel(log_level)
datasets.utils.logging.set_verbosity(log_level)
transformers.utils.logging.set_verbosity(log_level)
transformers.utils.logging.enable_default_handler()
transformers.utils.logging.enable_explicit_format()
logger.warning(
    f"Process rank: {training_args.local_rank}, device: {training_args.device}, n_gpu: {training_args.n_gpu}"
    + f" distributed training: {bool(training_args.local_rank != -1)}, 16-bits training: {training_args.fp16}"
)
logger.info(f"Training/evaluation parameters {training_args}")

#### Defining the Model

In [None]:
model_name =  "csebuetnlp/banglabert"

#### setting the random seed

In [None]:
set_seed(training_args.seed)

In [None]:
# import torch
# import torch.nn as nn
# from transformers import AutoModel, AutoConfig

# class CustomBanBERTClassifier(nn.Module):
#     def __init__(self, model_name, num_labels, extra_hidden=True, hidden_size=256, dropout_prob=0.1):
#         super().__init__()
#         self.config = AutoConfig.from_pretrained(model_name, num_labels=num_labels)
#         self.encoder = AutoModel.from_pretrained(model_name, config=self.config)

#         # Optional extra layer
#         self.extra_hidden = extra_hidden
#         if extra_hidden:
#             self.fc = nn.Linear(self.config.hidden_size, hidden_size)
#             self.activation = nn.GELU()
#             self.norm = nn.LayerNorm(hidden_size)
#             self.dropout = nn.Dropout(dropout_prob)
#             classifier_input_size = hidden_size
#         else:
#             classifier_input_size = self.config.hidden_size

#         self.classifier = nn.Linear(classifier_input_size, num_labels)
#         self.dropout_final = nn.Dropout(dropout_prob)

#         # Match HF initialization
#         self.apply(self._init_weights)

#     def _init_weights(self, module):
#         if isinstance(module, nn.Linear):
#             module.weight.data.normal_(mean=0.0, std=self.config.initializer_range)
#             if module.bias is not None:
#                 module.bias.data.zero_()
#         elif isinstance(module, nn.LayerNorm):
#             module.bias.data.zero_()
#             module.weight.data.fill_(1.0)

#     def forward(self, input_ids=None, attention_mask=None, token_type_ids=None, labels=None):
#         outputs = self.encoder(
#             input_ids=input_ids,
#             attention_mask=attention_mask,
#             token_type_ids=token_type_ids
#         )

#         # Always use pooler_output if available
#         pooled_output = outputs.pooler_output
#         if pooled_output is None:
#             pooled_output = outputs.last_hidden_state[:, 0]  # CLS fallback

#         if self.extra_hidden:
#             pooled_output = self.fc(pooled_output)
#             pooled_output = self.activation(pooled_output)
#             pooled_output = self.norm(pooled_output)
#             pooled_output = self.dropout(pooled_output)

#         pooled_output = self.dropout_final(pooled_output)
#         logits = self.classifier(pooled_output)

#         loss = None
#         if labels is not None:
#             labels = labels.long()
#             loss_fct = nn.CrossEntropyLoss()
#             loss = loss_fct(logits.view(-1, self.config.num_labels), labels.view(-1))

#         return (loss, logits) if loss is not None else logits


#### Loading data files

In [None]:
l2id = {'None': 0, 'Religious Hate': 1, 'Sexism': 2, 'Political Hate': 3, 'Profane': 4, 'Abusive': 5}
train_df = pd.read_csv(train_file, sep='\t')
# print(train_df['label'])
train_df['label'] = train_df['label'].map(l2id).fillna(0).astype(int)
train_df = Dataset.from_pandas(train_df)
validation_df = pd.read_csv(validation_file, sep='\t')
validation_df['label'] = validation_df['label'].map(l2id).fillna(0).astype(int)
validation_df = Dataset.from_pandas(validation_df)
test_df = pd.read_csv(test_file, sep='\t')
#test_df['label'] = test_df['label'].map(l2id)
test_df = Dataset.from_pandas(test_df)

data_files = {"train": train_df, "validation": validation_df, "test": test_df}
for key in data_files.keys():
    logger.info(f"loading a local file for {key}")
raw_datasets = DatasetDict(
    {"train": train_df, "validation": validation_df, "test": test_df}
)

In [None]:
len(test_df['id'])

##### Extracting number of unique labels

In [None]:
# Labels
label_list = raw_datasets["train"].unique("label")
print(label_list)
label_list.sort()  # sort the labels for determine
num_labels = len(label_list)

### Loading Pretrained Configuration, Tokenizer and Model

In [None]:
config = AutoConfig.from_pretrained(
    model_name,
    num_labels=num_labels,
    finetuning_task=None,
    cache_dir=None,
    revision="main",
    use_auth_token=None,
)

tokenizer = AutoTokenizer.from_pretrained(
    model_name,
    cache_dir=None,
    use_fast=True,
    revision="main",
    use_auth_token=None,
)

model = AutoModelForSequenceClassification.from_pretrained(
    model_name,
    from_tf=bool(".ckpt" in model_name),
    config=config,
    cache_dir=None,
    revision="main",
    use_auth_token=None,
    ignore_mismatched_sizes=False,
)

#### Preprocessing the raw_datasets

In [None]:
from normalizer import normalize

non_label_column_names = [name for name in raw_datasets["train"].column_names if name != "label"]
sentence1_key= non_label_column_names[1]

# Padding strategy
padding = "max_length"

# Some models have set the order of the labels to use, so let's make sure we do use it.
label_to_id = None
if (model.config.label2id != PretrainedConfig(num_labels=num_labels).label2id):
    # Some have all caps in their config, some don't.
    label_name_to_id = {k.lower(): v for k, v in model.config.label2id.items()}
    if sorted(label_name_to_id.keys()) == sorted(label_list):
        label_to_id = {i: int(label_name_to_id[label_list[i]]) for i in range(num_labels)}
    else:
        logger.warning(
            "Your model seems to have been trained with labels, but they don't match the dataset: ",
            f"model labels: {sorted(label_name_to_id.keys())}, dataset labels: {sorted(label_list)}."
            "\nIgnoring the model labels as a result.",)

if label_to_id is not None:
    model.config.label2id = label_to_id
    model.config.id2label = {id: label for label, id in config.label2id.items()}

if 128 > tokenizer.model_max_length:
    logger.warning(
        f"The max_seq_length passed ({128}) is larger than the maximum length for the"
        f"model ({tokenizer.model_max_length}). Using max_seq_length={tokenizer.model_max_length}.")
max_seq_length = min(128, tokenizer.model_max_length)

def preprocess_function(examples):
    # Normalize the Bengali text using the 'normalize' function
    # It's important to do this before tokenization
    examples[sentence1_key] = [normalize(text) for text in examples[sentence1_key]]

    # Tokenize the texts
    args = (
        (examples[sentence1_key],))
    result = tokenizer(*args, padding=padding, max_length=max_seq_length, truncation=True)

    # Map labels to IDs (not necessary for GLUE tasks)
    if label_to_id is not None and "label" in examples:
        result["label"] = [(label_to_id[l] if l != -1 else -1) for l in examples["label"]]
    return result
raw_datasets = raw_datasets.map(
    preprocess_function,
    batched=True,
    load_from_cache_file=True,
    desc="Running tokenizer on dataset",
)

#### Finalize the training data for training the model

In [None]:
if "train" not in raw_datasets:
    raise ValueError("requires a train dataset")
train_dataset = raw_datasets["train"]
if max_train_samples is not None:
    max_train_samples_n = min(len(train_dataset), max_train_samples)
    train_dataset = train_dataset.select(range(max_train_samples_n))

In [None]:
train_dataset

#### Finalize the development/evaluation data for evaluating the model

In [None]:
if "validation" not in raw_datasets:
    raise ValueError("requires a validation dataset")
eval_dataset = raw_datasets["validation"]
if max_eval_samples is not None:
    max_eval_samples_n = min(len(eval_dataset), max_eval_samples)
    eval_dataset = eval_dataset.select(range(max_eval_samples_n))

#### Finalize the test data for predicting the unseen test data using the model

In [None]:
if "test" not in raw_datasets and "test_matched" not in raw_datasets:
    raise ValueError("requires a test dataset")
predict_dataset = raw_datasets["test"]
if max_predict_samples is not None:
    max_predict_samples_n = min(len(predict_dataset), max_predict_samples)
    predict_dataset = predict_dataset.select(range(max_predict_samples_n))

#### Log a few random samples from the training set

In [None]:
for index in random.sample(range(len(train_dataset)), 3):
    logger.info(f"Sample {index} of the training set: {train_dataset[index]}.")

#### Get the metric function `accuracy`

In [None]:
metric = evaluate.load("accuracy")

#### Predictions and label_ids field and has to return a dictionary string to float.

In [None]:
# def compute_metrics(p: EvalPrediction):
#     preds = p.predictions[0] if isinstance(p.predictions, tuple) else p.predictions
#     preds = np.argmax(preds, axis=1)
#     return {"accuracy": (preds == p.label_ids).astype(np.float32).mean().item()}


# from sklearn.metrics import accuracy_score, f1_score
# def compute_metrics(p: EvalPrediction):
#     preds = p.predictions[0] if isinstance(p.predictions, tuple) else p.predictions
#     preds = np.argmax(preds, axis=1)
#     acc = accuracy_score(p.label_ids, preds)
#     f1 = f1_score(p.label_ids, preds, average="weighted")  # or "macro"
#     return {"accuracy": acc, "f1": f1}


# from sklearn.metrics import f1_score

# def compute_metrics(p):
#     preds = p.predictions.argmax(axis=1)
#     f1_micro = f1_score(p.label_ids, preds, average='micro')
#     return {"f1_micro": f1_micro}


# from sklearn.metrics import f1_score, classification_report

# def compute_metrics(p: EvalPrediction):
#     # Extract logits
#     preds = p.predictions[0] if isinstance(p.predictions, tuple) else p.predictions
#     # Turn logits into predicted class indices
#     preds = np.argmax(preds, axis=1)
#     labels = p.label_ids

#     # 1. Overall accuracy
#     acc = (preds == labels).astype(np.float32).mean().item()

#     # 2. Macro F1
#     macro_f1 = f1_score(labels, preds, average="macro")

#     3. (Optional) Full classification report
#     report = classification_report(labels, preds, digits=4)

#     return {
#         "accuracy": acc,
#         "macro_f1": macro_f1,
#         # "report": report  # you can log this if you want detailed class-wise stats
#     }



import evaluate

# Load metrics
acc_metric   = evaluate.load("accuracy")
f1_metric    = evaluate.load("f1")
cm_metric    = evaluate.load("confusion_matrix")

def compute_metrics(p: EvalPrediction):
    logits = p.predictions[0] if isinstance(p.predictions, tuple) else p.predictions
    preds  = np.argmax(logits, axis=1)
    labels = p.label_ids

    # Accuracy
    acc = acc_metric.compute(predictions=preds, references=labels)["accuracy"]

    # F1 micro + macro
    micro_f1 = f1_metric.compute(predictions=preds, references=labels, average="micro")["f1"]
    macro_f1 = f1_metric.compute(predictions=preds, references=labels, average="macro")["f1"]

    # Confusion Matrix (raw counts)
    cm_result = cm_metric.compute(
        predictions=preds.tolist(),
        references=labels.tolist(),
        normalize=None
    )
    cm = cm_result["confusion_matrix"]  # list of lists

    # Print or log
    print("\nConfusion Matrix:\n", np.array(cm))

    return {
        "accuracy":  acc,
        "micro_f1":  micro_f1,
        "macro_f1":  macro_f1,
        # you can also return cm if you want to inspect it elsewhere
    }


#### Data Collator

In [None]:
data_collator = default_data_collator

#### Initialize our Trainer

In [None]:
# from collections import Counter

# counts = Counter(train_df["label"])
# total = sum(counts.values())
# num_classes = len(l2id)
# class_weights = torch.tensor([total / (num_classes * counts[i]) for i in range(num_classes)], dtype=torch.float)


# class WeightedTrainer(Trainer):
#     def __init__(self, *args, class_weights=None, **kwargs):
#         super().__init__(*args, **kwargs)
#         self.class_weights = class_weights.to(self.args.device).to(torch.float32)

#     def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
#         labels = inputs["labels"]
#         outputs = model(**inputs)
#         logits = outputs.logits
#         loss_fct = nn.CrossEntropyLoss(weight=self.class_weights)
#         loss = loss_fct(logits.view(-1, self.model.config.num_labels),
#                         labels.view(-1))
#         return (loss, outputs) if return_outputs else loss



# model.class_weights = class_weights

# trainer = WeightedTrainer(
#     model=model,
#     args=training_args,
#     train_dataset=train_dataset,
#     eval_dataset=eval_dataset,
#     tokenizer=tokenizer,
#     data_collator=default_data_collator,
#     compute_metrics=compute_metrics,
#     class_weights=class_weights,
# )

In [None]:
train_dataset = train_dataset.remove_columns("id")
eval_dataset = eval_dataset.remove_columns("id")

In [None]:
#custom_model = CustomBanBERTClassifier(model_name=model_name, num_labels=len(l2id))
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    compute_metrics=compute_metrics,
    tokenizer=tokenizer,
    data_collator=data_collator,
)

#### Training our model

In [None]:
# import torch
# import torch.nn as nn
# import torch.nn.functional as F

# from transformers import Trainer

# class FocalLossTrainer(Trainer):
#     def __init__(self, *args, focal_loss=None, **kwargs):
#         super().__init__(*args, **kwargs)
#         self.focal_loss = focal_loss

#     # Add **kwargs so we swallow any extra Trainer args (like num_items_in_batch)
#     def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
#         # 1. Pop labels out of inputs so we can pass everything else to the model
#         labels = inputs.pop("labels")

#         # 2. Forward pass through model
#         outputs = model(**inputs, labels=labels)
#         logits = outputs.logits

#         # 3. Compute focal loss with your pre‐computed class weights
#         loss = self.focal_loss(
#             logits.view(-1, model.config.num_labels),
#             labels.view(-1)
#         )

#         # 4. Return (loss, outputs) if Trainer wants the raw outputs too
#         return (loss, outputs) if return_outputs else loss



# import numpy as np
# from sklearn.utils.class_weight import compute_class_weight

# train_labels = np.array(train_dataset["label"])

# class_weights = compute_class_weight(
#     class_weight='balanced',
#     classes=np.unique(train_labels),
#     y=train_labels
# )

# alpha_tensor = torch.tensor(class_weights, dtype=torch.float).to(training_args.device)


# focal_loss_fn = FocalLoss(gamma=2.0, alpha=alpha_tensor)

# # Initialize our custom Trainer
# trainer = FocalLossTrainer(
#     model=model,
#     args=training_args,
#     train_dataset=train_dataset,
#     eval_dataset=eval_dataset,
#     tokenizer=tokenizer,
#     compute_metrics=compute_metrics,  # your existing F1-micro function
#     focal_loss=focal_loss_fn
# )

# # Kick off training!
# trainer.train()




In [None]:
from transformers.trainer_utils import get_last_checkpoint

last_checkpoint = get_last_checkpoint(training_args.output_dir)
if last_checkpoint:
    print(f"Resuming from {last_checkpoint}")
train_result = trainer.train(resume_from_checkpoint=last_checkpoint)

train_result = trainer.train(resume_from_checkpoint=True)
metrics = train_result.metrics
max_train_samples = (
    max_train_samples if max_train_samples is not None else len(train_dataset)
)
metrics["train_samples"] = min(max_train_samples, len(train_dataset))

#### Saving the tokenizer too for easy upload

In [None]:
trainer.save_model()
trainer.log_metrics("train", metrics)
trainer.save_metrics("train", metrics)
trainer.save_state()

#### Evaluating our model on validation/development data

In [None]:
logger.info("*** Evaluate ***")

metrics = trainer.evaluate(eval_dataset=eval_dataset)

max_eval_samples = (
    max_eval_samples if max_eval_samples is not None else len(eval_dataset)
)
metrics["eval_samples"] = min(max_eval_samples, len(eval_dataset))

trainer.log_metrics("eval", metrics)
trainer.save_metrics("eval", metrics)

### Predecting the test data

In [None]:
id2l = {v: k for k, v in l2id.items()}
logger.info("*** Predict ***")
#predict_dataset = predict_dataset.remove_columns("label")
ids = predict_dataset['id']
predict_dataset = predict_dataset.remove_columns("id")
predictions = trainer.predict(predict_dataset, metric_key_prefix="predict").predictions
predictions = np.argmax(predictions, axis=1)
output_predict_file = os.path.join(training_args.output_dir, f"subtask_1A.tsv")
if trainer.is_world_process_zero():
    with open(output_predict_file, "w") as writer:
        logger.info(f"***** Predict results *****")
        writer.write("id\tlabel\tmodel\n")
        for index, item in enumerate(predictions):
            item = label_list[item]
            item = id2l[item]
            writer.write(f"{ids[index]}\t{item}\t{model_name}\n")

In [None]:
ids[0]

#### Saving the model into card

In [None]:
kwargs = {"finetuned_from": model_name, "tasks": "text-classification"}
trainer.create_model_card(**kwargs)