### Part 1

In [17]:
from datasets import load_dataset, load_from_disk
import os, torch

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

dataset_path = 'imdb_subset'
# if the path exists - load the dataset from disk, otherwise download it
if os.path.exists(dataset_path):
    subset = load_from_disk(dataset_path)
else:
    dataset = load_dataset('imdb')
    subset = dataset['train'].shuffle(seed=42).select(range(500))
    subset.save_to_disk('imdb_subset')

dataset = subset

### Part 2 - Bert

In [None]:
from transformers import BertTokenizer, BertForSequenceClassification

model_name = 'bert-base-uncased'
tokenizer = BertTokenizer.from_pretrained(model_name)
model = BertForSequenceClassification.from_pretrained(model_name, num_labels=2)

In [None]:
def tokenize_function(example):
    return tokenizer(example["text"], padding="max_length", truncation=True)

dataset = dataset.rename_column("label", "labels")

# Train test split
split_dataset = dataset.train_test_split(test_size=0.2, seed=42)
train_dataset = split_dataset["train"]
test_dataset = split_dataset["test"]

# Train eval split
split_dataset = train_dataset.train_test_split(test_size=0.4, seed=42)
train_dataset = split_dataset["train"]
eval_dataset = split_dataset["test"]

# Apply the tokenization function to the datasets
tokenized_train = train_dataset.map(tokenize_function, batched=True)
tokenized_eval = eval_dataset.map(tokenize_function, batched=True)
tokenized_test = test_dataset.map(tokenize_function, batched=True)

# Set the format for PyTorch
tokenized_train.set_format("torch", columns=["input_ids", "attention_mask", "labels"])
tokenized_eval.set_format("torch", columns=["input_ids", "attention_mask", "labels"])
tokenized_test.set_format("torch", columns=["input_ids", "attention_mask", "labels"])

# Confirm dataset sizes
print(f"Train size: {len(tokenized_train)}")
print(f"Eval size: {len(tokenized_eval)}")
print(f"Test size: {len(tokenized_test)}")

In [None]:
from transformers import TrainingArguments

training_args = TrainingArguments(
    output_dir="./results",               # Directory to save model checkpoints
    evaluation_strategy="epoch",          # Evaluate at the end of each epoch
    learning_rate=2e-5,                   # Learning rate
    per_device_train_batch_size=8,        # Batch size for training
    per_device_eval_batch_size=8,         # Batch size for evaluation
    num_train_epochs=10,                  # Number of epochs
    weight_decay=0.01,                    # Weight decay
    logging_dir="./logs",                 # Directory to save logs
    logging_steps=10,                     # Log every 10 steps
    save_total_limit=2                    # Limit the number of saved checkpoints
)

In [None]:
from transformers import Trainer
import numpy as np
from sklearn.metrics import accuracy_score

def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    preds = np.argmax(predictions, axis=1)  # Get the index of the max logit
    acc = accuracy_score(labels, preds)
    return {"accuracy": acc}

trainer = Trainer(
    model=model,                          # The pre-trained model to fine-tune
    args=training_args,                   # Training arguments
    train_dataset=tokenized_train,        # Training dataset
    eval_dataset=tokenized_eval,          # Evaluation dataset
    tokenizer=tokenizer,                  # Tokenizer for data preprocessing
    compute_metrics=compute_metrics       # Custom metrics function
)

model = model.to(device)

In [None]:
trainer.train()

In [None]:
# Make predictions on the test dataset
predictions = trainer.predict(tokenized_test)

# Extract predicted logits and labels
logits = predictions.predictions
predicted_labels = np.argmax(logits, axis=1)  # Get the indices of the max logits
true_labels = predictions.label_ids

In [None]:
# Calculate accuracy on the test set
test_accuracy = accuracy_score(true_labels, predicted_labels)
print(f"Test Accuracy: {test_accuracy * 100:.2f}%")

### Part 3 - GPT 2

In [25]:
from transformers import GPT2LMHeadModel, GPT2Tokenizer, Trainer, TrainingArguments, DataCollatorForLanguageModeling

def tokenize_reviews(dataset, tokenizer, max_length=150):
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
    
    def tokenize_function(examples):
        return tokenizer(
            examples['text'],
            padding="max_length",
            truncation=True,
            max_length=max_length,
            return_tensors=None  # Default format for datasets
        )
    
    # Apply the tokenization function
    tokenized_dataset = dataset.map(tokenize_function, batched=True)
    
    # Rename "label" to "labels" if needed for compatibility with the Trainer API
    if "label" in tokenized_dataset.column_names:
        tokenized_dataset = tokenized_dataset.rename_column("label", "labels")
    
    return tokenized_dataset

