**TODO : LOADING OUR DATA**

In [1]:
import numpy as np
from sklearn.datasets import load_diabetes
from sklearn.model_selection import train_test_split

# Load diabetes dataset
diabetes = load_diabetes()
X, Y = diabetes.data, diabetes.target

# TODO: Split the data into a 80%-20% training-testing split
X0, X1, Y0, Y1 = train_test_split(X, Y, test_size=0.2, random_state=42)

# TODO: Reshape the Y subsets to have shape (num_samples, 1)
Y0 = Y0.reshape(-1, 1)
Y1 = Y1.reshape(-1, 1)

# Print out the shapes
print(f"""
> Input shape: {X0.shape} for training, {X1.shape} for testing
> Label shape: {Y0.shape} for training, {Y1.shape} for testing
""".strip())

> Input shape: (353, 10) for training, (89, 10) for testing
> Label shape: (353, 1) for training, (89, 1) for testing


In [2]:
import torch
import torch.nn as nn

class Regression(nn.Module):

    """
    Initialize all the inherent "things" inside of a model!
    This includes things like the layers, activation/loss functions, and optimzer.
    """
    def __init__(self, input_dims, output_dims):
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        super().__init__()
        self.dense = nn.Linear(input_dims, output_dims).to(self.device)
        self.activation = None  ## To be specified in subclasses
        self.loss = None        ## To be specified in subclasses
        self.set_learning_rate()

    """
    Sets up the optmizer
    """
    def set_learning_rate(self, learning_rate=0.001):
        self.optimizer = torch.optim.SGD(self.parameters(), lr=learning_rate) ## Simple stochastic gradient descent (SGD) optimizer

    """
    Forward pass of the model
    Given an input x, how does the model process the input to get its output?
    """
    def forward(self, x):
        x = self.dense(x)
        x = self.activation(x)
        return x

In [3]:
class TrainTest:

    no_grad = torch.no_grad

    def fit(self, data):
        ## Training loop
        self.train()        ## Set model into training mode
        ## Iterate over the data batches
        for batch, (inputs, target) in enumerate(data):
            ## In real pytorch, you'd need to set the device
            inputs = inputs.to(self.device)
            target = target.to(self.device)
            ## Erase the gradient history
            self.optimizer.zero_grad()
            ## Do a forward pass on the model
            output = self(inputs)
            ## Compute the loss
            loss = self.loss(output, target)
            ## Run backwards pass from the loss through the previous layers
            ## This will accumulate gradients for the parameters that need to be optimized
            loss.backward()
            ## Perform a single optimization step
            self.optimizer.step()
        return {'loss' : loss}

    def evaluate(self, data):
        ## Set model into "evaluate" mode so that the parameters don't get updated
        self.eval()
        total_loss = 0
        ## Cut off the tensor training scope to make sure weights aren't updated
        ## For now, it's torch.no_grad; later, you'll use Tensor.no_grad
        with TrainTest.no_grad():
            for inputs, target in data:
                ## In real pytorch, you'd need to set the device
                inputs = inputs.to(self.device)
                target = target.to(self.device)
                output = self(inputs)
                total_loss += self.loss(output, target).item()  # sum up batch loss

        total_loss /= len(data)
        return {'test_loss' : total_loss}

    def train_test(self, train_data, test_data, epochs=1):
        ## Does both training and validation on a per-epoch basis
        all_stats = []
        for epoch in range(epochs):
            train_stats = self.fit(train_data)
            test_stats = self.evaluate(test_data)
            all_stats += [{**train_stats, **test_stats}]
            print(f'[Epoch {epoch+1}/{epochs}]', all_stats[-1])
        return all_stats

In [4]:
class LinearRegression(Regression, TrainTest):
    def __init__(self, input_dims, output_dims):
        super().__init__(input_dims, output_dims)
        self.activation = nn.Identity()
        self.loss = nn.MSELoss()

