In [None]:
!pip install datasets

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Subset
from transformers import AutoTokenizer, AutoModelForSequenceClassification, AdamW
from datasets import load_dataset
from tqdm import tqdm
import numpy as np
# Add gradient clipping
from torch.nn.utils import clip_grad_norm_
max_grad_norm = 1.0

class PromptTuningBERT(nn.Module):
    def __init__(self, model_name="bert-base-uncased", num_virtual_tokens=50, max_length=512):
        super().__init__()
        self.bert = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=2)
        self.bert.requires_grad_(False)

        self.n_tokens = num_virtual_tokens
        self.max_length = max_length - num_virtual_tokens

        vocab_size = self.bert.config.vocab_size
        token_ids = torch.randint(0, vocab_size, (num_virtual_tokens,))
        word_embeddings = self.bert.bert.embeddings.word_embeddings
        prompt_embeddings = word_embeddings(token_ids).unsqueeze(0)
        self.prompt_embeddings = nn.Parameter(prompt_embeddings)

    def forward(self, input_ids, attention_mask, labels=None):
        batch_size = input_ids.shape[0]
        input_ids = input_ids[:, :self.max_length]
        attention_mask = attention_mask[:, :self.max_length]

        embeddings = self.bert.bert.embeddings.word_embeddings(input_ids)
        prompt_embeddings = self.prompt_embeddings.expand(batch_size, -1, -1)
        inputs_embeds = torch.cat([prompt_embeddings, embeddings], dim=1)

        prompt_attention_mask = torch.ones(batch_size, self.n_tokens, device=attention_mask.device)
        attention_mask = torch.cat([prompt_attention_mask, attention_mask], dim=1)

        return self.bert(
            inputs_embeds=inputs_embeds,
            attention_mask=attention_mask,
            labels=labels,
            return_dict=True
        )

# Data preparation
dataset = load_dataset("imdb")
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
max_length = 512
num_virtual_tokens = 20

def tokenize_function(examples):
    return tokenizer(
        examples["text"],
        padding="max_length",
        truncation=True,
        max_length=max_length - num_virtual_tokens
    )

# Use only 5000 examples for training
train_size = 5000
np.random.seed(42)
train_indices = np.random.choice(len(dataset["train"]), train_size, replace=False)
test_indices = np.random.choice(len(dataset["test"]), train_size, replace=False)

tokenized_train = dataset["train"].map(tokenize_function, batched=True)
tokenized_test = dataset["test"].map(tokenize_function, batched=True)

# Create subset for training
tokenized_train = tokenized_train.select(train_indices)
tokenized_test = tokenized_test.select(test_indices)

tokenized_train.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])
tokenized_test.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])

train_dataloader = DataLoader(tokenized_train, batch_size=64, shuffle=True)
eval_dataloader = DataLoader(tokenized_test, batch_size=128)

# Define the model
model = PromptTuningBERT(num_virtual_tokens=num_virtual_tokens, max_length=max_length)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
optimizer = AdamW(model.parameters(), lr=1e-2)
num_epochs = 30

# Perform the training
for epoch in range(num_epochs):
    model.train()
    total_train_loss = 0

    for batch in tqdm(train_dataloader, desc=f'Training Epoch {epoch + 1}'):
        batch = {k: v.to(device) for k, v in batch.items()}
        labels = batch.pop('label')
        outputs = model(**batch, labels=labels)

        loss = outputs.loss
        total_train_loss += loss.item()

        loss.backward()
        clip_grad_norm_(model.parameters(), max_grad_norm)
        optimizer.step()
        optimizer.zero_grad()

    model.eval()
    val_accuracy = []
    total_val_loss = 0

    with torch.no_grad():
        for batch in tqdm(eval_dataloader, desc='Validating'):
            batch = {k: v.to(device) for k, v in batch.items()}
            labels = batch.pop('label')

            outputs = model(**batch, labels=labels)
            total_val_loss += outputs.loss.item()

            predictions = torch.argmax(outputs.logits, dim=-1)
            val_accuracy.extend((predictions == labels).cpu().numpy())

    avg_train_loss = total_train_loss / len(train_dataloader)
    avg_val_loss = total_val_loss / len(eval_dataloader)
    val_accuracy = np.mean(val_accuracy)

    print(f"\nEpoch {epoch + 1}:")
    print(f"Average training loss: {avg_train_loss:.4f}")
    print(f"Average validation loss: {avg_val_loss:.4f}")
    print(f"Validation accuracy: {val_accuracy:.4f}")

torch.save(model.prompt_embeddings, "imdb_prompt_embeddings.pt")

In [None]:
torch.save({
    'prompt_embeddings': model.prompt_embeddings,
    'config': {
        'num_virtual_tokens': model.n_tokens,
        'model_name': 'bert-base-uncased'
    }
}, "imdb_prompt_embeddings.pt")

In [None]:
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import random
import numpy as np

