In [1]:
import torch
import random

random.seed(0)
torch.manual_seed(0)

#创建一个游戏环境
class Env:
    def reset(self):
        #前两个数是起点,后两个数是终点
        self.state = torch.zeros(4)
        self.state[2] = random.uniform(3.5, 4.5)
        self.state[3] = random.uniform(3.5, 4.5)

        self.count = 0

        return self.state.tolist()

    def step(self, action):
        action = torch.FloatTensor(action).reshape(2)

        #裁剪动作范围
        action = torch.clamp(action, min=-1, max=1)

        #执行动作
        self.state[:2] += action

        #规范状态空间
        self.state[:2] = torch.clamp(self.state[:2], min=0, max=5)

        self.count += 1

        #求二范数,求两向量相减之后的向量的模长
        #两向量相减的几何意义是两个向量的尾部相连,再连接两个头部形成的新向量
        #mod = ((self.state - self.goal)**2).sum()**0.5
        mod = (self.state[:2] - self.state[2:]).norm(p=2).item()

        reward = -1.0
        over = False
        if mod <= 0.15:
            reward = 0.0
            over = True

        if self.count >= 50:
            over = True

        return self.state.tolist(), reward, over


env = Env()

print(env.reset())

env.step([0.1, 0.2])

[0.0, 0.0, 4.344421863555908, 4.2579545974731445]


([0.10000000149011612,
  0.20000000298023224,
  4.344421863555908,
  4.2579545974731445],
 -1.0,
 False)

In [2]:
class DDPG:
    def __init__(self):
        self.model_action = torch.nn.Sequential(
            torch.nn.Linear(4, 128),
            torch.nn.ReLU(),
            torch.nn.Linear(128, 128),
            torch.nn.ReLU(),
            torch.nn.Linear(128, 2),
            torch.nn.Tanh(),
        )
        self.model_value = torch.nn.Sequential(
            torch.nn.Linear(6, 128),
            torch.nn.ReLU(),
            torch.nn.Linear(128, 128),
            torch.nn.ReLU(),
            torch.nn.Linear(128, 1),
        )

        self.model_action_next = torch.nn.Sequential(
            torch.nn.Linear(4, 128),
            torch.nn.ReLU(),
            torch.nn.Linear(128, 128),
            torch.nn.ReLU(),
            torch.nn.Linear(128, 2),
            torch.nn.Tanh(),
        )
        self.model_value_next = torch.nn.Sequential(
            torch.nn.Linear(6, 128),
            torch.nn.ReLU(),
            torch.nn.Linear(128, 128),
            torch.nn.ReLU(),
            torch.nn.Linear(128, 1),
        )

        self.model_action_next.load_state_dict(self.model_action.state_dict())
        self.model_value_next.load_state_dict(self.model_value.state_dict())

        self.optimizer_action = torch.optim.Adam(
            self.model_action.parameters(), lr=1e-3)
        self.optimizer_value = torch.optim.Adam(self.model_value.parameters(),
                                                lr=1e-3)

        self.mse_loss = torch.nn.MSELoss()

    def get_action(self, state):
        state = torch.FloatTensor(state).reshape(1, 4)

        #[1, 4] -> [1, 2]
        action = self.model_action(state)

        #[1, 2] -> [2]
        action = action.reshape(2)

        #添加噪声,增加探索
        action += 0.1 * torch.randn(2)
        return action.tolist()

    def _soft_update(self, model, model_next):
        for param, param_next in zip(model.parameters(),
                                     model_next.parameters()):
            #以一个小的比例更新
            value = param_next.data * 0.995 + param.data * 0.005
            param_next.data.copy_(value)

    def _get_target(self, next_state, reward, over):
        #[b, 4] -> [b, 2]
        action = self.model_action_next(next_state)

        #[b, 4+2] -> [b, 6]
        input = torch.cat([next_state, action], dim=1)

        #[b, 6] -> [b, 1]
        target = self.model_value_next(input)

        #[b, 1]
        target *= 0.98
        target *= (1 - over)
        target += reward

        return target

    def _get_value(self, state, action):
        #[b, 4+2] -> [b, 6]
        input = torch.cat([state, action], dim=1)

        #[b, 6] -> [b, 1]
        value = self.model_value(input)

        return value

    def _get_loss_action(self, state):
        #[b, 4] -> [b, 2]
        action = self.model_action(state)

        #[b, 4+2] -> [b, 6]
        input = torch.cat([state, action], dim=1)

        #[b, 6] -> [b, 1]
        loss_action = self.model_value(input)
        loss_action = -loss_action.mean()

        return loss_action

    def train(self, state, action, reward, next_state, over):
        #state -> [b, 4]
        #action -> [b, 2]
        #reward -> [b, 1]
        #next_state -> [b, 4]
        #over -> [b, 1]

        #[b, 1]
        target = self._get_target(next_state, reward, over)

        #[b, 1]
        value = self._get_value(state, action)

        loss_value = self.mse_loss(value, target)
        self.optimizer_value.zero_grad()
        loss_value.backward()
        self.optimizer_value.step()

        loss_action = self._get_loss_action(state)
        self.optimizer_action.zero_grad()
        loss_action.backward()
        self.optimizer_action.step()

        self._soft_update(self.model_action, self.model_action_next)
        self._soft_update(self.model_value, self.model_value_next)


