# Rewriting in pytorch

In [None]:
import math
import numpy as np
import matplotlib.pyplot as plt
import random
import torch
from graphviz import Digraph
%matplotlib inline

In [None]:
def trace(root):
  # builds a set of all nodes and edges in a graph
  nodes, edges = set(), set()
  def build(v):
    if v not in nodes:
      nodes.add(v)
      for child in v._prev:
        edges.add((child, v))
        build(child)
  build(root)
  return nodes, edges

def draw_dot(root):
  dot = Digraph(format='svg', graph_attr={'rankdir': 'LR'}) # LR = left to right
  
  nodes, edges = trace(root)
  for n in nodes:
    uid = str(id(n))
    # for any value in the graph, create a rectangular ('record') node for it
    dot.node(name = uid, label = "{ %s | data %.4f | grad %.4f }" % (n.label, n.data, n.grad), shape='record')
    if n._op:
      # if this value is a result of some operation, create an op node for it
      dot.node(name = uid + n._op, label = n._op)
      # and connect this node to it
      dot.edge(uid + n._op, uid)

  for n1, n2 in edges:
    # connect n1 to the op node of n2
    dot.edge(str(id(n1)), str(id(n2)) + n2._op)

  return dot

In [None]:
# Defining a Value class to do math functions and store representations.
class Value:
    def __init__(self, data, _children=(), _op="", label=""):
        """
        param data: Stores the value
        param _children: Stores the values that gave rise to self.data
        param _op: Math operation between children that produced self.data
        """
        self.data = data
        self._prev = set(_children)
        self.grad = 0.0
        # Every operation will have its own backprop.
        self._backward = lambda: None
        self._op = _op
        self.label = label
        
    def __repr__(self):
        return f"Value=({self.data})"
        
    def __add__(self, other):
        other = other if isinstance(other, Value) else Value(other)
        out = Value(self.data + other.data, (self, other), _op="+")
        # chaining the current `.grad` based on the out.grad value.
        # This is the reason why we initiate o.grad = 1.
        # That chains `out.grad` `_backward`s through all the `_ops` and `Value`s.
        def _backward():
            self.grad += 1 * out.grad
            other.grad += 1 * out.grad            
        out._backward = _backward
        
        return out

    def __radd__(self, other): # other + self
        return self + other
        
    def __mul__(self, other):
        other = other if isinstance(other, Value) else Value(other)
        out = Value(self.data * other.data, (self, other), _op="*")
        
        def _backward():
            self.grad += other.data * out.grad
            other.grad += self.data * out.grad
        out._backward = _backward
        
        return out

    def __rmul__(self, other):
        return self * other

    def __pow__(self, other):
        assert isinstance(other, (float, int))
        out = Value(self.data ** other, (self,), _op=f"**{other}")

        def _backward():
            self.grad += (other * (self.data ** (other - 1)) ) * out.grad
        out._backward = _backward
        
        return out

    def __truediv__(self, other):
        # We don't need a _backward() because these are operations in Value.
        return self * (other ** -1.0)

    def __sub__(self, other):
        return self + (-other)

    def exp(self):
        # Raise `e` to the power `self`
        x = self.data
        out = Value(math.exp(x), (self,), "exp")

        def _backward():
            self.grad += out.data * out.grad
        out._backward = _backward

        return out
    
    def tanh(self):
        x = self.data
        t = (math.exp(2.0 * x) - 1)/(math.exp(2.0 * x) + 1)
        out = Value(t, (self,), "tanh")

        def _backward():
            self.grad += (1 - t**2) * out.grad
        out._backward = _backward
        
        return out
    
    def backward(self):
        # Automating the manual _backward() calls using topological sort.
        # Sort the DAG such that o is last.
        # Call _backward on the queue.
        topo = []
        visited = set()
        def build_topo(v):
          if v not in visited:
            visited.add(v)
            for child in v._prev:
              build_topo(child)
            topo.append(v)
        build_topo(self)
        self.grad = 1.0        
        for node in reversed(topo):
            node._backward()

In [None]:
# Defining a 2 input neuron

