# Spam generation

## Installation & setup

In [None]:
import torch
import torch.nn as nn
from transformers import RobertaTokenizer, RobertaModel, GPT2LMHeadModel

## Tokenizer

In [None]:
tokenizer = RobertaTokenizer.from_pretrained('roberta-base')

In [None]:
tokenizer_config = {
  'padding': 'max_length',
  'truncation': True,
  'max_length': 128,
  'return_tensors': 'pt'
}

## GAN

The GAN model is inspired by TextGail:
- code: https://github.com/qywu/TextGAIL/tree/master,
- article: https://arxiv.org/pdf/2004.13796.

### Generator

In [None]:
class TextGenerator(nn.Module):
  def __init__(self, max_length=tokenizer_config['max_length']):
    super(TextGenerator, self).__init__()

    self.max_length = max_length
    self.model = GPT2LMHeadModel.from_pretrained('gpt2')

  def forward(self, inputs):
    return self.model.generate(**inputs, max_length=self.max_length)

#### On hold: RNN

Simple text generation model from https://www.kaggle.com/code/ab971631/beginners-guide-to-text-generation-pytorch

Useful links:
- https://github.com/purvasingh96/Deep-learning-with-neural-networks/blob/master/Chapter-wise%20code/Code%20-%20PyTorch/6.%20Natural-Language-Processing/8.%20Natural%20Language%20Generation/text-generation-via-rnn-and-lstms-pytorch.ipynb

In [None]:
import torch
import torch.nn as nn
from torch.autograd import Variable

