### **奖励设计**


核心文件位于`feature/reward_process.py`中，需要结合特征处理进行设计，否则出现agent无法感知具体奖励的情况。为了更加清晰框架的奖励设计以及其的工作流程，下面首先分析原本框架的具体流程以及其中关键的数据传输。

#### **整体结构**

训练主循环 -> Agent -> GameRewardManager

1. 训练循环拿到环境给的`obervation[i]`，其中包含：
    - `frame_state`：本帧的完整状态
2. Agent调用：
```python
reward = agent.reward_manager.result(observation[i]["frame_state"])
```

3. `result()`返回本步奖励字典(每个子项 + 合计)，供PPO算法记于学习

#### **数据流**

1. **输入数据结构**：输入环境的`frame_states`

2. **输出数据结构**：`result()`的返回值
```python
{
  "forward": float,
  "tower_hp_point": float,
  "hero_hp_point": float,
  "gold_point": float,
  "minion_push_depth": float,
  "kill_event": float,
  "death_event": float,
  "tower_danger": float,
  "dive_no_minion": float,
  "grass_engage": float,
   # 汇总分：按 GameConfig.REWARD_WEIGHT_DICT 做线性加权求和
  "reward_sum": float
}
```


#### **关键类**

`GameRewardManager` \
该类把每一帧的`frame_state`转成本步可以学习奖励字典

**关键成员**

- `m_main_calc_frame_map: dict[str, RewardStruct]` \
当前“己方阵营”视角的各奖励条目“帧值缓存”（含上一帧值与当前帧值）。
- `m_enemy_calc_frame_map: dict[str, RewardStruct]` \
敌方阵营视角的同构缓存
- `ABS_NAMES: set[str]` \
指明哪些条目在`get_reward`时按非零和/非差分使用
- `EVENT_NAMES: set[str]` \
指明哪些条目按当帧敌我差使用
- `TIME_SCALE_ARG: int`
时间衰减超参

`RewardStruct` \
保存每个条目在“本阵营视角”的上一帧值与当前帧值，方便在后序的奖励计算中进行帧间差分

**关键字段**

```python
class RewardStruct:
    last_frame_value: float  # t-1 帧的度量
    cur_frame_value:  float  # t 帧的度量
```

#### **关键函数**

`result(frame_state) -> dict`

入口调用函数，整合调用顺序：
1. `frame_data_process(frame_state)`
- 用于产出/更新`m_main_calc_frame_map`、`m_enemy_calc_frame_map`中每个条目的`last_frame_value/cur_frame_value`

2. `get_reward(m_main_calc_frame_map, m_enemy_calc_frame_map, frame_state)`
- 把帧信息变成每一步奖励：
  - 对`ABS_NAMES`：直接用我方`cur_frame_value`
  - 对`EVENT_NAMES`：用（我方当帧 − 敌方当帧）
  - 对其他条目：用双方差的帧间差分：
  `[(我方_cur - 敌方_cur) - (我方_last - 敌方_last)]`
  - 若启用时间衰减：乘`0.6 ** (frameNo / TIME_SCALE_ARG)`
  - 按权重表`GameConfig.REWARD_WEIGHT_DICT`做线性加权，得到`reward_sum`

3. 返回奖励字典

---

`frame_data_process(frame_state)`

把一帧输入拆成己方视角/敌方视角的帧度量缓存：
1. 识别阵营
  - 从`frame_state.hero_states`找出敌方/己方英雄
2. 刷新两套缓存
  - 调用`set_cur_calc_frame_vec(self.m_main_calc_frame_map, frame_state, main_camp)`
  使得每个条目的`cur_frame_value`和`last_frame_value`计算
  - 同样针对敌方的情况进行一致的流程
3. 完成后两套Map已经更新完成，送入`get_reward`

核心数据：`self.m_main_calc_frame_map`和`self.m_enemy_calc_frame_map`

---

`set_cur_calc_frame_vec(cul_calc_frame_map, frame_data, camp)`

一个阵营的帧值填充器
- 输入：
  - `cul_calc_frame_map: dict[str, RewardStruct]`将会刷新的一套条目表
  - `frame_data: dict`重要的`frame_state`
  - `camp: str`该视角的阵营，注意阵营形式为`PLAYERCAMP_2/PLAYERCAMP_1`

