In [1]:
import gym
import numpy as np
import random
import copy

In [2]:
class Network:
    
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers):

        self.W = [np.random.randn(input_dim, hidden_dim)*((2/(input_dim+hidden_dim))**0.5)]

        self.b = [np.zeros(hidden_dim)]

        self.gradW = [np.zeros((input_dim, hidden_dim))]

        self.gradb = [np.zeros(hidden_dim)]

        for i in range(num_layers):

            self.W.append(np.random.randn(hidden_dim, hidden_dim)*((1/hidden_dim)**0.5))

            self.b.append(np.zeros(hidden_dim))

            self.gradW.append(np.zeros((hidden_dim, hidden_dim)))

            self.gradb.append(np.zeros(hidden_dim))

        self.W.append(np.random.randn(hidden_dim, output_dim)*((2/(hidden_dim+output_dim))**0.5))

        self.b.append(np.zeros(output_dim))

        self.gradW.append(np.zeros((hidden_dim, output_dim)))

        self.gradb.append(np.zeros(output_dim))

        self.activations = []



    def forward(self, x):

        self.activations = [x.copy()]

        for i in range(len(self.W)-1):

            x = np.maximum(x @ self.W[i] + self.b[i], 0)

            self.activations.append(x.copy())

        x = x @ self.W[-1] + self.b[-1]

        return x



    def backward(self, error):

        self.gradW[-1] = self.activations[-1].reshape(-1, 1) @ error.reshape(1, -1)

        self.gradb[-1] = error.copy()

        for i in reversed(range(len(self.W)-1)):

            error = (error @ self.W[i+1].T) * (self.activations[i+1]>0).astype(int)

            self.gradW[i] = self.activations[i].reshape(-1, 1) @ error.reshape(1, -1)

            self.gradb[i] = error.copy()



    def update(self, lr):

        for i in range(len(self.W)):

            self.W[i] = self.W[i] - lr * self.gradW[i]

            self.b[i] = self.b[i] - lr * self.gradb[i]



    def __repr__(self):

        return "(" + ", ".join([str(x.shape[0]) for x in self.W]) + ", " + str(self.W[-1].shape[1]) + ")"

In [3]:
env = gym.make("CartPole-v1")
env.reset()

  deprecation(
  deprecation(


array([ 0.042111  , -0.03571446, -0.00270681,  0.00681081], dtype=float32)

In [4]:
def getTargetNet(net):
    netCopy = Network(1,1,1,1)
    netCopy.W = copy.deepcopy(net.W)
    netCopy.b = copy.deepcopy(net.b)
    netCopy.gradb = copy.deepcopy(net.gradb)
    netCopy.gradW = copy.deepcopy(net.gradW)
    netCopy.activations = copy.deepcopy(net.activations)
    return netCopy

In [5]:
def policy(x, epsilon):
    r = random.random()
    if r < epsilon:
        return random.choice([0,1])
    else:
        return np.argmax(x)


In [8]:
def estimatePerformance():  
    counters = []
    epsilon = -1
    trials = 10
    for i in range(trials):
        counter = 0
        terminated = False
        observation = env.reset()
        while not terminated:
            action = np.argmax(net.forward(observation))
            observation, reward, terminated, info = env.step(action)
            counter += reward
        counters.append(counter)

    summ =  0
    for i in range(trials):
        summ += counters[i]
    return summ/trials

In [9]:
net = Network(4, 16, 2, 1) # (MLP structure: 4 -- 16 -- 16 -- 2)


In [10]:
targetNet = getTargetNet(net)
learning_rate = 0.0001
gamma = 0.99 # for the horizon
epsilon = 1.0 # decay it with 0.999 after each episode and fix it at 0.1
epsilonDecay = 0.999
counter = 0
episode = 0

In [11]:
while True:
    observation = env.reset()
    replayBuffer = []
    targetNet = getTargetNet(net)
    for i in range(1000):
        
        x = net.forward(observation)
        action = policy(x, epsilon)
        replayBuffer.append((observation,x, action, env.step(action)))
        observation, reward, terminated, info = replayBuffer[-1][-1]
        if terminated:
            observation = env.reset()
    errors = []
    for frame in np.random.choice(range(len(replayBuffer)), 320, replace=False) :
        frame = replayBuffer[frame]
        state,x , action, step = frame
        observation, reward, terminated, info = step
        # x = targetNet.forward(state)
        # net.gradW = targetNet.gradW
        # net.gradb = targetNet.gradb
        x = net.forward(state)
        if terminated:
            target = reward
        else:
            target = reward +  gamma * np.max(targetNet.forward(observation))
        error = [-1 * (target - x[action]) if action == i else 0 for i in [0,1]]
        net.backward(np.array(error).reshape(-1))
        net.update(learning_rate)
    if epsilon > 0.11 :
        epsilon *= epsilonDecay
    counter += 1
    episode += 1
    if counter == 1000:
        print(episode, estimatePerformance())
        counter = 0

1000 9.7
2000 53.4
3000 200.6
4000 247.0
5000 500.0
6000 500.0
7000 500.0
8000 434.9
9000 500.0


KeyboardInterrupt: 

In [90]:
net.forward(env.reset())

array([147.85450564, 150.43269953])

In [95]:
estimatePerformance()

500.0

In [144]:
replayBuffer = []
net = Network(4, 16, 2, 1)
targetNet = getTargetNet(net)
while True:
    state = env.reset()
    for i in range(1000):
        x = net.forward(state)
        action = policy(x)

        replayBuffer.append((state,x, action, env.step(action)))
        observation, reward, terminated, info = replayBuffer[-1][-1]
        if terminated:
            env.reset()
        if(len(replayBuffer) < 32):
            continue
        for frame in np.random.choice(range(len(replayBuffer)), 320, replace=False) :
            frame = replayBuffer[frame]
            state,x , action, step = frame
            observation, reward, terminated, info = step
            x = net.forward(state)
            if not terminated: 
                reward += gamma * np.max(targetNet.forward(observation))
            error =  (reward - x[action]) * -1
            error = [error if action == i else 0 for i in [0,1]]
            
            net.backward(np.array(error))
            net.update(learning_rate)
    
    targetNet = getTargetNet(net)
    if epsilon > 10 ** -1:
        epsilon *= epsilonDecay
    replayBuffer = []
            
        

KeyboardInterrupt: 