class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, n_layers=1):
        super(RNN, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.n_layers = n_layers

        self.encoder = nn.Embedding(input_size, hidden_size)
        self.gru = nn.GRU(hidden_size*2, hidden_size, n_layers, batch_first=True,
                          bidirectional=False)
        self.decoder = nn.Linear(hidden_size, output_size)

    def forward(self, input, hidden=None):
        if hidden is None:  # Initialize hidden state if not provided
            hidden = self.init_hidden()
        input = self.encoder(input.view(1, -1))
        output, hidden = self.gru(input.view(1, 1, -1), hidden)
        output = self.decoder(output.view(1, -1))
        return output, hidden

    def init_hidden(self):
        return Variable(torch.zeros(self.n_layers, 1, self.hidden_size))

### Discriminator 1 - Spam classifier

In [None]:
class SpamDiscriminator(nn.Module):
  def __init__(self, num_labels=2):
    super(SpamDiscriminator, self).__init__()

    self.model = RobertaModel.from_pretrained('roberta-base')
    self.classifier = nn.Linear(self.model.config.hidden_size, 1)

  def forward(self, input):
    outputs = self.model(**input)

    # Use the hidden states of the [CLS] token for classification
    cls_output = outputs.last_hidden_state[:, 0, :]  # [CLS] token is the first token

    logits = self.classifier(cls_output)
    prob = torch.sigmoid(logits)
    return prob

In [None]:
input_text = "Congratulations! You've won a $1,000 Walmart gift card."

inputs = tokenizer(input_text, return_tensors='pt', padding=True, truncation=True)

model = SpamDiscriminator()

model.eval()
with torch.no_grad():
    prob = model(inputs).item()

threshold = 0.5
predicted_class = 1 if prob >= threshold else 0

print(f"Predicted class: {SPAM_ID_2_LABEL_MAP[predicted_class]} with probability {prob:.4f}")

Some weights of RobertaModel were not initialized from the model checkpoint at roberta-base and are newly initialized: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Predicted class: spam with probability 0.5621


### Discriminator 2 - Detector of generated text

In [None]:
class AiGeneratedDiscriminator(nn.Module):
  def __init__(self, num_labels=2):
    super(AiGeneratedDiscriminator, self).__init__()

    self.model = RobertaModel.from_pretrained('roberta-base')
    self.classifier = nn.Linear(self.model.config.hidden_size, 1)

  def forward(self, input):
    outputs = self.model(**input)

    # Use the hidden states of the [CLS] token for classification
    cls_output = outputs.last_hidden_state[:, 0, :]  # [CLS] token is the first token

    logits = self.classifier(cls_output)
    prob = torch.sigmoid(logits)
    return prob

## Dataset

https://huggingface.co/datasets/TrainingDataPro/email-spam-classification

In [None]:
!pip install datasets

Collecting datasets
  Downloading datasets-3.2.0-py3-none-any.whl.metadata (20 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py311-none-any.whl.metadata (7.2 kB)
Collecting fsspec<=2024.9.0,>=2023.1.0 (from fsspec[http]<=2024.9.0,>=2023.1.0->datasets)
  Downloading fsspec-2024.9.0-py3-none-any.whl.metadata (11 kB)
Downloading datasets-3.2.0-py3-none-any.whl (480 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m480.6/480.6 kB[0m [31m8.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m8.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading fsspec-2024.9.0-py3-none-any.whl (1

In [None]:
from datasets import load_dataset

dataset = load_dataset("TrainingDataPro/email-spam-classification")

test_size = 0.2

if 'train' in dataset:
    dataset_split = dataset['train'].train_test_split(test_size=test_size)
else:
    raise ValueError("The dataset does not have a train split.")

train_dataset = dataset_split['train']
test_dataset = dataset_split['test']

print(f"Number of training samples: {len(train_dataset)}")
print(f"Number of testing samples: {len(test_dataset)}")

Number of training samples: 67
Number of testing samples: 17


In [None]:
SPAM_LABEL = 'spam'
NO_SPAM_LABEL = 'not spam'

SPAM_LABEL_2_ID_MAP = {
    NO_SPAM_LABEL: 0,
    SPAM_LABEL: 1
}

SPAM_ID_2_LABEL_MAP = {
    0: NO_SPAM_LABEL,
    1: SPAM_LABEL
}

In [None]:
def tokenize_function(item):
    # text_vector = tokenizer(item['text'], **tokenizer_config),
    # title_vector = tokenizer(item['title'], **tokenizer_config),
    return {
        'text_vector': tokenizer(item['text'], **tokenizer_config),
        'title_vector': tokenizer(item['title'], **tokenizer_config),
        'type': item['type'],
        'text': item['text'],
        'title': item['title'],
        'label': SPAM_LABEL_2_ID_MAP[item['type']]
    }

train_tokenized_dataset = train_dataset.map(tokenize_function)
test_tokenized_dataset = test_dataset.map(tokenize_function)

Map:   0%|          | 0/67 [00:00<?, ? examples/s]

Map:   0%|          | 0/17 [00:00<?, ? examples/s]

In [None]:
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence

batch_size = 16

train_loader = DataLoader(
    train_tokenized_dataset, batch_size=batch_size, shuffle=True, drop_last=True)

test_loader = DataLoader(
    test_tokenized_dataset, batch_size=batch_size, shuffle=False, drop_last=True)

## Training setup

In [None]:
import torch.optim as optim

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize models

generator = TextGenerator().to(device)
discriminator_A = SpamDiscriminator().to(device)
discriminator_B = AiGeneratedDiscriminator().to(device)

# Optimizers

optimizer_G = optim.Adam(generator.parameters(), lr=0.001)
optimizer_D_A = optim.Adam(discriminator_A.parameters(), lr=0.001)
optimizer_D_B = optim.Adam(discriminator_B.parameters(), lr=0.001)

# Loss functions

criterion = nn.BCELoss()

Some weights of RobertaModel were not initialized from the model checkpoint at roberta-base and are newly initialized: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Some weights of RobertaModel were not initialized from the model checkpoint at roberta-base and are newly initialized: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


## TODO: Training loop  

In [None]:
from tqdm import tqdm

num_epochs = 5

for epoch in range(num_epochs):
  for batch in tqdm(train_loader):
      text_input_ids_tensor = torch.stack(batch['text_vector']['input_ids'])  # or torch.cat
      text_attention_mask_tensor = torch.stack(batch['text_vector']['attention_mask'])  # or torch.cat

      title_input_ids_tensor = torch.stack(batch['title_vector']['input_ids'])  # or torch.cat
      title_attention_mask_tensor = torch.stack(batch['title_vector']['attention_mask'])  # or torch.cat

      real_text = batch['text_vector']
      fake_text = generator(batch['title_vector']) # Generate text corpus for given titles

      # Train Discriminator A - Spam detector
      optimizer_D_A.zero_grad()
      output_A_real = discriminator_A({
          'input_ids': text_input_ids_tensor, 'attention_mask': text_attention_mask_tensor
      })
      output_A_fake = discriminator_A({
          'input_ids': fake_text
      })
      loss_D_A = criterion(output_A_real.view(-1), batch['label']) + \
                  criterion(output_A_fake.view(-1), torch.ones(batch_size))
      loss_D_A.backward()
      optimizer_D_A.step()

      # Train Discriminator B - AI-Generated text detector
      optimizer_D_B.zero_grad()
      output_B_real = discriminator_B(real_text)
      output_B_fake = discriminator_B(fake_text)
      loss_D_B = criterion(output_B_real.view(-1), torch.zeros(batch_size)) + \
                  criterion(output_B_fake.view(-1), torch.ones(batch_size))
      loss_D_B.backward()
      optimizer_D_B.step()

      # Train Generator
      optimizer_G.zero_grad()
      output_A_fake = discriminator_A(fake_text)
      output_B_fake = discriminator_B(fake_text)
      loss_G = criterion(output_A_fake.view(-1), torch.zeros(batch_size)) + \
                criterion(output_B_fake.view(-1), torch.zeros(batch_size))
      loss_G.backward()
      optimizer_G.step()

  print(f'Epoch [{epoch}/{num_epochs}], Loss D_A: {loss_D_A.item()}, Loss D_B: {loss_D_B.item()}, Loss G: {loss_G.item()}')

  0%|          | 0/4 [00:00<?, ?it/s]


TypeError: expected Tensor as element 0 in argument 0, but got list

## Test