In [27]:
import numpy as np
import gym
import random
import math

from sklearn.preprocessing import KBinsDiscretizer
import time
from typing import Tuple
import tensorflow as tf
import datetime

In [28]:
import warnings
warnings.filterwarnings('ignore')

In [29]:
# Set Seeds:
np.random.seed(0)
random.seed(0)

In [30]:
logdir = "./logs/"+datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
writer = tf.summary.create_file_writer(logdir=logdir, name="q_learning")

In [31]:
# define Q_learning class

class QLearning:
    def __init__(self, n_state, n_action) -> None:
        self.num_states = n_state
        self.num_actions = n_action
        self.q_table = np.zeros((3,6,6,12)+(2,))


In [32]:
#define agent Class

class QAgent:

    def __init__(self, env) -> None:
        self.env = env
        self.num_states = env.observation_space.shape[0] # number of states
        self.num_actions = env.action_space.n # num of actions
        self.learning_rate = 0.0001
        self.gamma = 0.99 # discount factor
        self.epsilon_max = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.005
        self.epsilon = self.epsilon_max # initial epsilon

        # Q Learning algorithm
        self.q_learning = QLearning(self.num_states,self.num_actions)
    

    def discretizer(self, position , velocity , angle, pole_velocity ) -> Tuple[int,...]:
        """
        Convert continues state intro a discrete state
        Link: https://www.youtube.com/watch?v=KMjQmG5Uzis 
        Link 2: https://github.com/RJBrooker/Q-learning-demo-Cartpole-V1/blob/master/cartpole.ipynb
        """
        n_bins = (3, 6, 6 , 12 )
        lower_bounds = [self.env.observation_space.low[0],-12.0, self.env.observation_space.low[2], -math.radians(50) ]
        upper_bounds = [self.env.observation_space.high[0], 12.0, self.env.observation_space.high[2], math.radians(50) ]
        est = KBinsDiscretizer(n_bins=n_bins, encode='ordinal', strategy='uniform')
        est.fit([lower_bounds, upper_bounds ])
        return tuple(map(int,est.transform([[position, velocity, angle, pole_velocity]])[0]))


    
    def exploration_exploitation(self, state):
        rand_epsilon = random.uniform(0,1) # get random epsilon value for exploration/explotation

        if rand_epsilon <= self.epsilon:
            # explore the environment
            return self.env.action_space.sample()
        else:
            # exploit the environment. return action with max q value
            return np.argmax(self.q_learning.q_table[state])
    
    def explorationRate(self, epoch, min_rate=0.1):
        return max(min_rate,min(1,1.0-math.log10((epoch+1)/25)))
    def learning_rate_update(self, epoch, min_rate=0.01):
        return max(min_rate, min(1.0,1.0-math.log10((epoch+1)/25)))
    # Learn Q values
    def learn_environment(self,state, action, reward, next_state, episode):
        """
        Formula: Q(state, action) = old q value + learning rate *(current reward + discount factor* optimal q value from next state-old_q_value)
        """
        self.learning_rate = self.learning_rate_update(episode)
        self.q_learning.q_table[state][action] += self.learning_rate*(reward+self.gamma*np.max(self.q_learning.q_table[next_state])-self.q_learning.q_table[state][action])
        self.epsilon = self.explorationRate(episode)

    def train(self):
        for episode in range(2000):
            # reset environment
            observation, _ = self.env.reset()
            observation = self.discretizer(*observation)
            # run upto this number of step in each episodes
            ep_reward = 0
            done = False

            while done==False:
                action = self.exploration_exploitation(observation)
                next_observation, reward, terminated,done, info = self.env.step(action)
                next_observation = self.discretizer(*next_observation)
                self.learn_environment(observation, action, reward, next_observation, episode)
                ep_reward += reward
                observation = next_observation
            with writer.as_default():
                tf.summary.scalar("reward", ep_reward, step=(episode+1))
                writer.flush()
            if(episode+1) == 10000:
                self.env.render(mode="human")
        self.env.close()
    


    

https://www.datacamp.com/tutorial/introduction-q-learning-beginner-tutorial

In [33]:
# define Environments

env = gym.make("CartPole-v1")
agent = QAgent(env=env)



In [34]:
agent.train()