In [140]:
# 가치 반복법 구현
import numpy as np

ACTIONS = ('U', 'D', 'L', 'R')
DELTA_THRESHOLD = 1e-3  # 학습 종료 기준치
GAMMA = 0.95            # 할인율
ACCURACY = 1            # 이동 확률

# 격자 공간 정의
class Grid:
    def __init__(self, rows, cols, start) -> None:
        self.rows = rows
        self.cols = cols
        self.i = start[0]
        self.j = start[1]

    def set(self, rewards, actions):
        self.rewards = rewards
        self.actions = actions

    def set_state(self, s):
        self.i = s[0]
        self.j = s[1]

    def get_actions(self):
        return self.actions[(self.i, self.j)]

    def current_state(self):
        return (self.i, self.j)
    
    def is_terminal(self, s):
        return s not in self.actions
    
    def move(self, action):
        if action in self.actions[(self.i, self.j)]:
            if action == 'U':
                self.i -= 1
            elif action == 'D':
                self.i += 1
            elif action == 'R':
                self.j += 1
            elif action == 'L':
                self.j -= 1

        return self.rewards.get((self.i, self.j), 0)
    
    def all_states(self):
        return set(self.actions.keys()) | set(self.rewards.keys())
    
    def get_transition_probs(self, action):
        nextState = None
        if action in self.actions[(self.i, self.j)]:
            if action == 'U':
                nextState = (self.i - 1, self.j)
            elif action == 'D':
                nextState = (self.i + 1, self.j)
            elif action == 'R':
                nextState = (self.i, self.j + 1)
            elif action == 'L':
                nextState = (self.i, self.j - 1)
        return (ACCURACY, self.rewards.get(nextState, 0), nextState)

    
# 격자 공관과 각 상태에서 선택 가능한 행동 정의
def standard_grid():
    grid = Grid(3, 4, (2, 0))
    rewards = {(0, 3): 1, (1, 3): -1}
    actions = {
		(0, 0): ('D', 'R'),
		(0, 1): ('L', 'R'),
		(0, 2): ('L', 'D', 'R'),
		(1, 0): ('U', 'D'),
		(1, 2): ('U', 'D', 'R'),
		(2, 0): ('U', 'R'),
		(2, 1): ('L', 'R'),
		(2, 2): ('L', 'R', 'U'),
		(2, 3): ('L', 'U'),
	}
    grid.set(rewards, actions)
    return grid

def print_values(V, grid):
    for i in range(grid.rows):
        print('----------------------------')
        for j in range(grid.cols):
            value = V.get((i, j), 0)
            if value >= 0:
                print(f'{value:.2f} | ', end='')
            else:
                print(f'{value:.2f} | ',end='')
        print('')

def print_policy(P, grid):
    for i in range(grid.rows):
        print('----------------------------')
        for j in range(grid.cols):
            action = P.get((i, j), ' ')
            print(f'  {action}  |', end='')
        print('')

# 주어진 상태에서 다음 상태의 가치 함수를 극대화하는 행동 선택
def best_action_value(grid, V, s):
    bestAction = None
    bestValue = float('-inf')
    grid.set_state(s)

    # 가능한 모든 행동 반복
    
    for action in grid.get_actions():
        transition = grid.get_transition_probs(action)
        rewardExpectation = 0
        valueExpectation = 0
            
        (prob, r, (nextI, nextJ)) = transition
        rewardExpectation += prob * r
        valueExpectation += prob * V[(nextI, nextJ)]

        value = rewardExpectation + GAMMA*valueExpectation
        if value > bestValue:
            bestValue = value
            bestAction = action
        
    return bestAction, bestValue

# 가치 함수 계산
def calculate_values(grid: Grid):
    V = {}
    states = grid.all_states()
    for s in states:
        V[s] = 0

    # 수렴 시까지 반복
    i = 0
    while True:
        maxChange = 0
        for s in grid.actions.keys():
            oldValue = V[s]
            _, newValue = best_action_value(grid, V, s)
            V[s] = newValue
            maxChange = max(maxChange, np.abs(oldValue - V[s]))

        print(f'\n{i} 번째 반복', end='\n')
        print_values(V, grid)
        i += 1

        if maxChange < DELTA_THRESHOLD:
            break

    return V