class PromptTunedBERTInference:
    def __init__(self, model_name="bert-base-uncased", prompt_path="imdb_prompt_embeddings.pt"):
        # Set seeds for reproducibility
        torch.manual_seed(42)
        random.seed(42)
        np.random.seed(42)
        if torch.cuda.is_available():
            torch.cuda.manual_seed_all(42)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False

        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        print(f"Using device: {self.device}")

        # Load saved state
        try:
            saved_state = torch.load(prompt_path, map_location=self.device)
            if isinstance(saved_state, dict) and 'prompt_embeddings' in saved_state:
                self.prompt_embeddings = saved_state['prompt_embeddings']
                config = saved_state['config']
                print(f"Loaded config: {config}")
            else:
                self.prompt_embeddings = saved_state  # Old format
            print(f"Loaded prompt embeddings shape: {self.prompt_embeddings.shape}")
            print(f"Prompt embeddings sum: {self.prompt_embeddings.sum().item()}")
        except Exception as e:
            print(f"Error loading prompt embeddings: {e}")
            raise

        # Initialize model and tokenizer
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=2)

        # Move to device and eval mode
        self.model.to(self.device)
        self.model.eval()
        self.prompt_embeddings = self.prompt_embeddings.to(self.device)

        # Freeze everything
        self.model.requires_grad_(False)

    def predict(self, text):
        with torch.no_grad():
            # Tokenize
            max_length = 512 - self.prompt_embeddings.shape[1]
            inputs = self.tokenizer(text, padding=True, truncation=True,
                                  max_length=max_length,
                                  return_tensors="pt")
            inputs = {k: v.to(self.device) for k, v in inputs.items()}

            # Get embeddings
            embeddings = self.model.bert.embeddings.word_embeddings(inputs['input_ids'])
            batch_size = embeddings.shape[0]
            prompt_embeds = self.prompt_embeddings.expand(batch_size, -1, -1)
            inputs_embeds = torch.cat([prompt_embeds, embeddings], dim=1)

            # Create attention mask
            attention_mask = inputs['attention_mask']
            prompt_attention = torch.ones(batch_size, self.prompt_embeddings.shape[1],
                                        device=self.device)
            attention_mask = torch.cat([prompt_attention, attention_mask], dim=1)

            # Forward pass
            outputs = self.model(inputs_embeds=inputs_embeds,
                               attention_mask=attention_mask)

            # Get probabilities
            probs = torch.nn.functional.softmax(outputs.logits, dim=-1)
            prediction = 1 - outputs.logits.argmax(-1).item()

            return {
                "prediction": prediction,
                "positive_prob": probs[0][0].item(),
                "negative_prob": probs[0][1].item(),
                "confidence": probs[0][1 - prediction].item(),
                "logits": outputs.logits[0].tolist()
            }

def test_consistency(model, num_runs=5):
    test_review = "This movie was absolutely brilliant! The acting was superb and the story kept me on the edge of my seat. A must-watch!"
    results = []

    print("\nTesting same review multiple times for consistency:")
    for i in range(num_runs):
        result = model.predict(test_review)
        sentiment = "Positive" if result['prediction'] == 1 else "Negative"
        results.append({
            'prediction': sentiment,
            'positive_prob': result['positive_prob'],
            'negative_prob': result['negative_prob'],
            'logits': result['logits']
        })
        print(f"\nRun {i+1}")
        print(f"Prediction: {sentiment}")
        print(f"Logits: {result['logits']}")
        print(f"Positive prob: {result['positive_prob']:.3f}")
        print(f"Negative prob: {result['negative_prob']:.3f}")

    # Check consistency
    predictions = [r['prediction'] for r in results]
    logits = [r['logits'] for r in results]
    print("\nConsistency Check:")
    print(f"Predictions same: {len(set(predictions)) == 1}")
    print(f"Logits consistent: {all(logits[0] == l for l in logits)}")

if __name__ == "__main__":
    model = PromptTunedBERTInference()
    test_consistency(model)

In [None]:
def test_multiple_reviews(model):
    test_reviews = [
        {
            "text": "This movie was absolutely brilliant! The acting was superb and the story kept me on the edge of my seat. A must-watch!",
            "expected": "Positive"
        },
        {
            "text": "What a complete waste of time. Bad acting, terrible plot, and the ending made no sense at all. Don't bother watching.",
            "expected": "Negative"
        },
        {
            "text": "It was okay. Some good moments but nothing special. Probably wouldn't watch it again.",
            "expected": "Mixed/Neutral"
        },
        {
            "text": "The best film I've seen this year! The cinematography was breathtaking and the script was perfect. Going to watch it again!",
            "expected": "Positive"
        },
        {
            "text": "Horrible experience. The theater was empty for a reason - this movie is just awful. Save your money.",
            "expected": "Negative"
        },
        {
            "text": "A solid 7/10. Not groundbreaking but entertaining throughout. Good performances from the whole cast.",
            "expected": "Positive"
        },
        {
            "text": "I went in with high expectations but left disappointed. The story had potential but failed to deliver.",
            "expected": "Negative"
        },
        {
            "text": "A masterpiece of modern cinema. Every scene was perfectly crafted. The director outdid themselves.",
            "expected": "Positive"
        }
    ]

    print("\nTesting multiple reviews:")
    print("=" * 100)

    for review in test_reviews:
        result = model.predict(review['text'])
        sentiment = "Positive" if result['prediction'] == 1 else "Negative"

        print(f"\nReview: {review['text']}")
        print(f"Expected sentiment: {review['expected']}")
        print(f"Model prediction: {sentiment}")
        print(f"Confidence scores:")
        print(f"  Positive probability: {result['positive_prob']:.3f}")
        print(f"  Negative probability: {result['negative_prob']:.3f}")
        print(f"  Overall confidence: {result['confidence']:.3f}")
        print("-" * 100)

if __name__ == "__main__":
    model = PromptTunedBERTInference()

    # First test consistency
    test_consistency(model, num_runs=3)

    # Then test multiple reviews
    test_multiple_reviews(model)