- 流程：
  - 针对每个条目，将旧的`cur_frame_value`转移到`last_frame_value`
  - 计算并写入`cur_frame_value`

- 输出：
  - 最终完成对`self.m_main_calc_frame_map`和`self.m_enemy_calc_frame_map`的更新

---

`get_reward(main_map, enemy_map, frame_state) -> dict`

策略整合器

- 按条目组(`ABS_NAMES、EVENT_NAMES`)选择不同的规则，将帧度量组合成本步最终奖励
  - ABS：直接用`main_map[name].cur_frame_value`
  - 事件：`main_cur - enmey_cur`
  - 普通密集量(零和 + 差分)：`(main_cur - enemy_cur) - (main_last - enemy_last)`
- 可选时间衰减
- 加权汇总
- 返回每个条目本步奖励 + reward_sum字典


In [1]:
# 下面是整体流程的伪代码
class GameRewardManager:
    def __init__(self, config):
        self.m_main_calc_frame_map = {name: RewardStruct() for name in ALL_NAMES}
        self.m_enemy_calc_frame_map = {name: RewardStruct() for name in ALL_NAMES}
        self.ABS_NAMES = {...}
        self.EVENT_NAMES = {...}
        self.TIME_SCALE_ARG = config.TIME_SCALE_ARG
        self.weights = GameConfig.REWARD_WEIGHT_DICT

    def result(self, frame_state) -> dict:
        # 1) 用两套视角把“本帧度量”准备好
        self.frame_data_process(frame_state)

        # 2) 把“帧度量”转换为“本步奖励”，并做加权
        reward_dict = self.get_reward(self.m_main_calc_frame_map,
                                      self.m_enemy_calc_frame_map,
                                      frame_state)
        return reward_dict

    def frame_data_process(self, frame_state):
        main_camp, enemy_camp = identify_camps(frame_state)
        self.set_cur_calc_frame_vec(self.m_main_calc_frame_map, frame_state, main_camp)
        self.set_cur_calc_frame_vec(self.m_enemy_calc_frame_map, frame_state, enemy_camp)

    def set_cur_calc_frame_vec(self, cul_calc_frame_map, frame_state, camp):
        for name, st in cul_calc_frame_map.items():
            st.last_frame_value = st.cur_frame_value
            st.cur_frame_value  = compute_frame_metric_for(name, frame_state, camp)  # 只填“帧值”

    def get_reward(self, main_map, enemy_map, frame_state) -> dict:
        per_name = {}
        for name in ALL_NAMES:
            if name in self.ABS_NAMES:
                value = main_map[name].cur_frame_value
            elif name in self.EVENT_NAMES:
                value = main_map[name].cur_frame_value - enemy_map[name].cur_frame_value
            else:
                value = ((main_map[name].cur_frame_value - enemy_map[name].cur_frame_value)
                        -(main_map[name].last_frame_value - enemy_map[name].last_frame_value))

            if self.TIME_SCALE_ARG > 0:
                t = frame_state.get("frameNo", 0)
                value *= 0.6 ** (t / self.TIME_SCALE_ARG)

            per_name[name] = value

        reward_sum = sum(self.weights.get(n, 0.0) * v for n, v in per_name.items())
        per_name["reward_sum"] = reward_sum
        return per_name


#### **关于阵营信息**

代码中存在部分阵营信息提取不太理想/相对繁琐的问题，下面有一个比较好的阵营信息的提取方式，具体如下：
```python
for hero in frame_data["hero_states"]:
    if hero["player_id"] == self.main_hero_player_id:
        main_camp = hero["actor_state"]["camp"]
        self.main_hero_camp = main_camp
    else:
        enemy_camp = hero["actor_state"]["camp"]
```

注意提取得到的`main_camp`和`enemy_camp`均为`PLAYERCAMP_1/PLAYERCAMP_2`

### **当前设计设计-版本1**


注意这里`1 step = 6 frame`，按照逐帧取值，算法侧每步(6帧)再聚合

