In [None]:
from GridWorld import GridWorld
from ValueIteration import ValueIteration
from PolicyIteration import PolicyIteration
import numpy as np

problem = GridWorld('world00.csv',reward={0: -0.04, 1: 1.0, 2: -1.0, 3: np.NaN}, random_rate=0.2)

solver = ValueIteration(problem.reward_function, problem.transition_model, gamma=0.9)
solver.train()

problem.visualize_value_policy(policy=solver.policy, values=solver.values)
# problem.random_start_policy(policy=solver.policy, start_pos=[1,0])

In [None]:
problem = GridWorld('world00.csv',reward={0: -0.04, 1: 1.0, 2: -1.0, 3: np.NaN}, random_rate=0.2)
policy = [1, 1, 3, 1, 0, 0, 2, 0, 1, 2, 1, 0]

solver = PolicyIteration(problem.reward_function, problem.transition_model, gamma=0.9, init_policy=policy)
solver.train()

problem.visualize_value_policy(policy=solver.policy, values=solver.values)
# problem.random_start_policy(policy=solver.policy)

In [None]:
import gym
import sys
from mdp import MDP
import matplotlib.pyplot as plt

sys.path.append(' .')
gym.envs.register('Gambler-v1', entry_point='gambler:GamblerEnv', max_episode_steps=1000)

gambler = MDP(environment='Gambler-v1', convergence_threshold=0.00001, grid=False)
print('Gambler Problem:')


    


In [None]:
gambler.value_iteration(iterations_to_save=[8], visualize=False)
optimal_action_vi = gambler.optimal_policy_actions.copy()

In [None]:
fig, ax = plt.subplots(1, 1, figsize=(6,4), dpi=300)
ax.set_xlabel('State')
ax.set_ylabel('Amount of bet')
ax.plot(optimal_action_vi)
ax.set_title('Gambler - Best action in each state')
plt.show()

# plt.plot(optimal_action_vi)
# plt.show()
print(optimal_action_vi)

In [None]:
gambler.policy_iteration(iterations_to_save=[1, 8, 16], visualize=False)
optimal_action_pi = gambler.optimal_policy_actions.copy()


In [None]:
fig, ax = plt.subplots(1, 1, figsize=(8,6), dpi=300)
ax.set_xlabel('State')
ax.set_ylabel('Amount of bet')
ax.plot(optimal_action_pi)
plt.show()

In [None]:
##Q-learning
#import libraries
import numpy as np
import time
import matplotlib.patches as patches

#define the shape of the environment (i.e., its states)
environment_rows = 3
environment_columns = 4
#Create a 3D numpy array to hold the current Q-values for each state and action pair: Q(s, a) 
#The array contains 3 rows and 4 columns (to match the shape of the environment), as well as a third "action" dimension.
#The "action" dimension consists of 4 layers that will allow us to keep track of the Q-values for each possible action in
#each state (see next cell for a description of possible actions). 
#The value of each (state, action) pair is initialized to 0.
q_values = np.zeros((environment_rows, environment_columns, 4))

#define actions
#numeric action codes: 0 = up, 1 = right, 2 = down, 3 = left
actions = ['up', 'right', 'down', 'left']

#Create a 2D numpy array to hold the rewards for each state. 
#The array contains 11 rows and 11 columns (to match the shape of the environment), and each value is initialized to -100.
rewards = np.full((environment_rows, environment_columns), -0.04)
rewards[0,3] = 1
rewards[1,3] = -1
rewards[1,1] = -100

for row in rewards:
    print(row)



In [None]:
#define a function that determines if the specified location is a terminal state
def is_terminal_state(current_row_index, current_column_index):
  #if the reward for this location is -0.04, then it is not a terminal state (i.e., it is a 'white square')
    if rewards[current_row_index, current_column_index] == -0.04:
        return False
    else:
        return True
    
#define a function that will choose a random, non-terminal starting location
def get_starting_location():
    return 2,0
    #get a random row and column index
#     current_row_index = np.random.randint(environment_rows)
#     current_column_index = np.random.randint(environment_columns)
#     #continue choosing random row and column indexes until a non-terminal state is identified
#     #(i.e., until the chosen state is a 'white square').
#     while is_terminal_state(current_row_index, current_column_index):
#         current_row_index = np.random.randint(environment_rows)
#         current_column_index = np.random.randint(environment_columns)
#     return current_row_index, current_column_index

#define an epsilon greedy algorithm that will choose which action to take next (i.e., where to move next)
def get_next_action(current_row_index, current_column_index, epsilon):
    #if a randomly chosen value between 0 and 1 is less than epsilon, 
    #then choose the most promising value from the Q-table for this state.
    if np.random.random() < epsilon:
        return np.argmax(q_values[current_row_index, current_column_index])
    else: #choose a random action
        return np.random.randint(4)

#define a function that will get the next location based on the chosen action
def get_next_location(current_row_index, current_column_index, action_index):
    new_row_index = current_row_index
    new_column_index = current_column_index
    if actions[action_index] == 'up' and current_row_index > 0:
        new_row_index -= 1
    elif actions[action_index] == 'right' and current_column_index < environment_columns - 1:
        new_column_index += 1
    elif actions[action_index] == 'down' and current_row_index < environment_rows - 1:
        new_row_index += 1
    elif actions[action_index] == 'left' and current_column_index > 0:
        new_column_index -= 1
    if (new_row_index ==2 and new_column_index == 0):
        new_row_index = current_row_index
        new_column_index = current_column_index
    return new_row_index, new_column_index