tokenized_dataset = tokenize_reviews(dataset, tokenizer, max_length=150)

positive_tokenized = tokenized_dataset.filter(lambda example: example["labels"] == 1)
negative_tokenized = tokenized_dataset.filter(lambda example: example["labels"] == 0)

In [None]:
def train_save_GPT2(model, tokenizer, tokenized_dataset, prompt='The movie was'):
    if tokenized_dataset["labels"] == 0:
        sentiment = "negative"
    elif tokenized_dataset["labels"] == 1:
        sentiment = "positive"

    training_args = TrainingArguments(
        output_dir=f"./gpt2-{sentiment}-results",
        num_train_epochs=5,
        learning_rate=2e-5,
        per_device_train_batch_size=8,  # batch size per device during training
        per_device_eval_batch_size=8,   # batch size for evaluation
        weight_decay=0.01,              # strength of weight decay
        logging_dir='./logs',      # directory for storing logs
        logging_steps=10,
    )

    data_collator = DataCollatorForLanguageModeling(
        tokenizer=tokenizer,
        mlm=False
    )

    train_test_split = tokenized_dataset.train_test_split(test_size=0.2)
    train_dataset = train_test_split["train"]
    eval_dataset = train_test_split["test"]
    
    # Create a Trainer instance
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
        tokenizer=tokenizer,
        data_collator=data_collator
    )
    
    # Train the model
    trainer.train()
    results = trainer.evaluate()
    print(f'Perplexity: {results["eval_loss"]}')

    # Save the model and tokenizer
    save_directory = f"./gpt2-{sentiment}-results"
    trainer.save_model(save_directory)
    tokenizer.save_pretrained(save_directory)

### Positive

In [26]:
positive_model = GPT2LMHeadModel.from_pretrained("gpt2")
positive_tokenizer = GPT2Tokenizer.from_pretrained("gpt2")

if positive_tokenizer.pad_token is None:
    if positive_tokenizer.eos_token is not None:
        positive_tokenizer.pad_token = positive_tokenizer.eos_token
    else:
        positive_tokenizer.add_special_tokens({'pad_token': '[PAD]'})
        positive_model.resize_token_embeddings(len(positive_tokenizer))

In [None]:
training_args_positive = TrainingArguments(
    output_dir="./positive_gpt2",
    evaluation_strategy="epoch",
    learning_rate=5e-5,
    weight_decay=0.01,
    logging_dir="./logs",
    logging_steps=10,
    per_device_train_batch_size=2,
    per_device_eval_batch_size=2,
    num_train_epochs=5,
    save_total_limit=2
)

In [None]:

positive_trainer = Trainer(
    model=positive_model,
    args=training_args_positive,
    train_dataset=positive_tokenized_train,
    eval_dataset=positive_tokenized_eval,
    tokenizer=positive_tokenizer,
    data_collator=DataCollatorForLanguageModeling(tokenizer=positive_tokenizer, mlm=False)
)

positive_model = positive_model.to(device)

positive_trainer.train()

In [None]:
save_directory_pos = "gpt2-positive"
positive_trainer.model.save_pretrained(save_directory_pos)
positive_tokenizer.save_pretrained(save_directory_pos)

### Negative

In [11]:
negative_model = GPT2LMHeadModel.from_pretrained("gpt2")
negative_tokenizer = GPT2Tokenizer.from_pretrained("gpt2")

if negative_tokenizer.pad_token is None:
    if negative_tokenizer.eos_token is not None:
        negative_tokenizer.pad_token = negative_tokenizer.eos_token
    else:
        negative_tokenizer.add_special_tokens({'pad_token': '[PAD]'})
        negative_model.resize_token_embeddings(len(negative_tokenizer))

In [None]:
training_args_negative = TrainingArguments(
    output_dir="./negative_gpt2",
    evaluation_strategy="epoch",
    learning_rate=5e-5,
    weight_decay=0.01,
    logging_dir="./logs",
    logging_steps=10,
    per_device_train_batch_size=2,
    per_device_eval_batch_size=2,
    num_train_epochs=5,
    save_total_limit=2
)