In [5]:
torch_model = LinearRegression(X0.shape[-1], 1)
torch_model.set_learning_rate(0.3)
torch_model_stats = torch_model.train_test(
    [[torch.Tensor(X0), torch.Tensor(Y0)]],
    [[torch.Tensor(X1), torch.Tensor(Y1)]],
    epochs=200
);

[Epoch 1/200] {'loss': tensor(29722.4570, grad_fn=<MseLossBackward0>), 'test_loss': 8145.8701171875}
[Epoch 2/200] {'loss': tensor(9832.9111, grad_fn=<MseLossBackward0>), 'test_loss': 5531.677734375}
[Epoch 3/200] {'loss': tensor(6630.8345, grad_fn=<MseLossBackward0>), 'test_loss': 5234.345703125}
[Epoch 4/200] {'loss': tensor(6098.8306, grad_fn=<MseLossBackward0>), 'test_loss': 5223.90185546875}
[Epoch 5/200] {'loss': tensor(5994.2090, grad_fn=<MseLossBackward0>), 'test_loss': 5226.119140625}
[Epoch 6/200] {'loss': tensor(5958.1499, grad_fn=<MseLossBackward0>), 'test_loss': 5217.24169921875}
[Epoch 7/200] {'loss': tensor(5933.2441, grad_fn=<MseLossBackward0>), 'test_loss': 5201.47705078125}
[Epoch 8/200] {'loss': tensor(5910.3037, grad_fn=<MseLossBackward0>), 'test_loss': 5182.68310546875}
[Epoch 9/200] {'loss': tensor(5887.8564, grad_fn=<MseLossBackward0>), 'test_loss': 5162.744140625}
[Epoch 10/200] {'loss': tensor(5865.6650, grad_fn=<MseLossBackward0>), 'test_loss': 5142.4643554687

In [6]:
y_true = torch.Tensor(Y0)
y_pred = torch_model(torch.Tensor(X0))
loss = torch_model.loss(y_true, y_pred)

print(f"""
> Prediction Shape: {y_pred.shape}
> Weights    Shape: {list(torch_model.parameters())[0].shape}
> Bias       Shape: {list(torch_model.parameters())[1].shape}
> Loss       Shape: {loss.shape}
""".strip())

> Prediction Shape: torch.Size([353, 1])
> Weights    Shape: torch.Size([1, 10])
> Bias       Shape: torch.Size([1])
> Loss       Shape: torch.Size([])


In [7]:
class Tensor(np.ndarray):

    '''
    Subclassing numpy arrays is a bit weird:
    https://numpy.org/doc/stable/user/basics.subclassing.html

    Just assume that the attributes referred to in __new__/__array_finalize__
    will be accessible in a Tensor when a new Tensor object is created.
    '''

    requires_grad = True  ## Class variable; accessible by Tensor.requires_grad

    def __new__(cls, input_array):
        obj = np.asarray(input_array).view(cls)
        obj.backward = lambda x: None   ## Backward starts as None, gets assigned later
        obj.grad = None                 ## Gradient starts as None, gets computed later
        obj.requires_grad = True        ## By default, we'll want to compute gradient for new tensors
        obj.to = lambda x: obj          ## We don't handle special device support (i.e. cpu vs gpu/cuda)
        return obj

    def __array_finalize__(self, obj):
        if obj is None: return
        self.backward       = getattr(obj, 'backward',      lambda x: None)
        self.to             = getattr(obj, 'to',            lambda x: obj)
        self.grad           = getattr(obj, 'grad',          None)
        self.requires_grad  = getattr(obj, 'requires_grad', None)

    class no_grad():

        '''
        Synergizes with Tensor: By entering the tensor with no_grad scope,
        the Tensor.requires_grad singleton will swap to False.
        '''

        def __enter__(self):
            # When tape scope is entered, stop asking tensors to record gradients
            Tensor.requires_grad = False
            return self

        def __exit__(self, exc_type, exc_val, exc_tb):
            # When tape scope is exited, let Diffable start recording to self.operation
            Tensor.requires_grad = True

In [8]:
from abc import ABC, abstractmethod  # # For abstract method support

class Diffable(ABC):
    """
        We use these to represent differentiable layers which we can compute gradients for.
    """

    def to(self, device):
        return self         # Just there to ignore device setting calls

    def __call__(self, *args, **kwargs):

        ## The call method keeps track of method inputs and outputs
        self.argnames   = self.forward.__code__.co_varnames[1:]
        named_args      = {self.argnames[i] : args[i] for i in range(len(args))}
        self.input_dict = {**named_args, **kwargs}
        self.inputs     = [self.input_dict[arg] for arg in self.argnames if arg in self.input_dict.keys()]
        self.outputs    = self.forward(*args, **kwargs)

        ## Make sure outputs are tensors and tie back to this layer
        list_outs = isinstance(self.outputs, list) or isinstance(self.outputs, tuple)
        if not list_outs:
            self.outputs = [self.outputs]
        self.outputs = [Tensor(out) for out in self.outputs]
        for out in self.outputs:
            out.backward = self.backward

        # print(self.__class__.__name__.ljust(24), [v.shape for v in self.inputs], '->', [v.shape for v in self.outputs])

        ## And then finally, it returns the output, thereby wrapping the forward
        return self.outputs if list_outs else self.outputs[0]

    def parameters(self):
        """Returns a list of parameters"""
        return ()

    @abstractmethod
    def forward(self, x):
        """Pass inputs through function. Can store inputs and outputs as instance variables"""
        pass

    @abstractmethod
    def input_gradients(self):
        """Returns local gradient of layer output w.r.t. input"""
        pass

    def weight_gradients(self):
        """Returns local gradient of layer output w.r.t. weights"""
        return []

    @abstractmethod
    def backward(self, grad=np.array([[1]])):
        """
        Propagate upstream gradient backwards by composing with local gradient

        SCAFFOLD:

        Differentiate with respect to layer parameters:
            For every param-gradient pair
            - If all Tensors or this tensor do not require gradients, then skip
            - Otherwise, compose upstream and local gradient

        Differentiate with respect to layer input:
            For every input-gradient pair
            - If all Tensors or this tensor do not require gradients, then skip
            - Otherwise, compose upstream and local gradient

        Usefulseful print boilerplate...:
            # print(f'Diffing w.r.t. "{k}": local = {g.shape} and upstream = {grad.shape}')
        """
        pass

**TODO : Loss**

In [9]:
class MSELoss(Diffable):
    """
    Calculates mean squared error loss and gradient w.r.t. inputs.
    Subclasses Diffable.
    """

    def forward(self, y_pred, y_true):
        """Mean squared error forward pass!"""
        # Compute the MSE given predicted and actual labels
        self.y_pred = y_pred
        self.y_true = y_true
        # MSE formula: (1/n) * sum((y_pred - y_true)^2)
        mse = np.mean((y_pred - y_true) ** 2)
        return mse

    def input_gradients(self):
        """Mean squared error backpropagation!"""
        # Compute the gradient of MSE w.r.t y_pred and y_true
        n = len(self.y_pred)
        grad_y_pred = (2 / n) * (self.y_pred - self.y_true)
        grad_y_true = np.zeros_like(self.y_true)  # Gradients w.r.t. y_true are zero
        return grad_y_pred, grad_y_true

    def backward(self, grad=np.array([[1]])):
        """Mean squared error backpropagation!"""
        # Differentiate with respect to layer inputs
        grad_y_pred, grad_y_true = self.input_gradients()

        # Compose the upstream gradient with this input's gradient
        grad_composed_pred = grad * grad_y_pred

        # Set the gradient of y_pred tensor if it requires gradient
        if self.y_pred.requires_grad:
            self.y_pred.grad = grad_composed_pred

        # y_true doesn't require gradients, but we handle it for completeness
        if self.y_true.requires_grad:
            self.y_true.grad = grad_y_true

        # Pass the composed gradient backward (no further backward in this case)
        return grad_composed_pred


In [10]:
class con:
    ## Control set using default PyTorch
    ytrue = torch.Tensor(Y0)
    ypred = torch_model(torch.Tensor(X0))
    loss_fn = nn.MSELoss()

class exp:
    ## Experimental set using your own implementation
    ytrue = Tensor(Y0)
    ypred = Tensor(con.ypred.detach().numpy())
    loss_fn = MSELoss()

def ypred_to_loss(ns):
    ## Compute loss using the control and experimental namespaces
    ns.loss = ns.loss_fn(ns.ypred, ns.ytrue)
    return ns.loss

## Sanity Check 1: Make sure that the forward pass is the same (i.e. your implementation matches the control)
print(ypred_to_loss(con))
print(ypred_to_loss(exp))

tensor(3839.5854, grad_fn=<MseLossBackward0>)
3839.5852480675444


In [11]:
## Sanity Check 2: Make sure that the backwards pass is the same

con.ypred = con.ypred.detach()
con.ypred.requires_grad = True
#print("Before running backwards:\n", con.ypred.grad)
ypred_to_loss(con)
con.loss.backward()
#print("After running backwards:\n", con.ypred.grad)

exp.ypred.grad = None
#print("Before running backwards:\n", np.round(exp.ypred.grad, 4))
ypred_to_loss(exp)
exp.loss.backward()
#print("After running backwards:\n", np.round(exp.ypred.grad, 4))

max_diff = np.max(exp.ypred.grad - con.ypred.grad.detach().numpy())
print(f"Maximum difference {max_diff} should be less than 0.00001")

Maximum difference 3.6640815603838917e-08 should be less than 0.00001


**LINEAR LAYER**

In [12]:
import numpy as np

class Linear(Diffable):
    """
    Standard linear/dense layer.
    Subclasses Diffable.
    """

    def __init__(self, in_features, out_features, device=None, dtype=None):
        self.w, self.b = self.__class__._initialize_weight(in_features, out_features)
        self.inputs = None
        self.grad_w = None  # Gradient of weights
        self.grad_b = None  # Gradient of biases

    def parameters(self):
        return [self.w, self.b]

    def forward(self, inputs):
        """
        Forward pass for a dense layer: Y = XW + B
        Inputs: (batch_size, in_features)
        Weights: (in_features, out_features)
        Bias: (1, out_features)
        """
        self.inputs = inputs  # Store input for backprop

        # Perform matrix multiplication: inputs @ weights + bias
        output = np.dot(inputs, self.w) + self.b
        return output

    def weight_gradients(self):
        dw = np.dot(self.inputs.T, self.grad_w)  # Gradient w.r.t weights
        db = np.sum(self.grad_w, axis=0, keepdims=True)  # Gradient w.r.t biases
        return dw, db

    def input_gradients(self):
        return np.dot(self.grad_w, self.w.T)

    def backward(self, grad=np.array([[1]])):
        print("Entering backward function with grad:", grad)
        self.grad_w = grad  # Store upstream gradient for weight gradient calculation
        dw, db = self.weight_gradients() # Compute gradients for weights and biases
        print("Computed weight gradient (dw):", dw)
        print("Computed bias gradient (db):", db)
        # Store the gradients (without updating the parameters directly)
        self.w.grad = dw  # Store gradient for weights
        self.b.grad = db  # Store gradient for biases
        print("Assigned weight gradient to self.w.grad:", self.w.grad)
        print("Assigned bias gradient to self.b.grad:", self.b.grad)
        # Return the gradient to propagate backwards to earlier layers
        return self.input_gradients()

    @staticmethod
    def _initialize_weight(input_size, output_size):
        # Correct weight shape to (input_size, output_size)
        w = np.random.normal(0, 1, (input_size, output_size))  # (in_features, out_features)
        b = np.zeros((1, output_size))  # Bias is (1, out_features)
        return w, b



In [13]:
import torch
import torch.nn as nn
import numpy as np

# Example usage
class con:
    X0 = torch.tensor(X0, dtype=torch.float32, requires_grad=True)  # Set requires_grad correctly
    Y0 = torch.tensor(Y0, dtype=torch.float32, requires_grad=True)  # Set requires_grad correctly
    dense = nn.Linear(10, 1)  # Control model
    loss_fn = nn.MSELoss()

class exp:
    X0 = torch.tensor(X0, dtype=torch.float32, requires_grad=True)  # Set requires_grad correctly
    Y0 = torch.tensor(Y0, dtype=torch.float32, requires_grad=True)  # Set requires_grad correctly
    dense = nn.Linear(10, 1)  # Experimental model
    dense.weight = nn.Parameter(con.dense.weight.detach().clone())  # Use Parameter for weights
    dense.bias = nn.Parameter(con.dense.bias.detach().clone())      # Use Parameter for bias
    loss_fn = nn.MSELoss()

def x_to_loss(ns):
    ns.ypred = ns.dense(ns.X0)
    ns.loss = ns.loss_fn(ns.ypred, ns.Y0)
    #print("Loss inside x_to_loss:", ns.loss)
    #print("Loss requires_grad:", ns.loss.requires_grad)
    return ns.loss

# Call x_to_loss and print outputs
loss_con = x_to_loss(con)
loss_exp = x_to_loss(exp)

# Sanity checks
print(f"Maximum difference: {np.max(con.ypred.detach().numpy() - exp.ypred.detach().numpy())} should be less than 0.00001\n")
print(f"Losses: Control {loss_con.item()} vs Experimental {loss_exp.item()}")

# Print parameter values for debugging
print('\nControl Params:', *list(con.dense.parameters()), sep='\n')
print('\nExperimental Params:', *list(exp.dense.parameters()), sep='\n')


Maximum difference: 0.0 should be less than 0.00001

Losses: Control 29699.373046875 vs Experimental 29699.373046875

Control Params:
Parameter containing:
tensor([[ 0.0421, -0.1437, -0.2759,  0.2651, -0.2627,  0.0977, -0.2938,  0.1480,
          0.0639,  0.2266]], requires_grad=True)
Parameter containing:
tensor([0.0327], requires_grad=True)

Experimental Params:
Parameter containing:
tensor([[ 0.0421, -0.1437, -0.2759,  0.2651, -0.2627,  0.0977, -0.2938,  0.1480,
          0.0639,  0.2266]], requires_grad=True)
Parameter containing:
tensor([0.0327], requires_grad=True)


In [14]:
## Sanity Check 2: Make sure that the backwards pass is the same

con.X0 = con.X0.detach()
con.Y0 = con.Y0.detach()
for p in con.dense.parameters():
    if p.grad is None: continue
    p.grad.detach_()
    p.grad = None

x_to_loss(con).backward()
print("After running backwards on weights:")
print([p.grad for p in con.dense.parameters()])

for p in exp.dense.parameters(): p.grad = None
x_to_loss(exp).backward()

print("\n" + "*" * 100 + "\n")
print("After running backwards on weights:")
print([p.grad for p in exp.dense.parameters()])

After running backwards on weights:
[tensor([[-1.8602, -0.1088, -4.9841, -3.7174, -1.3137, -0.9007,  3.1145, -3.2770,
         -4.4843, -3.5200]]), tensor([-307.4061])]

****************************************************************************************************

After running backwards on weights:
[tensor([[-1.8602, -0.1088, -4.9841, -3.7174, -1.3137, -0.9007,  3.1145, -3.2770,
         -4.4843, -3.5200]]), tensor([-307.4061])]


**STOCHASTIC GRADIENT DESCENT**

In [15]:
import torch
import numpy as np

# SGD Optimizer Implementation
class SGD:
    """
    Performs stochastic gradient descent with the specified learning rate.
    """
    def __init__(self, params, lr, *args, **kwargs):
        self.params = params
        self.lr = lr

    def zero_grad(self):
        """
        Reset the gradients for all parameters.
        """
        for param in self.params:
            if param.grad is not None:
                param.grad.zero_()

    def step(self):
        """
        Update parameters by subtracting the gradient multiplied by the learning rate.
        """
        for param in self.params:
            if param.grad is not None:
                param.data -= self.lr * param.grad

# FakeTorchModule provided earlier
class FakeTorchModule:
    """
        Needed so that we can do manual linear regression.
    """

    def __init__(self):
        self.device = ""

    def __call__(self, *args, **kwargs):
        return self.forward(*args, **kwargs)

    def to(self, device):
        return self

    def parameters(self):
        params = []
        for k, v in self.__dict__.items():
            params += getattr(v, 'parameters', lambda: [])()
        return params

    def train(self):
        for p in self.parameters():
            p.requires_grad = getattr(p, 'required_grad', p.requires_grad)

    def eval(self):
        for p in self.parameters():
            p.required_grad = p.requires_grad
            p.requires_grad = False

class ManualRegression(FakeTorchModule):
    """
    Allows us to use our custom Linear layer and SGD optimizer.
    Subclasses FakeTorchModule
    """

    def __init__(self, input_dims, output_dims):
        super().__init__()
        # Initialize the weights and bias for the linear layer manually
        self.w = torch.randn(input_dims, output_dims, requires_grad=True)
        self.b = torch.randn(output_dims, requires_grad=True)
        self.set_learning_rate()

    def set_learning_rate(self, learning_rate=0.001):
        # Use custom SGD
        self.optimizer = SGD([self.w, self.b], lr=learning_rate)

    def forward(self, x):
        # Linear layer computation: y = x @ w + b
        return x @ self.w + self.b

    def step(self, X, Y, loss_fn):
        # Forward pass
        y_pred = self.forward(X)

        # Calculate loss
        loss = loss_fn(y_pred, Y)

        # Zero gradients
        self.optimizer.zero_grad()

        # Backward pass
        loss.backward()

        # Step optimizer
        self.optimizer.step()

        return loss.item()

class TrainTest2:
    def __init__(self):
        pass

    def train_test(self, train_data, test_data, epochs=100):
        train_losses = []
        test_losses = []
        for epoch in range(epochs):
            X_train, Y_train = train_data[0]
            loss = self.step(X_train, Y_train, torch.nn.MSELoss())
            train_losses.append(loss)

            if epoch % 50 == 0:
                print(f"Epoch {epoch} / {epochs} - Train Loss: {loss}")

        return train_losses

class ManualLinearRegression(ManualRegression, TrainTest2):
    def __init__(self, input_dims, output_dims):
        super().__init__(input_dims, output_dims)
        ## Now the model is initialized with custom SGD and linear layer

# Initialize Manual Linear Regression Model
model = ManualLinearRegression(10, 1)
model.set_learning_rate(0.2)

X0 = torch.tensor(X0, dtype=torch.float32)
Y0 = torch.tensor(Y0, dtype=torch.float32)
X1 = torch.tensor(X1, dtype=torch.float32)
Y1 = torch.tensor(Y1, dtype=torch.float32)

# Train the model
manual_model_stats = model.train_test([[X0, Y0]], [[X1, Y1]], epochs=200)


Epoch 0 / 200 - Train Loss: 29209.75
Epoch 50 / 200 - Train Loss: 5394.9453125
Epoch 100 / 200 - Train Loss: 4887.755859375
Epoch 150 / 200 - Train Loss: 4510.90087890625


**Compare this model's performance to the first linear regression model**

In [16]:
# Extract final training and testing losses
torch_final_train_loss = torch_model_stats[-1]['loss']

manual_final_train_loss = manual_model_stats[-1]

# Print comparison
print(f"""
Comparison of PyTorch and Manual FakeTorch Model (SGD):
-----------------------------------------------------
PyTorch Model Final Training Loss: {torch_final_train_loss}
Manual FakeTorch Model Final Training Loss: {manual_final_train_loss}

""")


Comparison of PyTorch and Manual FakeTorch Model (SGD):
-----------------------------------------------------
PyTorch Model Final Training Loss: 3844.04296875
Manual FakeTorch Model Final Training Loss: 4232.11962890625


