In [None]:
# @title Setup and Installation { display-mode: "form" }

# Install required packages
!pip install transformers datasets scikit-learn matplotlib seaborn torch tqdm tensorboard
!pip install --upgrade transformers
# Import necessary libraries
import os
import torch
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datasets import load_dataset
from transformers import (
    DistilBertTokenizerFast,
    DistilBertForSequenceClassification,
    Trainer,
    TrainingArguments
)
from sklearn.metrics import (
    accuracy_score,
    precision_recall_fscore_support,
    confusion_matrix,
    ConfusionMatrixDisplay,
    classification_report
)
import json
import time
import argparse
from tqdm.auto import tqdm
from google.colab import drive

# Mount Google Drive for saving models and results
drive.mount('/content/drive')

# Create project directories
PROJECT_DIR = '/content/imdb-sentiment'
MODEL_DIR = f'{PROJECT_DIR}/model'
BEST_MODEL_DIR = f'{MODEL_DIR}/best_model'
LOGS_DIR = f'{PROJECT_DIR}/logs'
VISUALIZATIONS_DIR = f'{PROJECT_DIR}/visualizations'

os.makedirs(PROJECT_DIR, exist_ok=True)
os.makedirs(MODEL_DIR, exist_ok=True)
os.makedirs(BEST_MODEL_DIR, exist_ok=True)
os.makedirs(LOGS_DIR, exist_ok=True)
os.makedirs(VISUALIZATIONS_DIR, exist_ok=True)

# Option to save to Google Drive
SAVE_TO_DRIVE = True  # @param {type:"boolean"}
if SAVE_TO_DRIVE:
    DRIVE_DIR = '/content/drive/MyDrive/imdb-sentiment'
    os.makedirs(DRIVE_DIR, exist_ok=True)

print("Setup complete!")

In [None]:
# @title Configuration { display-mode: "form" }

# Define configuration parameters individually for form widgets
epochs = 2  # @param {type:"slider", min:1, max:10, step:1}
train_batch_size = 32  # @param {type:"slider", min:8, max:64, step:8}
eval_batch_size = 64  # @param {type:"slider", min:8, max:64, step:8}
learning_rate = 5e-5  # @param {type:"number"}
weight_decay = 0.01  # @param {type:"number"}
max_length = 256  # @param {type:"slider", min:128, max:512, step:32}
sample_size = "1000"  # @param {type:"string"} # MODIFIED: Using a small sample size for faster training

# Now build the config dictionary from the individual parameters
config = {
  "model": {
    "name": "distilbert-base-uncased",
    "num_labels": 2
  },
  "training": {
    "epochs": epochs,
    "train_batch_size": train_batch_size,
    "eval_batch_size": eval_batch_size,
    "learning_rate": learning_rate,
    "weight_decay": weight_decay,
    "max_length": max_length,
    "save_steps": 100,  # MODIFIED: Reduced from 500 for smaller dataset
    "eval_steps": 100   # MODIFIED: Reduced from 500 for smaller dataset
  },
  "data": {
    "dataset": "imdb",
    "train_size": 25000,
    "test_size": 25000,
    "sample_size": None if sample_size == "None" else int(sample_size) if isinstance(sample_size, str) and sample_size.isdigit() else None
  },
  "paths": {
    "model_dir": MODEL_DIR,
    "best_model_dir": BEST_MODEL_DIR,
    "logs_dir": LOGS_DIR,
    "visualizations_dir": VISUALIZATIONS_DIR
  }
}

# Save config to file
with open(f'{PROJECT_DIR}/config.json', 'w') as f:
    json.dump(config, f, indent=2)

print("Configuration saved!")
print(f"Using sample size: {config['data']['sample_size']} examples")

In [None]:
# @title Data Preparation { display-mode: "form" }

