In [1]:
import numpy as np
import time
from collections import defaultdict
from collections.abc import Iterable
import itertools
from tqdm import tqdm

import os, sys
practice_dir = os.path.join(os.path.dirname(os.path.abspath('')), 'practice')
if not practice_dir in sys.path:
    sys.path.append(practice_dir)
from utils import isnumber, type_name, raise_operand_exception, FwdAAD

In [None]:
class BwdAAD:
    def __init__(self, value: int | float = None, local_gradients: defaultdict = None, symbol: str = None):
        self.value: float | int = value
        self.symbol: str = symbol
        self.local_gradients: defaultdict = local_gradients if local_gradients else defaultdict(float, {self: 1})
    
    def get_variable(symbol: str, value: int | float = None):
        """
        Create variable symbol.

        :param symbol: unique name to indicate the variable
        :param value: value of the variable
        :returns: BwdAAD  variable
        `ignore_existent` is False
        """
        return BwdAAD(value=value, symbol=symbol)
    
    def get_vector(symbol: str, values: Iterable[int | float] = None, length: int = None):
        """
        Create a list of variable symbols.

        :param symbol: unique name to indicate the variables. Each variable will have a name in format 'symbol'_i
        :param values: values of the variables
        :param length: length of the vector (is required if `values = None`, otherwise ignored)
        :returns: BwdAAD  variables list
        :raises ValueError: if neither `values` nor `length` arguments have been specified
        """
        if values:
            return [BwdAAD.get_variable(symbol + '_' + str(i + 1), value=val) for i, val in enumerate(values)]
        elif length:
            return [BwdAAD.get_variable(symbol=symbol + '_' + str(i + 1)) for i in range(length)]
        else:
            raise ValueError("Please provide values or specify length of the vector.")

    def set_value(self, value: float | int):
        """
        Change value of the variable to `value`. Note that this deletes the
        computed gradient for the variable.

        :param value: new value of the variable
        :raises ValueError: if `value` is not a number
        """
        if not isnumber(value):
            raise ValueError(f"Expected a number, got '{type_name(value)}'")
        self.value = value
        self.local_gradients = defaultdict(float, {self: 1})

    def set_name(self, name: str):
        """
        Change the symbol of the variable to `name`. Note that this deletes the
        computed gradient for the variable.

        :param name: new name of the variable
        :raises ValueError: if `name` is not a str
        """
        if not isinstance(name, str):
            raise ValueError(f"Expected a 'str', got '{type_name(value)}'")
        self.symbol = name
        self.local_gradients = defaultdict(float, {self: 1})

    def set_vector_values(vector: Iterable, values: Iterable[int | float]) -> None:
        """
        Change values of an entire vector of variable to `values`. Note that this deletes the
        computed gradient for the variables.

        :param vector: an iterable of variables to change values of
        :param values: new values of the variables
        :raises ValueError: if not a number is found in the `values`
        :raises ValueError: lengths of `vector` and `values` mismatch
        """
        if len(values) != len(vector):
            raise ValueError("`len(values)` must be the same as `len(vector)`")
        for var, val in zip(vector, values):
            var.set_value(val)

    def get_gradient(self) -> dict:
        """
        Compute the first derivatives of the variable with respect to all its child variables.

        :returns: dict which maps child variable to the derivative of the function w.r.t. this variable
        """
        gradients = defaultdict(float)
        def compute_gradients(variable: BwdAAD, path_value):
            for child_variable, local_gradient in variable.local_gradients.items():
                # multiply the edges of a path
                path_to_child_value = path_value * local_gradient
                # add together different paths
                gradients[child_variable] += path_to_child_value
                # recurse through graph if it is not the user-initialised variable
                if not child_variable.symbol:
                    compute_gradients(child_variable, path_to_child_value)
        compute_gradients(self, path_value=1) # path_value=1 is from `variable` differentiated w.r.t. itself
        return dict(gradients)
    
    def print_gradient(self, precision: int = 3):
        print(', '.join(['d/d{} = {:.{}f}'.format(key.symbol, d, precision) for key, d in sorted(list(self.get_gradient().items()), key=lambda x: x[0].symbol if x[0].symbol else '') if key.symbol]))

    def __add__(self, other):
        if isinstance(other, BwdAAD):
            value = self.value + other.value
            local_gradients = defaultdict(float)
            local_gradients[self] += 1
            local_gradients[other] += 1
            return BwdAAD(value, local_gradients)
        if isnumber(other):
            return BwdAAD(self.value + other, defaultdict(float, {self: 1}))
        raise_operand_exception(self, other, '+')
    
    def __mul__(self, other):
        if isinstance(other, BwdAAD):
            value = self.value * other.value
            local_gradients = defaultdict(float)
            local_gradients[self] += other.value
            local_gradients[other] += self.value
            res = BwdAAD(value, local_gradients)
            return res
        if isnumber(other):
            res = BwdAAD(self.value * other, defaultdict(float, {self: other}))
            return res
        raise_operand_exception(self, other, '*')
    
    def __neg__(self):
        value = -1 * self.value
        local_gradients = defaultdict(float, {self: -1})
        return BwdAAD(value, local_gradients)

    def __sub__(self, other):
        return self + (-other)

    def _inverse(self):
        value = 1 / self.value
        local_gradients = defaultdict(float, {self: -1 / self.value ** 2})
        return BwdAAD(value, local_gradients)   

    def __truediv__(self, other):
        return self * other._inverse()
    
    def __rtruediv__(self, other):
        return self._inverse() * other

    __rmul__ = __mul__
    __radd__ = __add__

    def __pow__(self, other):
        if isinstance(other, BwdAAD):
            value = pow(self.value, other.value)
            local_gradients = defaultdict(float)
            local_gradients[self] += other.value * pow(self.value, other.value - 1)
            local_gradients[other] += pow(self.value, other.value) * np.log(self.value)
            return BwdAAD(value, local_gradients)
        if isnumber(other):
            value = pow(self.value, other)
            local_gradients = defaultdict(float, {self: other * pow(self.value, other - 1)})
            return BwdAAD(value, local_gradients)
        raise_operand_exception(self, other, '** or pow()')

    def __rpow__(self, other):
        if isnumber(other):
            value = pow(other, self.value)
            local_gradients = defaultdict(float, {self: pow(other, self.value) * np.log(other)})
            return BwdAAD(value, local_gradients)
        raise_operand_exception(self, other, '** or pow()')

