In [None]:
import numpy as np
import pandas as pd
import random
from collections import defaultdict
import gym
import gym_minigrid
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
def plot_policy_arrows(q_table, grid_size=6):
    action_to_arrow = ['turn left  ', 'turn right ', 'go straight']
    policy = [["" for _ in range(grid_size)] for _ in range(grid_size)]

    for state, q_vals in q_table.items():
        state //= 3
        x = state % grid_size
        y = state // grid_size

        best_action = np.argmax(q_vals)
        policy[y][x] = action_to_arrow[best_action]
    
    policy[grid_size - 1][grid_size - 1] = 'Terminal'

    for row in policy:
        print(" ".join(row))

def plot_rewards_for_all_envs(rewards_g01, rewards_g05, rewards_g09, rewards_g099, window=100):
    avg_g01 = np.convolve(rewards_g01, np.ones(window)/window, mode='valid')
    avg_g05 = np.convolve(rewards_g05, np.ones(window)/window, mode='valid')
    avg_g09 = np.convolve(rewards_g09, np.ones(window)/window, mode='valid')
    avg_g099 = np.convolve(rewards_g099, np.ones(window)/window, mode='valid')

    # Plotting for all environments
    plt.figure(figsize=(14, 10))

    # env_g01
    plt.subplot(2, 2, 1)
    plt.plot(avg_g01, label="Q-learning", color='blue')
    plt.title("env_g01")
    plt.xlabel("Episode")
    plt.ylabel(f"Moving Avg Reward ({window})")
    plt.legend()
    plt.grid(True)

    # env_g05
    plt.subplot(2, 2, 2)
    plt.plot(avg_g05, label="Q-learning", color='blue')
    plt.title("env_g05")
    plt.xlabel("Episode")
    plt.ylabel(f"Moving Avg Reward ({window})")
    plt.legend()
    plt.grid(True)

    # env_g09
    plt.subplot(2, 2, 3)
    plt.plot(avg_g09, label="Q-learning", color='blue')
    plt.title("env_g09")
    plt.xlabel("Episode")
    plt.ylabel(f"Moving Avg Reward ({window})")
    plt.legend()
    plt.grid(True)

    # env_g099
    plt.subplot(2, 2, 4)
    plt.plot(avg_g099, label="Q-learning", color='blue')
    plt.title("env_g099")
    plt.xlabel("Episode")
    plt.ylabel(f"Moving Avg Reward ({window})")
    plt.legend()
    plt.grid(True)

    plt.tight_layout()
    plt.show()



def Summary_of_experimental_settings(num_iter, alpha):
    params = {
        'Agent': ['Q-learning', 'SARSA'],
        'Alpha (Learning rate)': [alpha, alpha],
        'Gamma (Discount factor)': [0.9, 0.9],
        'Epsilon (Exploration)': [0.2, 0.2],
        'Environment': ['MiniGrid-Empty-6x6-v0', 'MiniGrid-Empty-6x6-v0'],
        'Episodes': [num_iter, num_iter]
    }

    df = pd.DataFrame(params)
    print("Experimental Settings")
    print(df.to_markdown(index=False))  # 깔끔한 콘솔 출력


In [None]:
class QLearning:
    def __init__(self, actions, gamma, agent_indicator=10):
        self.actions = actions
        self.agent_indicator = agent_indicator
        self.alpha = 0.1
        self.gamma = gamma
        self.epsilon = 0.2
        self.q_values = defaultdict(lambda: [0.0] * actions)
        
    def _convert_state(self, s):
        return np.where(s == self.agent_indicator)[0][0]
        
    def update(self, state, action, reward, next_state):
        state = self._convert_state(state)
        next_state = self._convert_state(next_state)
        next_action = np.argmax(self.q_values[next_state])
        
        q_value = self.q_values[state][action]
        
        next_q_value = self.q_values[next_state][next_action]
        
        td_error = reward + self.gamma * next_q_value - q_value
        self.q_values[state][action] = q_value + self.alpha * td_error

    def act(self, state):
        if np.random.rand() < self.epsilon:
            action = random.randint(0, self.actions - 1)
        else:
            state = self._convert_state(state)
            q_values = self.q_values[state]
            action = np.argmax(q_values)

        return action

In [None]:
import sys
sys.path.insert(0, "D:/대학/자료/4-1/강화학습/과제/과제 1/Sarsa&Q-learning")

from my_env_utils import gen_wrapped_env, show_video

In [None]:
import os
print(os.getcwd())  # 현재 디렉토리 확인

In [None]:
env_g01 = gen_wrapped_env('MiniGrid-Empty-6x6-v0')
env_g05 = gen_wrapped_env('MiniGrid-Empty-6x6-v0')
env_g09 = gen_wrapped_env('MiniGrid-Empty-6x6-v0')
env_g099 = gen_wrapped_env('MiniGrid-Empty-6x6-v0')

obs_g01 = env_g01.reset()
obs_g05 = env_g05.reset()
obs_g09 = env_g09.reset()
obs_g099 = env_g099.reset()

agent_g01_position = obs_g01[0]
agent_g05_position = obs_g05[0]
agent_g09_position = obs_g09[0]
agent_g099_position = obs_g099[0]

agent_g01 = QLearning(3, gamma=0.1, agent_indicator=agent_g01_position)
agent_g05 = QLearning(3, gamma=0.5, agent_indicator=agent_g05_position)
agent_g09 = QLearning(3, gamma=0.9, agent_indicator=agent_g09_position)
agent_g099 = QLearning(3, gamma=0.99, agent_indicator=agent_g099_position)


In [None]:
rewards_g01 = []
rewards_g05 = []
rewards_g09 = []
rewards_g099 = []