def prepare_dataset(sample_size=None):
    """
    Load and tokenize the IMDB dataset for sentiment analysis.

    Args:
        sample_size: Optional number of examples to use (for faster testing)

    Returns:
        tuple: (train_dataset, test_dataset) tokenized and formatted for PyTorch
    """
    print("Loading IMDB dataset...")
    dataset = load_dataset("imdb")
    print(f"Dataset loaded with {len(dataset['train'])} training and {len(dataset['test'])} test examples")

    # Optionally use a smaller sample for faster testing
    if sample_size is not None:
        dataset['train'] = dataset['train'].shuffle(seed=42).select(range(min(sample_size, len(dataset['train']))))
        dataset['test'] = dataset['test'].shuffle(seed=42).select(range(min(sample_size//5, len(dataset['test']))))
        print(f"Using {len(dataset['train'])} training and {len(dataset['test'])} test examples")

    tokenizer = DistilBertTokenizerFast.from_pretrained(config["model"]["name"])
    print("Tokenizing dataset...")

    def tokenize(example):
        return tokenizer(
            example["text"],
            truncation=True,
            padding="max_length",
            max_length=config["training"]["max_length"]
        )

    tokenized_dataset = dataset.map(tokenize, batched=True)
    tokenized_dataset.set_format("torch", columns=["input_ids", "attention_mask", "label"])

    print("Dataset preparation complete")
    return tokenized_dataset["train"], tokenized_dataset["test"]

# Run data preparation
train_dataset, test_dataset = prepare_dataset(config["data"]["sample_size"])

In [None]:
# @title Utility Functions { display-mode: "form" }

def compute_metrics(eval_pred):
    """
    Compute evaluation metrics for the model.

    Args:
        eval_pred: Tuple of predictions and labels

    Returns:
        dict: Dictionary of metrics
    """
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)

    accuracy = accuracy_score(labels, predictions)
    precision, recall, f1, _ = precision_recall_fscore_support(
        labels, predictions, average="weighted"
    )

    return {
        "accuracy": accuracy,
        "f1": f1,
        "precision": precision,
        "recall": recall
    }

def save_training_plot(history, output_path):
    """
    Generate and save a learning curve plot from training history.

    Args:
        history: Training history from trainer.state.log_history
        output_path: Path to save the plot
    """
    train_loss = []
    eval_loss = []
    eval_accuracy = []
    steps = []

    # Extract metrics from history
    for entry in history:
        if "loss" in entry and "step" in entry and "epoch" not in entry:
            train_loss.append(entry["loss"])
            steps.append(entry["step"])
        elif "eval_loss" in entry:
            eval_loss.append(entry["eval_loss"])
            eval_accuracy.append(entry["eval_accuracy"])

    # Create figure with two y-axes
    fig, ax1 = plt.subplots(figsize=(12, 6))

    # Plot training loss
    color = 'tab:blue'
    ax1.set_xlabel('Training Steps')
    ax1.set_ylabel('Loss', color=color)

    # Check if we have training loss data
    if train_loss and steps:
        ax1.plot(steps, train_loss, color=color, label='Training Loss')

    # Check if we have evaluation loss data
    if eval_loss and steps:
        try:
            # Only plot evaluation loss if we have both training steps and eval loss
            if len(steps) > 0:
                # Calculate evaluation steps safely
                last_step = steps[-1] if steps else 0
                eval_steps = [last_step * (i+1) / len(eval_loss) for i in range(len(eval_loss))]
                ax1.plot(eval_steps, eval_loss, 'o-', color='tab:green', label='Evaluation Loss')
        except (IndexError, ZeroDivisionError) as e:
            print(f"Warning: Could not plot evaluation loss: {e}")

    ax1.tick_params(axis='y', labelcolor=color)

    # Create second y-axis for accuracy
    if eval_accuracy and steps:
        try:
            ax2 = ax1.twinx()
            color = 'tab:red'
            ax2.set_ylabel('Accuracy', color=color)

            # Calculate evaluation steps safely (same as above)
            last_step = steps[-1] if steps else 0
            eval_steps = [last_step * (i+1) / len(eval_accuracy) for i in range(len(eval_accuracy))]

            ax2.plot(eval_steps, eval_accuracy, 'o-', color=color, label='Evaluation Accuracy')
            ax2.tick_params(axis='y', labelcolor=color)
            ax2.set_ylim([0, 1])

            # Add legend
            lines1, labels1 = ax1.get_legend_handles_labels()
            lines2, labels2 = ax2.get_legend_handles_labels()
            ax1.legend(lines1 + lines2, labels1 + labels2, loc='center right')
        except (IndexError, ZeroDivisionError) as e:
            print(f"Warning: Could not plot evaluation accuracy: {e}")
    else:
        # Add legend for just training loss
        ax1.legend(loc='best')

    plt.title('Training and Evaluation Metrics')
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.tight_layout()

    # Save the figure
    plt.savefig(output_path)

    # Also display the plot in the notebook
    plt.show()

In [None]:
# @title Model Training { display-mode: "form" }

def train_model():
    """
    Train a DistilBERT model on the IMDB dataset for sentiment classification.

    Returns:
        Trainer: The trained model trainer
    """
    print("Initializing DistilBERT model...")
    model = DistilBertForSequenceClassification.from_pretrained(
        config["model"]["name"],
        num_labels=config["model"]["num_labels"]
    )

    print("Setting up training arguments...")
    training_args = TrainingArguments(
        output_dir=config["paths"]["model_dir"],
        eval_strategy="epoch",  # FIXED: Changed from evaluation_strategy to eval_strategy
        save_strategy="epoch",
        logging_dir=config["paths"]["logs_dir"],
        logging_steps=config["training"]["save_steps"],
        num_train_epochs=config["training"]["epochs"],
        per_device_train_batch_size=config["training"]["train_batch_size"],
        per_device_eval_batch_size=config["training"]["eval_batch_size"],
        learning_rate=config["training"]["learning_rate"],
        weight_decay=config["training"]["weight_decay"],
        save_total_limit=1,
        load_best_model_at_end=True,
        metric_for_best_model="accuracy",
        report_to="tensorboard"
    )

    print("Initializing trainer...")
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=test_dataset,
        compute_metrics=compute_metrics
    )

    print("Starting training...")
    train_result = trainer.train()

    # Save training metrics
    metrics = train_result.metrics
    trainer.log_metrics("train", metrics)
    trainer.save_metrics("train", metrics)

    # Save the model
    print("Saving best model...")
    trainer.save_model(config["paths"]["best_model_dir"])

    # Generate and save learning curve
    print("Generating learning curve...")
    history = trainer.state.log_history
    save_training_plot(history, os.path.join(config["paths"]["visualizations_dir"], "learning_curve.png"))

    # Copy to Google Drive if requested
    if SAVE_TO_DRIVE:
        print("Copying model to Google Drive...")
        !cp -r {config["paths"]["best_model_dir"]} {DRIVE_DIR}/
        !cp {os.path.join(config["paths"]["visualizations_dir"], "learning_curve.png")} {DRIVE_DIR}/

    print("Training complete!")
    return trainer

# Run training
trainer = train_model()

In [None]:
# @title Model Evaluation { display-mode: "form" }

def evaluate_model(trainer=None):
    """
    Evaluate the trained DistilBERT model on the IMDB test dataset.

    Args:
        trainer: Optional pre-initialized trainer

    Returns:
        dict: Evaluation metrics
    """
    if trainer is None:
        print("Loading trained model...")
        model_path = config["paths"]["best_model_dir"]
        if not os.path.exists(model_path):
            raise FileNotFoundError(f"Model not found at {model_path}. Please train the model first.")

        model = DistilBertForSequenceClassification.from_pretrained(model_path)

        print("Setting up trainer for evaluation...")
        trainer = Trainer(
            model=model,
            compute_metrics=compute_metrics
        )

    print("Evaluating model...")
    metrics = trainer.evaluate(test_dataset)

    print("\n===== Evaluation Results =====")
    for key, value in metrics.items():
        print(f"{key}: {value:.4f}")

    # Save metrics to file
    with open(os.path.join(config["paths"]["model_dir"], "evaluation_results.json"), "w") as f:
        json.dump(metrics, f, indent=2)

    print(f"Evaluation results saved to {os.path.join(config['paths']['model_dir'], 'evaluation_results.json')}")

    # Copy to Google Drive if requested
    if SAVE_TO_DRIVE:
        !cp {os.path.join(config["paths"]["model_dir"], "evaluation_results.json")} {DRIVE_DIR}/

    return metrics

# Run evaluation
metrics = evaluate_model(trainer)

In [None]:
# @title Error Analysis { display-mode: "form" }

def error_analysis(trainer=None):
    """
    Perform error analysis on the trained model.

    Args:
        trainer: Optional pre-initialized trainer

    Returns:
        dict: Error analysis results
    """
    # Initialize trainer if not provided
    if trainer is None:
        model_path = config["paths"]["best_model_dir"]
        if not os.path.exists(model_path):
            raise FileNotFoundError(f"Model not found at {model_path}. Please train the model first.")

        model = DistilBertForSequenceClassification.from_pretrained(model_path)
        trainer = Trainer(model=model)

    print("Running predictions on test dataset...")
    preds_output = trainer.predict(test_dataset)

    # Get predictions and labels
    preds = torch.argmax(torch.tensor(preds_output.predictions), dim=1)
    labels = torch.tensor(preds_output.label_ids)

    # Calculate confusion matrix
    print("Generating confusion matrix...")
    cm = confusion_matrix(labels, preds)

    # Plot and save confusion matrix
    plt.figure(figsize=(10, 8))
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=["Negative", "Positive"])
    disp.plot(cmap=plt.cm.Blues, values_format="d")
    plt.title("Confusion Matrix")
    plt.savefig(os.path.join(config["paths"]["visualizations_dir"], "confusion_matrix.png"))
    print(f"Confusion matrix saved to {os.path.join(config['paths']['visualizations_dir'], 'confusion_matrix.png')}")

    # Display the confusion matrix
    plt.show()

    # Generate classification report
    report = classification_report(labels, preds, target_names=["Negative", "Positive"], output_dict=True)
    print("\n===== Classification Report =====")
    print(classification_report(labels, preds, target_names=["Negative", "Positive"]))

    # Find misclassified examples
    misclassified_indices = (preds != labels).nonzero(as_tuple=True)[0]

    # Get a sample of misclassified examples
    num_examples = min(10, len(misclassified_indices))
    sample_indices = np.random.choice(misclassified_indices, num_examples, replace=False)

    # Get original text for misclassified examples
    original_dataset = test_dataset.dataset

    print(f"\n===== Sample of {num_examples} Misclassified Examples =====")
    misclassified_examples = []
    for idx in sample_indices:
        true_label = "Positive" if labels[idx] == 1 else "Negative"
        pred_label = "Positive" if preds[idx] == 1 else "Negative"

        # Get the original text (this assumes the dataset has the original text)
        original_idx = test_dataset.indices[idx] if hasattr(test_dataset, 'indices') else idx
        text = original_dataset[original_idx]['text']

        # Truncate text if too long
        if len(text) > 100:
            text = text[:100] + "..."

        example = {
            "text": text,
            "true_label": true_label,
            "predicted_label": pred_label
        }
        misclassified_examples.append(example)

        print(f"Example {idx}:")
        print(f"Text: {text}")
        print(f"True label: {true_label}")
        print(f"Predicted label: {pred_label}")
        print("-" * 50)

    # Copy to Google Drive if requested
    if SAVE_TO_DRIVE:
        !cp {os.path.join(config["paths"]["visualizations_dir"], "confusion_matrix.png")} {DRIVE_DIR}/

    # Return analysis results
    results = {
        "confusion_matrix": cm.tolist(),
        "classification_report": report,
        "misclassified_examples": misclassified_examples
    }

    return results