Пользоваться им нужно так (я постарался сохранить максимальную совместимость с классом `FwdAAD` с практики, поэтому код в точности такой же за исключением отсутствие флага `ignore_existent`)

In [3]:
x: BwdAAD = BwdAAD.get_variable('x', 2)
y: BwdAAD = BwdAAD.get_variable('y', np.pi)

def func(x, y):
    return x ** 3 - 2 * x ** 2 * y ** 2 + y ** 3

f: BwdAAD = func(x, y)
f.print_gradient(precision=10)

# if we want to change values and names:
x.set_value(1)
y.set_value(1)
x.set_name('y_0')
y.set_name('y_1')

f: BwdAAD = func(x, y)
f.print_gradient(precision=0)

d/dx = -66.9568352087, d/dy = -20.6566692542
d/dy_0 = -1, d/dy_1 = -1


Теперь сравним производительность этого варианта вычисления градиента с forward-mode методом.

In [4]:
def f_1(x):
    return np.sum(np.power(x, 2))
def f_2(x):
    return np.sum(np.power(x, x))

In [5]:
NUM_VARIABLES = 11
NUM_ARGUMENTS = 6

In [6]:
x_fwd = FwdAAD.get_vector('X', length=NUM_VARIABLES)

FwdAAD.set_vector_values(x_fwd, np.linspace(0, 1, NUM_VARIABLES))

start_time_fwd_1 = time.time()
for arguments in tqdm(itertools.product(x_fwd, repeat=NUM_ARGUMENTS), total=NUM_VARIABLES ** NUM_ARGUMENTS):
    f_1(arguments).get_gradient()
time_fwd_1 = int(time.time() - start_time_fwd_1)

start_time_fwd_2 = time.time()
FwdAAD.set_vector_values(x_fwd, np.linspace(1, 2, NUM_VARIABLES))
for arguments in tqdm(itertools.product(x_fwd, repeat=NUM_ARGUMENTS), total=NUM_VARIABLES ** NUM_ARGUMENTS):
    f_2(arguments).get_gradient()
time_fwd_2 = int(time.time() - start_time_fwd_2)

100%|██████████| 1771561/1771561 [02:52<00:00, 10283.69it/s]
100%|██████████| 1771561/1771561 [04:48<00:00, 6132.65it/s]


In [7]:
x_bwd = BwdAAD.get_vector('X', length=NUM_VARIABLES)

BwdAAD.set_vector_values(x_bwd, np.linspace(0, 1, NUM_VARIABLES))

start_time_bwd_1 = time.time()
for arguments in tqdm(itertools.product(x_bwd, repeat=NUM_ARGUMENTS), total=NUM_VARIABLES ** NUM_ARGUMENTS):
    f_1(arguments).get_gradient()
time_bwd_1 = int(time.time() - start_time_bwd_1)

start_time_bwd_2 = time.time()
BwdAAD.set_vector_values(x_bwd, np.linspace(1, 2, NUM_VARIABLES))
for arguments in tqdm(itertools.product(x_bwd, repeat=NUM_ARGUMENTS), total=NUM_VARIABLES ** NUM_ARGUMENTS):
    f_2(arguments).get_gradient()
time_bwd_2 = int(time.time() - start_time_bwd_2)

100%|██████████| 1771561/1771561 [01:06<00:00, 26782.71it/s]
100%|██████████| 1771561/1771561 [01:25<00:00, 20632.29it/s]


In [8]:
print(f"Forward-mode autodiff:  {time_fwd_1 // 60}m {time_fwd_1 % 60}s and {time_fwd_2 // 60}m {time_fwd_2 % 60}s")
print(f"Backward-mode autodiff: {time_bwd_1 // 60}m {time_bwd_1 % 60}s and {time_bwd_2 // 60}m {time_bwd_2 % 60}s")

Forward-mode autodiff:  2m 52s and 4m 48s
Backward-mode autodiff: 1m 6s and 1m 25s


Backward-mode оказался значительно эффективнее forward-mode!