# 정책 무작위 초기화
def initialize_random_policy(grid: Grid):
    policy = {}
    for s in grid.actions.keys():
        policy[s] = np.random.choice(grid.actions[s])
    return policy

# 최적의 가치를 찾는 정책 도출
def calculate_greedy_policy(grid, V):
    policy = initialize_random_policy(grid)
    for s in policy.keys():
        grid.set_state(s)
        bestAction, _ = best_action_value(grid, V, s)
        policy[s] = bestAction
    return policy

# 격자 공간 초기화
grid = standard_grid()

print('\n보상: ')
print_values(grid.rewards, grid)

policy = initialize_random_policy(grid)
print('\n초기 정책: ')
print_policy(policy, grid)

V = calculate_values(grid)
print('\n가치 함수:')
print_values(V, grid)

policy = calculate_greedy_policy(grid, V)
print('\n정책: ')
print_policy(policy, grid)


보상: 
----------------------------
0.00 | 0.00 | 0.00 | 1.00 | 
----------------------------
0.00 | 0.00 | 0.00 | -1.00 | 
----------------------------
0.00 | 0.00 | 0.00 | 0.00 | 

초기 정책: 
----------------------------
  R  |  L  |  L  |     |
----------------------------
  U  |     |  R  |     |
----------------------------
  R  |  L  |  R  |  L  |

0 번째 반복
----------------------------
0.00 | 0.00 | 1.00 | 0.00 | 
----------------------------
0.00 | 0.00 | 0.95 | 0.00 | 
----------------------------
0.00 | 0.00 | 0.90 | 0.86 | 

1 번째 반복
----------------------------
0.00 | 0.95 | 1.00 | 0.00 | 
----------------------------
0.00 | 0.00 | 0.95 | 0.00 | 
----------------------------
0.00 | 0.86 | 0.90 | 0.86 | 

2 번째 반복
----------------------------
0.90 | 0.95 | 1.00 | 0.00 | 
----------------------------
0.86 | 0.00 | 0.95 | 0.00 | 
----------------------------
0.81 | 0.86 | 0.90 | 0.86 | 

3 번째 반복
----------------------------
0.90 | 0.95 | 1.00 | 0.00 | 
----------------------------
0.8

## 1.3 모델 프리 강화 학습
위와 같이 모든 상태를 안다는 가정하(모델 존재)에 문제를 풀기 위해선 매우 방대한 양의 자원이 필요하다. 체스의 경우 $10^{50}$개의 상태가 존재한다. 따라서 에이전트가 모든 상태를 몰라도 환경 탐색 과정을 거치는 상태에 대해서만 가치 함수를 업데이트할 수 있는 방법을 찾는다.

- MC(Monte Carlo, 몬테카를로) 학습
- TD(Temproal difference, 시간차) 학습


### MC 학습
가치 함수의 추정치를 구하는 방식으로 문제 해결에 접근. 즉, 최적 행동이 무엇인지 모르는 상태에서 일단 행동을 취해 얻은 경험으로 배우는 것
- 과정
  1. 초기 정책 무작위 초기화
  2. 초기화된 정책에 따라 가치 함수 추정 후, 정책 개선
  3. 개선한 정책을 따라 가치 함수 추정 후, 정책 개선
  4. 2~3의 과정을 수렵 시까지 반복
- 정책 반복법과의 차이점
  - MC 학습은 완전한 가치 함수가 아닌 가치 함수의 추정치만 얻을 수 있다.
  - MC 학습은 환경 내 모든 상태의 가치 함수를 추정하는 것이 아니라, 에이전트가 에피소드 안에서 거쳐간 상태의 가치 함수만 추정한다.
- 특징
  - 알려진 모델이 없다고 가정한다. 즉, 에이전트는 주어진 상태에서 어떤 행동을 취했을 때 어떤 상태로 전이할지, 어떤 보상이 주어질지 알지 못한다.
  - 에이전트는 경험의 표본으로부터 학습한다.
  - 현재까지 겪은 모든 에피소드에 대해 상태의 이익 $G$를 평균하여 상태의 가치 함수를 구한다(경험적 평균).
  - 에피소드 하나를 완전히 끝낸 다음 업데이트한다.
  - 에피소드 단위 문제에 한하여 적용할 수 있다.