num_iter = 3000

for ep in range(num_iter):
    done_g01 = done_g05 = done_g09 = done_g099 = False
    obs_g01 = env_g01.reset()
    obs_g05 = env_g05.reset()
    obs_g09 = env_g09.reset()
    obs_g099 = env_g099.reset()
    
    action_g01 = agent_g01.act(obs_g01)
    action_g05 = agent_g05.act(obs_g05)
    action_g09 = agent_g09.act(obs_g09)
    action_g099 = agent_g099.act(obs_g099)
    
    ep_rewards_g01 = ep_rewards_g05 = ep_rewards_g09 = ep_rewards_g099 = 0

    while not (done_g01 and done_g05 and done_g09 and done_g099):
        # Update each environment separately
        if not done_g01:
            next_obs_g01, reward_g01, done_g01, _ = env_g01.step(action_g01)
            agent_g01.update(obs_g01, action_g01, reward_g01, next_obs_g01)
            action_g01 = agent_g01.act(next_obs_g01)
            ep_rewards_g01 += reward_g01
            obs_g01 = next_obs_g01

        if not done_g05:
            next_obs_g05, reward_g05, done_g05, _ = env_g05.step(action_g05)
            agent_g05.update(obs_g05, action_g05, reward_g05, next_obs_g05)
            action_g05 = agent_g05.act(next_obs_g05)
            ep_rewards_g05 += reward_g05
            obs_g05 = next_obs_g05

        if not done_g09:
            next_obs_g09, reward_g09, done_g09, _ = env_g09.step(action_g09)
            agent_g09.update(obs_g09, action_g09, reward_g09, next_obs_g09)
            action_g09 = agent_g09.act(next_obs_g09)
            ep_rewards_g09 += reward_g09
            obs_g09 = next_obs_g09

        if not done_g099:
            next_obs_g099, reward_g099, done_g099, _ = env_g099.step(action_g099)
            agent_g099.update(obs_g099, action_g099, reward_g099, next_obs_g099)
            action_g099 = agent_g099.act(next_obs_g099)
            ep_rewards_g099 += reward_g099
            obs_g099 = next_obs_g099

    rewards_g01.append(ep_rewards_g01)
    rewards_g05.append(ep_rewards_g05)
    rewards_g09.append(ep_rewards_g09)
    rewards_g099.append(ep_rewards_g099)
    
    if (ep+1) % 100 == 0:
        avg_g01 = np.mean(rewards_g01[-100:])
        avg_g05 = np.mean(rewards_g05[-100:])
        avg_g09 = np.mean(rewards_g09[-100:])
        avg_g099 = np.mean(rewards_g099[-100:])
        print(f"[Episode {ep+1}] Q avg (env_g01): {avg_g01:.2f}, (env_g05): {avg_g05:.2f}, (env_g09): {avg_g09:.2f}, (env_g099): {avg_g099:.2f}")

env_g01.close()
env_g05.close()
env_g09.close()
env_g099.close()


In [None]:
show_video()

In [None]:
# 각 환경에 대한 보상을 저장
pd.Series(rewards_g01).to_csv('./logs/rewards_qlearning_env_g01.csv', index=False)
pd.Series(rewards_g05).to_csv('./logs/rewards_qlearning_env_g05.csv', index=False)
pd.Series(rewards_g09).to_csv('./logs/rewards_qlearning_env_g09.csv', index=False)
pd.Series(rewards_g099).to_csv('./logs/rewards_qlearning_env_g099.csv', index=False)

In [None]:
g_logs01 = pd.read_csv('./logs/rewards_qlearning_env_g01.csv', index_col=False).iloc[:, 1]
g_logs05 = pd.read_csv('./logs/rewards_qlearning_env_g05.csv', index_col=False).iloc[:, 1]
g_logs09 = pd.read_csv('./logs/rewards_qlearning_env_g09.csv', index_col=False).iloc[:, 1]
g_logs099 = pd.read_csv('./logs/rewards_qlearning_env_g099.csv', index_col=False).iloc[:, 1]

In [None]:
# 그래프 그리기
plt.figure(figsize=(16, 8))

# 각 환경의 누적 보상 계산 후 그래프에 추가
plt.plot(g_logs01.cumsum() / (pd.Series(np.arange(g_logs01.shape[0]))+1), label="env_g01 (gamma = 0.1)")
plt.plot(g_logs05.cumsum() / (pd.Series(np.arange(g_logs05.shape[0]))+1), label="env_g05 (gamma = 0.1)")
plt.plot(g_logs09.cumsum() / (pd.Series(np.arange(g_logs09.shape[0]))+1), label="env_g09 (gamma = 0.1)")
plt.plot(g_logs099.cumsum() / (pd.Series(np.arange(g_logs099.shape[0]))+1), label="env_g099 (gamma = 0.1)")

# 레이블 및 범례 추가
plt.xlabel('Episodes')
plt.ylabel('Average Cumulative Reward')
plt.title('Cumulative Reward Over Episodes (Different Environments)')
plt.legend()

# 그래프 출력
plt.show()

In [None]:
# Q-learning 결과 출력 및 정책 화살표 그리기
print('Q-learning for env_g01')
plot_policy_arrows(agent_g01.q_values, grid_size=4)

print('Q-learning for env_g05')
plot_policy_arrows(agent_g05.q_values, grid_size=4)

print('Q-learning for env_g09')
plot_policy_arrows(agent_g09.q_values, grid_size=4)

print('Q-learning for env_g099')
plot_policy_arrows(agent_g099.q_values, grid_size=4)

In [None]:
# 보상 그래프 그리기
plot_rewards_for_all_envs(rewards_g01, rewards_g05, rewards_g09, rewards_g099, window=500)