# Pet Mood Detector - Training Notebook

This notebook demonstrates the training process for the Pet Mood Detector project. It allows you to:
1. Load and explore the dataset
2. Define and train the model
3. Visualize training metrics
4. Test the trained model

## 1. Setup and Imports

In [2]:
import os
import sys
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
from torch.utils.tensorboard import SummaryWriter
from torchvision import transforms, datasets
import time
import copy
from tqdm.notebook import tqdm

# Add parent directory to path so we can import our modules
sys.path.append('..')
from src.datamodule import get_transforms, get_loaders
from src.model import make_model, PetMoodClassifier

ModuleNotFoundError: No module named 'numpy'

In [None]:
# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

## 2. Data Exploration

In [None]:
# Data directory
data_dir = '../data/master_folder'

# Get data loaders
dataloaders = get_loaders(
    data_dir=data_dir,
    batch_size=32,
    num_workers=4
)

In [None]:
# Class names and dataset sizes
class_names = dataloaders['train'].dataset.classes
dataset_sizes = {x: len(dataloaders[x].dataset) for x in ['train', 'val', 'test']}

print(f"Classes: {class_names}")
print(f"Dataset sizes: {dataset_sizes}")

In [None]:
# Visualize some sample images
def imshow(inp, title=None):
    """Display image for Tensor."""
    inp = inp.numpy().transpose((1, 2, 0))
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    inp = std * inp + mean
    inp = np.clip(inp, 0, 1)
    plt.imshow(inp)
    if title is not None:
        plt.title(title)

# Get a batch of training data
inputs, classes = next(iter(dataloaders['train']))

# Make a grid from batch
out = torchvision.utils.make_grid(inputs[:16])

plt.figure(figsize=(12, 8))
imshow(out, title=[class_names[x] for x in classes[:16]])

## 3. Model Definition

In [None]:
# Define model
num_classes = len(class_names)
backbone = 'resnet18'  # 'resnet18' or 'mobilenet_v2'

model = PetMoodClassifier(
    num_classes=num_classes,
    backbone=backbone,
    pretrained=True
).to(device)

print(model)

## 4. Training Setup

In [None]:
# Define loss function, optimizer and scheduler
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=3e-4)
scheduler = lr_scheduler.CosineAnnealingLR(optimizer, T_max=10)

# TensorBoard setup
from datetime import datetime
log_dir = os.path.join("../runs", datetime.now().strftime("%Y%m%d-%H%M%S"))
writer = SummaryWriter(log_dir=log_dir)
print(f"TensorBoard log directory: {log_dir}")

## 5. Training Loop

In [None]:
def train_model(model, dataloaders, criterion, optimizer, scheduler, writer, num_epochs=15):
    since = time.time()
    
    # Create models directory if it doesn't exist
    os.makedirs('../models', exist_ok=True)
    
    # Initialize best model tracking
    best_model_weights = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    
    # Track metrics
    history = {
        'train_loss': [], 'val_loss': [],
        'train_acc': [], 'val_acc': []
    }
    
    # Training loop
    for epoch in range(num_epochs):
        print(f'Epoch {epoch + 1}/{num_epochs}')
        print('-' * 10)
        
        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
            else:
                model.eval()
                
            running_loss = 0.0
            running_corrects = 0
            
            # Iterate over data
            pbar = tqdm(dataloaders[phase], desc=f"{phase}")
            for inputs, labels in pbar:
                inputs = inputs.to(device)
                labels = labels.to(device)
                
                # Zero the parameter gradients
                optimizer.zero_grad()
                
                # Forward
                # Track history only in train phase
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)
                    
                    # Backward + optimize only in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                
                # Statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
                
                # Update progress bar
                pbar.set_postfix({"loss": loss.item()})
            
            # Compute epoch metrics
            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)
            
            # Update history
            history[f'{phase}_loss'].append(epoch_loss)
            history[f'{phase}_acc'].append(epoch_acc.item())
            
            # Log to tensorboard
            writer.add_scalar(f'Loss/{phase}', epoch_loss, epoch)
            writer.add_scalar(f'Accuracy/{phase}', epoch_acc, epoch)
            
            print(f'{phase.capitalize()} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
            
            # Save best model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_weights = copy.deepcopy(model.state_dict())
                # Save best model
                torch.save(model.state_dict(), f"../models/best_model_notebook.pth")
                print(f"Saved new best model with accuracy: {best_acc:.4f}")
        
        # Step the scheduler
        if scheduler:
            scheduler.step()
        
        print()
    
    time_elapsed = time.time() - since
    print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'Best val Acc: {best_acc:.4f}')
    
    # Load best model weights
    model.load_state_dict(best_model_weights)
    
    # Return model and history
    return model, history

