<a href="https://colab.research.google.com/github/ratulb/pytorch/blob/main/mnist_in_pythor_from_scratch_cleaned_wip4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, random_split
import random
import numpy as np
import torch
import math
import time
import torch.nn as nn



# Define a transform to convert the data to tensor
random.seed(24)
np.random.seed(42)
torch.manual_seed(24)

to_tensor = transforms.ToTensor()

# Load the MNIST dataset

In [None]:
# Download the training and test datasets
train_dataset = torchvision.datasets.MNIST(root='./', train=True, download=True, transform=to_tensor)
#test_dataset = torchvision.datasets.MNIST(root='./', train=False, download=True, transform=to_tensor)
train_ds, val_ds = random_split(train_dataset, [59000, 1000])
len(train_ds), len(val_ds)
#len(train_dataset)

In [None]:
batch_size = 4

train_loader = DataLoader(train_ds, batch_size, shuffle=True)
val_loader = DataLoader(val_ds, batch_size)

In [None]:
def one_hot_encode(values, num_classes):
    one_hot_encoded = []
    for value in values:
        one_hot = [0] * num_classes
        one_hot[value[0]] = 1
        one_hot_encoded.append(one_hot)
    return one_hot_encoded

def softmax(x):
    exp_x = [math.exp(i) for i in x]
    sum_exp_x = sum(exp_x)
    softmax_x = [j / sum_exp_x for j in exp_x]
    return softmax_x

def relu(x):
    return max(0, x)


In [None]:
def forward_pass(input_batch, all_neurons_weights, biases):
    batch_output = []
    for inputs in input_batch:
        layer_output = []
        for weights, bias in zip(all_neurons_weights, biases):
            neuron_output = relu(sum(x * w for x, w in zip(inputs, weights)) + bias)
            layer_output.append(neuron_output)
        batch_output.append(layer_output)
    return batch_output

def calculate_errors(batch_output_of_forward_pass, target_batch):
    squared_errors = []
    residual_errors = []
    for output, target in zip(batch_output_of_forward_pass, target_batch):
        try:
            sample_squared_errors = [(o - t) ** 2 for o, t in zip(output, target)]
        except OverflowError as e:
            print("Overflow error.", e)
            sample_squared_errors = 0
        #sample_squared_errors = [(o - t) ** 2 for o, t in zip(output, target)]
        sample_residual_errors = [(o - t) for o, t in zip(output, target)]
        squared_errors.append(sample_squared_errors)
        residual_errors.append(sample_residual_errors)
    return squared_errors, residual_errors

def calculate_weight_and_bias_deltas(output_of_calculate_errors, input_batch, learning_rate):
    residual_errors = output_of_calculate_errors[1]
    weight_deltas = [[0 for _ in range(len(input_batch[0]))] for _ in range(len(residual_errors[0]))]
    bias_deltas = [0 for _ in range(len(residual_errors[0]))]

    for residual_error, inputs in zip(residual_errors, input_batch):
        for neuron_index in range(len(residual_error)):
            for feature_index in range(len(inputs)):
                weight_deltas[neuron_index][feature_index] += learning_rate * residual_error[neuron_index] * inputs[feature_index]
            bias_deltas[neuron_index] += learning_rate * residual_error[neuron_index]
    return weight_deltas, bias_deltas

def update_weights_biases(output_of_calculate_weight_and_bias_deltas, weights, biases):
    weight_deltas, bias_deltas = output_of_calculate_weight_and_bias_deltas
    for i in range(len(weights)):
        for j in range(len(weights[i])):
            weights[i][j] -= weight_deltas[i][j]
        biases[i] -= bias_deltas[i]
    return weights, biases

def batch_validation(weights, biases, batch):
    images, labels = batch
    images = [image.squeeze().tolist() for image in images]
    images = [[pixel for row in image for pixel in row] for image in images]
    outputs = forward_pass(images, weights, biases)
    outputs = torch.tensor([np.argmax(softmax(output)) for output in outputs])
    accuracy_and_counts = accuracy(outputs, labels)
    return accuracy_and_counts

def evaluate(weights, biases, val_loader, epoch=None):
    accum_acc_and_counts = [batch_validation(weights, biases, batch) for batch in val_loader]
    return epoch_end_validation(accum_acc_and_counts, epoch=epoch)

def epoch_end_validation(accum_acc_and_counts, epoch):
    accum_acc_and_counts = torch.stack(accum_acc_and_counts)
    acc, count, total = accum_acc_and_counts[:, 0].mean(), accum_acc_and_counts[:, 1].sum(), accum_acc_and_counts[:, 2].sum()
    print(f"Epoch {epoch}, Accuracy: {acc}, Count: {count}, total: {total}")


def accuracy(outputs, labels):
    count = torch.sum(outputs == labels).item()
    return torch.tensor([count/labels.numel(), count, labels.numel()])

def xavier_init(num_features, num_neurons):
    limit = math.sqrt(6 / (num_features + num_neurons))
    return [[random.uniform(-limit, limit) for _ in range(num_features)] for _ in range(num_neurons)]

