In [None]:
"""Irony classification using transformers"""

In [None]:
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)

Mounted at /content/gdrive


In [None]:
import pandas as pd
import numpy as np

In [None]:
# Use StringIO to simulate a file object
train_df = pd.read_csv('gdrive/My Drive/anlp_project/SemEval2018-T3-train-taskA_emoji.txt', sep='\t')
test_df = pd.read_csv('gdrive/My Drive/anlp_project/SemEval2018-T3_gold_test_taskA_emoji.txt', sep='\t')

In [None]:
import re

#very basic preprocessing

def preprocess_tweet(tweet):

    # replace possible sarcasm expressions with possiblity
    re.sub(r'\. \.\.', ' possibility ', tweet)
    re.sub(r'\. \.\.\.', ' possibility ', tweet)

     # Replace patterns like "250,000", "3,600" with "NUM"
    #text = re.sub(r'\b\d{1,3}(?:,\d{3})*(?!\d)', 'num', tweet)

    # Replace urls
    tweet = re.sub(r"http\S+|www\S+|https\S+", 'URL', tweet, flags=re.MULTILINE)

    #replace numbers
    #tweet = re.sub(r'^\d+$', 'num', tweet)
    tweet = re.sub(r'\b\d+\b', 'num', tweet)


    return tweet


In [None]:
train_df['text_prep'] = train_df['Tweet text'].apply(lambda x: preprocess_tweet(x))
test_df['text_prep'] = test_df['Tweet text'].apply(lambda x: preprocess_tweet(x))

In [None]:
train_tweets = train_df['text_prep'].tolist()

In [None]:
from tokenizers import Tokenizer
# Load tokenizer
tokenizer_save_path = 'gdrive/My Drive/anlp_project/transformers/tokenizer.json'
tokenizer = Tokenizer.from_file(tokenizer_save_path)

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader

class TweetDataset(Dataset):
    def __init__(self, tweets, labels, tokenizer, max_len):
        self.tweets = tweets
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.tweets)

    def __getitem__(self, idx):
        tweet = self.tweets[idx]
        label = self.labels[idx]
        encoding = self.tokenizer.encode(tweet)
        tokens = encoding.ids

        # Truncate or pad the tokens
        if len(tokens) > self.max_len:
            tokens = tokens[:self.max_len]
        else:
            tokens += [0] * (self.max_len - len(tokens))

        # The attention mask should have 1 for real tokens and 0 for padding
        attention_mask = [1 if token_id > 0 else 0 for token_id in tokens]

        return {
            'input_ids': torch.tensor(tokens),
            'attention_mask': torch.tensor(attention_mask),
            'label': torch.tensor(label, dtype=torch.float)
        }

# Example
tweets = train_tweets
labels = train_df['Label'].tolist()  # 0 for non-ironic and 1 for ironic
dataset = TweetDataset(tweets, labels, tokenizer, max_len=230)
loader = DataLoader(dataset, batch_size=32, shuffle=True)


In [None]:
test_tweets = test_df['text_prep'].tolist()
test_labels = test_df['Label'].tolist()
test_dataset = TweetDataset(test_tweets, test_labels, tokenizer, max_len=230)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=True)

In [None]:
import math
import torch
from torch import nn, Tensor
from torch.nn import TransformerEncoder, TransformerEncoderLayer
from torch.utils.data import dataset


class TransformerModel(nn.Module):
    def __init__(self, ntoken: int, d_model: int, nhead: int, d_hid: int, nlayers: int, dropout: float = 0.5):
        super().__init__()
        self.model_type = 'Transformer'
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        encoder_layers = TransformerEncoderLayer(d_model, nhead, d_hid, dropout)
        self.transformer_encoder = TransformerEncoder(encoder_layers, nlayers)
        self.embedding = nn.Embedding(ntoken, d_model)
        self.d_model = d_model

        # Modified: Added a pooling layer and changed the output size
        #self.pooling = nn.AdaptiveAvgPool1d(1)
        self.linear = nn.Linear(d_model, 1)
        self.sigmoid = nn.Sigmoid()

        self.init_weights()

    def init_weights(self) -> None:
        initrange = 0.1
        self.embedding.weight.data.uniform_(-initrange, initrange)
        self.linear.bias.data.zero_()
        self.linear.weight.data.uniform_(-initrange, initrange)

    def forward(self, src: Tensor, src_mask: Tensor = None) -> Tensor:
        src = src.permute(1, 0)  # Make it [seq_len, batch_size]
        src = self.embedding(src) * math.sqrt(self.d_model)
        src = self.pos_encoder(src)
        output = self.transformer_encoder(src, src_mask)

        # Modified: Added pooling to obtain a fixed-size representation for the entire sequence
        #pooled_output = self.pooling(output.permute(1, 2, 0)).squeeze(-1)
        pooled_output = output.mean(dim=0)
        logits = self.linear(pooled_output).squeeze(-1)
        output = self.sigmoid(logits)
        return output

class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)
        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x: Tensor) -> Tensor:
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)


In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
ntokens = tokenizer.get_vocab_size()  # size of vocabulary
emsize = 200  # embedding dimension
d_hid = 200  # dimension of the feedforward network model in ``nn.TransformerEncoder``
nlayers = 2  # number of ``nn.TransformerEncoderLayer`` in ``nn.TransformerEncoder``
nhead = 2  # number of heads in ``nn.MultiheadAttention``
dropout = 0.1  # dropout probability
model = TransformerModel(ntokens, emsize, nhead, d_hid, nlayers, dropout).to(device)

In [None]:
model.to(device)

