In [159]:
from abc import ABC, abstractmethod
import gymnasium as gym
import random
import numpy as np
import torch
import torch.nn as nn
from collections import OrderedDict
import math
import random

In [160]:
### MLP
class Multi_Layer_Perceptron(nn.Sequential):
    def __init__(self, input_dim, intern_dim, output_dim, depth = 2, isBiased = False):
        
        dict = OrderedDict([("input",nn.Linear(input_dim,intern_dim, bias=isBiased))])
        for i in range(depth):
            dict.update({str(i) : nn.Linear(intern_dim,intern_dim,bias=isBiased)})
        dict.update({"output" : nn.Linear(intern_dim,output_dim,bias=isBiased)})

        super().__init__(dict)

        self.reset_init_weights_biases(0) # so that we do not use a default initialization

    def reset_init_weights_biases(self, norm = None):
        for layer in self.children():
            if norm == None:
                stdv = 1. / math.sqrt(layer.weight.size(1))
            else :
                stdv = norm
            
            layer.weight.data.uniform_(-stdv, stdv)
            if layer.bias is not None:
                layer.bias.data.uniform_(-stdv, stdv)

In [161]:
### CustomLoss DQN
class DQN_Loss(nn.Module):
    def __init__(self):
        super(DQN_Loss, self).__init__()

    def forward(self, input, target):
        loss = (target - input)**2
        return loss

In [162]:
class Agent(ABC):

    @abstractmethod
    def observe(self, state, action, next_state, reward):
        pass

    @abstractmethod
    def select_action(self, state):
        pass
    
    @abstractmethod
    def update(self):
        pass

    @abstractmethod
    def train(self, episodes, debug_mode=False):
        pass

    def __init__(self, id, env):
        self.id = id
        self.env = env
                

In [163]:
class RandomAgent(Agent):
    def observe(self, state, action, next_state, reward):
        pass
        
    def select_action(self, state):
        return random.randint(0,1)
        
        
    def update(self):
        pass

    def train(self, episodes, debug_mode=False):
        episodesHistory = np.zeros((episodes))
        
        for i in range(episodes):
            if(debug_mode) : print("Episode: "+str(i+1)+" starts")
            newSeed = random.randint(0,100000)
            state,_ = self.env.reset(seed = newSeed)
            done = False
            episode_reward = 0
            
            while not done:
                                        
                action = self.select_action(state)
                next_state, reward, terminated, truncated, _ = self.env.step(action)
                self.observe(state,action,next_state,reward)
                self.update()

                episode_reward += reward
                state = next_state
                done = terminated or truncated

            episodesHistory[i] = episode_reward
            if(debug_mode) : print("Episode "+str(i+1)+ " , Reward: "+str(episode_reward))
        print(episodesHistory[:])

