# game里的内容

- 主要用于定义游戏的规则，如：移动、合并、游戏结束等
- 主要类：Game
- 主要函数：move、board

In [7]:
'''A numpy-based 2048 game core implementation.'''
import numpy as np

class Game:

    def __init__(self, size=4, score_to_win=None, rate_2=0.5, random=False, enable_rewrite_board=False):
        '''

        :param size: the size of the board
        :param score_to_win: the terminate score to indicate `win`
        :param rate_2: the probability of the next element to be 2 (otherwise 4)
        :param random: a random initialized board (a harder mode)
        '''
        self.size = size
        if score_to_win is None:
            score_to_win = np.inf
        self.score_to_win = score_to_win
        self.__rate_2 = rate_2
        if random:
            self.__board = \
                2 ** np.random.randint(1, 10, size=(self.size, self.size))
            self.__end = False
        else:
            self.__board = np.zeros((self.size, self.size))
            # initilize the board (with 2 entries)
            self._maybe_new_entry()
            self._maybe_new_entry()
        self.enable_rewrite_board = enable_rewrite_board
        assert not self.end

    def move(self, direction):
        '''
        direction:
            0: left
            1: down
            2: right
            3: up
        '''
        # treat all direction as left (by rotation)
        board_to_left = np.rot90(self.board, -direction)
        for row in range(self.size):
            core = _merge(board_to_left[row])
            board_to_left[row, :len(core)] = core
            board_to_left[row, len(core):] = 0

        # rotation to the original
        self.__board = np.rot90(board_to_left, direction)
        self._maybe_new_entry()

    def __str__(self):
        board = "State:"
        for row in self.board:
            board += ('\t' + '{:8d}' *
                      self.size + '\n').format(*map(int, row))
        board += "Score: {0:d}".format(self.score)
        return board

    @property
    def board(self):
        '''`NOTE`: Setting board by indexing,
        i.e. board[1,3]=2, will not raise error.'''
        return self.__board.copy()

    @board.setter
    def board(self, x):
        if self.enable_rewrite_board:
            assert self.__board.shape == x.shape
            self.__board = x.astype(self.__board.dtype)
        else:
            print("Disable to rewrite `board` manually.")

    @property
    def score(self):
        return int(self.board.max())

    @property
    def end(self):
        '''
        0: continue
        1: lose
        2: win
        '''
        if self.score >= self.score_to_win:
            return 2
        elif self.__end:
            return 1
        else:
            return 0

    def _maybe_new_entry(self):
        '''maybe set a new entry 2 / 4 according to `rate_2`'''
        where_empty = self._where_empty()
        if where_empty:
            selected = where_empty[np.random.randint(0, len(where_empty))]
            self.__board[selected] = \
                2 if np.random.random() < self.__rate_2 else 4
            self.__end = False
        else:
            self.__end = True

    def _where_empty(self):
        '''return where is empty in the board'''
        return list(zip(*np.where(self.board == 0)))


def _merge(row):
    '''merge the row, there may be some improvement'''
    non_zero = row[row != 0]  # remove zeros
    core = [None]
    for elem in non_zero:
        if core[-1] is None:
            core[-1] = elem
        elif core[-1] == elem:
            core[-1] = 2 * elem
            core.append(None)
        else:
            core.append(elem)
    if core[-1] is None:
        core.pop()
    return core


# display里的内容

- 主要完成可视化工作
- 主要类：Display、IPythonDisplay

In [8]:
import sys
from IPython.display import HTML, display as ipy_display


class Display:
    '''A basic display.'''

    def display(self, game):
        if game.end == 2:
            self.win(game)
        elif game.end == 1:
            self.lose(game)
        else:
            self.show(game)

    def _display(self, game):
        print(game)

    def show(self, game):
        self._display(game)

    def win(self, game):
        self._display(game)
        print("You win! Score: %s" % game.score)

    def lose(self, game):
        self._display(game)
        print("You lose! Score: %s" % game.score)


class IPythonDisplay(Display):
    '''A better display for IPython (Jupyter) notebook environments.'''

    def __init__(self, display_size=40):
        self.display_size = display_size

    def _render(self, game):
        board = game.board
        html = '''<h1>Score: {}</h1>'''.format(game.score)
        table = '''<table style="border: 5px solid black;">{}</table>'''
        td = '''<td style="border:3px solid black; text-align:center;"
         width="%s" height="%s">{}</td>''' % (self.display_size, self.display_size)
        content = ''
        for row in range(game.size):
            content += '''<tr>'''
            for col in range(game.size):
                elem = int(board[row, col])
                content += td.format(elem if elem else "")
            content += '''</tr>'''
        html += table.format(content)
        return html

    def _display(self, game):
        if 'ipykernel' in sys.modules:
            source = self._render(game)
            ipy_display(HTML(source))
        else:
            print("Warning: since it's not in ipykernel, "
                  "it will show the command line version.")
            super()._display(game)

