Реализация алгоритма Policy Iteration.

Отчет по лабораторной работе должен содержать:

титульный лист;
описание задания;
текст программы;
экранные формы с примерами выполнения программы.

На основе рассмотренного на лекции примера реализуйте алгоритм Policy Iteration для любой среды обучения с подкреплением (кроме рассмотренной на лекции среды Toy Text / Frozen Lake) из библиотеки Gym (или аналогичной библиотеки).

Код на языке Python для алгоритма итерации политики с использованием библиотеки Gym для среды CartPole-v1

https://hrl.boyuai.com/chapter/1/%E5%8A%A8%E6%80%81%E8%A7%84%E5%88%92%E7%AE%97%E6%B3%95/#43-%E7%AD%96%E7%95%A5%E8%BF%AD%E4%BB%A3%E7%AE%97%E6%B3%95

https://blog.csdn.net/chenxy_bwave/article/details/129349086

https://github.com/ugapanyuk/courses_current/blob/main/code/rl/examples/policy_iteration.py

# 基于策略迭代求CliffWaking-v0最优解(python实现)

CliffWalking-v0游戏的环境设定类似于GridWorld，所以这里采用了类似于GridWorld的状态表示方法。环境类对象创建时，用一个二维数组表示网格环境中各cell的类型，“1”表示Terminate cell；“-1”表示Cliff cells；“0”表示其它cells。如下所示：

In [None]:
from enum import Enum
import numpy as np
import random

class State():

    def __init__(self, row=-1, column=-1):
        self.row = row
        self.column = column

    def __repr__(self):
        return "<State: [{}, {}]>".format(self.row, self.column)

    def clone(self):
        return State(self.row, self.column)

    def __hash__(self):
        return hash((self.row, self.column))

    def __eq__(self, other):
        return self.row == other.row and self.column == other.column


class Action(Enum):
    # Opposite numeric values are assigned to opposite direction.
    # This is for the convenience of implementation in transit_func().
    UP    = 1
    DOWN  = -1
    LEFT  = 2
    RIGHT = -2

class Environment():

    def __init__(self, grid, move_prob=1.0):
        # grid is 2d-array. Its values are treated as an attribute.
        # Kinds of attribute is following.
        # grid[i][j] = 1: Terminate cell (game end)
        # grid[i][j] =-1: Cliff cells
        # grid[i][j] = 0: Other cells

        self.grid = grid
        self.agent_state = State()

        # Default reward is minus. Just like a poison swamp.
        # It means the agent has to reach the goal fast!
        self.default_reward = -1

        # Agent can move to a selected direction in move_prob.
        # It means the agent will move different direction
        # in (1 - move_prob).
        # move_prob = 1.0 means agent always move in the selected direction.
        self.move_prob = move_prob
        self.reset()

    @property
    def row_length(self):
        return len(self.grid)

    @property
    def column_length(self):
        return len(self.grid[0])

    @property
    def actions(self):
        return [Action.UP, Action.DOWN,
                Action.LEFT, Action.RIGHT]

    @property
    def states(self):
        '''
        valid states.
        In CliffWalking-v0 games, cliff cells are not valid cells (unreachable).
        '''
        states = []
        for row in range(self.row_length):
            for column in range(self.column_length):
                # Cliff cells are not included to the valid state list.
                if not(row == (self.row_length - 1) and 0 < column < (self.column_length - 1)):
                    states.append(State(row, column))
        return states

    def transit_func(self, state, action):
        """
        Prob(s',r|s,a) stored in one dict[(s',reward)].
        """
        transition_probs = {}
        if not self.can_action_at(state):
            # Already on the terminal cell.
            return transition_probs

        opposite_direction = Action(action.value * -1)

        for a in self.actions:
            prob = 0
            if a == action:
                prob = self.move_prob
            elif a != opposite_direction:
                prob = (1 - self.move_prob) / 2

            next_state = self._move(state, a)
            if next_state.row == (self.row_length - 1) and 0 < next_state.column < (self.column_length - 1):
                reward = -100
                next_state = State(self.row_length - 1, 0) # Return to start grid when falls into cliff grid.
            else:
                reward = -1
            
            if (next_state,reward) not in transition_probs:
                transition_probs[(next_state,reward)] = prob
            else:
                transition_probs[(next_state,reward)] += prob

        return transition_probs

    def can_action_at(self, state):
        '''
        Assuming:
            grid[i][j] = 1: Terminate grid
            grid[i][j] =-1: Cliff grids
            grid[i][j] = 0: Other grids
        '''
        if self.grid[state.row][state.column] == 0:
            return True
        else:
            return False

    def _move(self, state, action):
        """
        Predict the next state upon the combination of {state, action}
        {state, action} --> next_state
        Called in transit_func()
        """
        if not self.can_action_at(state):
            raise Exception("Can't move from here!")

        next_state = state.clone()

        # Execute an action (move).
        if action == Action.UP:
            next_state.row -= 1
        elif action == Action.DOWN:
            next_state.row += 1
        elif action == Action.LEFT:
            next_state.column -= 1
        elif action == Action.RIGHT:
            next_state.column += 1

        # Check whether a state is out of the grid.
        if not (0 <= next_state.row < self.row_length):
            next_state = state
        if not (0 <= next_state.column < self.column_length):
            next_state = state

        # Entering into cliff grids is related to the correspong penalty and 
        # reset to start grid, hence will be handled upper layer.

        return next_state

    def reset(self):
        # Locate the agent at lower left corner.
        self.agent_state = State(self.row_length - 1, 0)
        return self.agent_state