# Run error analysis
analysis_results = error_analysis(trainer)

In [None]:
# @title Inference { display-mode: "form" }

def load_sentiment_pipeline():
    """
    Load the sentiment analysis pipeline with the trained model.

    Returns:
        pipeline: Hugging Face pipeline for sentiment analysis
    """
    from transformers import pipeline

    model_path = config["paths"]["best_model_dir"]
    if not os.path.exists(model_path):
        raise FileNotFoundError(f"Model not found at {model_path}. Please train the model first.")

    print(f"Loading sentiment analysis pipeline from {model_path}...")
    sentiment_pipeline = pipeline(
        "sentiment-analysis",
        model=model_path,
        tokenizer="distilbert-base-uncased",
        device=0 if torch.cuda.is_available() else -1  # Use GPU if available
    )
    return sentiment_pipeline

def predict(text, pipeline=None):
    """
    Predict sentiment for a given text.

    Args:
        text (str): Text to analyze
        pipeline: Optional pre-loaded pipeline

    Returns:
        dict: Prediction result with label and score
    """
    if pipeline is None:
        pipeline = load_sentiment_pipeline()

    start_time = time.time()
    result = pipeline(text)
    inference_time = time.time() - start_time

    # Map label index to sentiment
    label_map = {0: "Negative", 1: "Positive"}

    # Format the result
    formatted_result = {
        "text": text,
        "sentiment": label_map.get(result[0]["label"], result[0]["label"]),
        "confidence": round(result[0]["score"], 4),
        "inference_time": f"{inference_time*1000:.2f}ms"
    }

    return formatted_result

