In [1]:
import sys 
sys.path.append('../scripts/')
from dynamic_programming import *

In [2]:
class BeliefDynamicProgramming(DynamicProgramming): 
    def __init__(self, widths, goal, puddles, time_interval, sampling_num, puddle_coef=100.0, \
                 lowerleft=np.array([-4, -4]).T, upperright=np.array([4, 4]).T, dev_borders=[0.1,0.2,0.4,0.8]): 
        super().__init__(widths, goal, puddles, time_interval, sampling_num, puddle_coef, lowerleft, upperright)
        
        self.index_nums = np.array([*self.index_nums, len(dev_borders) + 1])
        nx, ny, nt, nh = self.index_nums
        self.indexes = list(itertools.product(range(nx), range(ny), range(nt), range(nh)))
        
        self.value_function, self.final_state_flags =  self.init_belief_value_function()
        self.policy = np.zeros(np.r_[self.index_nums,2]) 
        
        self.dev_borders = dev_borders ###beliefdp3（init_entropy_transition_probsまで） #以下追加
        tmp_borders = [dev_borders[0]/10, *dev_borders, dev_borders[-1]*10] #dev_bordersに両側を加える
        self.entropy_state_transition_probs = {}
        for a in self.actions:
            for i in range(len(dev_borders)+1):
                probs = self.init_entropy_transition_probs(time_interval, 100, tmp_borders[i], tmp_borders[i+1], a)
                self.entropy_state_transition_probs[(i, a)] = probs
            
    def sigma_to_index(self, sigma):
        for i, e in enumerate(self.dev_borders):
            if sigma < e: return i
            
        return len(self.dev_borders)
        
    def init_entropy_transition_probs(self, time_interval, sampling_num, min_sigma, max_sigma, action):
        nu, omega = action
        if abs(omega) < 1e-5: omega = 1e-5
            
        F = np.diag([1.0, 1.0, 1.0])
        F[0, 2] = nu / omega * (math.cos(omega * time_interval) - 1.0)#ロボットの向きは関係ないので0[deg]で固定で
        F[1, 2] = nu / omega * (math.sin(omega * time_interval))
        
        v ={"nn":0.19, "no":0.001, "on":0.13, "oo":0.2} #移動の誤差モデル（カルマンフィルタのものをコピペ）
        M = np.diag([v["nn"]**2*abs(nu)/time_interval + v["no"]**2*abs(omega)/time_interval, 
                         v["on"]**2*abs(nu)/time_interval + v["oo"]**2*abs(omega)/time_interval])

        stw, ctw = math.sin(omega*time_interval), math.cos(omega*time_interval)
        A = np.array([[stw/omega,    -nu/(omega**2)*stw + nu/omega*time_interval*ctw],
                                     [(-ctw + 1.0)/omega, -nu/(omega**2)*(-ctw + 1.0) + nu/omega*time_interval*stw],
                                     [0,                                time_interval]] )
        
        ans = {}
        for sigma in np.linspace(min_sigma, max_sigma*0.999, sampling_num):
            mat = sigma*sigma*F.dot(F.T) + A.dot(M).dot(A.T)
            h = 1.5*np.log(2*np.pi) + 0.5*np.log(np.linalg.det(mat)) + 1.5 #sigmaが小さすぎると行列式が0以下になるので注意
            sigma_after = np.power(np.e, h/3 - 0.5*np.log(2*np.pi) - 0.5)
            index_after = self.sigma_to_index(sigma_after)
            
            if index_after not in ans:
                ans[index_after] = 0

            ans[index_after] += 1
                
        for e in ans:
            ans[e] /= sampling_num

        return ans
    
    def init_belief_value_function(self): 
        v = np.empty(self.index_nums)
        f = np.zeros(self.index_nums) 
        
        for index in self.indexes:
            f[index] = self.belief_final_state(np.array(index).T)
            v[index] = self.goal.value if f[index] else -100.0
                
        return v, f
        
    def belief_final_state(self, index):
        x_min, y_min, _ = self.pose_min + self.widths*index[0:3] 
        x_max, y_max, _ = self.pose_min + self.widths*(index[0:3] + 1) 
        
        corners = [[x_min, y_min, _], [x_min, y_max, _], [x_max, y_min, _], [x_max, y_max, _] ] 
        return all([self.goal.inside(np.array(c).T) for c in corners ]) and index[3] == 0
    
    def action_value(self, action, index, out_penalty=True):  ###beliefdp2
        value = 0.0
        for delta, prob in self.state_transition_probs[(action, index[2])]: 
            after, out_reward = self.out_correction(np.array(index[0:3]).T + delta) #indexを4次元から3次元に
            after = tuple([*after, 0])                                                                     #エントロピーのインデックスを加える（とりあえず0で）
            reward = - self.time_interval * self.depths[(after[0], after[1])] * self.puddle_coef - self.time_interval + out_reward*out_penalty
            value += (self.value_function[after] + reward) * prob

        return value

In [3]:
puddles = [Puddle((-2, 0), (0, 2), 0.1), Puddle((-0.5, -2), (2.5, 1), 0.1)]
dp = BeliefDynamicProgramming(np.array([0.2, 0.2, math.pi/18]).T, Goal(-3,-3), puddles, 0.1, 10)  #クラスを派生したものに

In [None]:
delta = dp.value_iteration_sweep()  ###beliefdp2exec

In [None]:
import seaborn as sns
v = dp.value_function[:, :, 18, 1]
sns.heatmap(np.rot90(v), square=False)
plt.show()

In [None]:
dp.entropy_state_transition_probs