In [4]:
from combat import Combat


def test_env():
    state = env.reset()
    action = env.action_space.sample()
    next_state, reward, over, _ = env.step(action)

    print('state=', len(state[0]), len(state[1]))
    print('action=', action)
    print('reward=', reward)
    print('next_state=', len(next_state[0]), len(next_state[1]))
    print('over=', over)


env = Combat(grid_shape=(15, 15), n_agents=2, n_opponents=2)

test_env()

env

state= 150 150
action= [5, 2]
reward= [0, 0]
next_state= 150 150
over= [False, False]


<combat.Combat at 0x7f3d78e87080>

In [2]:
import random
import torch


class PPO:
    def __init__(self):
        self.model_action = torch.nn.Sequential(
            torch.nn.Linear(150, 64),
            torch.nn.ReLU(),
            torch.nn.Linear(64, 64),
            torch.nn.ReLU(),
            torch.nn.Linear(64, 7),
            torch.nn.Softmax(dim=1),
        )
        self.model_value = torch.nn.Sequential(
            torch.nn.Linear(150, 64),
            torch.nn.ReLU(),
            torch.nn.Linear(64, 64),
            torch.nn.ReLU(),
            torch.nn.Linear(64, 1),
        )
        self.optimizer_action = torch.optim.Adam(
            self.model_action.parameters(), lr=3e-4)
        self.optimizer_value = torch.optim.Adam(self.model_value.parameters(),
                                                lr=3e-3)

        self.mse_loss = torch.nn.MSELoss()

    def get_action(self, state):
        state = torch.FloatTensor(state).reshape(1, 150)

        #[1, 150] -> [7]
        weights = self.model_action(state).squeeze(dim=0).tolist()

        #[7] -> scala
        action = random.choices(range(7), weights=weights, k=1)[0]

        return action

    def _get_advantages(self, deltas):
        advantages = []

        #反向遍历deltas
        s = 0.0
        for delta in deltas[::-1]:
            s = 0.99 * 0.97 * s + delta
            advantages.append(s)

        #逆序
        advantages.reverse()
        return advantages

    def _get_target(self, next_state, reward, over):
        #[b, 150] -> [b, 1]
        target = self.model_value(next_state)
        target *= 0.99
        target *= (1 - over)
        target += reward
        return target

    def _get_value(self, state):
        #[b, 150] -> [b, 1]
        return self.model_value(state)

    def train(self, state, action, reward, next_state, over):
        #state -> [b, 150]
        #action -> [b, 1]
        #reward -> [b, 1]
        #next_state -> [b, 150]
        #over -> [b, 1]

        #[b, 1]
        target = self._get_target(next_state, reward, over).detach()
        #[b, 150] -> [b, 1]
        value = self._get_value(state)

        #[b, 1] - [b, 1] -> [b, 1] -> [b]
        delta = (target - value).squeeze(dim=1).tolist()
        #[b] -> [b]
        advantages = self._get_advantages(delta)
        #[b] -> [b, 1]
        advantages = torch.FloatTensor(advantages).reshape(-1, 1)

        #[b, 150] -> [b, 7]
        old_prob = self.model_action(state)
        #[b, 7] -> [b, 1]
        old_prob = old_prob.gather(1, action)
        #[b, 1] -> [b, 1]
        old_prob = old_prob.log().detach()

        for _ in range(1):
            #[b, 150] -> [b, 7]
            new_prob = self.model_action(state)
            #[b, 7] -> [b, 1]
            new_prob = new_prob.gather(1, action)
            #[b, 1] -> [b, 1]
            new_prob = new_prob.log()

            #[b, 1] - [b, 1] -> [b, 1]
            ratio = (new_prob - old_prob).exp()

            #[b, 1] * [b, 1] -> [b, 1]
            surr1 = ratio * advantages
            #[b, 1] * [b, 1] -> [b, 1]
            surr2 = torch.clamp(ratio, 0.8, 1.2) * advantages

            #[b, 1]
            loss_action = torch.min(surr1, surr2)
            loss_action = -loss_action
            #[b, 1] -> scala
            loss_action = loss_action.mean()

            self.optimizer_action.zero_grad()
            loss_action.backward()
            self.optimizer_action.step()

            #[b, 4] -> [b, 1]
            value = self._get_value(state)

            self.optimizer_value.zero_grad()
            #[b, 1],[b, 1] -> scala
            loss_action = self.mse_loss(value, target)
            self.optimizer_value.step()