# Interactive demo
def interactive_demo():
    """Run an interactive demo for sentiment analysis"""
    pipeline = load_sentiment_pipeline()

    # Example texts
    examples = [
        "This movie was fantastic! The acting was superb and the plot kept me engaged throughout.",
        "What a waste of time. The characters were poorly developed and the story made no sense.",
        "I had mixed feelings about this film. Some parts were good, but others dragged on too long."
    ]

    print("\n===== Example Predictions =====")
    for example in examples:
        result = predict(example, pipeline)
        print(f"\nText: {result['text']}")
        print(f"Sentiment: {result['sentiment']}")
        print(f"Confidence: {result['confidence']}")
        print(f"Inference time: {result['inference_time']}")

    # Custom input
    from IPython.display import HTML, display
    from google.colab import output

    def analyze_text(text_input):
        if text_input.strip():
            result = predict(text_input, pipeline)
            print(f"\nSentiment: {result['sentiment']}")
            print(f"Confidence: {result['confidence']}")
            print(f"Inference time: {result['inference_time']}")

    output.register_callback('analyze', analyze_text)

    print("\n===== Enter your own text to analyze =====")
    display(HTML('''
    <input id="text_input" style="width: 100%; padding: 10px; margin: 10px 0;" placeholder="Enter text to analyze...">
    <button id="analyze_button" style="padding: 10px 20px; background-color: #4CAF50; color: white; border: none; cursor: pointer;">
      Analyze Sentiment
    </button>

    <script>
      document.getElementById('analyze_button').addEventListener('click', function() {
        var text = document.getElementById('text_input').value;
        google.colab.kernel.invokeFunction('analyze', [text], {});
      });

      document.getElementById('text_input').addEventListener('keypress', function(e) {
        if (e.key === 'Enter') {
          var text = document.getElementById('text_input').value;
          google.colab.kernel.invokeFunction('analyze', [text], {});
        }
      });
    </script>
    '''))

