In [1]:
import gym 
import random
import numpy as np
import time
import tensorflow as tf
from gym.envs.registration import register  # to use register function
from IPython.display import clear_output  # feature of jupyter notebook

In [2]:
# register is in the init function of the source code of the open ai
try:     #used as if we register one game again we get error therfore to avoid we use exception handling
    register(
        id='FrozenLakenoslip-v0',     # we can change this name if we want before (-v0)
        entry_point='gym.envs.toy_text:FrozenLakeEnv',
        kwargs={'map_name' : '4x4','is_slippery':False},  # making game non slippery for our benifit
        max_episode_steps=100,
        reward_threshold=0.78, # optimum = .8196
    )
except:
    pass
env_name = "FrozenLakenoslip-v0"
env = gym.make(env_name)

In [3]:
print(env.observation_space)    # 4X4 matrix
print(env.action_space)     # 4 movement
type(env.action_space)

Discrete(16)
Discrete(4)


gym.spaces.discrete.Discrete

In [4]:
# using object orientation
class Agent():                     # player class
    def __init__(self, env):      # constructor  # self is Agent
        
# checking if the env we are working with is discrete or continuous 
        self.is_discrete = type(env.action_space)==gym.spaces.discrete.Discrete   # if true: is_discrete= True, else:False
        
        if self.is_discrete:                          # if true then size is below
            self.action_size = env.action_space.n
        else:                                        # else store max and min value
            self.action_low = env.action_space.low
            self.action_high = env.action_space.high
            self.action_shape = env.action_space.shape
            
# to figure out which action to use next all logic will be here 
    def get_action(self,state):
        # action = env.action_space.sample()     #different approch than line written beneath this
        if self.is_discrete:
            action = random.choice(range(self.action_size)) # just for testing
        else:
            action = np.random.uniform(self.action_low,   #just for testing
                                       self.action_high,
                                       self.action_shape)
        return action

In [9]:
class QNAgent(Agent):
    def __init__(self,env,discount_rate=0.97,learning_rate=0.01):
        
        super().__init__(env)
        self.state_size = env.observation_space.n # for discrete
        self.exploration_rate = 1.0 
        self.discount_rate = discount_rate
        self.learning_rate = learning_rate
        self.build_model()   # for tensorflow code
        
        # to run/start session the model we have built in tensorflow we write bellow code
        self.sess=tf.Session()
        self.sess.run(tf.global_variables_initializer())
        
    def build_model(self):    # tensorflow model here
        tf.reset_default_graph() # to reset all names given to layer
        
        # tensorflow needs to know what variable type is 
        # tf.placeholder used to pass/ create variable type and in bracket type and shape is given
        self.state_in = tf.placeholder(tf.int32, shape=[1]) # input state for our model
        self.action_in = tf.placeholder(tf.int32, shape=[1]) # action input state for our model
        self.target_in = tf.placeholder(tf.float32, shape=[1]) #  target input state for our model needs to be float
        
        self.state = tf.one_hot(self.state_in,depth=self.state_size)  # one hot encoding state_in value and saving it in state
        # depth gives the size of state_in that is 16 in this case 
        self.action = tf.one_hot(self.action_in,depth=self.action_size)
        # depth here is 4 as there are 4 possible actions
        
        self.q_state = tf.layers.dense(self.state,units=self.action_size,name="q_table")
        #saving the layer in variable q_state
        self.q_action = tf.reduce_sum(tf.multiply(self.q_state,self.action),axis=1)
        # multiplying actual o/p with layers o/p
        
        self.loss=tf.reduce_sum(tf.square(self.target_in - self.q_action))  # squaring and adding to get loss
        self.optimizer=tf.train.AdamOptimizer(self.learning_rate).minimize(self.loss)
        # choosing the optimizer and using optimizer to reduce loss
        
    def get_action(self,state):
        q_state = self.sess.run(self.q_state,feed_dict={self.state_in: [state]}) # no back prop only prediction
        # the above line code ,1 iteration of model with inputs given as dict using feed_dict variable
        # now q_states hold the o/p given by layers 
        action_greedy = np.argmax(q_state)  # learned state
        action_random = super().get_action(state) # for exploration choose random action
        return action_random if random.random()<self.exploration_rate else action_greedy
    
    def train(self,experience):
        state,action,next_state,reward,done = [[exp] for exp in experience]
        # list comprehension done as in tensorflow we need to pass list in the model so we are creating each element as list
        #now we have made the list therefore no need to pass list below
        q_next = self.sess.run(self.q_state, feed_dict={self.state_in:next_state})  # no back prop only prediction
        #running the model again here as we need next state q value in qtable algo 
        q_next[done]= np.zeros(self.action_size) #if done else q_next  # if game done
        q_target = reward+self.discount_rate*np.max(q_next)
        
        # creating a dic for the optimizer that we are using in our model
        feed={self.state_in:state,self.action_in:action,self.target_in:q_target}
        
        # updating optimizer using below lines 
        # this is actual training part and backprop performed here 
        self.sess.run(self.optimizer,feed_dict=feed)
        
        if done[0]:
            self.exploration_rate*=0.99
    
    def __del__self(self):   #deconstructor
        sess.sess.close()
        


In [10]:
agent=QNAgent(env)



In [12]:
total_reward=0.0
for ep in range(100):
    state=env.reset()
    done=False
    while not done:
        action=agent.get_action(state)
        next_state,reward,done,info = env.step(action)
        agent.train((state,action,next_state,reward,done))
        state=next_state
        total_reward+=reward
        print(f"s{state} a:{action}")   
        print(f"epsiode:{ep} reward:{total_reward} exploration:{agent.exploration_rate}")
        
        env.render()
        #the statement below are used to prevent the q_table to be reset for each iteration
        with tf.variable_scope("q_table",reuse=True):  #set reuse true so that we can use it again
            weights=agent.sess.run(tf.get_variable("kernel")) #to get q_table 
            print(weights)
        time.sleep(0.05)
        clear_output(wait=True)

s15 a:2
epsiode:99 reward:69.0 exploration:0.13397967485796175
  (Right)
SFFF
FHFH
FFFH
HFF[41mG[0m
[[ 0.15231162  0.05854299  0.19316824  0.0736348 ]
 [ 0.19942246 -0.7356729   0.21187428  0.12320637]
 [ 0.19301312  0.1141359   0.22164726  0.14607859]
 [ 0.20927079 -0.60846865  0.02380594  0.03808787]
 [ 0.14336185  0.0681117  -0.6003821   0.07825181]
 [-0.2017557  -0.42708766  0.1829803   0.18884534]
 [-0.6758391   0.1637468  -0.25909626  0.05128361]
 [ 0.24932957 -0.48201343 -0.30184117  0.17893845]
 [ 0.20239714 -0.7607962   0.25232655  0.10861719]
 [ 0.21751364  0.10430083  0.28718358 -0.8155135 ]
 [ 0.21618131  0.16236055 -0.55861443  0.16362707]
 [-0.3943591   0.2185747  -0.52882516  0.34483308]
 [ 0.1621362  -0.37694463 -0.14190015 -0.47672582]
 [-0.29023674 -0.015889    0.24226455  0.17535181]
 [ 0.2164892   0.14758724  0.35221773  0.19689049]
 [-0.12469646  0.22696102 -0.43533072  0.27271748]]


In [None]:
# experience replay timing