In [1]:
import math
import sys
from collections import Counter
sys.path.insert(0, '..')
#import d2l
import torch
import torch.nn as nn
import torch.optim as optim
import time
import random as rand

def count_func(num):
    return lambda dice: num * Counter(dice)[num]

def four_of_a_kind(dice):
    for k, v in Counter(dice).items():
        if 4 <= v:
            return k * 4
    return 0

def straight(dice):
    min_dice = min(dice)
    length = 1
    while min_dice + length in dice:
        length += 1
    return length

YACHT = lambda dice: 50 if len(dice) == 5 and len(set(dice)) == 1 else 0
ONES = count_func(1)
TWOS = count_func(2)
THREES = count_func(3)
FOURS = count_func(4)
FIVES = count_func(5)
SIXES = count_func(6)
FULL_HOUSE = lambda dice: sum(dice) if \
    sorted(tuple(Counter(dice).values())) == [2, 3] else 0
FOUR_OF_A_KIND = four_of_a_kind
SMALL_STRAIGHT = lambda dice: 15 if straight(dice) >= 4 else 0
LARGE_STRAIGHT = lambda dice: 30 if 5 == straight(dice) else 0
CHOICE = lambda dice: sum(dice)
def score(dice, category):
    return category(dice)

class yacht_game:
    score_board = torch.tensor([[-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1],
                                [-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1]], requires_grad = False)
    dice_status = torch.zeros(5, requires_grad = False)
    roll_count = torch.tensor([2],requires_grad = False)
    total_score = [0, 0]
    cur_player = 0
    multi_mode = False
    score_func = [ONES, TWOS, THREES, FOURS, FIVES,
              SIXES, CHOICE, FOUR_OF_A_KIND, FULL_HOUSE,
              SMALL_STRAIGHT, LARGE_STRAIGHT,YACHT]
            
    
    def __init__(self):
        self.reset_game()

    def roll_dice(self, roll_action_num):
        for i in range(5):
            if roll_action_num%2 == 1:
                self.dice_status[i] = rand.randint(1, 6)
            roll_action_num /= 2
        self.roll_count -= 1

    def set_multi_mode(self,mode):
        self.multi_mode = mode
    
    def get_yacht_output(self):
        cur_total_score = 0
        for i in range(len(self.score_board[self.cur_player])):
            cur_total_score += self.score_board[self.cur_player][i]
            if self.score_board[self.cur_player][i] == -1:
                cur_total_score += 1
        if self.multi_mode:
            return torch.cat((self.score_board[self.cur_player], self.score_board[1 - self.cur_player], self.roll_count, self.dice_status)), cur_total_score, self.is_game_finished()
        else:
            return torch.cat((self.score_board[self.cur_player], self.roll_count, self.dice_status))\
                    ,cur_total_score, self.is_game_finished()

    def is_game_finished(self):
        return -1 not in self.score_board[self.cur_player]

    def set_score(self, dice, category):
        self.score_board[self.cur_player][category] = score(dice, self.score_func[category])
    
    def update(self, yacht_input):
        if self.is_game_finished():
            #print('Game End')
            #print('player : ', self.cur_player, ' total score : ', self.total_score[self.cur_player])
            if self.multi_mode:
                self.cur_player = 1 - self.cur_player
        else:
            dice_input, score_input = yacht_input[:32], yacht_input[32:]
            
            max_dice_index = dice_input.index(max(dice_input))
            if self.roll_count > 0 and not max_dice_index == 0:
                #print('Roll dice')
                self.roll_dice(max_dice_index)

            else:  # Set score
                #print('player : ', self.cur_player, ' Set score')
                for pref, i in sorted(zip(score_input,range(len(score_input)) ), reverse=True):
                    if self.score_board[self.cur_player][i] == -1:
                        self.set_score(self.dice_status, i)
                        break
                    else:
                        score_input[i] = -math.inf
                self.roll_count = 3
                self.roll_dice(31)

                if self.is_game_finished():  # Game Ended
                    #print('Game End')
                    bonus_counter, score_sum = 0, 0
                    for i in self.score_board[self.cur_player]:
                        score_sum += i
                        if i < 6:
                            bonus_counter += i
                    if bonus_counter >= 63:
                        score_sum += 35
                    self.total_score[self.cur_player] = score_sum
                    #print('player : ', self.cur_player, ' total score : ', score_sum)

                if self.multi_mode:
                    self.cur_player = 1 - self.cur_player
    
    def reset_game(self):
        score_board = torch.tensor([[-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1],
                                    [-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1]], requires_grad = False)
        dice_status = torch.zeros(5, requires_grad = False)
        roll_count = torch.tensor([2],requires_grad = False)
        total_score = [0, 0]
        cur_player = 0

game = yacht_game()

game.get_yacht_output() \
game.update(yacht_input)

In [2]:
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple
from itertools import count
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [3]:
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))

class ReplayMemory(object):

    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, *args):
        """transition 저장"""
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [4]:
INPUT_SIZE = 18
OUTPUT_SIZE = 44