- 행동-가치 함수 도입
  - $Q(s, a)$: 상태 $s$에서 행동 $a$를 취했을 때의 장기적인 보상에 대한 평균 값


#### 블랙잭을 통한 MC 학습
환경 정량화
1. 행동 공간 정의  
   플레이어는 히트, 스테이, 더불 중 1가지 행동을 선택할 수 있다. 하지만 여기에서는 행동의 종륲를 단순화하기 위해 더블은 제와하고 히트와 스테이만 선택할 수 있다고 가정
2. 플레이어가 가질 수 있는 상태 개수 정의  
   플레이어는 숫자 합이 11이하면 무조건 히트를 내야 한다. 그리고 숫자 합은 12부터 21까지로 총 10가지 상태로 정의할 수 있다.
3. 딜러가 가질 수 있는 샅애 개수 정의  
   딜러의 숫자 합도 고려해야 한다. 딜러는 처음에 카드 단 1장만 오픈하므로 1부터 10까지로 총 10가지 상태를 정의할 수 있다.
4. 플레이어의 카드 에이스 유무 상태 정의  
   마지막으로 플레이어의 카드 에이스 유무 상태를 정의 해야 한다. 에이스 카드는 1 또는 11로 판정할 수 있으므로 게임의 판세를 좌우하는 중요한 요소이다.
5. 전체 상태 정리하기  
   1부터 4까지의 경우를 모두 고려하면 플레이어의 상태 개수 10, 딜러의 상태 개수 10, 플레이어의 에잉스 카드 유무 2를 곱해 전체 상태를 총 200가지로 정리할 수 있다.
6. 보상 정량화하기  
   보상을 단순화하기 위해 숫자 합의 정도는 고려하지 않는다. 승리할 때 1, 패배할 때 -1, 비겼을 때 0으로 보상 설정

In [141]:
# MC 학습으로 블랙잭 풀어보기
import numpy as np
import matplotlib.pyplot as plt

class Player(object):
    def __init__(self, currentSum, usableAce, dealersCard) -> None:
        self.currentSum = currentSum
        self.dealersCard = dealersCard
        self.usableAce = usableAce
        self.usingAce = self.usableAce

    def ReceiveCard(self, card):
        if self.usingAce and self.currentSum + card > 21:
            self.usingAce = False
            self.currentSum += card - 10
        else:
            self.currentSum += card

    def GetState(self):
        return (self.currentSum, self.usableAce, self.dealersCard)
    
    def GetValue(self):
        return self.currentSum
    
    def ShouldHit(self, policy):
        return policy[self.GetState()]
    
    def Bust(self):
        return self.GetValue() > 21

In [142]:
class Dealer(object):
    def __init__(self, cards) -> None:
        self.cards = cards

    def ReceiveCard(self, card):
        self.cards.append(card)

    def GetValue(self):
        currentSum = 0
        aceCount = 0

        for card in self.cards:
            if card == 1:
                aceCount += 1
            else:
                currentSum += card

        while aceCount > 0:
            aceCount -= 1
            currentSum += 11

            if currentSum > 21:
                aceCount += 1
                currentSum -= 11
                currentSum += aceCount
                break

        return currentSum
    
    def ShouldHit(self):
        if self.GetValue() >= 17:
            return False
        else:
            return True
        
    def Bust(self):
        return self.GetValue() > 21

In [143]:
class StateActionInfo(object):
    def __init__(self) -> None:
        self.stateActionPairs = []
        self.stateActionMap = set()

    def AddPair(self, pair):
        if pair in self.stateActionMap:
            return
        
        self.stateActionPairs.append(pair)
        self.stateActionMap.add(pair)

In [144]:
def EvaluateAndImprovePolicy(qMap, policy, returns, stateActionPairs, reward):
    for pair in stateActionPairs:
        returns[pair] += 1
        qMap[pair] = qMap[pair] + ((reward - qMap[pair]) / returns[pair])

        state = pair[0]
        shouldHit = False
        if qMap[(state, True)] > qMap[(state, False)]:
            shouldHit = True

        policy[state] = shouldHit

def newCard():
    card = np.random.randint(1, 14)

    if card > 9:
        return 10
    else:
        return card