class Planner():

    def __init__(self, env):
        self.env     = env
        self.log     = []
        self.V_grid  = []
        self.iters   = 0

    def initialize(self):
        self.env.reset()
        self.log = []

    def plan(self, gamma=0.9, threshold=0.0001):
        raise Exception("Planner have to implements plan method.")

    def transitions_at(self, state, action):
        '''
        Maybe moved to Environment in the future.
        '''
        transition_probs = self.env.transit_func(state, action)
        for (next_state,reward) in transition_probs:
            prob = transition_probs[(next_state,reward)]
            # reward, _ = self.env.reward_func(next_state)
            yield prob, next_state, reward

    def dict_to_grid(self, state_reward_dict):
        """
        Convert dict to 2-D array specific to grid-world-like game, for the convenience of 
        print_value_grid(), etc.
        Using numpy array maybe better.
        """
        grid = []
        for i in range(self.env.row_length):
            row = [0] * self.env.column_length
            grid.append(row)
        for s in state_reward_dict:
            grid[s.row][s.column] = state_reward_dict[s]
    
        return grid
    
    def print_value_grid(self):
        for i in range(len(self.V_grid)):
            for j in range(len(self.V_grid[0])):
                print('{0:6.3f}'.format(self.V_grid[i][j]), end=' ' )
            print('')


class PolicyIterationPlanner(Planner):

    def __init__(self, env):
        super().__init__(env)
        self.policy = {}

    def initialize(self):
        super().initialize()
        self.policy = {}
        actions = self.env.actions
        states = self.env.states
        for s in states:
            self.policy[s] = {}
            for a in actions:
                # Initialize policy.
                # At first, each action is taken uniformly. 
                # Any other random initialization should be also OK, for example, gaussian distribution
                self.policy[s][a] = 1 / len(actions)                                    

    def policy_evaluation(self, gamma, threshold):
        V = {}
        for s in self.env.states:
            # Initialize each state's expected reward.
            V[s] = 0

        while True:
            delta = 0
            for s in V:
                expected_rewards = []
                for a in self.policy[s]:
                    action_prob = self.policy[s][a]
                    r = 0
                    for prob, next_state, reward in self.transitions_at(s, a):
                        r += action_prob * prob * \
                             (reward + gamma * V[next_state])
                    expected_rewards.append(r)
                value = sum(expected_rewards)
                delta = max(delta, abs(value - V[s]))
                V[s] = value
            if delta < threshold:
                break

        return V

    def plan(self, gamma=0.9, threshold=0.0001):
        """
        Implement the policy iteration algorithm
        gamma    : discount factor
        threshold: delta for policy evaluation convergency judge.
        """
        self.initialize()
        states  = self.env.states
        actions = self.env.actions

        def take_max_action(action_value_dict):
            return max(action_value_dict, key=action_value_dict.get)

        while True:
            update_stable = True
            # Estimate expected rewards under current policy.
            V = self.policy_evaluation(gamma, threshold)
            self.log.append(self.dict_to_grid(V))

            for s in states:
                # Get an action following to the current policy.
                policy_action = take_max_action(self.policy[s])

                # Compare with other actions.
                action_rewards = {}
                for a in actions:
                    r = 0
                    for prob, next_state, reward in self.transitions_at(s, a):
                        r += prob * (reward + gamma * V[next_state])
                    action_rewards[a] = r
                best_action = take_max_action(action_rewards)
                if policy_action != best_action:
                    update_stable = False

                # Update policy (set best_action prob=1, otherwise=0 (greedy))
                for a in self.policy[s]:
                    prob = 1 if a == best_action else 0
                    self.policy[s][a] = prob

            # Turn dictionary to grid
            self.V_grid = self.dict_to_grid(V)
            self.iters = self.iters + 1
            print('PolicyIteration: iters = {0}'.format(self.iters))
            self.print_value_grid()
            print('******************************')

            if update_stable:
                # If policy isn't updated, stop iteration
                break

    def print_policy(self):
        print('PolicyIteration: policy = ')
        actions = self.env.actions
        states  = self.env.states
        for s in states:
            print('\tstate = {}'.format(s))
            for a in actions:
                print('\t\taction = {0}, prob = {1}'.format(a,self.policy[s][a]))

        # Optimal actions
        action_array = []
        for i in range(self.env.row_length):
            row = [0] * self.env.column_length
            action_array.append(row)
        for s in states:
            max_prob = -1                  
            for a in actions:
                if self.policy[s][a] > max_prob:
                    max_prob = self.policy[s][a]
                    opt_action = a
            action_array[s.row][s.column] = opt_action.value
        
        print('PolicyIteration: optimal policy = ')
        for i in range(self.env.row_length):
            print("========================")
            for j in range(self.env.column_length):
                if action_array[i][j] == Action.UP.value:
                    print('  UP   ', end='')
                elif action_array[i][j] == Action.DOWN.value:
                    print(' DOWN  ', end='')
                elif action_array[i][j] == Action.LEFT.value:
                    print(' LEFT  ', end='')
                elif action_array[i][j] == Action.RIGHT.value:
                    print(' RIGHT ', end='')
                else:
                    print('   X   ', end='')
            print('')
                
