In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/Val_Task_A.csv
/kaggle/input/Train_Task_B.csv
/kaggle/input/Train_Task_A.csv
/kaggle/input/Test_Task_A.csv
/kaggle/input/Test_Task_B.csv
/kaggle/input/Val_Task_B.csv


In [2]:
# !pip install datasets transformers[sentencepiece]
# !pip install evaluate
# !pip install accelerate==0.26.0
# !pip install peft

In [3]:
import pandas as pd
import json
import math
import re

def clean(text):
    # Convert to lowercase and strip spaces
    text = text.lower().strip()
    # Replace special characters with a space
    text = re.sub(r"[^a-zA-Z0-9\s]", '', text)
    return text

def convert(file_path, output_file_path):
    # Read the CSV file
    df = pd.read_csv(file_path)

    # Define mappings for 'Target' and 'Severity'
    target_mapping = {'I': 1, 'O': 2, 'R': 3}
    severity_mapping = {'L': 1, 'M': 2, 'H': 3}

    # Convert the DataFrame to a list of dictionaries, skipping invalid rows
    data = []
    for _, row in df.iterrows():
        target = target_mapping.get(row["Target"])
        severity = severity_mapping.get(row["Severity"])
        if target is None:
            target = 0
        if severity is None:
            severity = 0
        data.append({
            "text": clean(str(row["Tweet"])),
            "label1": target,
            "label2": severity
        })

    # Save the list of dictionaries as a JSON file
    with open(output_file_path, 'w') as f:
        json.dump(data, f, indent=4)

    print(f"Data has been saved to {output_file_path}")

convert('/kaggle/input/Train_Task_B.csv', '/kaggle/working/train.json')
convert('/kaggle/input/Val_Task_B.csv', '/kaggle/working/val.json')

Data has been saved to /kaggle/working/train.json
Data has been saved to /kaggle/working/val.json


In [4]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer
import torch
import torch.nn as nn
from datasets import load_dataset
from transformers import DataCollatorWithPadding
from sklearn.metrics import accuracy_score
import numpy as np
from transformers.modeling_outputs import SequenceClassifierOutput

