<a href="https://colab.research.google.com/github/markhalka/Paper_Imlimentations/blob/main/IDBD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import math

"""
Impliments the paper: Adapting Bias by Gradient Descent: An Incremental Version of Delta-Bar-Delta by Sutton et. al

Incremental Delta Bar Delta is a type of meta-learning algorithm, which learns the best parameters for
a stochastic gradient descent process (such as a neural network). It learns over time how best to adjust
biases (in this case the learning rate), and it is escpecially well suited for non-stationary domains
"""

# define the default parameters used in the paper
DEFAULT_WEIGHTS = 0.04
DEFAULT_BETA = -3
DEFAULT_LR = 0
DEFAULT_THETA = 0.01 

class Learner():
    def __init__(self, input_size):
        self.size = input_size
        self.weights = [DEFAULT_WEIGHTS] * self.size
        self.betas = [DEFAULT_BETA] * self.size
        self.h = np.zeros(self.size)
        self.lrs = [DEFAULT_LR] * self.size
        self.theta = DEFAULT_THETA
        for i in range(self.size):
            self.lrs[i] = np.exp(self.betas[i])
    
    def get_delta(self, input, label):
        if len(input) != self.size:
            raise RuntimeError("incorrect input")
        y_hat = 0.0
        for i in range(self.size):
            y_hat += input[i] * self.weights[i]    
        return label - y_hat
     
    def update_betas(self, delta, input):
        for i in range(self.size):
            self.betas[i] += delta * self.theta * self.h[i] * input[i]

    def update_lrs(self, delta, input):
        self.update_betas(delta, input)
        for i in range(self.size):
            self.lrs[i] = np.exp(self.betas[i])

    def update_h(self, delta, input):
        for i in range(self.size):
            self.h[i] = self.h[i] * max(1-self.lrs[i] * pow(input[i],2), 0) + self.lrs[i] * delta * input[i]

    def update_weights(self, input, label):
        delta = self.get_delta(input, label)
        self.update_lrs(delta, input)
        self.update_h(delta, input)
        for i in range(self.size):
            self.weights[i] += self.lrs[i] * delta * input[i]
        return delta


"""
Impliment the same tests as in the paper, and compare results
The test is a simple non-stationary task, for more information, consult the paper
"""
class Tests():
    def __init__(self):
        self.weights = [1] * 5
        self.count = 0
        self.learner = Learner(20)
        
    def get_data(self, n):
        if n < 5:
            raise RuntimeError("must be at least 5")
        vec =  np.random.normal(size=n)
        label = np.sum(vec[0:5] * self.weights)
        return vec, label
    
    def change_weights(self):
        index = np.random.randint(0,5)
        self.weights[index] *= -1

    def get_next(self):
        self.count += 1
        if self.count % 20 == 0:
            self.change_weights()
        return self.get_data(20)

    def run_test(self):
        error = 0.0
        for i in range(20000):
            input, label = self.get_next()
            delta = pow(self.learner.update_weights(input, label),2)
            error += 1/1000 * delta
            if i % 1000 == 0:
                print("avg mse: %f" % (error))
                error = 0

test = Tests()
test.run_test()
