<a href="https://colab.research.google.com/github/jiwoong2/deeplearning/blob/main/Q_learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import gym
import torch
import torch.nn as nn
import torch.optim as optim
import random
from collections import deque

# FrozenLake-v1(기본적인 격자 환경)

In [None]:
# 환경 생성
env = gym.make('FrozenLake-v1')

# 관측 공간 (Observation Space) 살펴보기
print("Observation Space:")
print("Type:", env.observation_space) # 관측공간 특성(이산적)
print("Number of States:", env.observation_space.n)  # 16개의 타일선택 가능

# 행동 공간 (Action Space) 살펴보기
print("\nAction Space:")
print("Type:", env.action_space)
print("Number of Actions:", env.action_space.n)

# 보상의 범위 살펴보기
print("\nReward Range:")
print("Min Reward:", env.reward_range[0])
print("Max Reward:", env.reward_range[1])

# 맵 출력
env.unwrapped.desc

In [None]:
class QLearningAgent:

    def __init__(self, env, learning_rate=0.1, discount_rate=0.95, exploration_rate=1.0, exploration_decay=0.99):

        self.env = env # 환경객체
        self.learning_rate = learning_rate # 학습률
        self.discount_rate = discount_rate # 할인률
        self.exploration_rate = exploration_rate # 탐험률
        self.exploration_decay = exploration_decay # 탐험률 감소율
        self.q_table = np.zeros((env.observation_space.n, env.action_space.n)) # Q테이블

    def train(self, episodes=1000):
        for episode in range(episodes):
            state = self.env.reset()
            done = False

            #  ε-greedy 정책을 구현
            while not done:
                if np.random.rand() < self.exploration_rate:
                    action = self.env.action_space.sample()  # Explore. action space에서 무작위 선택
                else:
                    action = np.argmax(self.q_table[state])  # Exploit.  현재 상태(state)에 대한 Q 테이블에서 가장 높은 값의 인덱스를 반환.

                next_state, reward, done, _ = self.env.step(action) # 행동 후 다음상태, 보상, 에피소드 종료여부를 반환.
                old_value = self.q_table[state, action] # 상태에 대한 행동값의 Q값저장.
                next_max = np.max(self.q_table[next_state]) # 다음상태에서의 가장큰 Q테이블 값 저장.

                # Q-learning formula
                new_value = (1 - self.learning_rate) * old_value + self.learning_rate * (reward + self.discount_rate * next_max) # Q값 업데이트.
                self.q_table[state, action] = new_value

                state = next_state

            # Decay exploration rate
            self.exploration_rate *= self.exploration_decay # 탐험률 감쇠.

    def test(self, episodes=100):

        total_rewards = 0

        for episode in range(episodes):
            state = self.env.reset()
            done = False
            while not done:
                action = np.argmax(self.q_table[state])  # Exploit learned values
                state, reward, done, _ = self.env.step(action)
                total_rewards += reward

        avg_reward = total_rewards / episodes

        print(f"Average Reward: {avg_reward}")

In [None]:
# Example usage
env = gym.make('FrozenLake-v1')
agent = QLearningAgent(env = env, learning_rate=0.1, discount_rate=0.95, exploration_rate=1.0, exploration_decay=0.999)
agent.train(episodes=1000)
agent.test(episodes=100)

# CartPole-v1(막대기 세우기)

In [None]:
# 매시간다계마다 reward 1을 얻음

# 환경 생성
env = gym.make('CartPole-v1', new_step_api=True)

# 관측 공간 (Observation Space) 살펴보기
print("Observation Space:")
print("Shape:", env.observation_space.shape) # 관측공간의 차원

# 0: 카트 위치, 1: 카트 속도, 2: 막대기 각도, 3: 막대기 각속도
print("High:", env.observation_space.high) # 각 차원별 관측치의 상한
print("Low:", env.observation_space.low) # 각 차원별 관측치의 하한

# 행동 공간 (Action Space) 살펴보기
print("\nAction Space:")
print("Number of Actions:", env.action_space.n) # 에이전트가 취할 수 있는 행동 수

