In [41]:
import os
import numpy as np
import pandas as pd
import random
from collections import defaultdict
import gym
import gym_minigrid
import matplotlib.pyplot as plt
%matplotlib inline
from utils import gen_wrapped_env
import json

In [42]:
class QLearning:
    def __init__(self, actions, agent_indicator=10):
        self.actions = actions
        self.agent_indicator = agent_indicator
        self.alpha = 0.01
        self.gamma = 0.9
        self.epsilon = 0.2
        self.q_values = defaultdict(lambda: [0.0] * actions)

    def _convert_state(self, s):
        return np.where(s == self.agent_indicator)[0][0]

    def update(self, state, action, reward, next_state, next_action):
        state = self._convert_state(state)
        next_state = self._convert_state(next_state)

        q_value = self.q_values[state][action]

        ########################## Write Code ####################################
        next_q_value = max(self.q_values[next_state])
        td_error = reward + self.gamma * next_q_value - q_value
        self.q_values[state][action] = q_value + self.alpha * td_error

        ###########################################################################
    def act(self, state):
        #################### Write epsilion greedy code ###########################
        if np.random.rand() < self.epsilon:
            action = np.random.choice(self.actions)
        else:
            state = self._convert_state(state)
            q_values = self.q_values[state]
            action = int(np.argmax(q_values))
        ###########################################################################
        return action

In [43]:
def run_q_learning_with_gamma(gamma, episodes):
    env = gen_wrapped_env('MiniGrid-Empty-6x6-v0')
    obs = env.reset()
    agent_position = obs[0]

    q_agent = QLearning(actions=4, agent_indicator=agent_position)
    q_agent.gamma = gamma

    ep_rewards = []
    q_values_over_time = []

    for ep in range(episodes):
        done = False
        obs = env.reset()
        q_action = q_agent.act(obs)
        total_reward = 0

        while not done:
            next_obs, reward, done, info = env.step(q_action)
            next_action = q_agent.act(next_obs)
            q_agent.update(obs, q_action, reward, next_obs, next_action)

            total_reward += reward
            obs = next_obs
            q_action = next_action

        ep_rewards.append(total_reward)

        if (ep + 1) % 100 == 0:
            all_qs = [q for qs in q_agent.q_values.values() for q in qs]
            avg_q = np.mean(all_qs)
            q_values_over_time.append(avg_q)

    env.close()
    return ep_rewards, q_agent.q_values, q_values_over_time

In [44]:
gammas = [0.1, 0.5, 0.9, 0.99]
reward_logs = {}
q_tables = {}

In [45]:
os.makedirs("./logs/gamma_rewards", exist_ok=True)
os.makedirs("./logs/q_tables", exist_ok=True)
os.makedirs("./logs/plots", exist_ok=True)
os.makedirs("./logs/plots", exist_ok=True)

In [None]:
for g in gammas:
    print(f"Training Q-learning with gamma = {g}")
    rewards, q_vals, q_values_over_time = run_q_learning_with_gamma(gamma=g, episodes=100000)

    reward_logs[g] = rewards
    q_tables[g] = q_vals

    # 1. Reward 로그 저장
    df = pd.DataFrame({"Episode": range(len(rewards)), "Reward": rewards})
    df.to_csv(f"./logs/gamma_rewards/reward_gamma_{g}.csv", index=False)

    # 2. Q-table 저장
    q_dict = {str(int(s)): np.round(q, 5).tolist() for s, q in q_vals.items()}
    with open(f"./logs/q_tables/q_table_gamma_{g}.json", 'w') as f:
        json.dump(q_dict, f, indent=2)

    # 3. Q-value 수렴 시각화 및 저장
    episodes_recorded = list(range(100, len(q_values_over_time)*100 + 1, 100))
    plt.figure()
    plt.plot(episodes_recorded, q_values_over_time)
    plt.title(f"Q-value Convergence (Gamma = {g})")
    plt.xlabel("Episode")
    plt.ylabel("Avg Q-value")
    plt.grid()
    plt.savefig(f"./logs/plots/q_value_convergence_gamma_{g}.png")
    plt.close()

Training Q-learning with gamma = 0.1


In [None]:
# 1. Episode Reward 그래프
plt.figure(figsize=(12, 6))
for g in gammas:
    plt.plot(pd.Series(reward_logs[g]).rolling(10000).mean(), label=f"γ={g}")
plt.xlabel("Episode")
plt.ylabel("Episode Reward (Moving Avg)")
plt.title("Q-learning Episode Reward vs Gamma")
plt.legend()
plt.grid(True)
plt.savefig("./logs/plots/episode_reward_vs_gamma.png")
plt.close()

In [None]:
def plot_avg_qvalue_per_action(gammas, q_table_dir="./logs/q_tables", save_path="./logs/plots/avg_qvalue_per_action_by_gamma.png"):
    action_count = 4  # 행동 개수
    gamma_labels = []
    avg_qvalues_by_action = []

    for gamma in gammas:
        with open(f"{q_table_dir}/q_table_gamma_{gamma}.json", 'r') as f:
            q_dict = json.load(f)

        q_values = list(q_dict.values())
        q_array = np.array(q_values)  # shape: (num_states, 4)

        if q_array.shape[0] == 0:
            continue  # 혹시라도 비어있을 경우 스킵

        avg_q_per_action = np.mean(q_array, axis=0)  # shape: (4,)
        avg_qvalues_by_action.append(avg_q_per_action)
        gamma_labels.append(str(gamma))

    avg_qvalues_by_action = np.array(avg_qvalues_by_action).T  # shape: (4, num_gammas)

    x = np.arange(len(gamma_labels))
    bar_width = 0.2
    action_labels = ["Left", "Right", "Forward", "Toggle"]

    plt.figure(figsize=(10, 6))
    for i in range(action_count):
        plt.bar(x + i * bar_width, avg_qvalues_by_action[i], width=bar_width, label=action_labels[i])

    plt.xticks(x + bar_width * (action_count - 1) / 2, gamma_labels)
    plt.xlabel("Gamma (γ)")
    plt.ylabel("Average Q-value")
    plt.title("Average Q-value per Action by Gamma")
    plt.legend()
    plt.grid(True, linestyle="--", alpha=0.5)

    plt.tight_layout()
    plt.savefig(save_path)
    plt.close()

plot_avg_qvalue_per_action(gammas)