In [1]:
pip install torch




In [2]:
import numpy as np
import time
import random
import math
from datetime import datetime
import torch
import torch.nn as nn
import torch.optim as optim

total_episodes = 50000       # Total episodes
max_steps = 99               # Max steps per episode
gamma = 0.9                 # Discounting rate
alpha = 1.0					 # update parameter

# Exploration parameters
original_epsilon = 0.4           # Exploration rate
decay_rate = 0.000016            # Exponential decay rate for exploration prob
random.seed(datetime.now().timestamp())	# give a new seed in random number generation.

# state space is defined as size_row X size_col array.
# The boundary cells are holes(H).
# S: start, G: goal, H:hole, F:frozen

max_row = 9
max_col = 9
max_num_actions = 4

env_state_space = [
    ['H', 'H', 'H', 'H', 'H', 'H', 'H', 'H', 'H'],
    ['H', 'S', 'F', 'F', 'F', 'F', 'F', 'F', 'H'],
    ['H', 'F', 'F', 'H', 'H', 'F', 'H', 'F', 'H'],
    ['H', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'H'],
    ['H', 'F', 'H', 'F', 'H', 'F', 'F', 'H', 'H'],
    ['H', 'F', 'F', 'F', 'F', 'G', 'F', 'F', 'H'],
    ['H', 'F', 'H', 'H', 'F', 'H', 'F', 'F', 'H'],
    ['H', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'H'],
    ['H', 'H', 'H', 'H', 'H', 'H', 'H', 'H', 'H']
]

# Create our Q table and initialize its value.
#   dim0:row, dim1:column, dim2: action.
# Q-table is initialized as 0.0.
# for terminal states(H or G), q-a value should be always 0.

Q = np.zeros((max_row,	max_col,  max_num_actions))

# offset of each move action:  up, right, down, left, respectively.
# a new state(location) = current state + offset of an action.
#        action = up    right  down    left.
move_offset = [[-1,0], [0,1],   [1,0],  [0,-1]]
move_str =	   ['up   ', 'right', 'down ', 'left ']

def display_Q_table (Q):
	print("\ncol=0       1        2        3       4         5")
	for r in range(max_row):
		print("row:", r)
		for a in range(max_num_actions):
			for c in range(max_col):
				text = "{:5.2f}".format(Q[r,c,a])
				if c == 0:
					line = text
				else:
					line = line + ",   " + text
			print(line)

def my_argmax(q_a_list):
    mv = max(q_a_list)
    m_positions = [i for i, q in enumerate(q_a_list) if q == mv]
    nmax = len(m_positions)
    thresh = np.linspace(1 / nmax, 1, nmax)
    r_n = random.random()

    for i, t in enumerate(thresh):
        if r_n <= t:
            return m_positions[i]

    return m_positions[-1]

# choose an action with epsilon-greedy approach according to Q.
# return value: an action index(0 ~ 3)

In [3]:
class QNetwork(nn.Module):
    def __init__(self):
        super(QNetwork, self).__init__()
        self.fc1 = nn.Linear(2, 50)  # Input size is 2 (row and column), output size is the number of actions
        self.fc2 = nn.Linear(50, max_num_actions)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# Initialize Q-network, optimizer, and loss function
q_network = QNetwork()
optimizer = optim.Adam(q_network.parameters(), lr=0.001)
criterion = nn.MSELoss()

In [4]:
# Choose an action with epsilon-greedy approach according to Q.
# Update to use the neural network for Q-value estimation.
def choose_action_with_epsilon_greedy(s, epsilon):
    r = s[0]
    c = s[1]
    state_tensor = torch.FloatTensor([r, c]).unsqueeze(0)
    q_values = q_network(state_tensor)
    q_values = q_values.detach().numpy().flatten()

    max_action = my_argmax(q_values)
    rn = random.random()

    if rn >= epsilon:
        action = max_action
    else:
        rn1 = random.random()
        if rn1 >= 0.75:
            action = 0
        elif rn1 >= 0.5:
            action = 1
        elif rn1 >= 0.25:
            action = 2
        else:
            action = 3
    return action

