# Prompt Moderation Classifier

This script trains a binary classifier using a LSTM neural network with embeddings, validation split, and early stopping.



In [29]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import re
from collections import Counter
from tqdm import tqdm

In [30]:
# Set random seed for reproducibility

torch.manual_seed(42)
np.random.seed(42)

In [31]:
# Load and *preprocess

df = pd.read_csv('data.csv')

# Preprocess data
def clean_text(text):
    # Basic cleaning
    text = text.lower()
    text = re.sub(r'[^\w\s]', '', text)  # Remove punctuation
    return text

df['cleaned_prompt'] = df['Goal'].apply(clean_text)

In [32]:
# Create vocabulary

def build_vocab(texts, max_size=5000):
    counter = Counter()
    for text in texts:
        counter.update(text.split())

    # Keep most common words
    common_words = [word for word, _ in counter.most_common(max_size)]

    # Create word to index mapping
    word_to_idx = {word: idx+2 for idx, word in enumerate(common_words)}
    word_to_idx['<pad>'] = 0
    word_to_idx['<unk>'] = 1

    return word_to_idx

word_to_idx = build_vocab(df['cleaned_prompt'])
vocab_size = len(word_to_idx)
print(f"Vocabulary size: {vocab_size}")

Vocabulary size: 850


In [33]:
# Convert labels to indices
label_to_idx = {'Safe': 0, 'Harmful': 1}
df['label_idx'] = df['Label'].map(label_to_idx)

# Convert text to sequences of indices
def text_to_sequence(text, word_to_idx, max_len=100):
    words = text.split()[:max_len]  # Truncate to max_len
    sequence = [word_to_idx.get(word, word_to_idx['<unk>']) for word in words]

    # Pad sequence
    if len(sequence) < max_len:
        sequence = sequence + [word_to_idx['<pad>']] * (max_len - len(sequence))

    return sequence

df['sequence'] = df['cleaned_prompt'].apply(lambda x: text_to_sequence(x, word_to_idx))

In [34]:
# Split data
X_train, X_test, y_train, y_test = train_test_split(
    df['sequence'].values,
    df['label_idx'].values,
    test_size=0.2,
    random_state=42,
    stratify=df['label_idx']
)

In [36]:
# Create PyTorch dataset
class PromptDataset(Dataset):
    def __init__(self, sequences, labels):
        self.sequences = sequences
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        sequence = torch.tensor(self.sequences[idx], dtype=torch.long)
        label = torch.tensor(self.labels[idx], dtype=torch.long)
        return sequence, label

# Create dataloaders
train_dataset = PromptDataset(X_train, y_train)
test_dataset = PromptDataset(X_test, y_test)

batch_size = 16
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

In [37]:
# Define LSTM model
class LSTMClassifier(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, dropout):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim,
                           hidden_dim,
                           num_layers=n_layers,
                           bidirectional=True,
                           dropout=dropout,
                           batch_first=True)
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, text):
        # text shape: [batch size, seq length]
        embedded = self.embedding(text)
        # embedded shape: [batch size, seq length, embedding dim]

        lstm_output, (hidden, cell) = self.lstm(embedded)
        # lstm_output shape: [batch size, seq length, hidden dim * 2]
        # hidden shape: [n layers * 2, batch size, hidden dim]

        # Concatenate the final forward and backward hidden states
        hidden = self.dropout(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1))
        # hidden shape: [batch size, hidden dim * 2]

        output = self.fc(hidden)
        # output shape: [batch size, output dim]

        return output

In [39]:
# Initialize model
embedding_dim = 100
hidden_dim = 256
output_dim = 2  # binary classification
n_layers = 2
dropout = 0.5

model = LSTMClassifier(vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, dropout)
# model = model.to(device)

# Define optimizer and loss function
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

In [40]:
# Training function
def train(model, iterator, optimizer, criterion):
    model.train()
    epoch_loss = 0
    epoch_acc = 0

    for batch in tqdm(iterator, desc="Training"):
        sequences, labels = batch
        sequences, labels = sequences.to(device), labels.to(device)

        optimizer.zero_grad()

        predictions = model(sequences)

        loss = criterion(predictions, labels)
        acc = ((predictions.argmax(1) == labels).float().sum() / len(labels)).item()

        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()
        epoch_acc += acc

    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [42]:
