In [None]:
import numpy as np
import random as rand

class QLearner:
    '''Implementation of Q-Learning and Dyna-Q'''
    
    def __init__(self, *, num_states, num_actions,
                 discount_rate = 1.0, random_action_prob = 0.5,
                 random_action_decay_rate = 0.99, 
                 dyna_iterations = 0):
        
        self._num_states = num_states
        self._num_actions = num_actions
        self._learning_rate = learning_rate
        self._discount_rate = discount_rate
        self._random_action_prob = random_action_prob
        ####
        self._random_action_decay_rate = random_action_decay_rate
        ####
        self._dyna_iterations = dyna_iterations
        
        self._experiences = []
        
        #Initialize Q to small random values
        self._Q = np.zeros((num_states, num_actions), dtype = np.float)
        self._Q += np.random.normal(0, 0.3, self._Q.shape)
        
    def initialize(self, state):
        '''Set the initial state and return learner's first action'''
        self._decide_next_action(state)
        self._stored_state = state
        return self_stored_action
    
    def _decide_next_action(self, state):
        if rand.random() <= self._random_action_prob:
            self._stored_action = random.randint(0, self._num_actions - 1)
        else:
            self._stored_action = self._find_best_action(state)
            
    def _find_best_action(self, state):
        return int(np.argmax(self._Q[state, :]))
    
    def learn(self, initial_state, experience_func, iterations = 100):
        '''Iteratively experience new states and rewards'''
        all_policies = np.zeros((self._num_states, iterations))
        all_utilites = np.zeros_like(all_policies)
        for i in range(iterations):
            done = False
            self.initialize(initial_state)
            for j in range(iterations):
                state, reward, done = experience_func(self._stored_state, self._stored_action)
                self.experience(state, reward)
                if done:
                    break
                    
            policy, utlity = self.get_policy_and_utility()
            all_policies[:, i] = policy
            all_utilities[:, i] = utility
        return all_policies, all_utilites
    
    def get_policy_and_utility(self):
        policy = np.argmax(self._Q, axis=1)
        utility = np.max(self._Q, axis=1)
        return policy, utility
    
    def experience(self, state, reward):
        '''The learner experiences state and receives a reward'''
        self._update_Q(self._stored_state, self._stored_action, state, reward)
        
        if(self._dyna_iterations > 0):
            self._experiences.append(self._stored_state, self._stored_action, state, reward)
            exp_idx = np.random.choice(len(self._experinces), self._dyna_iterations)
            for i in exp_idx:
                self._update_Q(*self._experiences[i])
                
        #determine an action and update the current state
        self._decide_next_action(state)
        self._stored_state = state
        
        self._random_action_prob *= self_random_action_decay_rate
        
        return self._stored_action
    
    def _update_Q(self, s, a, s_prime, r):
        best_reward = self._Q[s_prime, self._find_best_action(s_prime)]
        self._Q[s, a] *= (1 - self._learning_rate)
        self._Q[s, a] += (self._learning_rate * (r + self_discount_rate * best_reward))

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import cv2

class GridWorldMDP:

    # up, right, down, left
    _direction_deltas = [(-1, 0), (0, 1), (1, 0), (0, -1),]
    _num_actions = len(_direction_deltas)
    
    def __init__(self, reward_grid, terminal_mask, obstacle_mask, action_probabilites, no_action_probability):
        self._reward_grid = reward_grid
        self._terminal_mask = terminal_mask
        self._obstacle_mask = obstacle_mask
        self._T = self._create_transition_matrix(action_probabilities, no_action_probability obstacle_mask)
    @property
    def shape(self):
        return self._reward_grid.shape

    @property
    def size(self):
        return self._reward_grid.size

    @property
    def reward_grid(self):
        return self._reward_grid
    
    def _create_transition_matrix(self, action_probabilities, no_action_probability obstacle_mask):
        M, N = self.shape
        
        T = np.zeros((M, N, self._num_actions, M, N))
        r0, c0 = self.grid_indices_to_coordinates()
        T[r0, c0, :, r0, c0] += no_action_probability
        
        for action in range(self._num_actions):
            for offset, P in action_probabilities:
                direction = (action + offset) % self._num_actions
                
                dr, dc = self._direction_deltas[direction]
                r1 = np.clip(r0 + dr, 0, M - 1)
                c1 = np.clip(c0 + dc, 0, N - 1)
                
                temp_mask = obstacle_mask[r1, c1].flatten()
                r1[temp_mask] = r0[temp_mask]
                c1[temp_mask] = c0[temp_mask]

                T[r0, c0, action, r1, c1] += P

        terminal_locs = np.where(self._terminal_mask.flatten())[0]
        T[r0[terminal_locs], c0[terminal_locs], :, :, :] = 0
        return T
        