class RobertaWithTwoHeads(nn.Module):
    def __init__(self, base_model, num_labels_head1=4, num_labels_head2=4, hidden_size=768):
        super(RobertaWithTwoHeads, self).__init__()
        self.roberta = base_model.roberta
        self.dropout = nn.Dropout(base_model.config.hidden_dropout_prob)
        self.config = base_model.config
        
        # Enhanced classification head 1 for hate detection
        self.head1 = nn.Sequential(
            nn.Linear(self.config.hidden_size, hidden_size),
            nn.LayerNorm(hidden_size),
            nn.GELU(),
            nn.Dropout(0.2),
            nn.Linear(hidden_size, hidden_size // 2),
            nn.LayerNorm(hidden_size // 2),
            nn.GELU(),
            nn.Dropout(0.1),
            nn.Linear(hidden_size // 2, num_labels_head1)
        )
        
        # Enhanced classification head 2 for fake news detection
        self.head2 = nn.Sequential(
            nn.Linear(self.config.hidden_size, hidden_size),
            nn.LayerNorm(hidden_size),
            nn.GELU(),
            nn.Dropout(0.2),
            nn.Linear(hidden_size, hidden_size // 2),
            nn.LayerNorm(hidden_size // 2),
            nn.GELU(),
            nn.Dropout(0.1),
            nn.Linear(hidden_size // 2, num_labels_head2)
        )
        
        # Optional: Add residual connections
        self.use_residual = True
        if self.use_residual:
            self.residual_projection1 = nn.Linear(self.config.hidden_size, num_labels_head1)
            self.residual_projection2 = nn.Linear(self.config.hidden_size, num_labels_head2)

    def forward(
        self,
        input_ids=None,
        attention_mask=None,
        labels=None,
        return_dict=None,
        **kwargs
    ):
        outputs = self.roberta(
            input_ids=input_ids,
            attention_mask=attention_mask,
            return_dict=True
        )

        sequence_output = outputs.last_hidden_state
        pooled_output = sequence_output[:, 0, :]  # [CLS] token
        pooled_output = self.dropout(pooled_output)

        # Forward pass through enhanced heads
        head1_output = self.head1(pooled_output)
        head2_output = self.head2(pooled_output)

        # Add residual connections if enabled
        if self.use_residual:
            residual1 = self.residual_projection1(pooled_output)
            residual2 = self.residual_projection2(pooled_output)
            logits_head1 = head1_output + residual1
            logits_head2 = head2_output + residual2
        else:
            logits_head1 = head1_output
            logits_head2 = head2_output

        loss = None
        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            if labels.dim() == 2:
                loss_head1 = loss_fct(logits_head1, labels[:, 0])
                loss_head2 = loss_fct(logits_head2, labels[:, 1])
                loss = (loss_head1 + loss_head2) / 2
            else:
                print(f"Unexpected label shape: {labels.shape}")

        return SequenceClassifierOutput(
            loss=loss,
            logits=(logits_head1, logits_head2),
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )

def tokenize_function(examples):
    # Tokenize the texts
    tokenized = tokenizer(
        examples["text"],
        padding="max_length",
        truncation=True,
        max_length=512
    )
    # Process labels correctly - don't duplicate
    labels = torch.tensor([
        [examples['label1'][i], examples['label2'][i]]
        for i in range(len(examples['text']))
    ], dtype=torch.long)
    
    tokenized['labels'] = labels
    return tokenized

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    logits_head1, logits_head2 = logits
    
    predictions_head1 = np.argmax(logits_head1, axis=1)
    predictions_head2 = np.argmax(logits_head2, axis=1)
    
    # Ensure labels are properly shaped
    if labels.ndim == 2:
        labels_head1, labels_head2 = labels[:, 0], labels[:, 1]
    else:
        raise ValueError(f"Unexpected label shape: {labels.shape}")
    
    accuracy_head1 = accuracy_score(labels_head1, predictions_head1)
    accuracy_head2 = accuracy_score(labels_head2, predictions_head2)
    
    return {
        "accuracy_head1": accuracy_head1,
        "accuracy_head2": accuracy_head2,
        "overall_accuracy": (accuracy_head1 + accuracy_head2) / 2,
    }

# Model and tokenizer initialization
model_checkpoint = "roberta-base"
batch_size = 8
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)

# Load dataset
data_files = {
    "train": "/kaggle/working/train.json",
    "validation": "/kaggle/working/val.json"
}
dataset = load_dataset("json", data_files=data_files)

# Tokenize datasets
tokenized_dataset = dataset.map(
    tokenize_function,
    batched=True,
)

# Load base model and create the dual-head model
base_model = AutoModelForSequenceClassification.from_pretrained(model_checkpoint, num_labels=2)
model = RobertaWithTwoHeads(base_model)

# Training arguments optimized for full fine-tuning
training_args = TrainingArguments(
    output_dir="roberta-dual-head",
    learning_rate=2e-5,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    num_train_epochs=6,
    weight_decay=0.01,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
    metric_for_best_model="overall_accuracy",
    greater_is_better=True,
    report_to=["none"],
    logging_dir="./logs",
    logging_steps=100,
    fp16=True,  # Mixed precision training
    gradient_accumulation_steps=4,
    warmup_ratio=0.1,  # Added warmup
)

# Initialize trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_dataset["train"],
    eval_dataset=tokenized_dataset["validation"],
    tokenizer=tokenizer,
    data_collator=DataCollatorWithPadding(tokenizer=tokenizer),
    compute_metrics=compute_metrics,
)

# Train the model
trainer.train()

# Save the model
trainer.save_model("roberta-dual-head-finetuned")

tokenizer_config.json:   0%|          | 0.00/25.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/481 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]



Generating train split: 0 examples [00:00, ? examples/s]

Generating validation split: 0 examples [00:00, ? examples/s]

Map:   0%|          | 0/6396 [00:00<?, ? examples/s]

Map:   0%|          | 0/800 [00:00<?, ? examples/s]

model.safetensors:   0%|          | 0.00/499M [00:00<?, ?B/s]

Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at roberta-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
  self.scaler = torch.cuda.amp.GradScaler(**kwargs)


Epoch,Training Loss,Validation Loss,Accuracy Head1,Accuracy Head2,Overall Accuracy
1,1.0996,1.032722,0.63875,0.54125,0.59
2,0.9616,0.965756,0.65375,0.56125,0.6075
3,0.8987,1.00979,0.6625,0.5675,0.615
4,0.8321,0.996495,0.6625,0.57875,0.620625
5,0.7772,1.022386,0.66,0.57625,0.618125
6,0.7322,1.040034,0.67625,0.5775,0.626875


In [5]:
# from transformers import AutoTokenizer, AutoModelForSequenceClassification
# import torch
# import torch.nn as nn
# from transformers.modeling_outputs import SequenceClassifierOutput
# from safetensors.torch import load_file

# class RobertaWithTwoHeads(nn.Module):
#     def __init__(self, base_model, num_labels_head1=4, num_labels_head2=4, hidden_size=768):
#         super(RobertaWithTwoHeads, self).__init__()
#         self.roberta = base_model.roberta
#         self.dropout = nn.Dropout(base_model.config.hidden_dropout_prob)
#         self.config = base_model.config
        
#         # Enhanced classification head 1 for hate detection
#         self.head1 = nn.Sequential(
#             nn.Linear(self.config.hidden_size, hidden_size),
#             nn.LayerNorm(hidden_size),
#             nn.GELU(),
#             nn.Dropout(0.2),
#             nn.Linear(hidden_size, hidden_size // 2),
#             nn.LayerNorm(hidden_size // 2),
#             nn.GELU(),
#             nn.Dropout(0.1),
#             nn.Linear(hidden_size // 2, num_labels_head1)
#         )
        
#         # Enhanced classification head 2 for fake news detection
#         self.head2 = nn.Sequential(
#             nn.Linear(self.config.hidden_size, hidden_size),
#             nn.LayerNorm(hidden_size),
#             nn.GELU(),
#             nn.Dropout(0.2),
#             nn.Linear(hidden_size, hidden_size // 2),
#             nn.LayerNorm(hidden_size // 2),
#             nn.GELU(),
#             nn.Dropout(0.1),
#             nn.Linear(hidden_size // 2, num_labels_head2)
#         )
        
#         # Optional: Add residual connections
#         self.use_residual = True
#         if self.use_residual:
#             self.residual_projection1 = nn.Linear(self.config.hidden_size, num_labels_head1)
#             self.residual_projection2 = nn.Linear(self.config.hidden_size, num_labels_head2)

#     def forward(
#         self,
#         input_ids=None,
#         attention_mask=None,
#         labels=None,
#         return_dict=None,
#         **kwargs
#     ):
#         outputs = self.roberta(
#             input_ids=input_ids,
#             attention_mask=attention_mask,
#             return_dict=True
#         )

#         sequence_output = outputs.last_hidden_state
#         pooled_output = sequence_output[:, 0, :]  # [CLS] token
#         pooled_output = self.dropout(pooled_output)

#         # Forward pass through enhanced heads
#         head1_output = self.head1(pooled_output)
#         head2_output = self.head2(pooled_output)

#         # Add residual connections if enabled
#         if self.use_residual:
#             residual1 = self.residual_projection1(pooled_output)
#             residual2 = self.residual_projection2(pooled_output)
#             logits_head1 = head1_output + residual1
#             logits_head2 = head2_output + residual2
#         else:
#             logits_head1 = head1_output
#             logits_head2 = head2_output

#         loss = None
#         if labels is not None:
#             loss_fct = nn.CrossEntropyLoss()
#             if labels.dim() == 2:
#                 loss_head1 = loss_fct(logits_head1, labels[:, 0])
#                 loss_head2 = loss_fct(logits_head2, labels[:, 1])
#                 loss = (loss_head1 + loss_head2) / 2
#             else:
#                 print(f"Unexpected label shape: {labels.shape}")

#         return SequenceClassifierOutput(
#             loss=loss,
#             logits=(logits_head1, logits_head2),
#             hidden_states=outputs.hidden_states,
#             attentions=outputs.attentions,
#         )


# # Load tokenizer
# tokenizer = AutoTokenizer.from_pretrained("roberta-base")

# def clean(text):
#     # Add your text cleaning function here
#     # This is a placeholder - use your actual cleaning logic
#     return str(text).strip()

# # Load base model and create the dual-head model
# base_model = AutoModelForSequenceClassification.from_pretrained("roberta-base", num_labels=2)
# model = RobertaWithTwoHeads(base_model)

# # Load the fine-tuned weights from safetensors format
# state_dict = load_file("roberta-dual-head-finetuned/model.safetensors")
# model.load_state_dict(state_dict)
# model.eval()  # Set to evaluation mode

# # Inference loop
# df = pd.read_csv("/kaggle/input/Val_Task_B.csv")
# correct_hate = 0
# correct_fake = 0
# total = 0

# # If GPU is available
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# model = model.to(device)

# target_mapping = {'I': 0, 'O': 1, 'R': 2}
# severity_mapping = {'L': 0, 'M': 1, 'H': 2}
# target_reverse_mapping = {0: 'I', 1: 'O', 2: 'R'}
# severity_reverse_mapping = {0: 'L', 1: 'M', 2: 'H'}

# for _, row in df.iterrows():
#     if(row["Target"] not in ["I","O","R"] or row["Severity"] not in ["L", "M", "H"]):
#         continue
#     text = clean(str(row["Tweet"]))
#     inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512)
    
#     # Move inputs to the same device as model
#     inputs = {k: v.to(device) for k, v in inputs.items()}
    
#     # Perform inference
#     with torch.no_grad():
#         outputs = model(**inputs)
        
#     # Extract predictions
#     logits_head1, logits_head2 = outputs.logits
    
#     # Get predicted classes
#     pred_head1 = torch.argmax(logits_head1, dim=1)
#     pred_head2 = torch.argmax(logits_head2, dim=1)
    
#     # Move predictions to CPU for comparison with pandas data
#     pred_head1 = pred_head1.cpu()
#     pred_head2 = pred_head2.cpu()
    
#     # Get ground truth labels
#     true_hate = int(target_mapping[row["Target"]])
#     true_fake = int(severity_mapping[row["Severity"]])
    
#     # Update counters
#     if pred_head1.item() == true_hate:
#         correct_hate += 1
#     if pred_head2.item() == true_fake:
#         correct_fake += 1
    
#     total += 1

# # Calculate accuracies
# hate_accuracy = correct_hate / total
# fake_accuracy = correct_fake / total
# overall_accuracy = (hate_accuracy + fake_accuracy) / 2

# print(f"Hate Detection Accuracy: {hate_accuracy:.4f}")
# print(f"Fake News Detection Accuracy: {fake_accuracy:.4f}")
# print(f"Overall Accuracy: {overall_accuracy:.4f}")

# # Optional: Save predictions to CSV
# predictions = []
# model.eval()
# with torch.no_grad():
#     for _, row in df.iterrows():
#         if(row["Target"] not in ["I","O","R"] or row["Severity"] not in ["L", "M", "H"]):
#             continue
#         text = clean(str(row["Tweet"]))
#         inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512)
#         inputs = {k: v.to(device) for k, v in inputs.items()}
        
#         outputs = model(**inputs)
#         logits_head1, logits_head2 = outputs.logits
        
#         pred_hate = torch.argmax(logits_head1, dim=1).cpu().item()
#         pred_fake = torch.argmax(logits_head2, dim=1).cpu().item()
        
#         predictions.append({
#             'Tweet': row["Tweet"],
#             'Predicted_Target': target_reverse_mapping[pred_hate],
#             'Predicted_Severity': severity_reverse_mapping[pred_fake],
#             'True_Target': row["Target"],
#             'True_Severity': row["Severity"]
#         })

# predictions_df = pd.DataFrame(predictions)
# predictions_df.to_csv("predictions.csv", index=False)

In [6]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
import torch.nn as nn
from transformers.modeling_outputs import SequenceClassifierOutput
from safetensors.torch import load_file

class RobertaWithTwoHeads(nn.Module):
    def __init__(self, base_model, num_labels_head1=4, num_labels_head2=4, hidden_size=768):
        super(RobertaWithTwoHeads, self).__init__()
        self.roberta = base_model.roberta
        self.dropout = nn.Dropout(base_model.config.hidden_dropout_prob)
        self.config = base_model.config
        
        # Enhanced classification head 1 for hate detection
        self.head1 = nn.Sequential(
            nn.Linear(self.config.hidden_size, hidden_size),
            nn.LayerNorm(hidden_size),
            nn.GELU(),
            nn.Dropout(0.2),
            nn.Linear(hidden_size, hidden_size // 2),
            nn.LayerNorm(hidden_size // 2),
            nn.GELU(),
            nn.Dropout(0.1),
            nn.Linear(hidden_size // 2, num_labels_head1)
        )
        
        # Enhanced classification head 2 for fake news detection
        self.head2 = nn.Sequential(
            nn.Linear(self.config.hidden_size, hidden_size),
            nn.LayerNorm(hidden_size),
            nn.GELU(),
            nn.Dropout(0.2),
            nn.Linear(hidden_size, hidden_size // 2),
            nn.LayerNorm(hidden_size // 2),
            nn.GELU(),
            nn.Dropout(0.1),
            nn.Linear(hidden_size // 2, num_labels_head2)
        )
        
        # Optional: Add residual connections
        self.use_residual = True
        if self.use_residual:
            self.residual_projection1 = nn.Linear(self.config.hidden_size, num_labels_head1)
            self.residual_projection2 = nn.Linear(self.config.hidden_size, num_labels_head2)

    def forward(
        self,
        input_ids=None,
        attention_mask=None,
        labels=None,
        return_dict=None,
        **kwargs
    ):
        outputs = self.roberta(
            input_ids=input_ids,
            attention_mask=attention_mask,
            return_dict=True
        )

        sequence_output = outputs.last_hidden_state
        pooled_output = sequence_output[:, 0, :]  # [CLS] token
        pooled_output = self.dropout(pooled_output)

        # Forward pass through enhanced heads
        head1_output = self.head1(pooled_output)
        head2_output = self.head2(pooled_output)

        # Add residual connections if enabled
        if self.use_residual:
            residual1 = self.residual_projection1(pooled_output)
            residual2 = self.residual_projection2(pooled_output)
            logits_head1 = head1_output + residual1
            logits_head2 = head2_output + residual2
        else:
            logits_head1 = head1_output
            logits_head2 = head2_output

        loss = None
        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            if labels.dim() == 2:
                loss_head1 = loss_fct(logits_head1, labels[:, 0])
                loss_head2 = loss_fct(logits_head2, labels[:, 1])
                loss = (loss_head1 + loss_head2) / 2
            else:
                print(f"Unexpected label shape: {labels.shape}")

        return SequenceClassifierOutput(
            loss=loss,
            logits=(logits_head1, logits_head2),
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )


# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained("roberta-base")

def clean(text):
    # Add your text cleaning function here
    # This is a placeholder - use your actual cleaning logic
    return str(text).strip()

# Load base model and create the dual-head model
base_model = AutoModelForSequenceClassification.from_pretrained("roberta-base", num_labels=3)
model = RobertaWithTwoHeads(base_model)

# Load the fine-tuned weights from safetensors format
state_dict = load_file("roberta-dual-head-finetuned/model.safetensors")
model.load_state_dict(state_dict)
model.eval()  # Set to evaluation mode

target_reverse_mapping = {0: "N/A", 1: 'I', 2: 'O', 3: 'R'}
severity_reverse_mapping = {0: "N/A", 1: 'L', 2: 'M', 3: 'H'}

# Inference loop
df = pd.read_csv("/kaggle/input/Test_Task_B.csv")

# If GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

predictions = []
model.eval()
with torch.no_grad():
    for _, row in df.iterrows():
        text = clean(str(row["Tweet"]))
        inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512)
        inputs = {k: v.to(device) for k, v in inputs.items()}
        
        outputs = model(**inputs)
        logits_head1, logits_head2 = outputs.logits
        
        pred_hate = torch.argmax(logits_head1, dim=1).cpu().item()
        pred_fake = torch.argmax(logits_head2, dim=1).cpu().item()
        
        predictions.append({
            'Id': row["Id"],
            'Tweet': row["Tweet"],
            'Target': target_reverse_mapping[pred_hate],
            'Severity': severity_reverse_mapping[pred_fake]
        })

predictions_df = pd.DataFrame(predictions)
predictions_df.to_csv("KeyboardWarriors_TaskB_run1.csv", index=False)

Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at roberta-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [7]:
print("Done")

Done
