# Homework 2: PyTorch basics

In case you are having a lot of trouble getting Torch installed on your local machine, this notebook should contain everything you need to complete the homework. However, for us to autograde your work, you must copy your code from this notebook back into the correct files in the GitHub repository.

In [3]:
# Install packages
%pip install numpy
%pip install torch

import numpy as np
import os
import time
import torch

os.makedirs("models", exist_ok=True)

In [4]:
# src/utils.py

import os
import torch


def save(filename, **kwargs):
    """
    Save a pytorch object to file
    See: https://pytorch.org/tutorials/beginner/saving_loading_models.html
    You shouldn't need to edit this function.

    Arguments:
        filename: the file in which to save the object

    Possible keyword arguments (kwargs):
        epoch: the epoch so far if training
        model_state_dict: a model's state
        opt_state_dict: a optimizer's state, if training
    """

    msg = f"{filename} exists: delete it first to replace it."
    assert not os.path.exists(filename), msg
    torch.save(kwargs, filename)


def load(filename):
    """
    Load a pytorch object from a given filename
    See: https://pytorch.org/tutorials/beginner/saving_loading_models.html
    You shouldn't need to edit this function.

    Arguments:
        filename: the file from which to load the object
    """

    return torch.load(filename)

In [5]:
# src/data.py

import torch
import numpy as np


class AddDataset(torch.utils.data.Dataset):
    def __init__(self, num_examples):
        """
        Create a dataset of the form x_1 + x_2 = y

        Save the dataset to class variables.
        You should use torch tensors of dtype float32.
        """
        self.num_examples = num_examples
        data = np.random.randint(-1000, 1000, size=[num_examples, 2])
        label = data.sum(axis=1, keepdims=True)

        # TODO Convert to torch tensors and save these as class variables
        #      so we can load them with self.__getitem__
        raise NotImplementedError

    def __len__(self):
        return self.num_examples

    def __getitem__(self, item_index):
        """
        Allow us to select items with `dataset[0]`
        Use the class variables you created in __init__.

        Returns (x, y)
            x: the data tensor
            y: the label tensor
        """
        raise NotImplementedError


class MultiplyDataset(torch.utils.data.Dataset):
    def __init__(self, num_examples):
        """
        Create a dataset of the form x_1 * x_2 = y

        Save the dataset to class variables.
        You should use torch tensors of dtype float32.
        """
        self.num_examples = num_examples
        data = np.random.randint(1, 1000, size=[num_examples, 2])
        label = data.prod(axis=1, keepdims=True)

        # TODO Convert to torch tensors and save these as class variables
        #      so we can load them with self.__getitem__
        raise NotImplementedError

    def __len__(self):
        return self.num_examples

    def __getitem__(self, item_index):
        """
        Allow us to select items with `dataset[0]`
        Returns (x, y)
            x: the data tensor
            y: the label tensor
        """
        raise NotImplementedError

In [6]:
# src/mlp.py

import torch


class MLP(torch.nn.Module):
    def __init__(self, number_of_hidden_layers: int, input_size: int,
                 hidden_size: int, activation: torch.nn.Module):
        """
        Construct a simple MLP
        """
        # NOTE: don't edit this constructor

        super().__init__()
        assert number_of_hidden_layers >= 0, "number_of_hidden_layers must be at least 0"

        dims_in = [input_size] + [hidden_size] * number_of_hidden_layers
        dims_out = [hidden_size] * number_of_hidden_layers + [1]

        layers = []
        for i in range(number_of_hidden_layers + 1):
            layers.append(torch.nn.Linear(dims_in[i], dims_out[i]))

            # No final activation
            if i < number_of_hidden_layers:
                layers.append(activation)

        self.net = torch.nn.Sequential(*layers)

    def forward(self, x: torch.Tensor):
        return self.net(x)

    def initialize(self):
        """
        Initialize all the model's weights.
        See https://pytorch.org/docs/stable/nn.init.html
        """
        raise NotImplementedError

    def save_model(self, filename):
        """
        Use `src.utils.save` to save this model to file.

        Note: You may want to save a dictionary containing the model's state.

        Args
            filename: the file to which to save the model
        """
        raise NotImplementedError

    def load_model(self, filename):
        """
        Use `src.utils.load` to load this model from file.

        Note: in addition to simply loading the saved model, you must use the
              information from that checkpoint to update the model's state.

        Args
            filename: the file from which to load the model
        """
        raise NotImplementedError