# agent里面的内容

- 主要用于控制游戏，产生方向
- 主要类：RandomAgent、ExpectiMaxAgent、MyAgent

In [9]:
import numpy as np

class Agent:
    '''Agent Base.'''

    def __init__(self, game, display=None):
        self.game = game
        self.display = display

    def play(self, max_iter=np.inf, verbose=False):
        n_iter = 0
        while (n_iter < max_iter) and (not self.game.end):
            direction = self.step()
            self.game.move(direction)
            n_iter += 1
            if verbose:
                print("Iter: {}".format(n_iter))
                print("======Direction: {}======".format(
                    ["left", "down", "right", "up"][direction]))
                if self.display is not None:
                    self.display.display(self.game)

    def step(self):
        direction = int(input("0: left, 1: down, 2: right, 3: up = ")) % 4
        return direction


class RandomAgent(Agent):

    def step(self):
        direction = np.random.randint(0, 4)
        return direction


class ExpectiMaxAgent(Agent):

    def __init__(self, game, display=None):
        if game.size != 4:
            raise ValueError(
                "`%s` can only work with game of `size` 4." % self.__class__.__name__)
        super().__init__(game, display)
        
        #从这里导入用cpp写好的expectimax agent
        from .expectimax import board_to_move
        self.search_func = board_to_move

    def step(self):
        direction = self.search_func(self.game.board)
        return direction


#定义自己的agent
class MyAgent(Agent):

    def __init__(self, game, myNet, display=None):
        super().__init__(game, display)
        self.model = myNet

    def step(self):
        direction = self.model.predictDirection(self.game.board)
        return direction

# 实现自己的agent

## 导入库

In [10]:
import time
import torch
import torch.nn as nn
import torch.nn.functional as f
import torch.optim as optim
from torch.autograd import  Variable
import numpy as np
from math import log
from expectimax import board_to_move

Loaded expectmax lib for 2048: C:\Users\ASUS\Desktop\Jupyter_Notebook_Project\ML_Project_New\expectimax\bin/2048.so


## 定义函数、模型

函数及功能
- randomBoard(zero_rate = 0.2, max_2 = 5)：产生一个随机的board，相当于产生不带标签的训练集
- oneHotEncoding(board)：将board转换为one-hot encoding，因为助教说直接用数值训练效果很不好
- generateTrainSet(batch_size = 50)：产生带标签的训练集

In [11]:
# 生成随机的board
def randomBoard(zero_rate = 0.2, max_2 = 5):
    board = np.zeros((4, 4))
    for i in range(4):
        for j in range(4):
            if torch.rand(1) >= zero_rate:
                board[i, j] = 2 ** np.random.randint(1, max_2)
    return board

# 用one-hot编码表示board
# 位置示意
#  1--2--3--4
#  5--6--7--8
#  9-10-11-12
# 13-14-15-16
# 编码示意
# 0-10  <-----> 1024-512-...-2-0
def oneHotEncoding(board):
    data = torch.zeros(16, 11, requires_grad=True)
    for i in range(4):
        for j in range(4):
            if board[i, j] == 0:
                data[4*i+j, 10] = 1
            else:
                data[4*i+j, 10-int(log(board[i, j], 2))] = 1
    #data -= 0.1 * torch.ones(16, 11)
    return data

# 获得初始数据
def generateTrainSet(batch_size = 50, zero_rate = 0.2, max_2 = 5):
    data = torch.zeros(batch_size, 16 * 11)
    labels = torch.zeros(batch_size, dtype = torch.long)
    for i in range(batch_size):
        board = randomBoard(zero_rate, max_2)
        raw_data = oneHotEncoding(board)
        data[i, :] = raw_data.flatten()
        # board_to_move即调用expectimax的agent的接口
        labels[i] = board_to_move(board)
    return data, labels

神经网络说明：
- 使用one-hot encoding作为训练集
- 尽量使用全连接网络，卷积神经网络效果不明显

In [12]:
#定义神经网络
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        #全连接神经网络
        #第一层输入为16*11，16个location，11个one-hot coding
        self.fc_1 = nn.Linear(16 * 11, 100)
        self.fc_2 = nn.Linear(100, 200)
        self.fc_3 = nn.Linear(200, 100)
        self.fc_4 = nn.Linear(100, 80)
        self.fc_5 = nn.Linear(80, 30)
        #最后一层输出是4，即四个方向
        self.fc_6 = nn.Linear(30, 4)

    def forward(self, x):
        #dropout 防止过拟合
        m = nn.Dropout(0.1)
        x = f.relu(m(self.fc_1(x)))
        x = f.softmax(m(self.fc_2(x)), dim=0)
        m = nn.Dropout(0.15)
        x = f.relu(m(self.fc_3(x)))
        x = f.softmax(m(self.fc_4(x)), dim=0)
        m = nn.Dropout(0.2)
        x = f.relu(m(self.fc_5(x)))
        x = self.fc_6(x)
        return x

    def predictDirection(self, board):
        d = oneHotEncoding(board).flatten()
        result = self(d)
        _, predict = torch.max(result.data, -1)
        return int(predict)