if __name__ == "__main__":

    # Create grid environment
    grid = [
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
    ]
    grid[3][11] = 1 # Terminate cell
    for k in range(1,11):
        grid[3][11] = -1  # Cliff cells
        
    # # A smaller grid environment, only for the convenience of debug.
    # grid = [
    #     [0,  0, 0],
    #     [0,  0, 0],
    #     [0, -1, 1]
    # ]    
    
    env2 = Environment(grid)
    policyIterPlanner = PolicyIterationPlanner(env2)
    policyIterPlanner.plan(0.9,0.001)
    policyIterPlanner.print_value_grid()    
    policyIterPlanner.print_policy()    

PolicyIteration: iters = 1
-53.260 -56.451 -59.258 -60.959 -61.784 -62.002 -61.690 -60.678 -58.491 -54.352 -47.647 -40.062 
-69.298 -77.481 -82.260 -84.486 -85.408 -85.646 -85.366 -84.377 -81.997 -76.633 -65.260 -45.840 
-103.511 -131.908 -139.933 -142.422 -143.233 -143.428 -143.247 -142.525 -140.488 -134.536 -115.483 -48.126 
-150.891  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000 
******************************
PolicyIteration: iters = 2
-9.991 -9.992 -9.993 -9.994 -9.994 -9.991 -9.991 -9.991 -9.991 -9.991 -9.991 -9.991 
-9.992 -9.993 -9.994 -9.994 -9.995 -9.992 -9.992 -9.992 -9.992 -9.992 -9.992 -9.992 
-9.993 -9.994 -9.994 -9.995 -9.995 -9.993 -9.993 -9.993 -9.993 -9.993 -1.900 -1.000 
-9.994  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000 
******************************
PolicyIteration: iters = 3
-9.991 -9.992 -9.993 -9.994 -9.991 -9.991 -9.991 -9.991 -9.991 -9.991 -9.991 -9.991 
-9.992 -9.993 -9.994 -9.994 -9.992 -9.992 -

# 书上的

In [4]:
import copy

class CliffWalkingEnv:
    """ 悬崖漫步环境"""
    def __init__(self, ncol=12, nrow=4):
        self.ncol = ncol  # 定义网格世界的列
        self.nrow = nrow  # 定义网格世界的行
        # 转移矩阵P[state][action] = [(p, next_state, reward, done)]包含下一个状态和奖励
        self.P = self.createP()

    def createP(self):
        # 初始化
        P = [[[] for j in range(4)] for i in range(self.nrow * self.ncol)]
        # 4种动作, change[0]:上,change[1]:下, change[2]:左, change[3]:右。坐标系原点(0,0)
        # 定义在左上角
        change = [[0, -1], [0, 1], [-1, 0], [1, 0]]
        for i in range(self.nrow):
            for j in range(self.ncol):
                for a in range(4):
                    # 位置在悬崖或者目标状态,因为无法继续交互,任何动作奖励都为0
                    if i == self.nrow - 1 and j > 0:
                        P[i * self.ncol + j][a] = [(1, i * self.ncol + j, 0,
                                                    True)]
                        continue
                    # 其他位置
                    next_x = min(self.ncol - 1, max(0, j + change[a][0]))
                    next_y = min(self.nrow - 1, max(0, i + change[a][1]))
                    next_state = next_y * self.ncol + next_x
                    reward = -1
                    done = False
                    # 下一个位置在悬崖或者终点
                    if next_y == self.nrow - 1 and next_x > 0:
                        done = True
                        if next_x != self.ncol - 1:  # 下一个位置在悬崖
                            reward = -100
                    P[i * self.ncol + j][a] = [(1, next_state, reward, done)]
        return P

