Part 1: Implementing a neural network from scratch

In [18]:
import numpy as np
import matplotlib.pyplot as plt

class Conv2D:
    def __init__(self, in_channels, out_channels, kernel_size):
        self.kernel_size = kernel_size
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.weights = np.random.randn(out_channels, in_channels, kernel_size, kernel_size).astype(np.float32) / (kernel_size * kernel_size)
        self.bias = np.zeros((out_channels, 1), dtype=np.float32)

    def forward(self, x):
        self.x = x
        batch_size, in_channels, in_height, in_width = x.shape
        out_height = in_height - self.kernel_size + 1
        out_width = in_width - self.kernel_size + 1
        out = np.zeros((batch_size, self.out_channels, out_height, out_width), dtype=np.float32)

        for i in range(out_height):
            for j in range(out_width):
                #receptive_field = x[:, :, i:i+self.kernel_size, j:j+self.kernel_size]
                receptive_field = np.array(x[:, :, i:i+self.kernel_size, j:j+self.kernel_size])

                #w = self.weights.reshape(receptive_field.shape)
                #z = receptive_field * self.weights
                c = np.einsum('ijkl,mjkl->mijkl', receptive_field, self.weights)
                c = np.sum(c, axis=(0,2,3,4))
                #print(z.shape)
                #y = np.sum(z, axis=(1,2,3)) 
                #print("y.shape:",y.shape)
                #out[:, :, i, j] = np.sum(receptive_field * self.weights, axis=(1,2,3)) + self.bias
                c = c+self.bias
                out[:, :, i, j] = c.transpose(1,0)

                #break
                
        return out

    def backward(self, grad, learning_rate):
        batch_size, in_channels, in_height, in_width = self.x.shape
        out_height = in_height - self.kernel_size + 1
        out_width = in_width - self.kernel_size + 1

        d_weights = np.zeros_like(self.weights, dtype=np.float32)
        d_bias = np.zeros_like(self.bias, dtype=np.float32)
        dx = np.zeros_like(self.x, dtype=np.float32)

        for i in range(out_height):
            for j in range(out_width):
                receptive_field = self.x[:, :, i:i+self.kernel_size, j:j+self.kernel_size]
                d_weights += np.sum(receptive_field * (grad[:, :, i, j])[:, :, np.newaxis, np.newaxis], axis=0)
                grad = np.array(grad)
                # print("grad_shape:", grad.shape)
                # print("d_bias shape:", d_bias.shape)
                # print("shape: ", np.sum(grad[:, :, i, j], axis=0, keepdims=True).shape)
                d_bias += (np.sum(grad[:, :, i, j], axis=0, keepdims=True)).transpose(1,0)
                # print("d_bias shape:", d_bias.shape)
                
                # print((grad[:, :, i, j])[:, :, np.newaxis, np.newaxis].shape)
                # print(self.weights.shape)
                # print("dx:", dx.shape)
                dx[:, :, i:i+self.kernel_size, j:j+self.kernel_size] += np.sum((grad[:, :, i, j])[:, :, np.newaxis, np.newaxis] * self.weights, axis=1)

        self.weights -= learning_rate * d_weights / batch_size
        self.bias -= learning_rate * d_bias / batch_size

        return dx
    