# Run interactive demo
interactive_demo()

In [None]:
# @title Complete Pipeline { display-mode: "form" }

def run_complete_pipeline():
    """Run the complete pipeline from data preparation to inference"""
    print("\n===== Step 1: Data Preparation =====")
    global train_dataset, test_dataset
    train_dataset, test_dataset = prepare_dataset(config["data"]["sample_size"])

    print("\n===== Step 2: Model Training =====")
    trainer = train_model()

    print("\n===== Step 3: Model Evaluation =====")
    metrics = evaluate_model(trainer)

    print("\n===== Step 4: Error Analysis =====")
    analysis_results = error_analysis(trainer)

    print("\n===== Step 5: Interactive Demo =====")
    interactive_demo()

    return {
        "trainer": trainer,
        "metrics": metrics,
        "analysis": analysis_results
    }

# Uncomment to run the complete pipeline
# results = run_complete_pipeline()

In [None]:
# @title Save to Google Drive { display-mode: "form" }

def save_project_to_drive():
    """Save the entire project to Google Drive"""
    if not os.path.exists('/content/drive'):
        drive.mount('/content/drive')

    DRIVE_DIR = '/content/drive/MyDrive/imdb-sentiment'
    os.makedirs(DRIVE_DIR, exist_ok=True)

    print(f"Saving project to Google Drive at {DRIVE_DIR}...")

    # Copy model
    if os.path.exists(config["paths"]["best_model_dir"]):
        !cp -r {config["paths"]["best_model_dir"]} {DRIVE_DIR}/

    # Copy visualizations
    if os.path.exists(config["paths"]["visualizations_dir"]):
        !mkdir -p {DRIVE_DIR}/visualizations
        !cp {config["paths"]["visualizations_dir"]}/* {DRIVE_DIR}/visualizations/

    # Copy config and results
    !cp {PROJECT_DIR}/config.json {DRIVE_DIR}/
    if os.path.exists(os.path.join(config["paths"]["model_dir"], "evaluation_results.json")):
        !cp {os.path.join(config["paths"]["model_dir"], "evaluation_results.json")} {DRIVE_DIR}/

    print("Project saved to Google Drive!")

# Save to Google Drive
save_project_to_drive()