# PyTorch Model Notebook #
This notebook will allow you to get practice in building and working with PyTorch models.  Code excersises denoted by a problem number (i.e. Problem #1) will include a task and a code block that asks for your solution.  These blocks will be denoted by comments of the form '# YOUR CODE HERE #'.  The code immediately following include assertions that are used to check completeness of the response.  They will raise an exception if the previous solution is not complete or not correct.

## Datasets and DataLoaders

Reference:  The Linux Foundation, "Datasets & DataLoaders - PyTorch Tutorials 2.6.0 +cu124 documentation," pytorch.org https://pytorch.org/tutorials/beginner/basics/data_tutorial.html (accessed Mar. 20, 2025).

In [None]:
import torch
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

**Problem #1:**  Finish implementing the "RandDataset" Dataset by 1) setting "self.mapping" to a random tensor of dimension (output_dims, input_dims), 2) implementing the '\_\_len\_\_' method by returning the length of the dataset, and 3) setting the **output_tensor** = **Mx** in the '\_\_getitem\_\_' method, where **M** is the "self.mapping" tensor and **x** is the "input_tensor".  Also remember to implement the "self.target_transform" (if not None) on the "output_tensor", analagous to the "self.transform" already implemented.

In [None]:
class RandDataset(Dataset):
    def __init__(self, input_dims, output_dims, length, transform=None, target_transform=None):
        self.input_dims = input_dims
        self.output_dims = output_dims
        self.transform = transform
        self.target_transform = target_transform
        # YOUR CODE HERE #
        self.length = length
        
    # YOUR CODE HERE #

    def __getitem__(self, idx):
        input_tensor = torch.rand(self.input_dims)
        if self.transform:
            input_tensor = self.transform(input_tensor)
        # YOUR CODE HERE #
        return input_tensor, output_tensor

assert len(RandDataset(5,10,1000)) == 1000
assert (RandDataset(5, 10, 1000)).mapping.shape == (10,5)
assert (RandDataset(5, 10, 1000))[1][1].shape[0] == 10
assert ((RandDataset(5, 10, 1000, target_transform=lambda x: x + 20))[1][1] > 20).all()

**Problem #2:**  Instatiate the RandDataset class with a length=32000 and variables "input_dims" and "output_dims".  Set variables named "input_dims" and "output_dims" to apropriate values and use in the RandDataset instantiation call. Also, instantiate a DataLoader using this dataset object using a "batch_size" of 32, already implemented. Name this dataloader object "rand_dataset".

In [None]:
batch_size = 32
# YOUR CODE HERE #

assert input_dims > 0 and output_dims > 0
assert len(rand_dataloader.dataset) == 32000
assert rand_dataloader.batch_size == 32
assert len(rand_dataloader) == 1000

**Setting the train and test dataloaders from above**

In [None]:
import copy

train_dataloader = rand_dataloader

test_dataset = copy.deepcopy(train_dataloader.dataset)
test_dataset.length = 1000
test_dataloader = DataLoader(test_dataset, batch_size=batch_size)

assert len(test_dataloader.dataset) == 1000
assert (train_dataloader.dataset.mapping == test_dataloader.dataset.mapping).all()

## Building the PyTorch Model ##

Reference:  The Linux Foundation, "Build the Neural Network - PyTorch Tutorials 2.6.0 +cu124 documentation," pytorch.org https://pytorch.org/tutorials/beginner/basics/buildmodel_tutorial.html (accessed Mar. 13, 2025).

**Problem #3:**  Implement a Pytorch model class named "NNModel". Fill in the instantiation of the model's layers, which should include a nn.Linear, nn.ReLU, nn.Linear, nn.ReLU, and nn.Linear layers.  There should be **n** input neurons, **h** hidden neurons, and **m** output neurons.  Hint: Both hidden linear layers (first two) should have **h** neurons. Implement the forward computation of the model using the **input_tensor** as input and return the result.

In [None]:
from torch import nn

class NNModel(nn.Module):
    def __init__(self,n,m,h):
        super().__init__()

        # YOUR CODE HERE #
    
    def forward(self, input_tensor):
        # YOUR CODE HERE #

import re
assert ((NNModel(input_dims, output_dims, 50))(torch.rand(5,input_dims))).shape == (5,output_dims)
assert len(re.findall('Linear', str(NNModel(input_dims, output_dims, 50)))) == 3
assert len(re.findall('ReLU()', str(NNModel(input_dims, output_dims, 50)))) == 2

## Optimizing the PyTorch Model ##

Reference:  The Linux Foundation, "Optimizing Model Parameters - PyTorch Tutorials 2.6.0 +cu124 documentation," pytorch.org https://pytorch.org/tutorials/beginner/basics/optimization_tutorial.html (accessed Mar. 24, 2025).

**Problem #4:**  Instantiate the nn.MSELoss function with reduction='sum' and name the object, "my_loss_fn".  Instantiate the NNModel using "input_dims", "output_dims", and any number of hidden neurons and name the object, "my_model".   Instantiate the optim.SGD optimizer with the model parameters and the learning_rate defined above and name the object, "my_optimizer".

In [None]:
from torch import optim

learning_rate = 1e-2
epochs = 50
tolerance = 1e-2

# YOUR CODE HERE #

assert isinstance(my_loss_fn, nn.MSELoss)
assert my_loss_fn.reduction == 'sum'
assert isinstance(my_model, NNModel)
assert isinstance(my_optimizer, optim.SGD)

**Problem #5:**  Implement the training loop by including the 1) model predictions from the batch inputs, **X**, and calculating the loss via the loss_fn using the predictions and batch outputs, **Y**.  Divide the loss by the number of predictions to get the **avg_loss**.

In [None]:
def train_loop(dataloader, model, loss_fn, optimizer, device=None):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X,Y) in enumerate(dataloader):
        if device:
            X = X.to(device)
            Y = Y.to(device)
        # YOUR CODE HERE #

        avg_loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if (batch+1) %100 == 0:
            avg_loss, current = avg_loss.item(), batch * batch_size + len(pred)
            print(f"Avg. loss: {avg_loss:>7f}, [current:{current:>5d}/{size:>5d}]")

    return True