## 训练模型

In [17]:
EPOCH = 10
BATCH_SIZE = 64
PATH = './2048.pth'

model = Net()
model.load_state_dict(torch.load(PATH))

if torch.cuda.is_available():
    print("gpu is available")
    model.cuda()    #将所有的模型参数移动到GPU上
    
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

gpu is available


In [18]:
for epoch in range(EPOCH):
    print("----------Epoch {}/{}---------".format(epoch+1, EPOCH))
    
    #随机生成zero_rate、max_2
    zero_rate = np.random.rand()
    max_2 = np.random.randint(2, 10)
    
    #产生数据集
    X_train,y_train = generateTrainSet(BATCH_SIZE, zero_rate, max_2)
    X_train,y_train = X_train.cuda(),y_train.cuda()
    X_train,y_train = Variable(X_train),Variable(y_train)
    
    outputs = model(X_train)
    optimizer.zero_grad()
    
    loss = criterion(outputs, y_train)
    loss.backward()
    
    optimizer.step()
    print('          loss:%.3f          \n' % (loss.item()))

torch.save(model.state_dict(), PATH)

----------Epoch 1/10---------
          loss:1.429          

----------Epoch 2/10---------
          loss:1.425          

----------Epoch 3/10---------
          loss:1.507          

----------Epoch 4/10---------
          loss:1.479          

----------Epoch 5/10---------
          loss:1.336          

----------Epoch 6/10---------
          loss:1.432          

----------Epoch 7/10---------
          loss:1.348          

----------Epoch 8/10---------
          loss:1.406          

----------Epoch 9/10---------
          loss:1.390          

----------Epoch 10/10---------
          loss:1.447          



## 加载模型

此部分与4.3不同时进行

PATH = './2048.pth'
model = Net()
model.load_state_dict(torch.load(PATH))
model.eval()

## 测试模型

In [21]:
n_test = 20
score = 0
for i in range(n_test):
    display1 = Display()
    game = Game(4, random=False)
    display1.display(game)
    agent = MyAgent(game, myNet=model.cpu(), display=display1)
    agent.play(verbose=False)
    score += game.score
score /= n_test

State:	       0       0       0       0
	       0       4       0       0
	       0       0       0       4
	       0       0       0       0
Score: 4
State:	       4       0       0       0
	       0       4       0       0
	       0       0       0       0
	       0       0       0       0
Score: 4
State:	       4       4       0       0
	       0       0       0       0
	       0       0       0       0
	       0       0       0       0
Score: 4
State:	       0       0       0       0
	       4       0       0       0
	       2       0       0       0
	       0       0       0       0
Score: 4
State:	       0       0       0       0
	       0       0       0       0
	       0       4       0       0
	       0       0       2       0
Score: 4
State:	       0       2       0       0
	       0       0       0       4
	       0       0       0       0
	       0       0       0       0
Score: 4
State:	       0       0       0       0
	       0       0       2       0
	       0       0   

In [22]:
score

21.6

# webapp里面的内容

## 函数定义

In [69]:
from flask import Flask, jsonify, request

def get_flask_app(game, agent):
    app = Flask(__name__)

    @app.route("/")
    def index():
        return app.send_static_file('board.html')

    @app.route("/board", methods=['GET', 'POST'])
    def get_board():
        direction = -1
        control = "USER"
        if request.method == "POST":
            direction = request.json
            if direction == -1:
                direction = agent.step()
                control = 'AGENT'
            game.move(direction)
        return jsonify({"board": game.board.tolist(),
                        "score": game.score,
                        "end": game.end,
                        "direction": direction,
                        "control": control})

    return app

## 运行webapp

In [None]:
GAME_SIZE = 4
SCORE_TO_WIN = 2048
APP_PORT = 5005
APP_HOST = "0.0.0.0"

game = Game(size=GAME_SIZE, score_to_win=SCORE_TO_WIN)

#这里就是要给一个agent
try:
    agent = ExpectiMaxAgent(game=game)
except:
    print("WARNING: Please compile the ExpectiMaxAgent first following the README.")
    print("WARNING: You are now using a RandomAgent.")
    agent = RandomAgent(game=game)

print("Run the webapp at http://<any address for your local host>:%s/" % APP_PORT)    

