<a href="https://colab.research.google.com/github/liuhaozhe6788/intro-nlp-f24/blob/master/assignment_1/problem_backprop_F24.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Assignment 1 – Backprop

In [24]:
#@title Library Imports [do not change]

import importlib
!git clone https://www.github.com/rycolab/intro-nlp-f24.git
utils = importlib.import_module("intro-nlp-f24.assignment_1.utils")

import re
import random
from collections import defaultdict
import itertools
from abc import ABC, abstractmethod
import math

fatal: destination path 'intro-nlp-f24' already exists and is not an empty directory.


In [25]:
#@title Select and Parse Math Problems [do not change]

#@markdown select math problem

math_problem_i = "3" #@param [0,1,2,3]
math_problem = utils.MATH_PROBLEMS[int(math_problem_i)]
print(math_problem)

parser = utils.Parser()
infix, in_vars = parser.parse(math_problem["problem"], in_vars = math_problem["in_vars"])
print(infix, in_vars)

{'problem': 'z + sin(x^(2) + (y * exp(z)))', 'in_vars': {'x': 2.0, 'y': -1.0, 'z': 0.0}, 'output': 0.14, 'derivative': {'x': -3.96, 'y': -0.99, 'z': 1.99}}
['z', '+', ['sin', [['x', '^', 2], '+', ['y', '*', ['exp', 'z']]]]] {'x': 2.0, 'z': 0.0, 'y': -1.0}


In [26]:
#@title ToDo1: Building

class Builder():

    def __init__(self, infix: list, in_vars: dict = {}):
        """
        infix: list of infix notation parse, e.g. [['exp', 2], '-', 3]
        in_vars: dict of input variables to ensure they are not used as intermediate or output variables
        RETURN: computation graph in a data structure of your choosing
        """

        ## some alphabetical vars to use as intermediate and output variables minus the input vars to avoid duplicates
        avail_vars = list(map(chr, range(97, 123))) + list(map(chr, range(945, 969)))
        if len(in_vars.keys()) > 0:
            avail_vars = set(avail_vars) - set(in_vars)
        self.avail_vars = sorted(list(set(avail_vars)), reverse=True)

        self.infix = infix
        self.var_id = 0  # index for available variables used
        self.graph = self.build_graph(infix)

    def build_graph(self, infix: list):
        if len(infix) == 0: return {}
        if len(infix) == 1:
            return {infix[0]: {"forward": {}, "backward": {}}}
        if len(infix) == 2:
          op, val = infix
          g = self.build_graph(val if isinstance(val, list) else [val])
          for k, v in g.items():
            if len(v["forward"]) == 0 and isinstance(k, str):
              g[k]["forward"][self.avail_vars[self.var_id]] = [op]
              g[self.avail_vars[self.var_id]] = {"forward": {}, "backward": {k: [op]}}
              break
          self.var_id +=1
          return g
        if len(infix) == 3:
          lval, op, rval = infix
          gl = self.build_graph(lval if isinstance(lval, list) else [lval])
          gr = self.build_graph(rval if isinstance(rval, list) else [rval])
          for k, v in gl.items():
            if len(v["forward"]) == 0:
              gl[k]["forward"][self.avail_vars[self.var_id]] = [op, 'l']
              gl[self.avail_vars[self.var_id]] = {"forward": {}, "backward": {k: [op, 'l']}}
              break
          for k, v in gr.items():
            if len(v["forward"]) == 0:
              gr[k]["forward"][self.avail_vars[self.var_id]] = [op, 'r']
              gr[self.avail_vars[self.var_id]] = {"forward": {}, "backward": {k: [op, 'r']}}
              break
          self.var_id +=1
          g = {}
          for k in gl.keys():
            if k in gr.keys():
              g[k] = {"forward": {}, "backward": {}}
              g[k]["forward"].update(gl[k]["forward"])
              g[k]["forward"].update(gr[k]["forward"])
              g[k]["backward"].update(gl[k]["backward"])
              g[k]["backward"].update(gr[k]["backward"])
            else: g[k] = gl[k]
          for k in gr.keys():
            if k not in gl.keys():
              g[k] = gr[k]
          return g

if __name__ == '__main__':

    g = Builder(infix, in_vars)
    import pprint
    pp = pprint.PrettyPrinter(indent=4)
    pp.pprint(g.graph)