ppo = PPO()

ppo.train(
    torch.randn(5, 150),
    torch.ones(5, 1).long(),
    torch.randn(5, 1),
    torch.randn(5, 150),
    torch.zeros(5, 1).long(),
)

ppo.get_action(list(range(150)))

5

In [3]:
def get_data():
    data0 = {
        'state': [],
        'action': [],
        'reward': [],
        'next_state': [],
        'over': [],
    }

    data1 = {
        'state': [],
        'action': [],
        'reward': [],
        'next_state': [],
        'over': [],
    }

    state = env.reset()
    over = False
    while not over:
        action = [None, None]

        action[0] = ppo.get_action(state[0])
        action[1] = ppo.get_action(state[1])

        next_state, reward, over, info = env.step(action)
        win = info['win']
        del info

        #对reward进行偏移
        if win:
            reward[0] += 100
            reward[1] += 100
        else:
            reward[0] -= 0.1
            reward[1] -= 0.1

        data0['state'].append(state[0])
        data0['action'].append(action[0])
        data0['reward'].append(reward[0])
        data0['next_state'].append(next_state[0])
        data0['over'].append(False)  #常量

        data1['state'].append(state[1])
        data1['action'].append(action[1])
        data1['reward'].append(reward[1])
        data1['next_state'].append(next_state[1])
        data1['over'].append(False)  #常量

        state = next_state
        over = over[0] and over[1]

    data0['state'] = torch.FloatTensor(data0['state']).reshape(-1, 150)
    data0['action'] = torch.LongTensor(data0['action']).reshape(-1, 1)
    data0['reward'] = torch.FloatTensor(data0['reward']).reshape(-1, 1)
    data0['next_state'] = torch.FloatTensor(data0['next_state']).reshape(
        -1, 150)
    data0['over'] = torch.LongTensor(data0['over']).reshape(-1, 1)

    data1['state'] = torch.FloatTensor(data1['state']).reshape(-1, 150)
    data1['action'] = torch.LongTensor(data1['action']).reshape(-1, 1)
    data1['reward'] = torch.FloatTensor(data1['reward']).reshape(-1, 1)
    data1['next_state'] = torch.FloatTensor(data1['next_state']).reshape(
        -1, 150)
    data1['over'] = torch.LongTensor(data1['over']).reshape(-1, 1)

    return data0, data1, win


get_data()

({'state': tensor([[0., 0., 0.,  ..., 0., 0., 0.],
          [0., 0., 0.,  ..., 0., 0., 0.],
          [0., 0., 0.,  ..., 0., 0., 0.],
          ...,
          [0., 0., 0.,  ..., 0., 0., 0.],
          [0., 0., 0.,  ..., 0., 0., 0.],
          [0., 0., 0.,  ..., 0., 0., 0.]]),
  'action': tensor([[1],
          [2],
          [1],
          [3],
          [1],
          [1],
          [0],
          [5],
          [1],
          [4],
          [4],
          [4],
          [3],
          [1],
          [5],
          [3],
          [0],
          [6],
          [3],
          [5],
          [3],
          [2]]),
  'reward': tensor([[-1.1000],
          [-0.1000],
          [-1.1000],
          [-0.1000],
          [-1.1000],
          [-0.1000],
          [-0.1000],
          [-0.1000],
          [-0.1000],
          [-0.1000],
          [-0.1000],
          [-0.1000],
          [-0.1000],
          [-0.1000],
          [-0.1000],
          [-0.1000],
          [-0.1000],
          [-0

In [4]:
import torch.nn.functional as F
import numpy as np
import rl_utils

wins = []
for i in range(100000):
    data0, data1, win = get_data()
    wins.append(win)

    ppo.train(**data0)
    ppo.train(**data1)

    if i % 5000 == 0:
        wins = wins[-100:]
        print(i, sum(wins) / len(wins))
        wins = []

0 0.0
5000 0.02
10000 0.13
15000 0.23
20000 0.24
25000 0.36
30000 0.28
35000 0.23
40000 0.35
45000 0.31
50000 0.13
55000 0.29
60000 0.26
65000 0.27
70000 0.36
75000 0.33
80000 0.37
85000 0.33
90000 0.43
95000 0.45
