In [None]:
import torch
from torch import nn
from torch.utils.data import Dataset
import pandas as pd
from sklearn.model_selection import train_test_split
from transformers import RobertaModel, RobertaTokenizer

class CustomRobertaClassifier(nn.Module):
    def __init__(self, model_name, num_labels):
        super(CustomRobertaClassifier, self).__init__()
        self.roberta_tweet = RobertaModel.from_pretrained(model_name)
        self.roberta_emoticons = RobertaModel.from_pretrained(model_name)
        self.classifier = nn.Linear(2 * self.roberta_tweet.config.hidden_size, num_labels)

    def forward(self, input_ids_tweet, attention_mask_tweet, input_ids_emoticons, attention_mask_emoticons):
        outputs_tweet = self.roberta_tweet(input_ids=input_ids_tweet, attention_mask=attention_mask_tweet)
        outputs_emoticons = self.roberta_emoticons(input_ids=input_ids_emoticons, attention_mask=attention_mask_emoticons)
        combined_output = torch.cat((outputs_tweet.pooler_output, outputs_emoticons.pooler_output), dim=1)
        logits = self.classifier(combined_output)
        return logits

class TweetEmoticonDataset(Dataset):
    def __init__(self, tweets, emoticons, labels, tokenizer, max_len=128):
        self.tweets = tweets
        self.emoticons = emoticons
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.tweets)

    def __getitem__(self, idx):
        tweet = str(self.tweets[idx])
        emoticon = str(self.emoticons[idx])
        label = int(self.labels[idx])

        encoding_tweet = self.tokenizer.encode_plus(
            tweet, add_special_tokens=True, max_length=self.max_len,
            return_token_type_ids=False, padding='max_length', truncation=True,
            return_attention_mask=True, return_tensors='pt'
        )

        encoding_emoticon = self.tokenizer.encode_plus(
            emoticon, add_special_tokens=True, max_length=self.max_len,
            return_token_type_ids=False, padding='max_length', truncation=True,
            return_attention_mask=True, return_tensors='pt'
        )

        return {
            'input_ids_tweet': encoding_tweet['input_ids'].squeeze(0),
            'attention_mask_tweet': encoding_tweet['attention_mask'].squeeze(0),
            'input_ids_emoticons': encoding_emoticon['input_ids'].squeeze(0),
            'attention_mask_emoticons': encoding_emoticon['attention_mask'].squeeze(0),
            'labels': torch.tensor(label)
        }

def read_data(data_path):
    sampled_data = pd.read_csv(data_path, encoding='latin1')
    sampled_data.iloc[:, 0].replace(4, 1, inplace=True)
    labels = sampled_data.iloc[:, 0].values
    unique_labels = sampled_data.iloc[:, 0].unique()
    print("Unique labels:", unique_labels)
    tweets = sampled_data.iloc[:, -1].values
    emoticons = sampled_data.iloc[:, -2].values
    return labels, tweets, emoticons

def split_into_train_dev_test(labels, tweets, emoticons, test_size=0.1, dev_size=0.1, random_state=42):
    train_tweets, test_tweets, train_labels, test_labels, train_emoticons, test_emoticons = train_test_split(
        tweets, labels, emoticons, test_size=test_size, shuffle=True, random_state=random_state)
    train_tweets, dev_tweets, train_labels, dev_labels, train_emoticons, dev_emoticons = train_test_split(
        train_tweets, train_labels, train_emoticons, test_size=dev_size, shuffle=True, random_state=random_state)
    return train_tweets, dev_tweets, test_tweets, train_labels, dev_labels, test_labels, train_emoticons, dev_emoticons, test_emoticons

data_path = '/content/drive/MyDrive/datasets/orig.csv'  # Update this path
labels, tweets, emoticons = read_data(data_path)
num_labels = len(set(labels))  # Get the number of unique labels

# Split data
train_tweets, dev_tweets, test_tweets, train_labels, dev_labels, test_labels, train_emoticons, dev_emoticons,test_emoticons = split_into_train_dev_test(labels, tweets, emoticons)