def GenerateStart(qMap, policy, returns):
    playerSum = np.random.randint(11, 22)
    usableAce = bool(np.random.randint(0, 2))

    player = Player(playerSum, usableAce, np.random.randint(1, 11))
    dealer = Dealer([np.random.randint(1, 11)])
    
    stateActionInfo = StateActionInfo()
    hitAction = bool(np.random.randint(0, 2))
    stateActionInfo.AddPair((player.GetState(), hitAction))

    if hitAction:
        player.ReceiveCard(newCard())

        while not player.Bust() and player.ShouldHit(policy):
            stateActionInfo.AddPair((player.GetState(), True))
            player.ReceiveCard(newCard())

    if player.Bust():
        EvaluateAndImprovePolicy(qMap, policy, returns, stateActionInfo.stateActionPairs, -1)
        return
    
    stateActionInfo.AddPair((player.GetState(), False))
    dealer.ReceiveCard(newCard())

    while not dealer.Bust() and dealer.ShouldHit():
        dealer.cards.append(newCard())

    if dealer.Bust() or dealer.GetValue() < player.GetValue():
        EvaluateAndImprovePolicy(qMap, policy, returns, stateActionInfo.stateActionPairs, 1)
    elif dealer.GetValue() > player.GetValue():
        EvaluateAndImprovePolicy(qMap, policy, returns, stateActionInfo.stateActionPairs, -1)
    else:
        EvaluateAndImprovePolicy(qMap, policy, returns, stateActionInfo.stateActionPairs, 0)

In [145]:
from tqdm import tqdm
qMap = {}
policy = {}
returns = {}

for playerSum in range(11, 22):
    for usableAce in range(2):
        for dealersCard in range(1, 11):
            playerState = (playerSum, bool(usableAce), dealersCard)
            qMap[(playerState, False)] = 0
            qMap[(playerState, True)] = 0
            returns[(playerState, False)] = 0
            returns[(playerState, True)] = 0

            if playerSum == 20 or playerSum == 21:
                policy[playerState] = False
            else:
                policy[playerState] = True

print(qMap)
print(policy)
print(returns)
for i in tqdm(range(10_000_000)):
    GenerateStart(qMap, policy, returns)

x11 = []
y11 = []

x12 = []
y12 = []

x21 = []
y21 = []

x22 = []
y22 = []

for playerState in policy:
    if playerState[1]:
        if policy[playerState]:
            x11.append(playerState[2] - 1)
            y11.append(playerState[0] - 11)
        else:
            x12.append(playerState[2] - 1)
            y12.append(playerState[0] - 11)
    else:
        if policy[playerState]:
            x21.append(playerState[2] - 1)
            y21.append(playerState[0] - 11)
        else:
            x22.append(playerState[2] - 1)
            y22.append(playerState[0] - 11)

plt.figure(0)
plt.title('With Usable Ace')
plt.scatter(x11, y11, color='blue')
plt.scatter(x12, y12, color='yellow')
plt.xticks(range(10), ['A', '2', '3', '4', '5', '6', '7', '8', '9', '10'])
plt.yticks(range(11), ['11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21'])

plt.figure(1)
plt.title('Without Usable Ace')
plt.scatter(x21, y21, color='blue')
plt.scatter(x22, y22, color='yellow')
plt.xticks(range(10), ['A', '2', '3', '4', '5', '6', '7', '8', '9', '10'])
plt.yticks(range(11), ['11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21'])
plt.show()


