In [17]:
import numpy as np
import torch
from torch import nn
import matplotlib.pyplot as plt


Create device agnostic code

In [18]:
FORCE_CPU = False
device = "cpu"
if torch.cuda.is_available() and not FORCE_CPU:
    device = "cuda"
elif torch.backends.mps.is_available() and not FORCE_CPU:
    device = "mps"
torch.device(device)
print(f"Using device={device}")

Using device=mps



### Data

Create some linearly shaped data

In [19]:
weight =  0.7  # the slope
bias = 0.3  # the intercept/offset

# Create range values
start = 0
end = 1
step = 0.02

X = torch.arange(start,end,step).unsqueeze(dim=1)
y = weight * X + bias


In [20]:
# Split data 
train_split = int(0.8 * len(X))
X_train, y_train =  X[:train_split], y[:train_split]
X_test, y_test = X[train_split:], y[train_split:]

# YOU NEED TO LOAD DATA ONTO THE DEVICE !!!!!!!!!
X_train = X_train.to(device)
y_train = y_train.to(device)
X_test = X_test.to(device)
y_test = y_test.to(device)


In [29]:
# Plot data using a helper function
# If the data was loaded onto anything other than the CPU, it needs to be loaded back so that it can be converted back to numpy
def plot_predictions(train_data, train_labels, test_data, test_labels, predictions=None):
    plt.figure(figsize = (10,7))
    plt.scatter(train_data.cpu(), train_labels.cpu(), c="b", s=4, label="Training Data")
    plt.scatter(test_data.cpu(), test_labels.cpu(), c="g", s=4, label="Test data")
    if predictions is not None:
        plt.scatter(test_data.cpu(), predictions.cpu(), c="r", s=4, label="Predictions")
    plt.legend(prop={"size":14})

# plot_predictions(X_train, y_train, X_test, y_test)


### Build linear model

In [22]:
# Create a linear model
class LinearModel(nn.Module):
    def __init__(self):
        super().__init__()
        # Use linear layer (aka fully connected layer) nn.Linear to apply a linear transformation y=wX+b
        self.linear_layer = nn.Linear(in_features=1, out_features=1)
        # Features are the "parameters" of the model
        # in_features are X_i
        # out_features are y
        # You check using model.parameters()

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.linear_layer(x)
    
# Set the manual seed
torch.manual_seed(42)

# Instantiate model
model = LinearModel()

# Inspect parameters
model.state_dict()



OrderedDict([('linear_layer.weight', tensor([[0.7645]])),
             ('linear_layer.bias', tensor([0.8300]))])

Without using nn.Linear, you'd have to initialise the parameters manually.
The gist of it is that given a random weight and random bias the model will eventually optimize:

```python
self.weights = nn.Parameter(torch.randn(
    1,
    requires_grad=True,
    dtype=torch.float
))

...
def forward(x):
    return x*self.weights + self.bias
```

Everything in python is an object. `linear_layer` is an instance of `nn.Linear`, and in the class `nn.Linear` there is a `__call__()` function that returns the linear transformation. So by calling the instance you are calling the inbuilt transform method in nn.Linear as defined in its `forward()`.

In [23]:
# Set the model to use target device
model.to(device=device)
list(model.parameters())

[Parameter containing:
 tensor([[0.7645]], device='mps:0', requires_grad=True),
 Parameter containing:
 tensor([0.8300], device='mps:0', requires_grad=True)]

### Training Loop/Code

Consists of:
- Loss function
- Optimizer (gradient descent/Adam/etc)
- Step (Loop)

The training algorithm is as such:
In model training mode:
    make predictions by feeding X_train into model
    calculate the difference between predicted and actual (quantified by loss) using the loss function
    zero gradients
    backprop using loss.backward() WHY?
    modify gradient with optimizer.step()

In [30]:
# Select a loss function
loss_fn = nn.L1Loss()  # same as MAE (mean absolute error)

# Select an optimizer to use and the model parameters that you want to optimize
optimizer = torch.optim.SGD(params=model.parameters(),lr=0.01)

# Begin the training loop
epochs = 200
for i in range(epochs):
    model.train() # Train mode
    y_pred = model(X_train)
    loss = loss_fn(y_pred, y_train)
    optimizer.zero_grad()
    loss.backward() # links training labels to predictions
    optimizer.step() # links to model params

    model.eval() # Test mode
    with torch.inference_mode():
        test_pred = model(X_test)
        test_loss = loss_fn(test_pred, y_test)
        
    if i%10==0:
        print(f"Epoch: {i}, Loss: {loss}, test loss: {test_loss}")


Epoch: 0, Loss: 0.000776502478402108, test loss: 0.012743920087814331
Epoch: 10, Loss: 0.000776502478402108, test loss: 0.012743920087814331
Epoch: 20, Loss: 0.000776502478402108, test loss: 0.012743920087814331
Epoch: 30, Loss: 0.000776502478402108, test loss: 0.012743920087814331
Epoch: 40, Loss: 0.000776502478402108, test loss: 0.012743920087814331
Epoch: 50, Loss: 0.000776502478402108, test loss: 0.012743920087814331
Epoch: 60, Loss: 0.000776502478402108, test loss: 0.012743920087814331
Epoch: 70, Loss: 0.000776502478402108, test loss: 0.012743920087814331
Epoch: 80, Loss: 0.000776502478402108, test loss: 0.012743920087814331
Epoch: 90, Loss: 0.000776502478402108, test loss: 0.012743920087814331
Epoch: 100, Loss: 0.000776502478402108, test loss: 0.012743920087814331
Epoch: 110, Loss: 0.000776502478402108, test loss: 0.012743920087814331
Epoch: 120, Loss: 0.000776502478402108, test loss: 0.012743920087814331
Epoch: 130, Loss: 0.000776502478402108, test loss: 0.012743920087814331
Epo

### Saving and loading a model

In [45]:
from pathlib import Path

model_subdir = Path('model')
model_subdir.mkdir(parents=True, exist_ok=True)
model_name = Path('linear_model_v0.pth') ## model extensions are given either as .pt or .pth
torch.save(model.state_dict(), model_subdir / model_name)

> Note: If you save as `model.state_dict` instead of `model.state_dict()`, you are saving to file instead the METHOD object which calls the dictionary and not the dictionary itself.

> Loading a model

In [46]:
# Instantiate a model with untuned weights
loaded_model = LinearModel()
loaded_dict = torch.load(f=model_subdir/model_name)
loaded_model.load_state_dict(loaded_dict)

# Load the model onto the device
loaded_model.to(device=device)

loaded_model.state_dict()


OrderedDict([('linear_layer.weight', tensor([[0.6999]], device='mps:0')),
             ('linear_layer.bias', tensor([0.3008], device='mps:0'))])