# 보상의 범위 살펴보기
print("\nReward Range:")
print("Min Reward:", env.reward_range[0]) # 최소 보상 값
print("Max Reward:", env.reward_range[1]) # 최대 보상 값

In [None]:
env.reset()

In [None]:
class DiscretizedQLearningAgent:
    def __init__(self, n_bins=10, n_episodes=1000, min_lr=0.1, min_epsilon=0.1, discount=0.99):
        self.env = gym.make('CartPole-v1')
        self.n_bins = n_bins  # 이산화할 구간의 수
        self.n_episodes = n_episodes  # 총 에피소드 수
        self.min_lr = min_lr  # 최소 학습률
        self.min_epsilon = min_epsilon  # 최소 탐색률
        self.discount = discount  # 할인율

        # 상태 공간을 이산화하기 위한 구간 계산
        self.bins = [
            np.linspace(-4.8, 4.8, self.n_bins),  # 카트 위치
            np.linspace(-4, 4, self.n_bins),  # 카트 속도
            np.linspace(-0.418, 0.418, self.n_bins),  # 막대기 각도
            np.linspace(-4, 4, self.n_bins)  # 막대기 각속도
        ]

        self.q_table = np.zeros([n_bins] * len(self.env.observation_space.high) + [self.env.action_space.n])

    def discretize(self, observation):
        # 연속적인 관측값을 이산화된 상태로 변환
        binned = []
        for i in range(len(observation)):
            binned.append(np.digitize(observation[i], self.bins[i]) - 1) #  주어진 값이 어느 구간에 속하는지를 계산하여 반환. digitize가 1base 인덱스를반환하므로 -1을 붙임.
        return tuple(binned)

    def train(self):
        for e in range(self.n_episodes):
            current_state = self.discretize(self.env.reset()) # 이산화.

            # 로그함수를 사용한 감쇠. 로그 함수의 특성상 초기에는 값의 변화가 크고, 에피소드가 진행될수록 변화가 완만해지는 특성을 이용.
            lr = max(self.min_lr, min(1.0, 1.0 - np.log10((e + 1) / 25))) # 학습률 감쇠.
            epsilon = max(self.min_epsilon, min(1, 1.0 - np.log10((e + 1) / 25))) #탐헙률감쇠

            done = False

            while not done:
                if np.random.random() < epsilon:
                    action = self.env.action_space.sample()  # 탐색
                else:
                    action = np.argmax(self.q_table[current_state])  # 이용

                obs, reward, done, _ = self.env.step(action)
                new_state = self.discretize(obs)

                # Q-table 업데이트
                self.q_table[current_state + (action,)] += lr * (reward + self.discount * np.max(self.q_table[new_state]) - self.q_table[current_state + (action,)])
                current_state = new_state

    def test(self):
        total_reward = 0
        for _ in range(100):
            state = self.discretize(self.env.reset())
            done = False
            while not done:
                action = np.argmax(self.q_table[state])
                state, reward, done, _ = self.env.step(action)
                state = self.discretize(state)
                total_reward += reward
        print(f"Average reward: {total_reward / 100}")

In [None]:
agent = DiscretizedQLearningAgent()
agent.train()
agent.test()

# DQL

In [None]:
# 신경망 모델 정의
class QNetwork(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim=64):
        super(QNetwork, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim)
        )

    def forward(self, x):
        return self.fc(x)