{   2: {'backward': {}, 'forward': {'ψ': ['^', 'r']}},
    'x': {'backward': {}, 'forward': {'ψ': ['^', 'l']}},
    'y': {'backward': {}, 'forward': {'φ': ['*', 'l']}},
    'z': {'backward': {}, 'forward': {'σ': ['+', 'l'], 'χ': ['exp']}},
    'σ': {'backward': {'z': ['+', 'l'], 'τ': ['+', 'r']}, 'forward': {}},
    'τ': {'backward': {'υ': ['sin']}, 'forward': {'σ': ['+', 'r']}},
    'υ': {   'backward': {'φ': ['+', 'r'], 'ψ': ['+', 'l']},
             'forward': {'τ': ['sin']}},
    'φ': {   'backward': {'y': ['*', 'l'], 'χ': ['*', 'r']},
             'forward': {'υ': ['+', 'r']}},
    'χ': {'backward': {'z': ['exp']}, 'forward': {'φ': ['*', 'r']}},
    'ψ': {   'backward': {2: ['^', 'r'], 'x': ['^', 'l']},
             'forward': {'υ': ['+', 'l']}}}


In [27]:
#@title ToDo2: Operations

class Operator(ABC):

    @abstractmethod
    def f(self, a, b = None) -> float:
        raise NotImplementedError()
        return f_res

    @abstractmethod
    def df(self, a, b = None) -> list:
        raise NotImplementedError()
        return [df_res]

class Exp(Operator):

    def f(self, a, b = None):
        return math.exp(a)

    def df(self, a, b = None):
        return [math.exp(a)]

class Log(Operator):
    ## natural logarithm

    def f(self, a, b = None):
        return math.log(a)

    def df(self, a, b = None):
        return [1/a]

class Mult(Operator):

    def f(self, a, b):
        return a * b

    def df(self, a, b):
        return [b, a]

class Div(Operator):

    def f(self, a, b):
        return a / b

    def df(self, a, b):
        return [1/b, -a/pow(b, 2)]

class Add(Operator):

    def f(self, a, b):
        return a + b

    def df(self, a, b = None):
        return [1, 1]

class Sub(Operator):

    def f(self, a, b = None):
        return a-b

    def df(self, a, b = None):
        return [1, -1]

class Pow(Operator):

    def f(self, a, b):
        return a**b

    def df(self, a, b):
        if a <= 0: ## work-around: treat as unary operation if -a^b
            return [b * (a ** (b - 1))]
        else:
            return [b * (a ** (b - 1)), (a ** b) * math.log(a)]

class Sin(Operator):

    def f(self, a, b=None):
        return math.sin(a)

    def df(self, a, b=None):
        return [math.cos(a)]

class Cos(Operator):

    def f(self, a, b=None):
        return math.cos(a)

    def df(self, a, b=None):
        return [-math.sin(a)]

In [28]:
#@title ToDo 3: Executing