{((11, False, 1), False): 0, ((11, False, 1), True): 0, ((11, False, 2), False): 0, ((11, False, 2), True): 0, ((11, False, 3), False): 0, ((11, False, 3), True): 0, ((11, False, 4), False): 0, ((11, False, 4), True): 0, ((11, False, 5), False): 0, ((11, False, 5), True): 0, ((11, False, 6), False): 0, ((11, False, 6), True): 0, ((11, False, 7), False): 0, ((11, False, 7), True): 0, ((11, False, 8), False): 0, ((11, False, 8), True): 0, ((11, False, 9), False): 0, ((11, False, 9), True): 0, ((11, False, 10), False): 0, ((11, False, 10), True): 0, ((11, True, 1), False): 0, ((11, True, 1), True): 0, ((11, True, 2), False): 0, ((11, True, 2), True): 0, ((11, True, 3), False): 0, ((11, True, 3), True): 0, ((11, True, 4), False): 0, ((11, True, 4), True): 0, ((11, True, 5), False): 0, ((11, True, 5), True): 0, ((11, True, 6), False): 0, ((11, True, 6), True): 0, ((11, True, 7), False): 0, ((11, True, 7), True): 0, ((11, True, 8), False): 0, ((11, True, 8), True): 0, ((11, True, 9), False):

  0%|          | 17810/10000000 [00:00<04:21, 38159.55it/s]


KeyboardInterrupt: 

### TD 학습
테트리스와 같이 끝없이 할 수 있는 게임이나 한 에피소드 자체가 길어서 학습을 진행해야 하는 경우에 적용할 수 있는 시간차 학습  
- MC 학습은 한 에피소드가 끝날 때마다 Q함수 업데이트
- TD학습은 상태 변화가 있을 때마다 Q함수 업데이트

하지만 이 학습은 움직이기 위한 정책(행동 정책)과 학습하기 위한 정책(학습 정책)이 같다. 이는 Q함수를 업데이트하는 정책에도 탐험요소가 들어가 학습 수렴 속도가 느려진다는 단점이 있다. 이를 보완하기 위해 행동 정책과 학습 정책을 달리하는 것을 Q-러닝이라고 한다.

### Q-learning
TD학습에서 행동 정책과 학습 정책을 다르게 설정한 것이다.  

In [1]:
# Q-learning 구현
import numpy as np
from tqdm import tqdm

BOARD_ROWS = 3
BOARD_COLS = 4
WIN_STATE = (0, 3)
LOSE_STATE = (1, 3)
BLOCKED_STATE = (1, 1) 
START = (2, 0)
DETERMINISTIC = False   # 상태 전이 함수의 확률 적용 플래그 (False일 경우 적용)

In [2]:
class State:
    def __init__(self, state = START) -> None:
        self.state = state
        self.isEnd = False
        self.determine = DETERMINISTIC

    def giveReward(self):
        if self.state == WIN_STATE:
            return 1
        elif self.state == LOSE_STATE:
            return -1
        else:
            return 0
        
    def isEndFunc(self):
        if (self.state == WIN_STATE) or (self.state == LOSE_STATE):
            self.isEnd = True

    def _chooseActionProb(self, action):
        if action == "U":
            return np.random.choice(["U", "L", "R"], p = [0.8, 0.1, 0.1])
        if action == "D":
            return np.random.choice(["D", "L", "R"], p = [0.8, 0.1, 0.1])
        if action == "L":
            return np.random.choice(["L", "U", "D"], p = [0.8, 0.1, 0.1])
        if action == "R":
            return np.random.choice(["R", "U", "D"], p = [0.8, 0.1, 0.1])
        
    def nxtPosition(self, action):
        if self.determine:
            if action == "U":
                nxtState = (self.state[0] - 1, self.state[1])
            elif action == "D":
                nxtState = (self.state[0] + 1, self.state[1])
            elif action == "L":
                nxtState = (self.state[0], self.state[1] - 1)
            else:
                nxtState = (self.state[0], self.state[1] + 1)
            self.determine = False
        else:
            action = self._chooseActionProb(action)
            self.determine = True
            nxtState = self.nxtPosition(action)

        if (nxtState[0] >= 0) and (nxtState[0] <= 2):
            if (nxtState[1] >= 0) and (nxtState[1] <= 3):
                if nxtState != BLOCKED_STATE:
                    return nxtState
        return self.state   

In [147]:
class Agent:
    def __init__(self) -> None:
        self.states = []    # 위치, 행동 기록
        self.actions = ['U', 'D', 'L', 'R']
        self.State = State()
        self.isEnd = self.State.isEnd
        self.lr = 0.1
        self.decay_gamma = 0.95 # 할인율

        # 전체 상태에 대해 Q 함수(행동 가치 함수)의 값 초기화
        self.Q_values = {}
        for i in range(BOARD_ROWS):
            for j in range(BOARD_COLS):
                self.Q_values[(i, j)] = {}
                for a in self.actions:
                    self.Q_values[(i, j)][a] = 0

    # Q 값을 가장 극대화하는 방향으로 다음 행동 선택
    def chooseAction(self):
        max_nxt_reward = 0
        action = ''
        for a in self.actions:
            current_position = self.State.state
            nxt_reward = self.Q_values[current_position][a]
            if nxt_reward >= max_nxt_reward:
                action = a
                max_nxt_reward = nxt_reward
                # print(f'current pos: {current_position}, greedy action: {action}')
            
        return action
    
    # 행동 후 상태 업데이트
    def takeAction(self, action):
        position = self.State.nxtPosition(action)
        return State(state=position)
    
    # 종단 상태 도달 후 에피소드 초기화
    def reset(self):
        self.states = []
        self.State = State()
        self.isEnd = self.State.isEnd

    # 에피소드 개수만큼 반복
    def play(self, episodes = 10):
        for _ in tqdm(range(episodes)):
            if self.State.isEnd:
                # 역전파
                reward = self.State.giveReward()
                for a in self.actions:
                    self.Q_values[self.State.state][a] = reward
                
                for s in reversed(self.states):
                    current_q_value = self.Q_values[s[0]][s[1]]
                    reward = current_q_value + self.lr*(self.decay_gamma*reward - current_q_value)
                    self.Q_values[s[0]][s[1]] = round(reward, 3)
                #print(*self.states, end='\n')
                self.reset()
                #self._printQValues()
            else:
                action = self.chooseAction()
                self.states.append([(self.State.state), action])

                self.State = self.takeAction(action)
                self.State.isEndFunc()

                self.isEnd = self.State.isEnd

    def _printQValues(self):
        print('-------------------------------------------------------')
        for i in range(BOARD_ROWS):
            print('|', end='')
            for j in range(BOARD_COLS):
                Q_value = self.Q_values[(i, j)]
                max_value = float('-inf')
                max_value_action = ''
                for a in Q_value:
                    if Q_value[a] >= max_value:
                        max_value = Q_value[a]
                        max_value_action = a
                print(f' {max_value_action}({max_value:.6f}) |', end='')
            print('\n-------------------------------------------------------')

In [150]:
agent = Agent()
agent.play(1000000)
print('latest Q-values ... \n')
print(agent.Q_values)
print('-----------------------------------------')
for i in range(BOARD_ROWS):
    print('|', end='')
    for j in range(BOARD_COLS):
        Q_value = agent.Q_values[(i, j)]
        max_value = float('-inf')
        max_value_action = ''
        for a in Q_value:
            if Q_value[a] >= max_value:
                max_value = Q_value[a]
                max_value_action = a
        print(f' {max_value:.2f} |', end='')
    print('\n-----------------------------------------')


100%|██████████| 1000000/1000000 [00:23<00:00, 42466.97it/s]

latest Q-values ... 

{(0, 0): {'U': 0, 'D': 0, 'L': 0, 'R': 0.72}, (0, 1): {'U': 0, 'D': 0, 'L': 0, 'R': 0.838}, (0, 2): {'U': 0, 'D': 0, 'L': 0, 'R': 0.911}, (0, 3): {'U': 1, 'D': 1, 'L': 1, 'R': 1}, (1, 0): {'U': 0, 'D': 0, 'L': 0, 'R': 0.298}, (1, 1): {'U': 0, 'D': 0, 'L': 0, 'R': 0}, (1, 2): {'U': 0, 'D': 0, 'L': 0.478, 'R': -0.095}, (1, 3): {'U': -1, 'D': -1, 'L': -1, 'R': -1}, (2, 0): {'U': 0, 'D': 0, 'L': 0, 'R': 0.132}, (2, 1): {'U': 0, 'D': 0, 'L': 0.1, 'R': -0.001}, (2, 2): {'U': 0, 'D': 0, 'L': 0.173, 'R': -0.009}, (2, 3): {'U': 0, 'D': 0, 'L': 0, 'R': -0.092}}
-----------------------------------------
| 0.72 | 0.84 | 0.91 | 1.00 |
-----------------------------------------
| 0.30 | 0.00 | 0.48 | -1.00 |
-----------------------------------------
| 0.13 | 0.10 | 0.17 | 0.00 |
-----------------------------------------