In [173]:
class DQNAgent(Agent):

    def __init__(self, id, epsilonMax, epsilonMin = 0.05, Q = Multi_Layer_Perceptron(input_dim = 3,intern_dim = 64, output_dim = 1, depth = 2, isBiased = True), env = gym.make('MountainCar-v0'), arrayNewReward = None, gamma = 0.99, replay_buffer_SIZE = 10000, batch_size = 64, observation_SIZE = 6, optimizer = torch.optim.AdamW):
        Agent.__init__(self,id,env)
        self.Q = Q
        self.gamma = gamma
        self.epsilon = epsilonMax
        self.epsilonMax = epsilonMax
        self.epsilonMin = epsilonMin
        self.replay_buffer = np.zeros((replay_buffer_SIZE,observation_SIZE))
        self.batch_size = batch_size
        self.optimizer = optimizer(self.Q.parameters())
        self.replay_buffer_SIZE = replay_buffer_SIZE
        self.observation_SIZE = observation_SIZE
        self.arrayNewReward = arrayNewReward
        
    def observe(self, state, action, next_state, reward):
        state = np.array(state)
        action = np.array([action])
        next_state = np.array(next_state)
        reward = np.array([reward])

        concatenatation = np.concatenate((state, action, next_state, reward))
        return concatenatation
        
    def select_action(self, state):
        P = random.uniform(0,1)
        a=0
        if P < 1-self.epsilon :
            A=np.zeros(3)
            for k in range(2):
                A[k] = self.Q(torch.from_numpy(np.concatenate((np.array(state), np.array([k])))).to(torch.float32))
            a = np.argmax(A)
        else:
            a = random.randint(0,2)
            
        return a
        
        
    def update(self):

        LossFct = DQN_Loss()
        
        batch = np.random.choice(self.replay_buffer.shape[0], self.batch_size)

        for i in range(self.batch_size):

            A0 = torch.from_numpy(np.concatenate((self.replay_buffer[batch[i],3:5], np.array([0])))).to(torch.float32)
            A1 = torch.from_numpy(np.concatenate((self.replay_buffer[batch[i],3:5], np.array([1])))).to(torch.float32)
            target = self.replay_buffer[batch[i],5] + self.gamma*max(self.Q(A0),self.Q(A1))
            input = self.Q(torch.from_numpy(self.replay_buffer[batch[i],:3]).to(torch.float32))

            loss = LossFct(input, target)
            
            self.optimizer.zero_grad()
            loss.backward()
            #print(loss.item())
            self.optimizer.step()

    def customReward(self, action, currentReward, next_state, uniqueReward, excludePassivity):
        reward = 0

        if excludePassivity and action == 1:
            return -1
        
        if self.arrayNewReward.any() == None:
            return reward
            
        for k in range(self.arrayNewReward.shape[0]):
            #print(i)
            i = self.arrayNewReward[k,0]
            if (i <= next_state[0] and i +0.4 >=0) or (i >= next_state[0] and i +0.6 <=0):
                if self.arrayNewReward[k,2] == 0:
                    reward = self.arrayNewReward[k,1]
                if uniqueReward: self.arrayNewReward[k,2] = 1
        return reward

    def play(self):
        self.epsilon = 0
        newSeed = random.randint(0,100000)
        state,_ = self.env.reset(seed = newSeed)
        done = False
                    
        while done == False:
                                        
            action = self.select_action(state)
            next_state, reward, terminated, truncated, _ = self.env.step(action)
            done = terminated or truncated
        self.env.close()
    
    def train(self, episodes, debug_mode=False, recap_mode=False, reset_init = False, epsilon_decrease = True, uniqueReward = False, excludePassivity = True):
        episodesHistory = np.zeros((episodes))
        if reset_init: self.Q.reset_init_weights_biases(0)
        
        for e in range(episodes):
            if debug_mode: print("Episode: "+str(e+1)+" starts")
                
            if epsilon_decrease: 
                if self.epsilon > self.epsilonMin:
                    self.epsilon = self.epsilonMax*math.exp(-e/10)
            else:
                self.epsilon = self.epsilonMax
                
            newSeed = random.randint(0,100000)
            state,_ = self.env.reset(seed = newSeed)
            done = False
            episode_reward = 0
            self.replay_buffer = np.zeros((self.replay_buffer_SIZE, self.observation_SIZE))
            if self.arrayNewReward.any() != None:
                for i in range(self.arrayNewReward.shape[0]):
                    self.arrayNewReward[i,2] == 0
            k=0
            
            while done == False:
                                        
                action = self.select_action(state)
                next_state, reward, terminated, truncated, _ = self.env.step(action)
                reward = self.customReward(action,reward, next_state, uniqueReward, excludePassivity)
                if debug_mode: print("Action "+str(k)+" selected: "+str(action)+" Reward: "+ str(reward))
                observe = self.observe(state,action,next_state,reward)
                self.replay_buffer[k] = observe
                k+=1
                episode_reward += reward
                state = next_state
                done = terminated or truncated

            if debug_mode or recap_mode: print("Episode "+str(e+1)+ " , Reward: "+str(episode_reward)+" Epsilon: "+str(self.epsilon))
            self.update()
            episodesHistory[e] = episode_reward
        return episodesHistory[:]

In [178]:
NewReward=np.array([(-0.85,25,0),(-0.7,15,0),(-0.2,10,0),(0,20,0),(0.40,3,0), (0.5,200,0)])
DQN = DQNAgent(0,1, epsilonMin = 0.2, arrayNewReward = NewReward)
DQN.train(5000, debug_mode = False, recap_mode=True, reset_init = True, uniqueReward = True, excludePassivity = True )

Episode 1 , Reward: -866.0 Epsilon: 1.0
Episode 2 , Reward: -830.0 Epsilon: 0.9048374180359595
Episode 3 , Reward: -722.0 Epsilon: 0.8187307530779818
Episode 4 , Reward: -614.0 Epsilon: 0.7408182206817179
Episode 5 , Reward: -551.0 Epsilon: 0.6703200460356393
Episode 6 , Reward: -542.0 Epsilon: 0.6065306597126334
Episode 7 , Reward: -596.0 Epsilon: 0.5488116360940265
Episode 8 , Reward: -533.0 Epsilon: 0.4965853037914095
Episode 9 , Reward: -477.0 Epsilon: 0.44932896411722156
Episode 10 , Reward: -506.0 Epsilon: 0.4065696597405991
Episode 11 , Reward: -461.0 Epsilon: 0.36787944117144233
Episode 12 , Reward: -461.0 Epsilon: 0.33287108369807955
Episode 13 , Reward: -443.0 Epsilon: 0.30119421191220214
Episode 14 , Reward: -380.0 Epsilon: 0.2725317930340126
Episode 15 , Reward: -353.0 Epsilon: 0.2465969639416065
Episode 16 , Reward: -272.0 Epsilon: 0.22313016014842982
Episode 17 , Reward: -353.0 Epsilon: 0.20189651799465538
Episode 18 , Reward: -317.0 Epsilon: 0.18268352405273466
Episode 1

KeyboardInterrupt: 

In [179]:
DQN.env = gym.make('MountainCar-v0', render_mode='human')
DQN.play()