def reformat_batch(input_batch, target_batch):
    input_batch = input_batch.squeeze().tolist()
    input_batch = [[item for sublist in outer for item in sublist] for outer in input_batch]
    target_batch = target_batch.tolist()
    target_batch = [[l] for l in target_batch]
    target_batch = one_hot_encode(target_batch, 10)
    return input_batch, target_batch

def train(num_epochs, learning_rate):
    num_features = 784
    num_neurons = 10

    weights = xavier_init(num_features, num_neurons)
    biases = [random.uniform(-math.sqrt(6 / (num_features + num_neurons)), math.sqrt(6 / (num_features + num_neurons))) for _ in range(num_neurons)]


    for epoch in range(num_epochs):
        start = time.time()
        for input_batch, target_batch in train_loader:
            input_batch, target_batch = reformat_batch(input_batch, target_batch)
            batch_output = forward_pass(input_batch, weights, biases)
            errors = calculate_errors(batch_output, target_batch)
            deltas = calculate_weight_and_bias_deltas(errors, input_batch, learning_rate)
            weights, biases = update_weights_biases(deltas, weights, biases)
        end = time.time()

        if epoch % 2 == 0 or epoch % 2 == 1:
            total_error = sum(sum(e) for e in errors[0])
            print(f"Epoch {epoch+1} completed in {end - start} seconds")
            print(f"Epoch {epoch+1}, Error: {total_error}, weights: {weights}, Biases: {biases}")
            evaluate(weights, biases, val_loader, epoch+1)
    return weights, biases

num_epochs = 1
learning_rate = 0.00125
activator = nn.Tanh()


weights, biases = train(num_epochs, learning_rate)
print("Trained weights:", weights)
print("Trained biases:", biases)



In [None]:
test_dataset = torchvision.datasets.MNIST(root='./', train=False, download=True, transform=to_tensor)

In [None]:
test_loader = DataLoader(test_dataset, batch_size=10)

In [None]:
evaluate(weights, biases, test_loader, epoch=None)

In [None]:
def relu2deriv(output):
    return output > 0

In [None]:
relu2deriv(-1)

In [None]:
import numpy as np
np.random.seed(1)
def relu(x):
 return (x > 0) * x
def relu2deriv(output):
 return output>0
streetlights = np.array( [[ 1, 0, 1 ],
 [ 0, 1, 1 ],
 [ 0, 0, 1 ],
 [ 1, 1, 1 ] ] )
walk_vs_stop = np.array([[ 1, 1, 0, 0]]).T

alpha = 0.2
hidden_size = 4
weights_0_1 = 2*np.random.random((3,hidden_size)) - 1
weights_1_2 = 2*np.random.random((hidden_size,1)) - 1
for iteration in range(60):
 layer_2_error = 0
 for i in range(len(streetlights)):
    layer_0 = streetlights[i:i+1]
    layer_1 = relu(np.dot(layer_0,weights_0_1))
    layer_2 = np.dot(layer_1,weights_1_2)
    layer_2_error += np.sum((layer_2 - walk_vs_stop[i:i+1]) ** 2)
    layer_2_delta = (layer_2 - walk_vs_stop[i:i+1])
    layer_1_delta=layer_2_delta.dot(weights_1_2.T)*relu2deriv(layer_1)
    weights_1_2 -= alpha * layer_1.T.dot(layer_2_delta)
    weights_0_1 -= alpha * layer_0.T.dot(layer_1_delta)
    if(iteration % 10 == 9):
        print("Error:" + str(layer_2_error))

In [None]:
100 * True


In [None]:
import numpy as np
weights = np.array([0.5,0.48,-0.7])
alpha = 0.001
streetlights = np.array( [ [ 1, 0, 1 ],
 [ 0, 1, 1 ],
 [ 0, 0, 1 ],
 [ 1, 1, 1 ],
 [ 0, 1, 1 ],
 [ 1, 0, 1 ] ] )
walk_vs_stop = np.array( [ 0, 1, 0, 1, 1, 0 ] )
input = streetlights[0]
goal_prediction = walk_vs_stop[0]
for iteration in range(5000):
    prediction = input.dot(weights)
    error = (goal_prediction - prediction) ** 2
    delta = prediction - goal_prediction
    weights = weights - (alpha * (input * delta))
    print("Error:" + str(error) + " Prediction:" + str(prediction))

In [None]:
weights = np.array([0.5,0.48,-0.7])
alpha = 0.1
streetlights = np.array( [[ 1, 0, 1 ],
 [ 0, 1, 1 ],
 [ 0, 0, 1 ],
 [ 1, 1, 1 ],
 [ 0, 1, 1 ],
 [ 1, 0, 1 ] ] )
walk_vs_stop = np.array( [ 0, 1, 0, 1, 1, 0 ] )
#input = streetlights[0]
#goal_prediction = walk_vs_stop[0]
for iteration in range(40):
 error_for_all_lights = 0
 for row_index in range(len(walk_vs_stop)):
    input = streetlights[row_index]
    goal_prediction = walk_vs_stop[row_index]

    prediction = input.dot(weights)

    error = (goal_prediction - prediction) ** 2
    error_for_all_lights += error

    delta = prediction - goal_prediction
    weights = weights - (alpha * (input * delta))
    print("Prediction:" + str(prediction))
    print("Error:" + str(error_for_all_lights) + "\n")

In [None]:
np.array([1,1,1]).dot(weights)
weights