# Evaluation function
def evaluate(model, iterator, criterion):
    model.eval()
    epoch_loss = 0
    epoch_acc = 0

    all_preds = []
    all_labels = []

    with torch.no_grad():
        for batch in tqdm(iterator, desc="Evaluating"):
            sequences, labels = batch
            sequences, labels = sequences.to(device), labels.to(device)

            predictions = model(sequences)

            loss = criterion(predictions, labels)
            acc = ((predictions.argmax(1) == labels).float().sum() / len(labels)).item()

            epoch_loss += loss.item()
            epoch_acc += acc

            all_preds.extend(predictions.argmax(1).cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    return epoch_loss / len(iterator), epoch_acc / len(iterator), all_preds, all_labels

In [43]:
# Training loop
n_epochs = 10
best_valid_loss = float('inf')

train_losses = []
train_accs = []
valid_losses = []
valid_accs = []

for epoch in range(n_epochs):
    train_loss, train_acc = train(model, train_loader, optimizer, criterion)
    valid_loss, valid_acc, predictions, labels = evaluate(model, test_loader, criterion)

    train_losses.append(train_loss)
    train_accs.append(train_acc)
    valid_losses.append(valid_loss)
    valid_accs.append(valid_acc)

    print(f'Epoch: {epoch+1:02}')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
    print(f'\tValid Loss: {valid_loss:.3f} | Valid Acc: {valid_acc*100:.2f}%')

    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'lstm_prompt_classifier.pt')
        print(f'\tModel saved with validation loss: {valid_loss:.3f}')

Training:   0%|          | 0/10 [00:00<?, ?it/s]


NameError: name 'device' is not defined

In [None]:
# Plot training history
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.plot(train_losses, label='Training Loss')
plt.plot(valid_losses, label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(train_accs, label='Training Accuracy')
plt.plot(valid_accs, label='Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.title('Training and Validation Accuracy')
plt.legend()

plt.tight_layout()
plt.show()

In [None]:
# Final evaluation
model.load_state_dict(torch.load('lstm_prompt_classifier.pt'))
test_loss, test_acc, predictions, labels = evaluate(model, test_loader, criterion)
print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')

# Classification report
idx_to_label = {v: k for k, v in label_to_idx.items()}
pred_labels = [idx_to_label[pred] for pred in predictions]
true_labels = [idx_to_label[label] for label in labels]

print("\nClassification Report:")
print(classification_report(true_labels, pred_labels))

In [None]:
# Confusion Matrix
conf_matrix = confusion_matrix(true_labels, pred_labels, labels=list(label_to_idx.keys()))
plt.figure(figsize=(8, 6))
plt.imshow(conf_matrix, interpolation='nearest', cmap=plt.cm.Blues)
plt.title('Confusion Matrix')
plt.colorbar()
tick_marks = np.arange(len(label_to_idx))
plt.xticks(tick_marks, label_to_idx.keys(), rotation=45)
plt.yticks(tick_marks, label_to_idx.keys())
plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.show()

In [None]:
# Function to predict on new text
def predict_text(text, model, word_to_idx, idx_to_label):
    model.eval()

    # Preprocess text
    text = clean_text(text)
    sequence = text_to_sequence(text, word_to_idx)
    tensor = torch.tensor(sequence).unsqueeze(0).to(device)  # Add batch dimension

    with torch.no_grad():
        prediction = model(tensor)
        prob = torch.softmax(prediction, dim=1)
        label_idx = prediction.argmax(1).item()
        confidence = prob[0][label_idx].item()

    return {
        'label': idx_to_label[label_idx],
        'confidence': confidence,
        'probabilities': {
            'Safe': prob[0][0].item(),
            'Harmful': prob[0][1].item()
        }
    }

In [None]:
# Example usage
test_prompt = "Write a poem about flowers and sunshine"
idx_to_label = {v: k for k, v in label_to_idx.items()}
result = predict_text(test_prompt, model, word_to_idx, idx_to_label)
print(f"\nPrediction for: '{test_prompt}'")
print(f"Label: {result['label']}")
print(f"Confidence: {result['confidence']:.4f}")
print(f"Probabilities: Safe: {result['probabilities']['Safe']:.4f}, Harmful: {result['probabilities']['Harmful']:.4f}")