In [5]:
# Choose an action with epsilon-greedy approach according to Q.
# Update to use the neural network for Q-value estimation.
def choose_action_with_greedy(s):
    r = s[0]
    c = s[1]
    state_tensor = torch.FloatTensor([r, c]).unsqueeze(0)
    q_values = q_network(state_tensor)
    q_values = q_values.detach().numpy().flatten()

    max_action = my_argmax(q_values)
    return max_action

In [6]:
# get new state and reward for taking action a at state s.
# deterministic movement is taken.
# reward is given as: F/S:0;  H:-5;   G:5.
def get_new_state_and_reward(s, a):
	new_state = []
	off_set = move_offset[a]

	#  s + off_set gives the new_state.
	new_state.append(s[0] + off_set[0])
	new_state.append(s[1] + off_set[1])

	# compute reward for moving to the new state
	cell = env_state_space[new_state[0]][new_state[1]]
	if cell == 'F':
		rew = 0
	elif cell == 'H':
		rew = -9
	elif cell == 'G':
		rew = 9
	elif cell == 'S':
		rew = 0
	else:
		print("Logic error in get_new_state_and_reward. This cannot happen!")
		time.sleep(1200)
		return [0,0], -20000
	return new_state, rew

# Environment 출력: agent 가 있는 곳에는 * 로 표시.
# agent 의 현재 위치(즉 current state): s
def env_rendering(s):
	for i in range(0, max_row, 1):
		line = ''
		for j in range(0, max_col, 1):
			line = line + env_state_space[i][j]
		if s[0] == i:
			col = s[1]
			line1 = line[:col] + '*' +line[col+1:]
		else:
			line1 = line
		print(line1)

In [8]:
# Learning stage: it iterates for a huge number of episodes
print("Initial Q network is:")
print(q_network)
start_state = [1,1]
print("\nLearning starts.\n")
for episode in range(total_episodes):
    # set the start state of an episode.
    S = start_state

    epsilon = original_epsilon * math.exp(-decay_rate * episode)

    if episode % 5000 == 0:
        print('episode=', episode, '  epsilon=', epsilon)
        time.sleep(1)

    for step in range(max_steps):
        A = choose_action_with_epsilon_greedy(S, epsilon)

        S_, R = get_new_state_and_reward(S, A)

        r = S[0]
        c = S[1]

        state_tensor = torch.FloatTensor([r, c]).unsqueeze(0)
        next_state_tensor = torch.FloatTensor(S_).unsqueeze(0)

        q_values = q_network(state_tensor)
        next_q_values = q_network(next_state_tensor)

        # Update Q value using the Q-learning update rule
        target = R + gamma * np.max(next_q_values.detach().numpy())
        loss = criterion(q_values[0][A], torch.FloatTensor([target]))
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        S = S_

        if env_state_space[S[0]][S[1]] == 'G' or env_state_space[S[0]][S[1]] == 'H':
            break

print('\nLearning is finished. The Q network is:')
print(q_network)

Initial Q network is:
QNetwork(
  (fc1): Linear(in_features=2, out_features=50, bias=True)
  (fc2): Linear(in_features=50, out_features=4, bias=True)
)

Learning starts.

episode= 0   epsilon= 0.4


  return F.mse_loss(input, target, reduction=self.reduction)


episode= 5000   epsilon= 0.36924653855465434
episode= 10000   epsilon= 0.3408575155864846
episode= 15000   epsilon= 0.3146511444266214
episode= 20000   epsilon= 0.2904596148294764
episode= 25000   epsilon= 0.26812801841425576
episode= 30000   epsilon= 0.24751335672245633
episode= 35000   epsilon= 0.22848362553952595
episode= 40000   epsilon= 0.21091696961721942
episode= 45000   epsilon= 0.19470090238398868

Learning is finished. The Q network is:
QNetwork(
  (fc1): Linear(in_features=2, out_features=50, bias=True)
  (fc2): Linear(in_features=50, out_features=4, bias=True)
)