assert train_loop(DataLoader(RandDataset(3,5,batch_size*100),batch_size=batch_size), NNModel(3,5,20), nn.MSELoss(reduction='sum'), optim.SGD((NNModel(3,5,20)).parameters(), lr=learning_rate))

**Problem #6:**  Implement the test loop by including the 1) model predictions from the batch inputs, **X**, and calculating the test loss via the loss_fn using the predictions and batch outputs, **Y**.  Divide the test_loss by the number of predictions and remember to use the .item() method to extract the scalar value.

In [None]:
def test_loop(dataloader, model, loss_fn, tolerance, device=None):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()

    test_loss = 0
    correct = 0

    with torch.no_grad():
        for (X,Y) in dataloader:
            if device:
                X = X.to(device)
                Y = Y.to(device)
            # YOUR CODE HERE #
            correct += ((pred - Y).abs() < tolerance).all(dim=1).type(torch.float).sum().item()

    test_loss /= num_batches
    correct /= size

    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg. loss: {test_loss:>8f}\n")
    return True

assert test_loop(DataLoader(RandDataset(3,5,batch_size),batch_size=batch_size), NNModel(3,5,20), nn.MSELoss(reduction='sum'), tolerance=tolerance)

**Implementing the epoch loop and running the training loop**

**Problem #7:**  Implement the epoch loop by using the train_loop and test_loop functions defined above.

In [None]:
def epoch_loop(epochs, train_dataloader, test_dataloader, model, loss_fn, optimizer, tolerance, device=None):
    for t in range(epochs):
        print(f"Epoch {t+1}\n------------------------------")
        # YOUR CODE HERE #
    print("Done")
    return True

epochs_cpu = 2
assert epoch_loop(epochs_cpu, train_dataloader, test_dataloader, my_model, my_loss_fn, my_optimizer, tolerance)

## Utilizing the GPU for Training ##

In [None]:
device = torch.device('cpu')

if torch.cuda.is_available():
    device = torch.device(torch.cuda.current_device())

print(f"Using device - {device}")

**Problem #8:**  If a GPU device is available, move "my_model" to the device and save as "my_model_gpu".  Reinitialize the SGD optimizer using "my_model_gpu.parameters()".

In [None]:
if torch.cuda.is_available():
    # YOUR CODE HERE #

    assert epoch_loop(epochs, train_dataloader, test_dataloader, my_model_gpu, my_loss_fn, my_optimizer_gpu, tolerance, device)

## Saving and Loading PyTorch Models ##