In [None]:
negative_trainer = Trainer(
    model=negative_model,
    args=training_args_negative,
    train_dataset=negative_tokenized_train,
    eval_dataset=negative_tokenized_eval,
    tokenizer=negative_tokenizer,
    data_collator=DataCollatorForLanguageModeling(tokenizer=negative_tokenizer, mlm=False)
)

negative_model = negative_model.to(device)

negative_trainer.train()

In [None]:
save_directory_neg = "gpt2-negative"
negative_trainer.model.save_pretrained(save_directory_neg)
negative_tokenizer.save_pretrained(save_directory_neg)

### AFTER

In [15]:
positive_model = GPT2LMHeadModel.from_pretrained(save_directory_pos)
positive_tokenizer = GPT2Tokenizer.from_pretrained(save_directory_pos)

negative_model = GPT2LMHeadModel.from_pretrained(save_directory_neg)
negative_tokenizer = GPT2Tokenizer.from_pretrained(save_directory_neg)

In [None]:
# Parameters for text generation
max_length = 150
temperature = 0.7
top_k = 50
top_p = 0.9
repetition_penalty = 1.2
prompt = 'The movie was'

input_ids_pos = positive_tokenizer.encode(prompt, return_tensors="pt")
attention_mask = input_ids_pos.ne(positive_tokenizer.pad_token_id) 

input_ids_neg = negative_tokenizer.encode(prompt, return_tensors="pt")
attention_mask = input_ids_neg.ne(negative_tokenizer.pad_token_id)

def generate_reviews(model, tokenizer, num_reviews=5):
    reviews = []
    for _ in range(num_reviews):
        input_ids = tokenizer.encode(prompt, return_tensors="pt")
        attention_mask = input_ids.ne(tokenizer.pad_token_id)
        
        with torch.no_grad():
            output = model.generate(
                input_ids,
                attention_mask=attention_mask,
                max_length=max_length,
                temperature=temperature,
                top_k=top_k,
                top_p=top_p,
                repetition_penalty=repetition_penalty,
                do_sample=True,
                num_return_sequences=1
            )
        review = tokenizer.decode(output[0], skip_special_tokens=True)
        reviews.append(review)
    return reviews

positive_reviews = generate_reviews(positive_model, tokenizer)
negative_reviews = generate_reviews(negative_model, tokenizer)

with open('generated_reviews.txt', 'w') as file:
    file.write("Reviews generated by positive model:\n")
    for i, review in enumerate(positive_reviews, start=1):
        file.write(f"{i}. {review}\n")
    file.write("\nReviews generated by negative model:\n")
    for i, review in enumerate(negative_reviews, start=1):
        file.write(f"{i}. {review}\n")

In [None]:
# Load IMDb Dataset
def load_imdb_dataset():
    if os.path.exists("save_dir/imdb_subset"):
        subset = load_from_disk("save_dir/imdb_subset")
    else:
        dataset = load_dataset("imdb")
        subset = dataset["train"].shuffle(seed=42).select(range(500))
        subset.save_to_disk("save_dir/imdb_subset")
    return subset


# Split Dataset into Positive and Negative Subsets with Sentiment Labels (100 samples each)
def split_and_label_dataset(dataset):
    positive_reviews = dataset.filter(lambda example: example["label"] == 1).select(range(100))
    negative_reviews = dataset.filter(lambda example: example["label"] == 0).select(range(100))

    # Add sentiment labels to text
    def add_label(example, label):
        example["text"] = f"{label}: {example['text']}"
        return example

    positive_reviews = positive_reviews.map(lambda x: add_label(x, "Positive"))
    negative_reviews = negative_reviews.map(lambda x: add_label(x, "Negative"))

    return positive_reviews, negative_reviews


# Tokenize Reviews
def tokenize_reviews(dataset, tokenizer, max_length=150):
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token

    def tokenize_function(examples):
        return tokenizer(
            examples['text'],
            padding="max_length",
            truncation=True,
            max_length=max_length
        )

    tokenized_dataset = dataset.map(tokenize_function, batched=True)
    tokenized_dataset = tokenized_dataset.rename_column("label", "labels")
    tokenized_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "labels"])
    return tokenized_dataset