In [None]:
# Train the model
model, history = train_model(
    model=model,
    dataloaders=dataloaders,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    writer=writer,
    num_epochs=15
)

## 6. Visualize Training Results

In [None]:
# Plot training curves
plt.figure(figsize=(12, 4))

# Plot loss
plt.subplot(1, 2, 1)
plt.plot(history['train_loss'], label='Training Loss')
plt.plot(history['val_loss'], label='Validation Loss')
plt.title('Loss vs. Epochs')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

# Plot accuracy
plt.subplot(1, 2, 2)
plt.plot(history['train_acc'], label='Training Accuracy')
plt.plot(history['val_acc'], label='Validation Accuracy')
plt.title('Accuracy vs. Epochs')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()

plt.tight_layout()
plt.savefig("../models/training_curves_notebook.png")
plt.show()

## 7. Evaluate on Test Set

In [None]:
# Evaluate the model on the test set
def evaluate_model(model, dataloader, criterion, class_names):
    model.eval()
    running_loss = 0.0
    running_corrects = 0
    all_preds = []
    all_labels = []
    
    # Iterate over data
    with torch.no_grad():
        for inputs, labels in tqdm(dataloader):
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            # Forward
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            loss = criterion(outputs, labels)
            
            # Statistics
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)
            
            # Save predictions and labels
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    # Compute metrics
    test_loss = running_loss / len(dataloader.dataset)
    test_acc = running_corrects.double() / len(dataloader.dataset)
    
    print(f'Test Loss: {test_loss:.4f} Acc: {test_acc:.4f}')
    
    # Compute confusion matrix
    from sklearn.metrics import confusion_matrix, classification_report
    cm = confusion_matrix(all_labels, all_preds)
    
    # Plot confusion matrix
    plt.figure(figsize=(10, 8))
    plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
    plt.title('Confusion Matrix')
    plt.colorbar()
    tick_marks = np.arange(len(class_names))
    plt.xticks(tick_marks, class_names, rotation=45)
    plt.yticks(tick_marks, class_names)
    
    # Add text annotations to the confusion matrix
    thresh = cm.max() / 2.
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            plt.text(j, i, format(cm[i, j], 'd'),
                    horizontalalignment="center",
                    color="white" if cm[i, j] > thresh else "black")
    
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.tight_layout()
    plt.savefig("../models/confusion_matrix.png")
    plt.show()
    
    # Print classification report
    print("\nClassification Report:")
    print(classification_report(all_labels, all_preds, target_names=class_names))
    
    return test_acc, cm

In [None]:
# Evaluate on test set
test_acc, confusion = evaluate_model(model, dataloaders['test'], criterion, class_names)

## 8. Save the Model

In [None]:
# Save the final model
model.save(f"../models/final_model_notebook.pth")
print(f"Model saved to ../models/final_model_notebook.pth")

## 9. Test on Sample Images

In [None]:
# Import prediction functions
sys.path.append('..')
from src.predict import predict_image, visualize_prediction

In [None]:
# Test on a few sample images
import glob

# Get a few test images from each class
test_images = []
for cls in class_names:
    cls_path = os.path.join(data_dir, 'test', cls.lower())
    if os.path.exists(cls_path):
        images = glob.glob(os.path.join(cls_path, '*.jpg'))[:2]  # Get 2 images per class
        test_images.extend(images)
    else:
        print(f"Warning: Path not found: {cls_path}")

# Test each image
for img_path in test_images:
    result = predict_image(img_path, model, device, class_names=class_names)
    visualize_prediction(result)

## 10. Conclusion

The notebook demonstrates the complete workflow for training and evaluating the Pet Mood Detector model. The model can now be used for inference on new images or through the webcam interface.

To run inference on new images or using the webcam, you can use the command-line interface:

```bash
# For a single image
python src/predict.py --image path/to/image.jpg --model models/final_model_notebook.pth

# For webcam
python src/predict.py --webcam --model models/final_model_notebook.pth
```

Or you can use the Gradio web interface:

```bash
python ui/app.py
```