app = get_flask_app(game, agent)
app.run(port=APP_PORT, threaded=False, host=APP_HOST)  # IMPORTANT: `threaded=False` to ensure correct behavior

# evaluate里面的内容

测试函数，用于评估模型，计算得分

In [23]:
def single_run(size, score_to_win, AgentClass, model, **kwargs):
    game = Game(size, score_to_win)
    agent = AgentClass(game, model, display=Display(), **kwargs)
    agent.play(verbose=True)
    return game.score

运行测试函数

In [26]:
GAME_SIZE = 4
SCORE_TO_WIN = 2048
N_TESTS = 10

'''====================
Use your own agent here.'''
TestAgent = MyAgent
'''===================='''

scores = []
for _ in range(N_TESTS):
    score = single_run(GAME_SIZE, SCORE_TO_WIN,
                       AgentClass=TestAgent, model = model.cpu())
    scores.append(score)

print("Average scores: @%s times" % N_TESTS, sum(scores) / len(scores))

Iter: 1
State:	       0       4       0       4
	       0       0       0       0
	       0       0       0       0
	       0       0       0       2
Score: 4
Iter: 2
State:	       0       0       0       8
	       2       0       0       0
	       0       0       0       0
	       0       0       0       2
Score: 8
Iter: 3
State:	       0       0       0       8
	       0       0       2       2
	       0       0       0       0
	       0       0       0       2
Score: 8
Iter: 4
State:	       2       0       0       8
	       0       0       0       4
	       0       0       0       0
	       0       0       0       2
Score: 8
Iter: 5
State:	       0       0       2       8
	       2       0       0       4
	       0       0       0       0
	       0       0       0       2
Score: 8
Iter: 6
State:	       0       0       2       8
	       0       0       2       4
	       0       0       0       0
	       0       2       0       2
Score: 8
Iter: 7
State:	       0       0       2       

Iter: 2
State:	       0       0       4       2
	       0       0       0       0
	       0       0       0       0
	       2       0       0       2
Score: 4
Iter: 3
State:	       0       0       4       2
	       0       0       0       0
	       0       0       0       0
	       0       0       4       4
Score: 4
Iter: 4
State:	       0       0       4       2
	       0       0       0       0
	       0       0       0       0
	       0       0       2       8
Score: 8
Iter: 5
State:	       0       0       4       2
	       0       0       0       0
	       0       4       0       0
	       0       0       2       8
Score: 8
Iter: 6
State:	       0       0       4       2
	       0       0       0       0
	       0       2       0       4
	       0       0       2       8
Score: 8
Iter: 7
State:	       0       0       4       2
	       0       0       0       0
	       0       0       2       4
	       4       0       2       8
Score: 8
Iter: 8
State:	       0       0       4       

Iter: 27
State:	       4       4       4       8
	       0       4       8      16
	       2       4       8      16
	       2       4       2      16
Score: 16
Iter: 28
State:	       4       4       8       8
	       0       4       8      16
	       2       4       8      16
	       2       4       2      16
Score: 16
Iter: 29
State:	       0       2       8      16
	       0       4       8      16
	       2       4       8      16
	       2       4       2      16
Score: 16
Iter: 30
State:	       4       2       8      16
	       0       4       8      16
	       2       4       8      16
	       2       4       2      16
Score: 16
Iter: 31
State:	       4       2       8      16
	       4       4       8      16
	       2       4       8      16
	       2       4       2      16
Score: 16
Iter: 32
State:	       4       2       8      16
	       2       8       8      16
	       2       4       8      16
	       2       4       2      16
Score: 16
Iter: 33
State:	       4       2  

In [27]:
print("Average scores: @%s times" % N_TESTS, sum(scores) / len(scores))

Average scores: @10 times 18.4


# generate_fingerprint里面的内容

生成指纹，好像是防止作弊用的？

In [30]:
import json
import numpy as np

def generate_fingerprint(AgentClass, model, **kwargs):
    with open("board_cases.json") as f:
        board_json = json.load(f)

    game = Game(size=4, enable_rewrite_board=True)
    agent = AgentClass(game=game, myNet=model, **kwargs)

    trace = []
    for board in board_json:
        game.board = np.array(board)
        direction = agent.step()
        trace.append(direction)
    fingerprint = "".join(str(i) for i in trace)
    return fingerprint

In [31]:
from collections import Counter

'''====================
Use your own agent here.'''
TestAgent = MyAgent
'''===================='''

fingerprint = generate_fingerprint(AgentClass=TestAgent, model=model.cpu())

with open("EE369_fingerprint.json", 'w') as f:        
    pack = dict()
    pack['fingerprint'] = fingerprint
    pack['statstics'] = dict(Counter(fingerprint))
    f.write(json.dumps(pack, indent=4))