计算时共有三种计算方式：
1. 非零和即时值：\
`ABS_NAMES = {"forward", "tower_danger", "dive_no_minion", "grass_engage"}`

2. 零和但非帧间差分：\
`EVENT_NAMES = {"kill_event", "death_event"}`

3. 零和+帧间差分：\
除开上述的奖励

计算方式如下：
```python
# 非零和即时值
if reward_name in ABS_NAMES:
    cur = self.m_main_calc_frame_map[reward_name].cur_frame_value
    reward_struct.value = cur

# 零和非帧间差分
elif reward_name in EVENT_NAMES:
    cur_diff = (
        self.m_main_calc_frame_map[reward_name].cur_frame_value
        - self.m_enemy_calc_frame_map[reward_name].cur_frame_value
    )
    reward_struct.value = cur_diff 

# 零和+帧间差分
else:
    cur_diff = (
        self.m_main_calc_frame_map[reward_name].cur_frame_value
        - self.m_enemy_calc_frame_map[reward_name].cur_frame_value
    )
    last_diff = (
        self.m_main_calc_frame_map[reward_name].last_frame_value
        - self.m_enemy_calc_frame_map[reward_name].last_frame_value
    )
    reward_struct.value = cur_diff - last_diff
```


#### **`forward`前进奖励**

`forward`前进奖励为非零和的连续值，只针对当前帧的情况计算即可

1. 取出三点坐标：`main_tower_pos`，`enemy_tower_pos`，`hero_pos`

2. 计算几个距离`dist_hero2emy = dist(hero, enemy_tower)`，`dist_main2emy = dist(main_tower, enemy_tower)`

3. 奖励为比值`base`，并且远离时不给奖励(同时这里约束了一个下届$base \ge 0$)

4. 最后将`baes`乘上归一化的血量得到最终的前进奖励

具体计算函数：
```python
def calculate_forward(self, main_hero, main_tower, enemy_tower):
    main_tower_pos = (main_tower["location"]["x"], main_tower["location"]["z"])
    enemy_tower_pos = (enemy_tower["location"]["x"], enemy_tower["location"]["z"])
    hero_pos = (
        main_hero["actor_state"]["location"]["x"],
        main_hero["actor_state"]["location"]["z"],
    )
    forward_value = 0
    dist_hero2emy = math.dist(hero_pos, enemy_tower_pos)
    dist_main2emy = max(math.dist(main_tower_pos, enemy_tower_pos), 1e-6)
    base = (dist_main2emy - dist_hero2emy) / dist_main2emy
    base = max(0.0, base)  # 远离敌塔不奖励
    hp = float(main_hero["actor_state"]["hp"])
    mx = max(float(main_hero["actor_state"]["max_hp"]), 1.0)
    hp_scale = hp / mx
    return base * hp_scale  # 或者直接 return base
```

#### **`tower_hp_point`塔血比例**

塔血比例需要同时计算己方和敌方的血量，所以为零和项


使用辅助函数得到塔血量比例，
```python
def _hp_ratio(u):
    if not isinstance(u, dict):
        return 0.0
    hp = float(u.get("hp", 0.0))
    mx = float(u.get("max_hp", 0.0))
    return hp / mx if mx > 0 else 0.0
# 可以直接计算
reward_struct.cur_frame_value = 1.0 * main_tower["hp"] / main_tower["max_hp"]
```

最终归入其他类，使用零和+差分

#### **`hero_hp_point`英雄血比例**

计算方式与`tower_hp_ration`非常相似，具体如下
```python
hero_hp_ratio = _hp_ratio((main_hero or {}).get("actor_state") or {})
```

注意这个涉及到敌我双方的血量，所以应当为零和+帧间差分，同上。


#### **`gold_point`金币**

此处个人认为存在一定的问题，具体代码如下：
```python
gold_point = 0.0
if main_hero:
    gold_point = float(main_hero.get("money") or 0.0)
```
暂时不确定是经济总量还是增长量，此处注意仍然为零和+差分，具体需要进一步探讨

#### **`minion_push_depth`推线奖励**