class DQN(nn.Module):

    def __init__(self, input_size, output_size):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_size, 100)
        self.fc2 = nn.Linear(100, 50)
        #self.fc3 = nn.Linear(100, 100)
        #self.fc4 = nn.Linear(100, 100)
        #self.fc5 = nn.Linear(100, 100)
        self.fc6 = nn.Linear(50, output_size)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        #x = F.relu(self.fc3(x))
        #x = F.relu(self.fc4(x))
        #x = F.relu(self.fc5(x))

        x = self.fc6(x)

        return x

def init_weights(m):
    if type(m) == nn.Linear or type(m) == nn.Conv2d:
        torch.nn.init.xavier_uniform_(m.weight)

policy_net = DQN(INPUT_SIZE,OUTPUT_SIZE).to(device)
target_net = DQN(INPUT_SIZE,OUTPUT_SIZE).to(device)


policy_net.apply(init_weights)
target_net.apply(init_weights)

DQN(
  (fc1): Linear(in_features=18, out_features=100, bias=True)
  (fc2): Linear(in_features=100, out_features=50, bias=True)
  (fc6): Linear(in_features=50, out_features=44, bias=True)
)

In [5]:
BATCH_SIZE = 128
GAMMA = 0.099
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 500
TARGET_UPDATE = 10



target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.RMSprop(policy_net.parameters())
memory = ReplayMemory(200)


steps_done = 0


def select_action(state):
    global steps_done
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * \
        math.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1
    if sample > eps_threshold:
        with torch.no_grad():
            # t.max (1)은 각 행의 가장 큰 열 값을 반환합니다.
            # 최대 결과의 두번째 열은 최대 요소의 주소값이므로,
            # 기대 보상이 더 큰 행동을 선택할 수 있습니다.
            return policy_net(state)
    else:
        return torch.randn(OUTPUT_SIZE, device=device)


episode_durations = []


def plot_durations():
    plt.figure(2)
    plt.clf()
    durations_t = torch.tensor(episode_durations, dtype=torch.float)
    plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel('Duration')
    plt.plot(durations_t.numpy())
    # 100개의 에피소드 평균을 가져 와서 도표 그리기
    if len(durations_t) >= 100:
        means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy())

    plt.pause(0.001)  # 도표가 업데이트되도록 잠시 멈춤
    #if is_ipython:
    #    display.clear_output(wait=True)
    #    display.display(plt.gcf())

In [6]:
def optimize_model():
    if len(memory) < BATCH_SIZE:
        return
    transitions = memory.sample(BATCH_SIZE)
    # Transpose the batch
    batch = Transition(*zip(*transitions))
    
    #print(batch.next_state)

    # 최종이 아닌 상태의 마스크를 계산하고 배치 요소를 연결합니다
    # (최종 상태는 시뮬레이션이 종료 된 이후의 상태)
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat(batch.next_state)
    
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)
    
    #print(str(policy_net(state_batch).size()))

    # Q(s_t, a) 계산 - 모델이 Q(s_t)를 계산하고, 취한 행동의 열을 선택합니다.
    # 이들은 policy_net에 따라 각 배치 상태에 대해 선택된 행동입니다.
    state_action_values = policy_net(state_batch)

    # 모든 다음 상태를 위한 V(s_{t+1}) 계산
    # non_final_next_states의 행동들에 대한 기대값은 "이전" target_net을 기반으로 계산됩니다.
    # max(1)[0]으로 최고의 보상을 선택하십시오.
    # 이것은 마스크를 기반으로 병합되어 기대 상태 값을 갖거나 상태가 최종인 경우 0을 갖습니다.
    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    next_state_values = target_net(non_final_next_states)
    # 기대 Q 값 계산
    expected_state_action_values = (next_state_values * GAMMA) * reward_batch
    
    # Huber 손실 계산
    
    loss = F.smooth_l1_loss(state_action_values, expected_state_action_values)

    # 모델 최적화
    optimizer.zero_grad()
    loss.backward()
    for param in policy_net.parameters():
        param.grad.data.clamp_(-1, 1)
    optimizer.step()

In [8]:
num_episodes = 1000
for i_episode in range(num_episodes):
    # 환경과 상태 초기화
    game.reset_game()
    state, reward, _ = game.get_yacht_output()
    state = torch.tensor(state, dtype=torch.float, device=device)
    for t in count():
        # 행동 선택과 수행
        action = select_action(state)
        game.update(action.tolist())
        new_state, new_reward, done = game.get_yacht_output()
        new_state = torch.tensor(new_state, dtype=torch.float, device=device)
        step_reward = torch.tensor([new_reward - reward], device=device)


        # 메모리에 변이 저장
        #print(state.size())
        memory.push(state.reshape(1,INPUT_SIZE), action.reshape(1,OUTPUT_SIZE), new_state.reshape(1,INPUT_SIZE), torch.tensor([new_reward],device=device).view(1,1))

        # 다음 상태로 이동
        state = new_state
        reward = new_reward

        # 최적화 한단계 수행(목표 네트워크에서)
        optimize_model()
        if done:
            episode_durations.append(t + 1)
            state, score, _ = yacht.get_yacht_output()
            print(str(i_episode) + ') ' + str(state[:12]) + ' score : ' + str(score))
            #plot_durations()
            break
    #목표 네트워크 업데이트, 모든 웨이트와 바이어스 복사
    if i_episode % TARGET_UPDATE == 0:
        target_net.load_state_dict(policy_net.state_dict())

print('Complete')

  


NameError: name 'yacht' is not defined