# Forward pass
x1 = Value(2.0, label="x1")
w1 = Value(-3.0, label="w1")
x2 = Value(0.0, label="x2")
w2 = Value(1.0, label="w2")
b = Value(6.8813735870195432, label="b")
x1w1 = x1 * w1; x1w1.label="x1w1"
x2w2 = x2 * w2; x2w2.label="x2w2"
x1w1x2w2 = x1w1 + x2w2; x1w1x2w2.label="x1w1x2w2"
n = x1w1x2w2 + b; n.label="n"
o = n.tanh(); o.label="o"

# Backward pass
o.backward()

# Compute graph plot
draw_dot(o)

The execution above is just 

In [None]:
# PyTorch implementation

x1 = torch.Tensor([2.0]).double(); x1.requires_grad = True
w1 = torch.Tensor([-3.0]).double(); w1.requires_grad = True
x2 = torch.Tensor([0.0]).double(); x2.requires_grad = True
w2 = torch.Tensor([1.0]).double(); w2.requires_grad = True
b = torch.Tensor([6.8813735870195432]).double(); b.requires_grad = True
x1w1 = x1 * w1
x2w2 = x2 * w2
x1w1x2w2 = x1w1 + x2w2
n = x1w1x2w2 + b
o = torch.tanh(n)

print(o.item())
o.backward()

print("----")
print(f"{x1.grad.item()=}")
print(f"{w1.grad.item()=}")
print(f"{x2.grad.item()=}")
print(f"{w2.grad.item()=}")
print(f"{b.grad.item()=}")

# Implementing a Neural Network

In [None]:
class Module:
    def zero_grad(self):
        for p in self.parameters:
            p = 0.0
            
    def parameters(self):
        return []

class Neuron(Module):
    # A neuron does the `w * x + b`
    # Gets a bunch of inputs and returns one activation output.
    def __init__(self, nin):
        # We need the same number of weights for each input.
        # Trust that nin and x contain the same dimension on the joining dimension.
        self.w = [Value(random.uniform(-1,1)) for _ in range(nin)]
        self.b = Value(random.uniform(-1,1))

    def __call__(self, x):
        # Summation of w * x + b
        act = sum([wi * xi for wi, xi in zip(self.w, x)], start=self.b)
        # tanh activation
        out = act.tanh()
        return out

    def parameters(self):
        return self.w + [self.b]

class Layer:
    # A layer is a list of Neurons, with a number of inputs and a number of outputs.
    def __init__(self, nin, nout):
        self.neurons = [Neuron(nin) for _ in range(nout)]

    def __call__(self, x):
        outs = [n(x) for n in self.neurons]
        return outs[0] if len(outs) == 1 else outs

    def parameters(self):
        return [p for neuron in self.neurons for p in neuron.parameters()]

class MLP:
    # stack of layers with a defined number of inputs and a list of neurons for each subsequent layer in nout.
    def __init__(self, nin, nouts):
        stacks = [nin] + nouts
        self.layers = [Layer(stacks[i], stacks[i+1]) for i in range(len(stacks) - 1)]

    def __call__(self, x):
        for layer in self.layers:
            x = layer(x)
        return x

    def parameters(self):
        return [p for layer in self.layers for p in layer.parameters()]
        

In [None]:
xs = [
  [2.0, 3.0, -1.0],
  [3.0, -1.0, 0.5],
  [0.5, 1.0, 1.0],
  [1.0, 1.0, -1.0],
]
ys = [1.0, -1.0, -1.0, 1.0] # desired targets
# We have initialized the MLP here.
n = MLP(3, [4, 4, 1])
n(xs[0])

In [None]:
# Fitting the neural network to the data 
# ---------
for k in range(100):
    # Forward pass
    ypred = [n(x) for x in xs]
    # Compute the MSE loss
    loss = sum((yout - ygt)**2 for ygt, yout in zip(ys, ypred))

    # backward pass
    for p in n.parameters():
        p.grad = 0.0
    loss.backward()

    # Update weights
    step = 0.1

    for p in n.parameters():
        p.data += (-step) * p.grad

    print(k, loss.data)

In [None]:
ypred

In [None]:
draw_dot(loss)