In [1]:
import numpy as np
import gymnasium as gym
import matplotlib.pyplot as plt
from time import time
from tqdm import tqdm

%load_ext autoreload
%autoreload 2

In [2]:
env = gym.make('MountainCar-v0', render_mode="rgb_array")

In [3]:
print(f"Action space: {env.action_space}")
print(f"Observation space: {env.observation_space}")

Action space: Discrete(3)
Observation space: Box([-1.2  -0.07], [0.6  0.07], (2,), float32)


In [4]:
starting_state, _ = env.reset() 

print(f"Starting state: {starting_state}")

Starting state: [-0.53928685  0.        ]


In [5]:
def discretize_position(position,size_bin_x,size_bin_y,xmin=-1.2,xmax=0.6,ymin=-0.07,ymax=0.07):
    n_bins_x = int((xmax-xmin)/size_bin_x)
    n_bins_y = int((ymax-ymin)/size_bin_y)
    x = np.linspace(xmin,xmax,n_bins_x)
    y = np.linspace(ymin,ymax,n_bins_y)
    # print(f"Number of bins in x: {n_bins_x}")
    # print(f"Number of bins in y: {n_bins_y}")
    # print(y)
    return np.digitize(position[0],x),np.digitize(position[1],y)

In [6]:
starting_state+np.array([0.1,0.05])

array([-0.43928685,  0.05      ])

In [7]:
discretize_position(starting_state+np.array([0.1,0.00]),0.025,0.005)

(30, 14)

In [67]:
class DynaAgent():
    def __init__(self,env,size_bin_x=0.025,size_bin_y=0.005,epsilon_start=0.1,epsilon_end=0.05,alpha=0.1,gamma=0.99,k=5):
        self.env = env
        
        self.n_bins_x = int((env.observation_space.high[0]-env.observation_space.low[0])/size_bin_x)
        # print(f"Number of bins in x: {self.n_bins_x}")
        # print((env.observation_space.high[0]-env.observation_space.low[0])/size_bin_x)
        assert (env.observation_space.high[0]-env.observation_space.low[0])/size_bin_x-self.n_bins_x<1e-5 , "size_bin_x is not a divisor of the range of the x axis"
        self.n_bins_y= int((env.observation_space.high[1]-env.observation_space.low[1])/size_bin_y)
        assert (env.observation_space.high[1]-env.observation_space.low[1])/size_bin_y-self.n_bins_y<1e-5 , "size_bin_y is not a divisor of the range of the y axis"
        self.disc_step=np.array([size_bin_x,size_bin_y])
        self.x_bins = np.linspace(env.observation_space.low[0],env.observation_space.high[0],self.n_bins_x)
        self.y_bins= np.linspace(env.observation_space.low[1],env.observation_space.high[1],self.n_bins_y)
        
    
        self.epsilon_start = epsilon_start
        self.epsilon_end = epsilon_end
        self.alpha = alpha
        self.gamma = gamma
        self.k=k
        self.n_states= self.n_bins_x*self.n_bins_y
        self.counts_total=np.zeros((self.n_states,3))
        self.counts_transition=np.zeros((self.n_states,3,self.n_states))
        self.P=np.ones((self.n_states,3,self.n_states))*1/self.n_states
        self.R=np.zeros((self.n_states,3))
        self.Q=np.zeros((self.n_states,3))
        self.model = {}
        
        self.state_action_pairs_encontered = []
        
        self.state=self.discretize_position(env.reset()[0])
    def discretize_position(self,position):
        if np.digitize(position[0],self.x_bins)*np.digitize(position[1],self.y_bins)+np.digitize(position[1],self.y_bins)>=2016:
            raise ValueError(f"position: {position} is out of bounds")
        return np.digitize(position[0],self.x_bins)*np.digitize(position[1],self.y_bins)+np.digitize(position[1],self.y_bins)
    
    def update_P(self,state,action,next_state):
        self.counts_total[state,action]+=1
        self.counts_transition[state,action,next_state]+=1
        old_p=self.P[state,action,next_state]
        self.P[state,action,next_state]=self.P[state,action,next_state]+self.counts_transition[state,action,next_state]/(1+self.counts_total[state,action])
        self.P[state,action]/=np.sum(self.P[state,action])
        pass
    
    def update_R(self,state,action,reward):
        self.R[state,action] = (self.R[state,action]*(self.counts_total[state,action]-1)+reward)/self.counts_total[state,action]
        pass
    
    def update_Q(self,state,action):
        self.Q[state][action]=self.R[state,action]+self.gamma*np.sum(self.P[state,action]*np.max(self.Q,axis=1))
        pass
        
    def update_step(self,epsilon):
        state=self.state
        if np.random.rand()<epsilon:
            action = np.random.choice([0,1,2])
        else:
            action = np.argmax(self.Q[state])
        self.state_action_pairs_encontered.append((state,action))
            
        next_state, reward, done, _ ,_= self.env.step(action)
        #print(next_state)
        next_state=self.discretize_position(next_state)
        #self.Q[state][action] += self.alpha*(reward+self.gamma*np.max(self.Q[next_state])-self.Q[state][action])# TO DOOOOOOOOOOOOOOto be changeddddddd
        self.update_P(state,action,next_state)
        self.update_R(state,action,reward)
        #self.Q[state][action] = self.Q[state][action]+self.alpha*(reward+self.gamma*np.max(self.Q[next_state])-self.Q[state][action])
        self.update_Q(state,action)
        
        self.state=next_state
        
        for planning_step in range(self.k):
            state,action = self.state_action_pairs_encontered[np.random.choice(len(self.state_action_pairs_encontered))]
            reward = self.R[state,action]
            self.Q[state][action] =reward+self.gamma*np.sum(self.P[state,action]*np.max(self.Q,axis=1))#TO DOOOOOOOOOOOOOOOOOOOOOOOto be chnageddddddddddddddd
        
        pass
    
    def training(self,n_steps,reset=True):
        print("Training...")
        if reset:
            self.state=self.discretize_position(env.reset()[0])
        else:
            pass
        epsilon = self.epsilon_start
        
        for i in tqdm(range(n_steps)):
            self.update_step(epsilon)
            #epsilon exponential decay:
            epsilon = self.epsilon_end+(self.epsilon_start-self.epsilon_end)*np.exp(-i/1000)
            # if i%1000==0:
            #     print(np.max(self.Q))
            
        pass
    
    
    def inference(self,max_steps):
        frames=[]
        state=self.discretize_position(self.env.reset()[0])
        tot_reward=0
        frames.append(self.env.render(mode="rgb_array"))
        self.env.render()
        for i in tqdm(range(max_steps)):
            action = np.argmax(self.Q[state])
            next_state, reward, done, _ ,_= self.env.step(action)
            next_state=self.discretize_position(next_state)
            frames.append(self.env.render(mode="rgb_array"))
            tot_reward+=reward
            state=next_state
            if done:
                print(f"Episode finished after {i} timesteps")
                break
        print(f'total reward: {tot_reward}')
        return frames,tot_reward

In [68]:
env = gym.make('MountainCar-v0', render_mode="human")
dyna_agent=DynaAgent(env)

In [70]:
dyna_agent.training(10000,reset=False)

Training...


 89%|████████▉ | 8891/10000 [05:00<00:37, 29.58it/s]


KeyboardInterrupt: 