TransformerModel(
  (pos_encoder): PositionalEncoding(
    (dropout): Dropout(p=0.1, inplace=False)
  )
  (transformer_encoder): TransformerEncoder(
    (layers): ModuleList(
      (0-1): 2 x TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=200, out_features=200, bias=True)
        )
        (linear1): Linear(in_features=200, out_features=200, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
        (linear2): Linear(in_features=200, out_features=200, bias=True)
        (norm1): LayerNorm((200,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((200,), eps=1e-05, elementwise_affine=True)
        (dropout1): Dropout(p=0.1, inplace=False)
        (dropout2): Dropout(p=0.1, inplace=False)
      )
    )
  )
  (embedding): Embedding(25280, 200)
  (linear): Linear(in_features=200, out_features=1, bias=True)
  (sigmoid): Sigmoid()
)

In [None]:
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score
from torch.nn import BCELoss

# Assuming you have set up your model, optimizer, and loss criterion
optimizer = torch.optim.AdamW(model.parameters(), lr=0.0001)
criterion = BCELoss().to(device)


epochs = 6
for epoch in range(epochs):
    model.train()
    all_predictions = []
    all_labels = []
    train_probabilities = []
    total_loss = 0.0
    for batch in loader:
        input_ids = batch['input_ids'].to(device)
        #attention_mask = batch['attention_mask'].to(device)
        labels = batch['label'].to(device)

        outputs = model(input_ids).squeeze()
        #outputs = outputs[-1].squeeze(dim=-1)
        loss = criterion(outputs, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Get the predictions
        predictions = (outputs > 0.5).int()
        all_predictions.extend(predictions.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())
        total_loss += loss.item()
        #probs = torch.nn.functional.softmax(outputs.detach(), dim=1)[:, 1].cpu().numpy()
        train_probabilities.extend(outputs.detach().cpu().numpy())

    avg_loss = total_loss / len(loader)
    accuracy = accuracy_score(all_labels, all_predictions)
    f1 = f1_score(all_labels, all_predictions)
    # Compute ROC AUC score for the positive class
    train_auc = roc_auc_score(all_labels, train_probabilities)
    #auc = roc_auc_score(all_labels, [output_probs[1] for output_probs in torch.nn.functional.softmax(outputs.detach(), dim=1).cpu().numpy()])

    print(f"Epoch {epoch + 1}/{epochs} Loss: {avg_loss:.4f} Accuracy: {accuracy:.4f} F1: {f1:.4f}, AUC: {train_auc:.4f}")


Epoch 1/6 Loss: 0.6880 Accuracy: 0.5426 F1: 0.5584, AUC: 0.5570
Epoch 2/6 Loss: 0.6416 Accuracy: 0.6369 F1: 0.6419, AUC: 0.6808
Epoch 3/6 Loss: 0.5629 Accuracy: 0.7066 F1: 0.7005, AUC: 0.7836
Epoch 4/6 Loss: 0.4186 Accuracy: 0.8019 F1: 0.8007, AUC: 0.8909
Epoch 5/6 Loss: 0.2465 Accuracy: 0.9039 F1: 0.9038, AUC: 0.9632
Epoch 6/6 Loss: 0.1371 Accuracy: 0.9492 F1: 0.9489, AUC: 0.9877


In [None]:
model = TransformerModel(ntokens, emsize, nhead, d_hid, nlayers, dropout)
model.load_state_dict(torch.load(model_save_path))
model.to(device)

TransformerModel(
  (pos_encoder): PositionalEncoding(
    (dropout): Dropout(p=0.1, inplace=False)
  )
  (transformer_encoder): TransformerEncoder(
    (layers): ModuleList(
      (0-1): 2 x TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=200, out_features=200, bias=True)
        )
        (linear1): Linear(in_features=200, out_features=200, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
        (linear2): Linear(in_features=200, out_features=200, bias=True)
        (norm1): LayerNorm((200,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((200,), eps=1e-05, elementwise_affine=True)
        (dropout1): Dropout(p=0.1, inplace=False)
        (dropout2): Dropout(p=0.1, inplace=False)
      )
    )
  )
  (embedding): Embedding(25280, 200)
  (linear): Linear(in_features=200, out_features=1, bias=True)
  (sigmoid): Sigmoid()
)

In [None]:
# Evaluation
model.eval()
test_predictions = []
test_labels = []
test_probabilities = []
with torch.no_grad():
  for batch in test_loader:
    input_ids = batch['input_ids'].to(device)
    #attention_mask = batch['attention_mask'].to(device)
    labels = batch['label'].to(device)

    outputs = model(input_ids).squeeze()

    predictions = (outputs > 0.5).int()
    test_predictions.extend(predictions.cpu().numpy())
    test_labels.extend(labels.cpu().numpy())
    #probs = torch.nn.functional.softmax(outputs, dim=1)[:, 1].cpu().numpy()
    test_probabilities.extend(outputs.detach().cpu().numpy())

  test_accuracy = accuracy_score(test_labels, test_predictions)
  test_f1 = f1_score(test_labels, test_predictions)
  # Compute ROC AUC score for the positive class for the test set
  test_auc = roc_auc_score(test_labels, test_probabilities)
  print(f"Test Accuracy: {test_accuracy:.4f} Test F1: {test_f1:.4f} Test AUC: {test_auc:.4f}")

Test Accuracy: 0.6454 Test F1: 0.6128 Test AUC: 0.7028


In [None]:
model_save_path = 'gdrive/My Drive/anlp_project/transformers/transformer_model_weights.pth'
# Save model weights
torch.save(model.state_dict(), model_save_path)