class MaxPool2D:
    def __init__(self, kernel_size):
        self.kernel_size = kernel_size

    def forward(self, x):
        batch_size, in_channels, in_height, in_width = x.shape
        out_height = in_height // self.kernel_size
        out_width = in_width // self.kernel_size
        out = np.zeros((batch_size, in_channels, out_height, out_width), dtype=np.float32)

        for i in range(out_height):
            for j in range(out_width):
                receptive_field = x[:, :, i*self.kernel_size:(i+1)*self.kernel_size, j*self.kernel_size:(j+1)*self.kernel_size]

                out[:, :, i, j] = np.amax(receptive_field, axis=(2,3))

        return out

    def backward(self, grad):
        batch_size, in_channels, in_height, in_width = grad.shape
        out_height = in_height * self.kernel_size
        out_width = in_width * self.kernel_size
        dx = np.zeros((batch_size, in_channels, out_height, out_width), dtype=np.float32)

        for i in range(in_height):
            for j in range(in_width):
                start_i = i * self.kernel_size
                start_j = j * self.kernel_size
                end_i = start_i + self.kernel_size
                end_j = start_j + self.kernel_size
                receptive_field = dx[:, :, start_i:end_i, start_j:end_j]
                mask = (receptive_field == np.max(receptive_field, axis=(2,3))[:, :, None, None])
                dx[:, :, start_i:end_i, start_j:end_j] += mask * (grad[:, :, i, j])[:, :, None, None]

        return dx


class Linear:
    def __init__(self, in_features, out_features):
        self.weights = np.random.randn(out_features, in_features).astype(np.float32) * np.sqrt(2 / in_features)
        self.bias = np.zeros((out_features, 1), dtype=np.float32)

    def forward(self, x):
        self.x = x
        batch_size = x.shape[0]
        
        out = np.dot(np.array(x).reshape(batch_size, -1), self.weights.T) + self.bias.T
        return out

    def backward(self, grad, learning_rate):
        batch_size = self.x.shape[0]
        dx = np.dot(grad, self.weights)
        dx = dx.reshape(batch_size, *self.x.shape[1:])
        d_weights = np.dot(grad.T, self.x.reshape(batch_size, -1))
        #c = np.sum(grad, axis=(1))
        print(grad.shape)
        grad = np.array(grad)
        d_bias = np.sum(grad, axis=0, keepdims=True).T
        self.weights -= learning_rate * d_weights / batch_size
        self.bias -= learning_rate * d_bias / batch_size
        return dx
        # d_weights = np.dot(grad.T, self.x) / self.x.shape[0]
        # d_bias = np.sum(grad, axis=0) / self.x.shape[0]
        # dx = np.dot(grad, self.weights)
        # self.weights -= learning_rate * d_weights
        # self.bias -= learning_rate * d_bias.reshape(-1, 1)
        # return dx
    
import torch.nn as nn