class Executor():

    def __init__(self, graph: dict, in_vars: dict = {}):
        """
        graph: computation graph in a data structure of your choosing
        in_vars: dict of input variables, e.g. {"x": 2.0, "y": -1.0}
        """
        self.graph = graph
        self.in_vars = in_vars
        self.forward_internal_vals = in_vars.copy()
        for k, v in self.graph.items():
          if v["forward"] == {}:
            self.out_var = k
        self.fn_map = {"log": Log(), "exp": Exp(), "+": Add(), "-": Sub(), "^": Pow(), "sin": Sin(), "cos": Cos(), "*": Mult(), "/": Div()}
        self.output = -1
        self.derivative = {}

    ## forward execution____________________________

    def forward(self, ):
        vars = list(self.in_vars.keys())
        while not (len(vars) == 1 and vars[0] == self.out_var):
          k = vars[0]
          for v, op in self.graph[k]["forward"].items():
            if v not in self.forward_internal_vals.keys():
              if len(op) == 1:
                a = self.forward_internal_vals[k]
                self.forward_internal_vals[v] = self.fn_map[op[0]].f(a)
                if k in vars:
                  vars.remove(k)
                vars.append(v)
              elif len(op) == 2:
                if op[1] == 'l':
                  a = self.forward_internal_vals[k]
                  for bk, op in self.graph[v]["backward"].items():
                    if bk != k:
                      if isinstance(bk, str):
                        if bk in self.forward_internal_vals.keys():
                          b = self.forward_internal_vals[bk]
                          self.forward_internal_vals[v] = self.fn_map[op[0]].f(a, b)
                          if bk in vars:
                            vars.remove(bk)
                          if k in vars:
                            vars.remove(k)
                          vars.append(v)
                      else:
                        b = bk
                        self.forward_internal_vals[v] = self.fn_map[op[0]].f(a, b)
                        if k in vars:
                          vars.remove(k)
                        vars.append(v)
                      break
                elif op[1] == 'r':
                  b = self.forward_internal_vals[k]
                  for bk, op in self.graph[v]["backward"].items():
                    if bk != k:
                      if isinstance(bk, str):
                        if bk in self.forward_internal_vals.keys():
                          a = self.forward_internal_vals[bk]
                          self.forward_internal_vals[v] = self.fn_map[op[0]].f(a, b)
                          if bk in vars:
                            vars.remove(bk)
                          if k in vars:
                            vars.remove(k)
                          vars.append(v)
                      else:
                        a = bk
                        self.forward_internal_vals[v] = self.fn_map[op[0]].f(a, b)
                        if k in vars:
                          vars.remove(k)
                        vars.append(v)
                      break
        self.output = round(self.forward_internal_vals[self.out_var], 2)

    ## backward execution____________________________

    def backward(self, ):
        self.internal_derivatives = {self.out_var: 1}
        vars = [self.out_var]
        while len(vars) != 0:
          k = vars[0]
          backward_items = self.graph[k]["backward"]
          if len(backward_items) == 1:
            a, op = list(backward_items.keys())[0], list(backward_items.values())[0]
            aval = self.forward_internal_vals[a]
            if a in self.internal_derivatives:
              self.internal_derivatives[a] += self.internal_derivatives[k]*self.fn_map[op[0]].df(aval)[0]
            else:
              self.internal_derivatives[a] = self.internal_derivatives[k]*self.fn_map[op[0]].df(aval)[0]
            vars.remove(k)
            vars.append(a)
          elif len(backward_items) == 2:
            for v, op in backward_items.items():
              if op[1] == "l":
                a = v
                if isinstance(a, str):
                  aval = self.forward_internal_vals[a]
                else:
                  aval = a
              elif op[1] == "r":
                b = v
                if isinstance(b, str):
                  bval = self.forward_internal_vals[b]
                else:
                  bval = b
            derivatives = self.fn_map[op[0]].df(aval, bval)
            if isinstance(a, str):
              if a in self.internal_derivatives:
                self.internal_derivatives[a] += self.internal_derivatives[k]*derivatives[0]
              else:
                self.internal_derivatives[a] = self.internal_derivatives[k]*derivatives[0]
              vars.append(a)
            if isinstance(b, str):
              if b in self.internal_derivatives:
                self.internal_derivatives[b] += self.internal_derivatives[k]*derivatives[1]
              else:
                self.internal_derivatives[b] = self.internal_derivatives[k]*derivatives[1]
              vars.append(b)
            vars.remove(k)
          elif len(backward_items) == 0:
            vars.remove(k)
        for in_var in self.in_vars.keys():
          self.derivative[in_var] = self.internal_derivatives[in_var]

if __name__ == '__main__':
  e = Executor(g.graph, in_vars=in_vars)
  e.forward()
  e.backward()
  print(e.output)
  print(e.derivative)

0.14
{'x': -3.9599699864017817, 'z': 1.9899924966004454, 'y': -0.9899924966004454}


In [29]:
#@title Test Function for Debugging [do not change]

utils.test_backprop(Builder, Executor, math_problem)


0: problem: z + sin(x^(2) + (y * exp(z))), in_vars: {'x': 2.0, 'y': -1.0, 'z': 0.0}
SUCCESS output: 0.14
SUCCESS derivative: {'x': -3.96, 'z': 1.99, 'y': -0.99}


In [30]:
#@title Test Function for Grading [do not change]

utils.test_backprop(Builder, Executor)


0: problem: x/y, in_vars: {'x': 1.0, 'y': 1.0}
SUCCESS output: 1.0
SUCCESS derivative: {'x': 1.0, 'y': -1.0}

1: problem: exp(x) - (y * 2), in_vars: {'x': 2.0, 'y': -2.0}
SUCCESS output: 11.39
SUCCESS derivative: {'x': 7.39, 'y': -2}

2: problem: (x^2 - 1) * (y+2), in_vars: {'x': 3.0, 'y': 2.0}
SUCCESS output: 32.0
SUCCESS derivative: {'x': 24.0, 'y': 8.0}

3: problem: z + sin(x^(2) + (y * exp(z))), in_vars: {'x': 2.0, 'y': -1.0, 'z': 0.0}
SUCCESS output: 0.14
SUCCESS derivative: {'x': -3.96, 'z': 1.99, 'y': -0.99}