In [9]:
# Test stage: agent 가 길을 찾아 가는 실험.
print("\nTest starts.\n")
for e in range(5):
    S = start_state
    total_rewards = 0
    print("\nEpisode:", e, " start state: (", S[0], ", ", S[1], ")")

    for step in range(max_steps):
        A = choose_action_with_greedy(S)
        S_, R = get_new_state_and_reward(S, A)

        print("The move is:", move_str[A], " that leads to state (", S_[0], ", ", S_[1], ")" )

        total_rewards += R
        S = S_
        if env_state_space[S[0]][S[1]] == 'G' or env_state_space[S[0]][S[1]] == 'H':
            break

    print("Episode has ended. Total reward received in episode = ", total_rewards)
    time.sleep(1)

print("Program ends!!!")


Test starts.


Episode: 0  start state: ( 1 ,  1 )
The move is: up     that leads to state ( 0 ,  1 )
Episode has ended. Total reward received in episode =  -9

Episode: 1  start state: ( 1 ,  1 )
The move is: up     that leads to state ( 0 ,  1 )
Episode has ended. Total reward received in episode =  -9

Episode: 2  start state: ( 1 ,  1 )
The move is: up     that leads to state ( 0 ,  1 )
Episode has ended. Total reward received in episode =  -9

Episode: 3  start state: ( 1 ,  1 )
The move is: up     that leads to state ( 0 ,  1 )
Episode has ended. Total reward received in episode =  -9

Episode: 4  start state: ( 1 ,  1 )
The move is: up     that leads to state ( 0 ,  1 )
Episode has ended. Total reward received in episode =  -9
Program ends!!!


100 개의 episode 중 정답을 맞춘 에
피소드들이 전체 100개 중 몇 % 인지를 측정위한 코드

In [11]:
def get_new_state_and_reward(s, a):
    new_state = []
    off_set = move_offset[a]

    # s + off_set gives the new_state.
    new_state_row = s[0] + off_set[0]
    new_state_col = s[1] + off_set[1]

    # Check if the new state is within the valid range
    if 0 <= new_state_row < max_row and 0 <= new_state_col < max_col:
        new_state.append(new_state_row)
        new_state.append(new_state_col)

        # compute reward for moving to the new state
        cell = env_state_space[new_state[0]][new_state[1]]
        if cell == 'F':
            rew = 0
        elif cell == 'H':
            rew = -9
        elif cell == 'G':
            rew = 9
        elif cell == 'S':
            rew = 0
        else:
            print("Logic error in get_new_state_and_reward. This cannot happen!")
            time.sleep(1200)
            return [0, 0], -20000
        return new_state, rew
    else:
        # If the new state is out of bounds, return the current state with a penalty
        return s, -20000


In [12]:
# Test stage: agent 가 길을 찾아 가는 실험.
print("\nTest starts.\n")

correct_episodes = 0  # Variable to count the number of correct episodes

for e in range(100):  # Run 100 episodes for testing
    S = start_state
    total_rewards = 0
    print("\nEpisode:", e, " start state: (", S[0], ", ", S[1], ")")

    for step in range(max_steps):
        A = choose_action_with_greedy(S)
        S_, R = get_new_state_and_reward(S, A)

        print("The move is:", move_str[A], " that leads to state (", S_[0], ", ", S_[1], ")" )

        total_rewards += R
        S = S_
        if env_state_space[S[0]][S[1]] == 'G':
            break

    print("Episode has ended. Total reward received in episode = ", total_rewards)

    # Check if the episode reached the goal state with the shortest path
    if total_rewards == 9 and step == 8:
        correct_episodes += 1

# Calculate and print the accuracy
accuracy = (correct_episodes / 100) * 100
print("\nAccuracy: {:.2f}% ({} out of 100 episodes)".format(accuracy, correct_episodes))
print("Program ends!!!")