ddpg = DDPG()

ddpg.train(
    torch.randn(200, 4),
    torch.randn(200, 2),
    torch.randn(200, 1),
    torch.randn(200, 4),
    torch.zeros(200, 1).long(),
)

ddpg.get_action([1, 2, 3, 4])

[-0.3392077088356018, 0.055921927094459534]

In [3]:
class Data():
    def __init__(self):
        self.datas = []

    def __len__(self):
        return len(self.datas)

    def update(self):
        state = env.reset()
        over = False

        data = {
            'state': [],
            'action': [],
            'reward': [],
            'next_state': [],
            'over': [],
        }

        while not over:
            action = ddpg.get_action(state)
            next_state, reward, over = env.step(action)

            data['state'].append(state)
            data['action'].append(action)
            data['reward'].append(reward)
            data['next_state'].append(next_state)
            data['over'].append(over)

            state = next_state

        self.datas.append(data)

    def get_sample(self):
        #采样结果
        sample = {
            'state': [],
            'action': [],
            'reward': [],
            'next_state': [],
            'over': [],
        }

        #采样N个数据
        for _ in range(256):

            #随机一局游戏
            data = random.sample(self.datas, 1)[0]

            #随机游戏中的一步,这里排除了最后一步
            step = random.randint(0, len(data['action']) - 2)

            #提取数据
            state = data['state'][step]
            next_state = data['next_state'][step]
            action = data['action'][step]
            reward = data['reward'][step]
            over = data['over'][step]

            #设置fake goal
            if random.random() <= 0.8:

                #随机选择step后面的某一步
                step = random.randint(step + 1, len(data['action']) - 1)

                #以后面某个步的状态为一个伪的终点,也就是希望先走到这里再说
                fake_goal = data['state'][step][:2]

                #求二范数
                mod = [
                    next_state[0] - fake_goal[0], next_state[1] - fake_goal[1]
                ]
                mod = torch.FloatTensor(mod).norm(p=2).item()

                #再自己重新计算reward和over
                reward = -1.0
                over = False
                if mod <= 0.15:
                    reward = 0.0
                    over = True

                #以伪终点构建新的state
                state[2] = fake_goal[0]
                state[3] = fake_goal[1]
                next_state[2] = fake_goal[0]
                next_state[3] = fake_goal[1]

            sample['state'].append(state)
            sample['action'].append(action)
            sample['reward'].append(reward)
            sample['next_state'].append(next_state)
            sample['over'].append(over)

        sample['state'] = torch.FloatTensor(sample['state']).reshape(-1, 4)
        sample['action'] = torch.FloatTensor(sample['action']).reshape(-1, 2)
        sample['reward'] = torch.FloatTensor(sample['reward']).reshape(-1, 1)
        sample['next_state'] = torch.FloatTensor(sample['next_state']).reshape(
            -1, 4)
        sample['over'] = torch.LongTensor(sample['over']).reshape(-1, 1)

        return sample

    #计算最后10个数据reward_sum的均值
    def get_last_reward_mean(self):
        reward_sum = []
        for data in self.datas[-10:]:
            reward_sum.append(sum(data['reward']))
        return sum(reward_sum) / len(reward_sum)


data = Data()

#初始化数据
for _ in range(200):
    data.update()

data.get_sample(), data.get_last_reward_mean()

({'state': tensor([[0.0000, 0.0000, 0.0000, 0.0000],
          [0.0000, 0.0000, 0.0000, 0.0000],
          [0.0000, 0.0000, 0.0000, 0.0325],
          ...,
          [0.0000, 0.0000, 0.0000, 0.0000],
          [0.0000, 0.0000, 0.0000, 0.0000],
          [0.0000, 0.0000, 3.9886, 4.0849]]),
  'action': tensor([[-0.2635, -0.0956],
          [-0.3706, -0.1727],
          [-0.4140, -0.1666],
          [-0.4562, -0.1237],
          [-0.4448, -0.3142],
          [-0.3906, -0.1910],
          [-0.4076, -0.1296],
          [-0.2947, -0.1786],
          [-0.3101, -0.2688],
          [-0.3959, -0.1792],
          [-0.3529,  0.0933],
          [-0.5559, -0.0750],
          [-0.2379, -0.1012],
          [-0.2405, -0.1720],
          [-0.2626, -0.1283],
          [-0.3308, -0.2549],
          [-0.5064, -0.1311],
          [-0.3473, -0.1086],
          [-0.1949, -0.1427],
          [-0.3029, -0.0756],
          [-0.3205, -0.0882],
          [-0.3693,  0.0324],
          [-0.3083, -0.2556],
          

In [4]:
for i in range(1800):
    data.update()

    for _ in range(20):
        ddpg.train(**data.get_sample())

    if i % 100 == 0:
        print(i, len(data), data.get_last_reward_mean())

0 201 -50.0
100 301 -45.3
200 401 -50.0
300 501 -50.0
400 601 -23.1
500 701 -4.6
600 801 -4.2
700 901 -5.6
800 1001 -4.3
900 1101 -3.9
1000 1201 -4.3
1100 1301 -4.5
1200 1401 -4.2
1300 1501 -4.3
1400 1601 -4.1
1500 1701 -4.4
1600 1801 -4.1
1700 1901 -4.8