# Fine-Tune GPT-2 Model
def train_gpt2_model(gpt2_model, gpt2_tokenizer, tokenized_reviews, sentiment):
    """
    Fine-tunes GPT-2 on the tokenized reviews for the specified sentiment.
    """
    training_args = TrainingArguments(
        output_dir=f"./gpt2_{sentiment}_results",
        eval_strategy="epoch",
        learning_rate=5e-5,
        per_device_train_batch_size=4,
        per_device_eval_batch_size=8,
        num_train_epochs=3,
        weight_decay=0.01,
        logging_dir=f"./gpt2_{sentiment}_logs",
        save_total_limit=2,
        fp16=True,
        no_cuda=False,
    )

    data_collator = DataCollatorForLanguageModeling(
        tokenizer=gpt2_tokenizer,
        mlm=False
    )

    train_test_split = tokenized_reviews.train_test_split(test_size=0.2, seed=42)
    train_dataset = train_test_split["train"]
    eval_dataset = train_test_split["test"]

    trainer = Trainer(
        model=gpt2_model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
        data_collator=data_collator
    )

    trainer.train()
    results = trainer.evaluate()
    print(f"GPT-2 ({sentiment}) Evaluation Loss:", results["eval_loss"])

    # Save the model and tokenizer
    save_dir = f"save_dir/gpt2_{sentiment}_model"
    trainer.model.save_pretrained(save_dir)
    gpt2_tokenizer.save_pretrained(save_dir)


# Generate Reviews and Write to File
def generate_and_save_reviews():
    # Load models
    pos_tokenizer = GPT2Tokenizer.from_pretrained("save_dir/gpt2_positive_model")
    pos_model = GPT2LMHeadModel.from_pretrained("save_dir/gpt2_positive_model")
    neg_tokenizer = GPT2Tokenizer.from_pretrained("save_dir/gpt2_negative_model")
    neg_model = GPT2LMHeadModel.from_pretrained("save_dir/gpt2_negative_model")

    prompt = "The movie was"
    reviews = {"positive": [], "negative": []}

    # Generate Positive Reviews
    for i in range(5):
        review = generate_reviews(pos_model, pos_tokenizer, f"Positive: {prompt}")
        reviews["positive"].append(review)

    # Generate Negative Reviews
    for i in range(5):
        review = generate_reviews(neg_model, neg_tokenizer, f"Negative: {prompt}")
        reviews["negative"].append(review)

    # Write to file
    with open("generated_reviews.txt", "w", encoding="utf-8") as f:
        f.write("Reviews generated by positive model:\n")
        for i, review in enumerate(reviews["positive"], 1):
            f.write(f"{i}. {review}\n")
        f.write("\nReviews generated by negative model:\n")
        for i, review in enumerate(reviews["negative"], 1):
            f.write(f"{i}. {review}\n")


# Generate Reviews Helper
def generate_reviews(gpt2_model, gpt2_tokenizer, prompt, max_length=150, temperature=0.7, top_k=50, top_p=0.9, repetition_penalty=1.2):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    gpt2_model = gpt2_model.to(device)

    input_ids = gpt2_tokenizer.encode(prompt, return_tensors="pt").to(device)
    attention_mask = input_ids.ne(gpt2_tokenizer.pad_token_id).to(device)

    with torch.no_grad():
        output = gpt2_model.generate(
            input_ids=input_ids,
            attention_mask=attention_mask,
            max_length=max_length,
            temperature=temperature,
            top_k=top_k,
            top_p=top_p,
            repetition_penalty=repetition_penalty,
            do_sample=True,
            num_return_sequences=1
        )

    return gpt2_tokenizer.decode(output[0], skip_special_tokens=True)


# Main Function
def prog_manager():
    # Remove old models if they exist
    shutil.rmtree("save_dir/gpt2_positive_model", ignore_errors=True)
    shutil.rmtree("save_dir/gpt2_negative_model", ignore_errors=True)

    # Load and process dataset
    dataset = load_imdb_dataset()
    positive_reviews, negative_reviews = split_and_label_dataset(dataset)

    # Load tokenizer and model
    gpt2_tokenizer, gpt2_model = GPT2Tokenizer.from_pretrained("gpt2"), GPT2LMHeadModel.from_pretrained("gpt2")

    # Tokenize and fine-tune models
    pos_tokenized = tokenize_reviews(positive_reviews, gpt2_tokenizer)
    train_gpt2_model(gpt2_model, gpt2_tokenizer, pos_tokenized, sentiment="positive")

    neg_tokenized = tokenize_reviews(negative_reviews, gpt2_tokenizer)
    train_gpt2_model(gpt2_model, gpt2_tokenizer, neg_tokenized, sentiment="negative")

    # Generate and save reviews
    generate_and_save_reviews()


if __name__ == "__main__":
    prog_manager()