[1;30;43m스트리밍 출력 내용이 길어서 마지막 5000줄이 삭제되었습니다.[0m
Episode: 51  start state: ( 1 ,  1 )
The move is: up     that leads to state ( 0 ,  1 )
The move is: right  that leads to state ( 0 ,  2 )
The move is: right  that leads to state ( 0 ,  3 )
The move is: right  that leads to state ( 0 ,  4 )
The move is: right  that leads to state ( 0 ,  5 )
The move is: right  that leads to state ( 0 ,  6 )
The move is: right  that leads to state ( 0 ,  7 )
The move is: right  that leads to state ( 0 ,  8 )
The move is: right  that leads to state ( 0 ,  8 )
The move is: right  that leads to state ( 0 ,  8 )
The move is: right  that leads to state ( 0 ,  8 )
The move is: right  that leads to state ( 0 ,  8 )
The move is: right  that leads to state ( 0 ,  8 )
The move is: right  that leads to state ( 0 ,  8 )
The move is: right  that leads to state ( 0 ,  8 )
The move is: right  that leads to state ( 0 ,  8 )
The move is: right  that leads to state ( 0 ,  8 )
The move is: right  that leads to state ( 0 , 

테스트단계에서 100 개의 에피소드 중 몇개가 정답인지를 percent 로 성능

In [13]:
max_memory = 2000  # capacity of the replay memory.
# Learning stage: it iterates for a huge number of episodes
print("Initial Q network is:")
print(q_network)

print("\nLearning starts.\n")
for episode in range(total_episodes):
    # set the start state of an episode.
    S = start_state

    epsilon = original_epsilon * math.exp(-decay_rate * episode)

    if episode % 5000 == 0:
        print('episode=', episode, '  epsilon=', epsilon)
        time.sleep(1)

    for step in range(max_steps):
        A = choose_action_with_epsilon_greedy(S, epsilon)

        S_, R = get_new_state_and_reward(S, A)

        r = S[0]
        c = S[1]

        state_tensor = torch.FloatTensor([r, c]).unsqueeze(0)
        next_state_tensor = torch.FloatTensor(S_).unsqueeze(0)

        q_values = q_network(state_tensor)
        next_q_values = q_network(next_state_tensor)

        # Update Q value using the Q-learning update rule
        target = R + gamma * np.max(next_q_values.detach().numpy())
        loss = criterion(q_values[0][A], torch.FloatTensor([target]))
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Capture in replay memory (removed for this experiment)

        S = S_

        # if the new state S is a terminal state, terminate the episode.
        if env_state_space[S[0]][S[1]] == 'G' or env_state_space[S[0]][S[1]] == 'H':
            break

# Test stage: agent 가 길을 찾아 가는 실험.
print("\nTest starts.\n")

correct_episodes = 0  # Variable to count the number of correct episodes

for e in range(100):  # Run 100 episodes for testing
    S = start_state
    total_rewards = 0
    print("\nEpisode:", e, " start state: (", S[0], ", ", S[1], ")")

    for step in range(max_steps):
        A = choose_action_with_greedy(S)
        S_, R = get_new_state_and_reward(S, A)

        print("The move is:", move_str[A], " that leads to state (", S_[0], ", ", S_[1], ")" )

        total_rewards += R
        S = S_
        if env_state_space[S[0]][S[1]] == 'G':
            break

    print("Episode has ended. Total reward received in episode = ", total_rewards)

    # Check if the episode reached the goal state with the shortest path
    if total_rewards == 9 and step == 8:
        correct_episodes += 1

# Print the performance
accuracy = (correct_episodes / 100) * 100
print("\nAccuracy: {:.2f}% ({} out of 100 episodes)".format(accuracy, correct_episodes))
print("Program ends!!!")

[1;30;43m스트리밍 출력 내용이 길어서 마지막 5000줄이 삭제되었습니다.[0m
Episode: 51  start state: ( 1 ,  1 )
The move is: up     that leads to state ( 0 ,  1 )
The move is: right  that leads to state ( 0 ,  2 )
The move is: right  that leads to state ( 0 ,  3 )
The move is: right  that leads to state ( 0 ,  4 )
The move is: right  that leads to state ( 0 ,  5 )
The move is: right  that leads to state ( 0 ,  6 )
The move is: right  that leads to state ( 0 ,  7 )
The move is: right  that leads to state ( 0 ,  8 )
The move is: right  that leads to state ( 0 ,  8 )
The move is: right  that leads to state ( 0 ,  8 )
The move is: right  that leads to state ( 0 ,  8 )
The move is: right  that leads to state ( 0 ,  8 )
The move is: right  that leads to state ( 0 ,  8 )
The move is: right  that leads to state ( 0 ,  8 )
The move is: right  that leads to state ( 0 ,  8 )
The move is: right  that leads to state ( 0 ,  8 )
The move is: right  that leads to state ( 0 ,  8 )
The move is: right  that leads to state ( 0 , 