Reference:  The Linux Foundation, "Save and Load the Model - PyTorch Tutorials 2.6.0 +cu124 documentation," pytorch.org https://pytorch.org/tutorials/beginner/basics/saveloadrun_tutorial.html (accessed Mar. 24, 2025).

Reference:  The Linux Foundation, "Saving and Loading Models - PyTorch Tutorials 2.6.0 +cu124 documentation," pytorch.org https://pytorch.org/tutorials/beginner/saving_loading_models.html (accessed Mar. 24, 2025).

**Problem #9:**  Finish the implementation of the save_model_checkpoint funciton. Add the elements to the save_dict dictionary corresponding to keys, "model_state_dict", "optimizer_state_dict", and "epoch".  The values for these should be the .state_dict() for the model and optimizer and the epoch. Secondly, add the function to save the save_dict as a file given in file_path. Hint:  Use torch.save.

In [None]:
def save_model_checkpoint(model, optimizer, dataloader, epoch, file_path):
    dataloader_mapping = dataloader.dataset.mapping
    save_dict = dict(
        # YOUR CODE HERE #
        dataloader_mapping = dataloader_mapping,
    )
    # YOUR CODE HERE #
    return True

from pathlib import Path
assert save_model_checkpoint(NNModel(3,5,20), optim.SGD((NNModel(3,5,20)).parameters(), lr=learning_rate), DataLoader(RandDataset(3,5,batch_size),batch_size=batch_size), 20, Path() / "test_checkpoint.pth")
assert not set(['model_state_dict','optimizer_state_dict','epoch','dataloader_mapping']) - set((torch.load(Path() / "test_checkpoint.pth", weights_only=True)).keys())
for key, val in (torch.load(Path() / "test_checkpoint.pth", weights_only=True)).items():
    if 'dict' in key:
        assert isinstance(val, dict)
    elif key == 'epoch':
        assert val == 20

**Problem #9:**  Finish the implementation of the "restore_model_checkpoint" function. Load the checkpoint file defined at "file_path" using torch.load(...).  Update the "model" and "optimizer" state_dicts from the checkpoint.  Update the "epoch" variable from the checkpoint.

In [None]:
def restore_model_checkpoint(model, optimizer, train_dataloader, test_dataloader, file_path):
    epoch = -1
    if file_path.exists():
        print(f"Restarting from checkpoint: {str(file_path)}")
        # YOUR CODE HERE #
        train_dataloader.dataset.mapping = checkpoint['dataloader_mapping']
        test_dataloader.dataset.mapping = checkpoint['dataloader_mapping']
    return epoch

test_model = NNModel(3,5,20)
test_optim = optim.SGD(test_model.parameters(), lr=learning_rate)
assert restore_model_checkpoint(test_model,test_optim,DataLoader(RandDataset(3,5,batch_size),batch_size=batch_size),DataLoader(RandDataset(3,5,batch_size),batch_size=batch_size),Path() / "test_checkpoint.pth") == 20
(Path() / "test_checkpoint.pth").unlink()

**Problem #10:**  Reimplement the "epoch_loop" with the restore_model_checkpoint and save_model_checkpoint functions. The epoch returned by restore_model_checkpoint should be saved to the "epoch_last" variable.

In [None]:
def epoch_loop(epochs, train_dataloader, test_dataloader, model, loss_fn, optimizer, tolerance, device=None, file_path=None):
    if file_path:
        # YOUR CODE HERE #
    for t in range(epoch_last+1,epochs):
        print(f"Epoch {t+1}\n------------------------------")
        train_loop(train_dataloader, model, loss_fn, optimizer, device)
        test_loop(test_dataloader, model, loss_fn, tolerance, device)
        if file_path:
            # YOUR CODE HERE #
    print("Done")
    return True

use_model = NNModel(input_dims, output_dims, 20).to(device)
use_optimizer = optim.SGD(use_model.parameters(), lr=learning_rate)
my_epochs = 3
assert epoch_loop(my_epochs, train_dataloader, test_dataloader, use_model, my_loss_fn, use_optimizer, tolerance, device, Path()/"my_checkpoint_file.pth")
if torch.cuda.is_available():
    my_epochs = epochs
else:
    my_epochs = 6
assert epoch_loop(my_epochs, train_dataloader, test_dataloader, use_model, my_loss_fn, use_optimizer, tolerance, device, Path()/"my_checkpoint_file.pth")
(Path()/"my_checkpoint_file.pth").unlink()