class Net:
    def __init__(self, learning_rate=0.001):
        self.conv1 = Conv2D(3, 32, kernel_size=3)
        self.pool1 = MaxPool2D(kernel_size=2)
        self.conv2 = Conv2D(32, 64, kernel_size=5)
        self.pool2 = MaxPool2D(kernel_size=2)
        self.conv3 = Conv2D(64, 64, kernel_size=3)
        self.fc1 = Linear(64 * 3 * 3, 64)
        self.fc2 = Linear(64, 10)
        self.loss_fn = nn.CrossEntropyLoss()
        self.softmax = nn.Softmax(dim=1)
        #self.softmax = nn.Softmax(dim=1)
        self.learning_rate = learning_rate
        self.layers = [self.conv1, self.pool1, self.conv2, self.pool2, self.conv3, self.fc1, self.fc2]

        # Adam optimizer parameters
        self.beta1 = 0.9
        self.beta2 = 0.999
        self.eps = 1e-8
        self.t = 0
        self.m = {}
        self.v = {}
        for layer in self.layers:
            if hasattr(layer, 'weights'):
                self.m[layer] = np.zeros_like(layer.weights)
                self.v[layer] = np.zeros_like(layer.weights)
    
    def forward(self, x):
        x = self.conv1.forward(x)

        x = np.maximum(x, 0)  # ReLU activation function
        print("conv1 done")
        x = self.pool1.forward(x)
        print("pool1 done")
        x = self.conv2.forward(x)
        
        x = np.maximum(x, 0)  # ReLU activation function
        print("conv2 done")
        print(x.shape)
        x = self.pool2.forward(x)
        print("pool2 done") 
        x = self.conv3.forward(x)
        x = np.maximum(x, 0)
        print("conv3 done")
        x = x.reshape(x.shape[0], -1)
        #print(x.shape)
        #x = x.view(-1, 64 * 8 * 8)
        x = self.fc1.forward(x)
        x = np.maximum(x, 0)  # ReLU activation function
        print("fc1 done")
        x = self.fc2.forward(x)
        print("fc2 done")
        x = torch.from_numpy(x)
        x = self.softmax(x)
        print("soft done")
        return x  
    
    """def backward(self, grad):
        grad = self.fc2.backward(grad, self.learning_rate)
        grad = np.maximum(grad, 0)  # ReLU derivative
        grad = self.fc1.backward(grad, self.learning_rate)
        grad = np.maximum(grad, 0)  # ReLU derivative
        grad = grad.reshape(grad.shape[0], self.conv2.out_channels, 5, 5)
        grad = self.pool2.backward(grad)
        grad = np.maximum(grad, 0)  # ReLU derivative 
        grad = self.conv2.backward(grad, self.learning_rate)
        grad = np.maximum(grad, 0)  # ReLU derivative
        grad = self.pool1.backward(grad)
        grad = np.maximum(grad, 0)  # ReLU derivative
        grad = self.conv1.backward(grad, self.learning_rate)
        return grad"""

    def backward(self, grad):
        grad = self.softmax(grad)
        grad = self.fc2.backward(grad, self.learning_rate)
        grad = self.fc1.backward(grad, self.learning_rate)
        grad = np.maximum(grad, 0)  # ReLU derivative
        grad = grad.reshape(grad.shape[0], self.conv3.out_channels, 3, 3)
        grad = self.conv3.backward(grad, self.learning_rate)
        grad = np.maximum(grad, 0)  # ReLU derivative 
        grad = self.pool2.backward(grad)
        grad = np.maximum(grad, 0)  # ReLU derivative
        grad = self.conv2.backward(grad, self.learning_rate)
        grad = np.maximum(grad, 0)  # ReLU derivative
        grad = self.pool1.backward(grad)
        grad = np.maximum(grad, 0)  # ReLU derivative
        grad = self.conv1.backward(grad, self.learning_rate)
        return grad

    
    def update_weights(self):
        self.t += 1
        for layer in self.layers:
            if hasattr(layer, 'weights'):
                self.m[layer] = self.beta1 * self.m[layer] + (1 - self.beta1) * layer.d_weights
                self.v[layer] = self.beta2 * self.v[layer] + (1 - self.beta2) * layer.d_weights**2
                m_hat = self.m[layer] / (1 - self.beta1**self.t)
                v_hat = self.v[layer] / (1 - self.beta2**self.t)
                layer.weights -= self.learning_rate * m_hat / (np.sqrt(v_hat) + self.eps)
                layer.bias -= self.learning_rate * layer.d_bias
    
    def train(self, x_train, y_train, x_val=None, y_val=None, epochs=10, batch_size=32):
        train_losses = []
        val_losses = []
        val_accs = []
        
        for epoch in range(epochs):
            print(epoch)
            # shuffle the training data
            # permutation = np.random.permutation(x_train.shape[0])
            # x_train = x_train[permutation]
            # y_train = y_train[permutation]
            
            loss = 0.0
            # split the data into batches
            num_batches = x_train.shape[0] // batch_size
            for i in range(num_batches):
                # get the current batch
                start = i * batch_size
                end = (i+1) * batch_size
                x_batch = x_train[start:end]
                y_batch = y_train[start:end]
                
                # forward pass
                
                y_pred = self.forward(x_batch)
                print(y_pred.shape)
                print(y_batch)
                # compute loss and gradients
                print(type(y_pred))
                print(type(y_batch))
                #loss += -np.mean(np.log(y_pred[np.arange(len(y_batch)), y_batch]))
                #loss += -np.mean(np.log(y_pred[np.arange(len(y_batch)), y_batch]), dtype=np.float64)

                loss = self.loss_fn(y_pred, y_batch)  

                grad = self.softmax(y_pred)
                grad[np.arange(len(y_batch)), y_batch] -= 1
                grad /= len(y_batch)
                print(type(grad))
                print("grad.shape:", grad.shape)
                
                self.backward(grad)
                
                # update weights
                self.update_weights()
                
            # evaluate on training set
            # train_loss = self.evaluate(x_train, y_train)
            # train_losses.append(train_loss)
            train_losses.append(loss / len(x_train))
            
            # evaluate on validation set, if provided
            """if x_val is not None and y_val is not None:
                val_loss, val_acc = self.evaluate(x_val, y_val, accuracy=True)
                val_losses.append(val_loss)
                val_accs.append(val_acc)
                print(f"Epoch {epoch+1}/{epochs}: Train Loss={train_loss:.4f}, Val Loss={val_loss:.4f}, Val Acc={val_acc:.4f}")
            else:
                print(f"Epoch {epoch+1}/{epochs}: Train Loss={train_loss:.4f}")
        
        if x_val is not None and y_val is not None:
            return train_losses, val_losses, val_accs
        else:
            return train_losses"""
        
        
    """def train(self, train_data_loader, val_data_loader, epochs=10):
        
        for epoch in range(epochs):
            epoch_loss = 0.0
            for images, labels in train_data_loader:
                y_pred = self.forward(images)"""
                
    
    def predict(self, x):
        y_pred = self.forward(x)
        return np.argmax(y_pred, axis=1)
    
    def evaluate(self, x, y, accuracy=False):
        y_pred = self.forward(x)
        loss, grad = self.loss(y_pred, y)
        if accuracy:
            acc = np.mean(np.argmax(y_pred, axis=1) == np.argmax(y, axis=1))
            return loss, acc
        else:
            return loss