In [5]:
class PolicyIteration:
    """ 策略迭代算法 """
    def __init__(self, env, theta, gamma):
        self.env = env
        self.v = [0] * self.env.ncol * self.env.nrow  # 初始化价值为0
        self.pi = [[0.25, 0.25, 0.25, 0.25]
                   for i in range(self.env.ncol * self.env.nrow)]  # 初始化为均匀随机策略
        self.theta = theta  # 策略评估收敛阈值
        self.gamma = gamma  # 折扣因子

    def policy_evaluation(self):  # 策略评估
        cnt = 1  # 计数器
        while 1:
            max_diff = 0
            new_v = [0] * self.env.ncol * self.env.nrow
            for s in range(self.env.ncol * self.env.nrow):
                qsa_list = []  # 开始计算状态s下的所有Q(s,a)价值
                for a in range(4):
                    qsa = 0
                    for res in self.env.P[s][a]:
                        p, next_state, r, done = res
                        qsa += p * (r + self.gamma * self.v[next_state] * (1 - done))
                        # 本章环境比较特殊,奖励和下一个状态有关,所以需要和状态转移概率相乘
                    qsa_list.append(self.pi[s][a] * qsa)
                new_v[s] = sum(qsa_list)  # 状态价值函数和动作价值函数之间的关系
                max_diff = max(max_diff, abs(new_v[s] - self.v[s]))
            self.v = new_v
            if max_diff < self.theta: break  # 满足收敛条件,退出评估迭代
            cnt += 1
        print("Strategy evaluation completed after %d round" % cnt)

    def policy_improvement(self):  # 策略提升
        for s in range(self.env.nrow * self.env.ncol):
            qsa_list = []
            for a in range(4):
                qsa = 0
                for res in self.env.P[s][a]:
                    p, next_state, r, done = res
                    qsa += p * (r + self.gamma * self.v[next_state] * (1 - done))
                qsa_list.append(qsa)
            maxq = max(qsa_list)
            cntq = qsa_list.count(maxq)  # 计算有几个动作得到了最大的Q值
            # 让这些动作均分概率
            self.pi[s] = [1 / cntq if q == maxq else 0 for q in qsa_list]
        print("Strategy enhancement completed")
        return self.pi

    def policy_iteration(self):  # 策略迭代
        while 1:
            self.policy_evaluation()
            old_pi = copy.deepcopy(self.pi)  # 将列表进行深拷贝,方便接下来进行比较
            new_pi = self.policy_improvement()
            if old_pi == new_pi: break

In [6]:
def print_agent(agent, action_meaning, disaster=[], end=[]):
    print("Status Value：")
    for i in range(agent.env.nrow):
        for j in range(agent.env.ncol):
            # 为了输出美观,保持输出6个字符
            print('%6.6s' % ('%.3f' % agent.v[i * agent.env.ncol + j]), end=' ')
        print()

    print("Strategies：")
    for i in range(agent.env.nrow):
        for j in range(agent.env.ncol):
            # 一些特殊的状态,例如悬崖漫步中的悬崖
            if (i * agent.env.ncol + j) in disaster:
                print('****', end=' ')
            elif (i * agent.env.ncol + j) in end:  # 目标状态
                print('EEEE', end=' ')
            else:
                a = agent.pi[i * agent.env.ncol + j]
                pi_str = ''
                for k in range(len(action_meaning)):
                    pi_str += action_meaning[k] if a[k] > 0 else 'o'
                print(pi_str, end=' ')
        print()


env = CliffWalkingEnv()
action_meaning = ['^', 'v', '<', '>']
theta = 0.001
gamma = 0.9
agent = PolicyIteration(env, theta, gamma)
agent.policy_iteration()
print_agent(agent, action_meaning, list(range(37, 47)), [47])

Strategy evaluation completed after 60 round
Strategy enhancement completed
Strategy evaluation completed after 72 round
Strategy enhancement completed
Strategy evaluation completed after 44 round
Strategy enhancement completed
Strategy evaluation completed after 12 round
Strategy enhancement completed
Strategy evaluation completed after 1 round
Strategy enhancement completed
Status Value：
-7.712 -7.458 -7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 
-7.458 -7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 -1.900 
-7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 -1.900 -1.000 
-7.458  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000  0.000 
Strategies：
ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovoo 
ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovoo 
ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ovoo 
^ooo **** **** **** **** **** **** **** **** **** **** EEEE 