#Define a function that will get the shortest path between any location within the warehouse that 
#the robot is allowed to travel and the item packaging location.
def get_shortest_path(start_row_index, start_column_index):
  #return immediately if this is an invalid starting location
    if is_terminal_state(start_row_index, start_column_index):
        return []
    else: #if this is a 'legal' starting location
        current_row_index, current_column_index = start_row_index, start_column_index
        shortest_path = []
        shortest_path.append([current_row_index, current_column_index])
        #continue moving along the path until we reach the goal (i.e., the item packaging location)
        while not is_terminal_state(current_row_index, current_column_index):
            #get the best action to take
            action_index = get_next_action(current_row_index, current_column_index, 1.)
            #move to the next location on the path, and add the new location to the list
            current_row_index, current_column_index = get_next_location(current_row_index, current_column_index, action_index)
            shortest_path.append([current_row_index, current_column_index])
        return shortest_path

In [None]:
#define training parameters

start = time.time()
epsilon = 0.01 #the percentage of time when we should take the best action (instead of a random action)
discount_factor = 0.9 #discount factor for future rewards
learning_rate = 0.9 #the rate at which the agent should learn
#run through 1000 training episodes
for episode in range(1000):
    #get the starting location for this episode
    row_index, column_index = get_starting_location()
    #continue taking actions (i.e., moving) until we reach a terminal state
    #(i.e., until we reach the item packaging area or crash into an item storage location)
    while not is_terminal_state(row_index, column_index):
#         print(row_index,column_index)
        #choose which action to take (i.e., where to move next)
        action_index = get_next_action(row_index, column_index, epsilon)
        #perform the chosen action, and transition to the next state (i.e., move to the next location)
        old_row_index, old_column_index = row_index, column_index #store the old row and column indexes
        row_index, column_index = get_next_location(row_index, column_index, action_index)
        #receive the reward for moving to the new state, and calculate the temporal difference
        reward = rewards[row_index, column_index]
        old_q_value = q_values[old_row_index, old_column_index, action_index]
        temporal_difference = reward + (discount_factor * np.max(q_values[row_index, column_index])) - old_q_value
        #update the Q-value for the previous state and action pair
        new_q_value = old_q_value + (learning_rate * temporal_difference)
        q_values[old_row_index, old_column_index, action_index] = new_q_value
end = time.time()
total_time = end - start

print('Training complete!',total_time)
print(q_values)

In [None]:
def visualize_value_policy(q_values,fig_size=(8, 6)):
    num_rows = 3
    num_cols = 4
    unit = min(fig_size[1] // num_rows, fig_size[0] // num_cols)
    unit = max(1, unit)
    fig, ax = plt.subplots(1, 1, figsize=fig_size)
    ax.axis('off')

    for i in range(num_cols + 1):
        if i == 0 or i == num_cols:
            ax.plot([i * unit, i * unit], [0, num_rows * unit],
                    color='black')
        else:
            ax.plot([i * unit, i * unit], [0, num_rows * unit],
                    alpha=0.7, color='grey', linestyle='dashed')
    for i in range(num_rows + 1):
        if i == 0 or i == num_rows:
            ax.plot([0, num_cols * unit], [i * unit, i * unit],
                    color='black')
        else:
            ax.plot([0, num_cols * unit], [i * unit, i * unit],
                    alpha=0.7, color='grey', linestyle='dashed')

        for i in range(num_rows):
            for j in range(num_cols):
                y = (num_rows - 1 - i) * unit
                x = j * unit
#                 s = q_values((i, j))
                if i == 1 and j == 1:
                    rect = patches.Rectangle((x, y), unit, unit, edgecolor='none', facecolor='black',
                                             alpha=0.6)
                    ax.add_patch(rect)
                elif i ==1 and j ==3:
                    rect = patches.Rectangle((x, y), unit, unit, edgecolor='none', facecolor='red',
                                             alpha=0.6)
                    ax.add_patch(rect)
                elif i == 0 and j == 3:
                    rect = patches.Rectangle((x, y), unit, unit, edgecolor='none', facecolor='green',
                                             alpha=0.6)
                    ax.add_patch(rect)
                else:
                    ax.text(x + 0.5 * unit, y + 0.5 * unit, f'{max(q_values[i][j]):.4f}',
                            horizontalalignment='center', verticalalignment='center',
                            fontsize=max(fig_size)*unit*0.6)
                    symbol = ['^', '>', 'v', '<']
                    a = np.argmax(q_values[i][j])
                    ax.plot([x + 0.5 * unit], [y + 0.5 * unit], marker=symbol[a], alpha=0.4,
                                linestyle='none', markersize=max(fig_size)*unit, color='#1f77b4')


    plt.tight_layout()
    plt.show()
    
visualize_value_policy(q_values=q_values)

In [None]:
gambler.Q_learning(num_episodes=10000, learning_rate_decay=0.995, epsilon_decay=0.995, visualize=True)