In [7]:
# src/trainer.py

import numpy as np
import time
import torch


class Trainer:

    def __init__(self, optimizer, model, loss_func, **kwargs):
        """
        Initialize the optimizer for the model, using any necessary kwargs
        Save the model and loss function for later calculation
        You shouldn't need to edit this function.
        """

        self.optimizer = optimizer(model.parameters(), **kwargs)
        self.model = model
        self.loss_func = loss_func

        self.epoch = 0
        self.start_time = None

    def run_one_batch(self, x, y, train=True):
        """
        Run self.model on one batch of data, using `self.loss_func` to
            compute the model's loss.

        If train=True (the default), you should use `self.optimizer`
            to update the parameters of `self.model`.

        You should also call `self.optimizer.zero_grad()`; see
            https://pytorch.org/tutorials/recipes/recipes/zeroing_out_gradients.html
            for a guide as to when to do that.

        Args
            x: the batch's input
            y: the batch's target

        Returns
            loss: the model's loss on this batch
        """
        raise NotImplementedError

    def run_one_epoch(self, data_loader: torch.utils.data.DataLoader,
                        train=True, verbose=False):
        """
        Train one epoch, a batch at a time, using self.run_one_batch
        You shouldn't need to edit this function.

        Args:
            data_loader: a torch.utils.data.DataLoader with our dataset
            stats: an optional dict of information to print out

        Returns:
            total_loss: the average loss per example
        """
        np.random.seed(0)
        torch.manual_seed(0)
        torch.use_deterministic_algorithms(True)
        if self.start_time is None:
            self.start_time = time.time()

        epoch_size = 0
        total_loss = 0
        for batch_idx, batch_data in enumerate(data_loader):
            x, y = batch_data
            epoch_size += x.size(0)
            loss = self.run_one_batch(x, y, train=train)
            total_loss += loss

        avg_loss = total_loss / epoch_size

        if verbose:
            epoch = self.epoch + 1
            duration = (time.time() - self.start_time) / 60

            if train:
                log = [f"Epoch: {epoch:6d}"]
            else:
                log = ["Eval:" + " " * 8]

            log.extend([
                f"Loss: {avg_loss:6.3f}",
                f"in {duration:5.1f} min",
            ])
            print("  ".join(log))

        return avg_loss

    def train(self, data_loader, n_epochs, train=True, report_every=None):
        """
        Run the model for `n_epochs` epochs on the data in `data_loader`
        You shouldn't need to edit this function.

        Args
            data_loader: data loader for our data
            n_epochs: how many epochs to run
            train: if True, train the model; otherwise, just evaluate it
            report_every: how often to print out stats

        Returns
            losses: average loss per epoch
        """
        self.start_time = time.time()

        if report_every is None:
            report_every = max(1, n_epochs // 10)

        losses = []
        for i in range(n_epochs):
            verbose = ((i + 1) % report_every) == 0
            loss = self.run_one_epoch(data_loader, train=train, verbose=verbose)
            losses.append(loss)
            if train:
                self.epoch += 1

        return losses

    def eval(self, data_loader):
        """
        Helper function to run through the data loader once and just
            compute the loss
        You shouldn't need to edit this function.
        """
        return self.train(data_loader, 1, train=False, report_every=1)

    def save_trainer(self, filename):
        """
        Use `src.data.save` to save this Trainer to file.
        See https://pytorch.org/tutorials/beginner/saving_loading_models.html

        Args
            filename: the file to which to save the trainer
        """
        raise NotImplementedError

    def load_trainer(self, filename):
        """
        Use `src.data.load` to load this trainer from file.
        See https://pytorch.org/tutorials/beginner/saving_loading_models.html

        Note: in addition to simply loading the saved model, you must
            use the information from that checkpoint to update the model's
            state.

        Args
            filename: the file from which to load the model
        """
        raise NotImplementedError

In [8]:
# src/experiments.py

def params_add_dataset():
    """
    Choose the parameters you used to train your AddDataset model
    This will be used to load the model you saved.

    Returns
        model_args: a dictionary of arguments to be passed to MLP()
        trainer_args: a dictionary of arguments to be passed to Trainer()
    """

    model_args = {}
    raise NotImplementedError

    # Don't include 'model' or 'loss_func' here
    # Just "optimizer" and any necessary kwargs
    trainer_args = {}
    raise NotImplementedError

    return model_args, trainer_args


def params_multiply_dataset():
    """
    Choose the parameters you used to train your MultiplyDataset model
    This will be used to load the model you saved.

    Returns
        model_args: a dictionary of arguments to be passed to MLP()
        trainer_args: a dictionary of arguments to be passed to Trainer()
    """

    model_args = {}
    raise NotImplementedError

    # Don't include 'model' or 'loss_func' here
    # Just "optimizer" and any necessary kwargs
    trainer_args = {}
    raise NotImplementedError

    return model_args, trainer_args

## Test code

You will need to copy `models/test_load_model.pt` to your Colab environment to pass `test_load_model`.

In [9]:
# tests/test_data.py

def test_datasets():
    n = 1000
    msg = "Your dataset should provide tensors of type float32"
    for dataset_cls in [AddDataset, MultiplyDataset]:
        # Iterate through manually
        dataset = dataset_cls(num_examples=n)
        count = 0
        for i in range(len(dataset)):
            x, y = dataset[i]
            assert isinstance(x, torch.Tensor), msg
            assert x.dtype == torch.float32, msg
            assert isinstance(y, torch.Tensor), msg
            count += 1
        assert count == n

        # Iterate through using a DataLoader
        data_loader = torch.utils.data.DataLoader(
            dataset, batch_size=10, shuffle=False)
        count = 0
        for batch in data_loader:
            x, y = batch
            assert isinstance(x, torch.Tensor), msg
            assert x.dtype == torch.float32, msg
            assert isinstance(y, torch.Tensor), msg
            count += 1
        assert count == n // 10

In [10]:
# tests/test_trainer.py

class DummyModel(torch.nn.Module):
    def __init__(self, n):
        super().__init__()
        self.weights = torch.nn.Linear(1, 1, bias=False)

    def forward(self, X):
        return self.weights(torch.ones([1, 1], dtype=torch.float32))


def test_trainer_basics():
    n = 100
    target = (torch.zeros(1, dtype=torch.float32),
              torch.zeros(1, dtype=torch.float32))

    data = [target for _ in range(n)]
    model = DummyModel(n)
    data_loader = torch.utils.data.DataLoader(data, batch_size=1)

    trainer = Trainer(
        optimizer=torch.optim.SGD,
        model=model,
        loss_func=torch.nn.MSELoss(),
        lr=0.1,
    )

    # Test `run_one_batch` with a dummy example
    loss_before = trainer.eval(data_loader)[0]
    trainer.run_one_batch(
        None, torch.zeros([1, 1], dtype=torch.float32))
    loss_after = trainer.eval(data_loader)[0]
    assert loss_after < loss_before, "Loss should decrease"

    # train=False
    loss_again = trainer.train(data_loader, 10, train=False, report_every=100)
    msg = "train=False should mean no training"
    assert np.all(np.isclose(loss_after, loss_again)), msg

    # Test trainer.train with train=True
    _ = trainer.train(data_loader, 3, report_every=100)
    losses = trainer.eval(data_loader)
    msg = "DummyModel should learn zero weights"
    assert np.isclose(losses[0], 0)

    model_weight = model.weights.weight[0][0].detach()
    assert np.isclose(model_weight, 0)

In [11]:
# tests/test_save_load.py

model_args = {
    "number_of_hidden_layers": 1,
    "input_size": 2,
    "hidden_size": 1,
    "activation": torch.nn.Sigmoid(),
}

trainer_args = {
    "optimizer": torch.optim.SGD,
    "loss_func": torch.nn.MSELoss(),
    "lr": 0.1,
}


def test_load_model():
    np.random.seed(0)
    torch.manual_seed(0)
    torch.use_deterministic_algorithms(True)

    model = MLP(**model_args)

    # This model is provided with the repository;
    #    you don't need to modify it
    fn = f"models/test_load_model.pt"
    model.load_model(fn)

    # It should load correctly
    first_layer = model.net[0].weight.detach().numpy()
    reference = np.array([0.449, 0.19120623])
    assert np.allclose(first_layer[0], reference)


def test_save_load_model():
    np.random.seed(0)
    torch.manual_seed(0)
    torch.use_deterministic_algorithms(True)

    dataset = AddDataset(num_examples=100)
    data_loader = torch.utils.data.DataLoader(
        dataset, batch_size=100, shuffle=False)

    model = MLP(**model_args)
    model.initialize()

    # Train model for 2 epochs
    trainer = Trainer(model=model, **trainer_args)
    trainer.train(data_loader, 2)
    before = trainer.eval(data_loader)[0]

    # Save model
    rand = np.random.randint(10000, 99999)
    fn = f"models/model_{rand}.pt"
    model.save_model(fn)

    try:
        # Load model
        model = MLP(**model_args)
        model.load_model(fn)

        # Loss should be same before/after loading
        after = trainer.eval(data_loader)[0]
        assert np.isclose(before, after)
    finally:
        os.remove(fn)


def test_continue_training():
    # Train for ten epochs, save per-epoch losses
    np.random.seed(0)
    torch.manual_seed(0)
    torch.use_deterministic_algorithms(True)

    dataset = AddDataset(num_examples=100)
    data_loader = torch.utils.data.DataLoader(
        dataset, batch_size=100, shuffle=False)

    model = MLP(**model_args)
    model.initialize()

    trainer = Trainer(model=model, **trainer_args)
    ten_losses = trainer.train(data_loader, 10)

    # Start over and train for just five epochs
    np.random.seed(0)
    torch.manual_seed(0)

    dataset = AddDataset(num_examples=100)
    data_loader = torch.utils.data.DataLoader(
        dataset, batch_size=100, shuffle=False)

    model = MLP(**model_args)
    model.initialize()

    trainer = Trainer(model=model, **trainer_args)
    five_losses = trainer.train(data_loader, 5)

    try:
        # Save the model and trainer
        now = re.sub("[ :.-]", "_", str(datetime.datetime.now()))
        model_fn = f"models/test_model_{now}.pt"
        trainer_fn = f"models/test_trainer_{now}.pt"
        model.save_model(model_fn)
        trainer.save_trainer(trainer_fn)

        # Reload the model and the trainer
        model = MLP(**model_args)
        model.load_model(model_fn)
        trainer = Trainer(model=model, **trainer_args)
        trainer.load_trainer(trainer_fn)

        # Train for five more epochs
        np.random.seed(0)
        torch.manual_seed(0)
        five_more = trainer.train(data_loader, 5)

        # Loss should be the same whether you train once for ten
        #   epochs or twice for five epochs
        assert np.all(np.isclose(ten_losses, five_losses + five_more))
    finally:
        if os.path.exists(model_fn):
            os.remove(model_fn)
        if os.path.exists(trainer_fn):
            os.remove(trainer_fn)

In [12]:
# tests/test_model.py

def test_model_init():
    np.random.seed(0)
    torch.manual_seed(0)
    model_args = {
        "number_of_hidden_layers": 1,
        "input_size": 2,
        "hidden_size": 10,
        "activation": torch.nn.Sigmoid(),
    }

    model = MLP(**model_args)
    model.net[0].weight = torch.nn.Parameter(100 * torch.ones(10, 1))
    before = model.net[0].weight.detach().numpy().copy()

    model.initialize()
    after = model.net[0].weight.detach().numpy()

    # However you initialize, it should be with small
    #   numbers but not all zeros
    assert np.std(after) > 0
    assert np.abs(np.mean(after)) < 2


def test_add_dataset():
    np.random.seed(0)
    torch.manual_seed(0)
    torch.use_deterministic_algorithms(True)

    dataset = AddDataset(num_examples=1000)
    data_loader = torch.utils.data.DataLoader(
        dataset, batch_size=100, shuffle=False)

    # Choose your model's parameters in
    # src.experiment.py:params_add_dataset()
    #   to train a model that can do addition
    model_args, trainer_args = params_add_dataset()
    msg = "Must use torch.nn.MSELoss()"
    assert "loss_func" not in trainer_args, msg

    model = MLP(**model_args)
    model.initialize()

    trainer = Trainer(
        model=model,
        loss_func=torch.nn.MSELoss(),
        **trainer_args)

    _ = trainer.train(data_loader, 200)
    losses = trainer.eval(data_loader)
    msg = "Should learn AddDataset in 200 epochs"
    assert losses[0] < 0.1, msg


def test_saved_add_dataset():

    # Save your model here
    MODEL_FN = "models/test_saved_add_dataset.pt"
    msg = f"Save your model to {MODEL_FN}"
    assert os.path.exists(MODEL_FN), msg
    msg = f"Delete {MODEL_FN} and then save your model there."
    assert os.path.getsize(MODEL_FN) > 0, msg

    np.random.seed(0)
    torch.manual_seed(0)
    torch.use_deterministic_algorithms(True)

    dataset = AddDataset(num_examples=1000)
    data_loader = torch.utils.data.DataLoader(
        dataset, batch_size=100, shuffle=False)

    # Choose your model's parameters in
    # src.experiment.py:params_add_dataset()
    #   to train a model that can do addition
    # We need to use the same model architecture
    #   to load your saved model
    model_args, trainer_args = params_add_dataset()
    msg = "Must use torch.nn.MSELoss()"
    assert "loss_func" not in trainer_args, msg

    model = MLP(**model_args)
    model.load_model(MODEL_FN)

    trainer = Trainer(
        model=model,
        optimizer=print,
        loss_func=torch.nn.MSELoss(),
    )

    losses = trainer.eval(data_loader)
    msg = "Saved model should solve AddDataset"
    assert losses[0] < 0.1, msg


def test_saved_multiply_dataset():

    # Save your model here
    MODEL_FN = "models/test_saved_multiply_dataset.pt"
    msg = f"Save your model to {MODEL_FN}"
    assert os.path.exists(MODEL_FN), msg
    msg = f"Delete {MODEL_FN} and then save your model there."
    assert os.path.getsize(MODEL_FN) > 0, msg

    np.random.seed(0)
    torch.manual_seed(0)
    torch.use_deterministic_algorithms(True)

    dataset = MultiplyDataset(num_examples=100)
    data_loader = torch.utils.data.DataLoader(
        dataset, batch_size=100, shuffle=False)

    # Choose your model's parameters in
    # src.experiment.py:params_multiply_dataset()
    #   to train a model that can do multiplication
    # We need to use the same model architecture
    #   to load your saved model
    model_args, trainer_args = params_multiply_dataset()
    msg = "Must use torch.nn.MSELoss()"
    assert "loss_func" not in trainer_args, msg

    model = MLP(**model_args)
    model.load_model(MODEL_FN)

    trainer = Trainer(
        model=model,
        optimizer=print,
        loss_func=torch.nn.MSELoss(),
    )

    losses = trainer.eval(data_loader)
    msg = "Saved model is expected to do poorly but not as bad as initially"
    assert losses[0] < 4.1e8, msg

In [None]:
# models/save_model_without_deleting.py

def main():
    """
    This is a demo function provided just to highlight how you might
    train and save your models to pass the `test_saved_add_dataset`
    and `test_saved_multiply_dataset` cases.
   
    You may want to modify the number of examples and number of training
    epochs used in `trainer.train(...)`.
    """

    np.random.seed(0)
    torch.manual_seed(0)
    torch.use_deterministic_algorithms(True)

    outfn = "models/test_saved_add_dataset.pt"
    if os.path.exists(outfn):
        choice = input(f"Delete {outfn}? y/n ")
        if choice == "y":
            os.remove(outfn)

    dataset = AddDataset(num_examples=100)
    data_loader = torch.utils.data.DataLoader(
        dataset, batch_size=100, shuffle=False)

    model_args, trainer_args = params_add_dataset()
    model = MLP(**model_args)
    model.initialize()

    trainer = Trainer(
        model=model,
        loss_func=torch.nn.MSELoss(),
        **trainer_args)

    _ = trainer.train(data_loader, 10)
    losses = trainer.eval(data_loader)

    model.save_model(outfn)

main()

## Free-response code

In [15]:
# free_response/batch_sizes.py

def add_dataset_experiment(num_examples=1000, batch_size=100):
    """
    This is a copy of the `test_add_dataset` case in tests/test_model.py,
        set up to allow you easily tweak the batch size and dataset size.
    Run this script from the root directory of your repository with:
        `python -m free_response.batch_sizes <num_examples> <batch_size>`
    """
    np.random.seed(0)
    torch.manual_seed(0)
    torch.use_deterministic_algorithms(True)

    dataset = AddDataset(num_examples=num_examples)
    data_loader = torch.utils.data.DataLoader(
        dataset, batch_size=batch_size, shuffle=False)

    model_args, trainer_args = params_add_dataset()

    model = MLP(**model_args)
    model.initialize()

    trainer = Trainer(
        model=model,
        loss_func=torch.nn.MSELoss(),
        **trainer_args)

    start_time = time.time()
    trainer.train(data_loader, 100, report_every=100)
    end = time.time() - start_time
    print(f"Training took {end:.1f} seconds")

In [None]:
# Experiments you can run:

add_dataset_experiment(1000, 100)