In [None]:
# DQL 에이전트
class DQNAgent:
    def __init__(self, state_dim, action_dim, lr=1e-3, gamma=0.99, epsilon_start=1.0, epsilon_end=0.01, epsilon_decay=0.995):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.epsilon = epsilon_start
        self.epsilon_end = epsilon_end
        self.epsilon_decay = epsilon_decay
        self.model = QNetwork(state_dim, action_dim)
        self.optimizer = optim.Adam(self.model.parameters(), lr=lr)
        self.memory = deque(maxlen=10000)

    def act(self, state):
        if random.random() < self.epsilon:
            return random.randrange(self.action_dim)
        state = torch.FloatTensor(state).unsqueeze(0)
        q_values = self.model(state)
        return np.argmax(q_values.detach().numpy())

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def replay(self, batch_size):

        if len(self.memory) < batch_size:
            return
        minibatch = random.sample(self.memory, batch_size) # 무작위샘플링

        for state, action, reward, next_state, done in minibatch:
            state = torch.FloatTensor(state).unsqueeze(0)
            next_state = torch.FloatTensor(next_state).unsqueeze(0)
            reward = torch.FloatTensor([reward])
            done = torch.FloatTensor([done])

            q_values = self.model(state)
            next_q_values = self.model(next_state)
            q_value = q_values[0][action]
            next_q_value = reward + self.gamma * next_q_values.max(1)[0] * (1 - done)

            loss = (q_value - next_q_value.detach()).pow(2).mean()
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()

        self.epsilon = max(self.epsilon_end, self.epsilon_decay*self.epsilon)  # Epsilon 감소

In [None]:
# 환경 초기화 및 에이전트 생성
env = gym.make('CartPole-v1')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
agent = DQNAgent(state_dim, action_dim)

# 학습
episodes = 1000
batch_size = 32

for e in range(episodes):
    state = env.reset()
    done = False
    total_reward = 0

    while not done:
        action = agent.act(state)
        next_state, reward, done, _ = env.step(action)
        agent.remember(state, action, reward, next_state, done)
        state = next_state
        total_reward += reward

    agent.replay(batch_size)
    print(f"Episode: {e+1}, Total reward: {total_reward}, Epsilon: {agent.epsilon}")

In [None]:
# 신경망 모델 정의
class QNetwork(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim=128):
        super(QNetwork, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim)
        )

    def forward(self, x):
        return self.fc(x)

# DQL 에이전트
class DQNAgent:
    def __init__(self, state_dim, action_dim, lr=1e-3, gamma=0.99, epsilon_start=1.0, epsilon_end=0.01, epsilon_decay=0.995):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.epsilon = epsilon_start
        self.epsilon_end = epsilon_end
        self.epsilon_decay = epsilon_decay
        self.model = QNetwork(state_dim, action_dim)
        self.optimizer = optim.Adam(self.model.parameters(), lr=lr)
        self.memory = deque(maxlen=10000)

    def act(self, state):
        if random.random() < self.epsilon:
            return random.randrange(self.action_dim)
        state = torch.FloatTensor(state).unsqueeze(0)
        with torch.no_grad():
            q_values = self.model(state)
        return q_values.max(1)[1].item()

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def replay(self, batch_size):
        if len(self.memory) < batch_size:
            return
        minibatch = random.sample(self.memory, batch_size)
        states, actions, rewards, next_states, dones = zip(*minibatch)

        states = torch.FloatTensor(states)
        actions = torch.LongTensor(actions)
        rewards = torch.FloatTensor(rewards)
        next_states = torch.FloatTensor(next_states)
        dones = torch.FloatTensor(dones)

        q_values = self.model(states)
        next_q_values = self.model(next_states)
        q_value = q_values.gather(1, actions.unsqueeze(1)).squeeze(1)
        next_q_value = next_q_values.max(1)[0]
        expected_q_value = rewards + self.gamma * next_q_value * (1 - dones)

        loss = (q_value - expected_q_value.detach()).pow(2).mean()

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        self.epsilon = max(self.epsilon_end, self.epsilon_decay*self.epsilon)  # Epsilon 감소

# 환경 및 에이전트 초기화
env = gym.make('CartPole-v1')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
agent = DQNAgent(state_dim, action_dim)

# 학습
episodes = 1000
batch_size = 64

for e in range(episodes):
    state = env.reset()
    done = False
    total_reward = 0

    while not done:
        action = agent.act(state)
        next_state, reward, done, _ = env.step(action)
        agent.remember(state, action, reward, next_state, done)
        state = next_state
        total_reward += reward

    agent.replay(batch_size)
    if e % 10 == 0:
        print(f"Episode: {e+1}, Total reward: {total_reward}, Epsilon: {agent.epsilon}")