tokenizer = RobertaTokenizer.from_pretrained('cardiffnlp/twitter-roberta-base')

# Instantiate the custom datasets
train_dataset = TweetEmoticonDataset(train_tweets, train_emoticons, train_labels, tokenizer)
dev_dataset = TweetEmoticonDataset(dev_tweets, dev_emoticons, dev_labels, tokenizer)
test_dataset = TweetEmoticonDataset(test_tweets, test_emoticons, test_labels, tokenizer)

# Model setup with the custom classifier
model = CustomRobertaClassifier('cardiffnlp/twitter-roberta-base', num_labels=num_labels)

In [None]:
def train_model(model, train_dataset, dev_dataset, test_dataset, tokenizer, device):
    # Move model to the device (GPU or CPU)
    model.to(device)

    # Prepare data loaders
    train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
    dev_loader = DataLoader(dev_dataset, batch_size=16, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

    # Optimizer
    optimizer = AdamW(model.parameters(), lr=5e-5)

    # Number of training epochs
    num_epochs = 3

    # Scheduler for learning rate decay
    num_training_steps = num_epochs * len(train_loader)
    lr_scheduler = get_scheduler(
        name="linear",
        optimizer=optimizer,
        num_warmup_steps=0,
        num_training_steps=num_training_steps
    )

    # Training loop
    model.train()
    for epoch in range(num_epochs):
        for batch in train_loader:
            input_ids_tweet = batch['input_ids_tweet'].to(device)
            attention_mask_tweet = batch['attention_mask_tweet'].to(device)
            input_ids_emoticons = batch['input_ids_emoticons'].to(device)
            attention_mask_emoticons = batch['attention_mask_emoticons'].to(device)
            labels = batch['labels'].to(device)

            # Reset gradients
            optimizer.zero_grad()

            # Forward pass
            outputs = model(input_ids_tweet, attention_mask_tweet, input_ids_emoticons, attention_mask_emoticons)
            logits = outputs

            # Compute loss
            loss_fn = nn.CrossEntropyLoss()
            loss = loss_fn(logits, labels)

            # Backpropagation
            loss.backward()
            optimizer.step()
            lr_scheduler.step()

        # Evaluate on development set for training loss
        model.eval()
        total_dev_loss = 0
        with torch.no_grad():
            for batch in dev_loader:
                input_ids_tweet = batch['input_ids_tweet'].to(device)
                attention_mask_tweet = batch['attention_mask_tweet'].to(device)
                input_ids_emoticons = batch['input_ids_emoticons'].to(device)
                attention_mask_emoticons = batch['attention_mask_emoticons'].to(device)
                labels = batch['labels'].to(device)

                outputs = model(input_ids_tweet, attention_mask_tweet, input_ids_emoticons, attention_mask_emoticons)
                logits = outputs

                loss = loss_fn(logits, labels)
                total_dev_loss += loss.item()

        print(f"Epoch {epoch + 1}, Dev Loss: {total_dev_loss / len(dev_loader)}")
        model.train()

    # Evaluate on test set
    model.eval()
    total_test_loss = 0
    total_correct = 0
    with torch.no_grad():
        for batch in test_loader:
            input_ids_tweet = batch['input_ids_tweet'].to(device)
            attention_mask_tweet = batch['attention_mask_tweet'].to(device)
            input_ids_emoticons = batch['input_ids_emoticons'].to(device)
            attention_mask_emoticons = batch['attention_mask_emoticons'].to(device)
            labels = batch['labels'].to(device)

            outputs = model(input_ids_tweet, attention_mask_tweet, input_ids_emoticons, attention_mask_emoticons)
            logits = outputs

            loss = loss_fn(logits, labels)
            total_test_loss += loss.item()

            # Calculate accuracy
            predictions = logits.argmax(dim=-1)
            total_correct += (predictions == labels).sum().item()

    print(f"Test Loss: {total_test_loss / len(test_loader)}")
    print(f"Accuracy: {total_correct / len(test_dataset):.2f}")

# Call the train_model function with appropriate parameters
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Train and evaluate the model
train_model(model, train_dataset, dev_dataset, test_dataset, tokenizer, device)