import torch
import torchvision
import torchvision.transforms as transforms



if __name__ == '__main__':
    # For cifar-10 you get pre-built loaders
    transform = transforms.Compose(
        [transforms.ToTensor(),
         transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
    trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                            download=True, transform=transform)
    trainloader = torch.utils.data.DataLoader(trainset, batch_size=32,
                                              shuffle=True, num_workers=4)
    testset = torchvision.datasets.CIFAR10(root='./data', train=False,
                                           download=True, transform=transform)
    testloader = torch.utils.data.DataLoader(testset, batch_size=32,
                                             shuffle=False, num_workers=4)
    
    # Get x_train and y_train
    x_train = []
    y_train = []
    for batch in trainloader:
        images, labels = batch
        x_train.append(images)
        y_train.append(labels)
    x_train = torch.cat(x_train, dim=0)
    y_train = torch.cat(y_train, dim=0)
    
    # Get x_test and y_test
    x_test = []
    y_test = []
    for batch in testloader:
        images, labels = batch
        x_test.append(images)
        y_test.append(labels)
    x_test = torch.cat(x_test, dim=0)
    y_test = torch.cat(y_test, dim=0)
    
    print("done loading data")

    x = Net()
    x.train(x_train, y_train)


 


Files already downloaded and verified
Files already downloaded and verified
done loading data
0
conv1 done
pool1 done
conv2 done
(32, 64, 11, 11)
pool2 done
conv3 done
fc1 done
fc2 done
soft done
torch.Size([32, 10])
tensor([8, 9, 1, 8, 1, 1, 5, 6, 1, 6, 7, 9, 7, 0, 0, 3, 9, 8, 1, 0, 4, 8, 5, 9,
        9, 1, 7, 0, 1, 5, 7, 3])
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
grad.shape: torch.Size([32, 10])
torch.Size([32, 10])
(32, 64)
grad_shape: (32, 64, 3, 3)
d_bias shape: (64, 1)
shape:  (1, 64)
d_bias shape: (64, 1)
(32, 64, 1, 1)
(64, 64, 3, 3)
dx: (32, 64, 5, 5)


ValueError: ignored