In [1]:
import sys
sys.path.append('scripts/')
from puddle_world import *
import itertools
import collections
from copy import copy
import cv2
import seaborn as sns

In [2]:
class DynamicProgramming:
    def __init__(self, map_image, widths, goal, time_interval, sampling_num):
        # マップ画像のy軸を反転
        self.map_image = map_image.T[:, ::-1]
        # ピクセル数
        x_pixel, y_pixel = map_image.shape
        self.index_nums = np.array([x_pixel, y_pixel])
        self.indexes = list(itertools.product(range(x_pixel), range(y_pixel)))
        
        self.pose_min = np.array([0, 0])
        self.pose_max = np.array([x_pixel*widths[0], y_pixel*widths[1]])
        
        self.widths = widths
        self.goal = goal
        
        self.actions = self.generate_actions(0.1, 36)
        
        self.value_function, self.final_state_flags = self.init_value_function()
        self.policy = self.init_policy()
        
        self.state_transition_probs = self.init_state_transition_probs(time_interval, sampling_num)
        
        self.time_interval = time_interval
        
    def value_iteration_sweep(self): #追加
        max_delta = 0.0
        for index in self.indexes:
            if not self.final_state_flags[index]:
                max_q = 1e100
                max_a = None
                qs = [self.action_value(a, index) for a in self.actions] #全行動の行動価値を計算
                max_q = max(qs)                               #最大の行動価値
                max_a = self.actions[np.argmax(qs)]

                delta = abs(self.value_function[index] - max_q)            #変化量
                max_delta = delta if delta > max_delta else max_delta #スイープ中で最大の変化量の更新

                self.value_function[index] = max_q      #価値の更新
                self.policy[index] = np.array(max_a).T  #方策の更新
            
        return max_delta
        
    def action_value(self, action, index): #はみ出しペナルティー追加
        value = 0.0
        for delta, prob in self.state_transition_probs[(action)]: 
            after, edge_reward = self.edge_correction(np.array(index).T + delta)
            after = tuple(after)
            reward = - self.time_interval - ~self.map_image[after[0], after[1]]/25.5 + edge_reward
            value += (self.value_function[after] + reward) * prob
            
        return value
            
    def edge_correction(self, index): #変更
        edge_reward = 0.0
        
        for i in range(2):
            if index[i] < 0:
                index[i] = 0
                edge_reward = -1e100
            elif index[i] >= self.index_nums[i]:
                index[i] = self.index_nums[i]-1
                edge_reward = -1e100
                
        return index, edge_reward
        
    def init_state_transition_probs(self, time_interval, sampling_num):
        ###セルの中の座標を均等にsampling_num**2点サンプリング###
        dx = np.linspace(0.001, self.widths[0]*0.999, sampling_num) #隣のセルにはみ出さないように端を避ける
        dy = np.linspace(0.001, self.widths[1]*0.999, sampling_num)
        # 全分割状態の標本
        samples = list(itertools.product(dx, dy))
        
        ###各行動、各方角でサンプリングした点を移動してインデックスの増分を記録###
        tmp = {}
        # 各行動ごとに
        for a in self.actions:
            transitions = []
            # 各分割状態ごとに
            for s in samples:
                before = np.array(s).T + self.pose_min
                
                after = self.state_transition(a[0], a[1], time_interval, before)
                after_index = np.floor((after - self.pose_min)/self.widths).astype(int)
                
                transitions.append(after_index)
            unique, count = np.unique(transitions, axis=0, return_counts=True)
            probs = [c/sampling_num**2 for c in count]
            tmp[a] = list(zip(unique, probs))
                
        return tmp
    
    def init_policy(self):
        tmp = np.zeros(np.r_[self.index_nums, 2])
        return tmp

    def init_value_function(self): 
        v = np.empty(self.index_nums) #全離散状態を要素に持つ配列を作成
        f = np.zeros(self.index_nums)
        
        for index in self.indexes:
            f[index] = self.final_state(np.array(index).T)
            v[index] = 0.0 if f[index] else -100.0
            
        return v, f
    
    def final_state(self, index):
        x_min, y_min = self.pose_min + self.widths*index          #xy平面で左下の座標
        x_max, y_max = self.pose_min + self.widths*(index + 1) #右上の座標（斜め上の離散状態の左下の座標）
        
        corners = [[x_min, y_min], [x_min, y_max], [x_max, y_min], [x_max, y_max] ] #4隅の座標
        return all([self.goal.inside(np.array(c).T) for c in corners ])
    
    def state_transition(self, nu, theta, time, pose):
        # theta の方向に直進
        return pose + np.array([nu*math.cos(theta),
                         nu*math.sin(theta)]) * time
    
    def generate_actions(self, nu, angle_num):
        angles = np.arange(0, 2*math.pi, 2*math.pi/angle_num)
        return [(nu, a) for a in angles]

In [None]:
map_name = 'NoWall_200x200_'
# map_name = 'CorridorGimp_200x200'
# map_name = 'CorridorGimp_100x100'
# map_name = 'CorridorGimp_20x20'
# map_name = 'NoWall_50x50'
map_image = cv2.imread('map/' + map_name + '.png', cv2.IMREAD_GRAYSCALE)
dp = DynamicProgramming(map_image, np.array([0.05, 0.05]).T, Goal(5.0, 7.0, radius=0.1), 0.5, 10)
# dp = DynamicProgramming(map_image, np.array([0.05, 0.05]).T, Goal(6.75, 8.0, radius=0.1), 0.5, 10)
# dp = DynamicProgramming(map_image, np.array([0.05, 0.05]).T, Goal(3.5, 3.75, radius=0.1), 0.5, 10)
# dp = DynamicProgramming(map_image, np.array([0.05, 0.05]).T, Goal(0.7, 0.75, radius=0.1), 0.5, 10)
#dp = DynamicProgramming(map_image, np.array([0.02, 0.02]).T, Goal(0.5, 0.75, radius=0.1), 0.2, 10)
counter = 0
delta = 1e100

while delta > 0.01:
    delta = dp.value_iteration_sweep()
    counter += 1
    print(counter, delta)

In [None]:
# 価値関数を読み込んで表示
vmin = -100
vmax = 0
#sns.heatmap(np.rot90(loaded_array), vmin=vmin, vmax=vmax, square=True, cmap='bwr_r')
sns.heatmap(np.rot90(dp.value_function), vmin=vmin, vmax=vmax, square=True, cmap='Oranges_r')
plt.show()

In [None]:
save_fine_name = map_name
# 価値関数書き込み
with open('value/' + save_fine_name + '.value', 'w') as f:
    for index in dp.indexes:
        v = dp.value_function[index]
        f.write("{} {} {}\n".format(index[0], index[1], v))
# 方策書き込み
with open('policy/' + save_fine_name + '.policy', 'w') as f:
    for index in dp.indexes:
        p = dp.policy[index]
        f.write("{} {} {} {}\n".format(index[0], index[1], p[0], p[1]))