首先取出存活的敌我小兵，然后分别计算到防御塔的具体，最后做差并且归一化处理
```python
# 取出存活小兵
A = [n for n in npc_list if n.get("sub_type") == "ACTOR_SUB_SOLDIER" and n.get("camp") == camp and n.get("hp", 0) > 0]
E = [n for n in npc_list if n.get("sub_type") == "ACTOR_SUB_SOLDIER" and n.get("camp") == enemy_camp and n.get("hp", 0) > 0]

# 计算距离函数    
def _front_to_tower(lst, tower):
    if not lst or not tower:
        return self.RANGE_NORM
    tx, tz = _pos(tower)
    best = self.RANGE_NORM
    for u in lst:
        ux, uz = _pos(u)
        d = math.hypot(ux - tx, uz - tz)
        if d < best:
            best = d
    return min(best, self.RANGE_NORM)

# 计算推线奖励
a_front = _front_to_tower(A, enemy_tower)
e_front = _front_to_tower(E, main_tower)
push_depth = (e_front - a_front) / self.RANGE_NORM  # [-1,1]
```

#### **`kill_event/death_event`**

注意该奖励为零和，但是不是帧间差分，防止下一帧出现抵消的情况，具体计算为统计当前帧我方击杀&我方死亡，这个奖励的设计可能欠考虑，导致无法正确起作用，需要更新
```python
kill_event, death_event = 0.0, 0.0
acts = frame_data.get("frame_action", []) or []
if isinstance(acts, list):
    for a in acts:
        if not isinstance(a, dict):
            continue
        da = a.get("dead_action") or {}
        if not isinstance(da, dict):
            continue
        death = (da.get("death") or {}).get("camp", None)
        killer = (da.get("killer") or {}).get("camp", None)
        if killer == camp:
            kill_event += 10.0
        if death == camp:
            death_event += 10.0
```


#### **`tower_danger`塔范围风险**

该奖励为非零和即时值，直接取值即可，核心原理为当我方英雄进入敌方塔攻击半径时，或被塔锁定，直接给一个负值(可以细节化，因为存在越塔的策略的，还有这段是纯ai生成的，我找不到那些数据结构，可以进行二次检查修正)
```python
tower_danger = 0.0
dive_no_minion = 0.0
if main_hero and enemy_tower:
    me = (main_hero.get("actor_state") or {})
    mx, mz = _pos(me)
    ex, ez = _pos(enemy_tower)
    atk_r = float(enemy_tower.get("attack_range", 0.0))
    in_range = 1.0 if (atk_r > 0 and math.hypot(mx - ex, mz - ez) <= atk_r) else 0.0
    target_me = 1.0 if str(enemy_tower.get("attack_target", "")) == str(me.get("runtime_id", "")) else 0.0
    tower_danger = 1.0 if (in_range or target_me) else 0.0
```

#### **`dive_no_minion`无兵线越塔**

同样的，当上述的`in_range=1.0`时，证明我方英雄处于防御塔内，同时当`near_cnt=0`即防御塔内没有小兵时，给一个负奖励。上述二者均属于`ABS_NAMES`非零和即时值


#### **`grass_engage`草丛埋伏**

鼓励智能体进入草丛，同时设置我方在草丛敌方不在草丛的情况，具体代码如下(有一个问题：具体的距离如何设定，以及整个地图的max_length为多少，同时该部分可以更加细化)

```python
grass_engage = 0.0
if main_hero and enemy_hero:
    a_in = 0.1 if (main_hero.get("isInGrass") or (main_hero.get("actor_state") or {}).get("isInGrass")) else 0
    e_in = 0.1 if (enemy_hero.get("isInGrass") or (enemy_hero.get("actor_state") or {}).get("isInGrass")) else 0
    if a_in and not e_in:
        ax, az = _pos((main_hero.get("actor_state") or {}))
        ex, ez = _pos((enemy_hero.get("actor_state") or {}))
        if math.hypot(ax - ex, az - ez) <= 5000.0:
            grass_engage = 0.1
```


### **一些问题**

1. 地图的距离，最长距离(用于归一化)

2. 部分数据结构需要自己尝试

3. 1V1需要综合考虑很多策略，可以分模块设计