In [8]:
import sys
sys.path.append('../scripts/')
from puddle_world import *
import itertools
import collections
from copy import copy

In [9]:
class DynamicProgramming:
    def __init__(self, widths, goal, puddles, time_interval, sampling_num, \
                 puddle_coef=100.0, lowerleft=np.array([-4, -4]).T, upperright=np.array([4, 4]).T):
        self.pose_min = np.r_[lowerleft, 0]
        self.pose_max = np.r_[upperright, math.pi*2]
        self.widths = widths
        self.goal = goal
        
        self.index_nums = ((self.pose_max - self.pose_min)/self.widths).astype(int)
        nx, ny, nt = self.index_nums
        self.indexes = list(itertools.product(range(nx), range(ny), range(nt)))
        
        self.value_function, self.final_state_flags = self.init_value_function()
        self.policy = self.init_policy()
        self.actions = list(set([tuple(self.policy[i]) for i in self.indexes]))
        
        self.state_transition_probs = self.init_state_transition_probs(time_interval, sampling_num)
        self.depths = self.depth_means(puddles, sampling_num)
        
        self.time_interval = time_interval
        self.puddle_coef = puddle_coef
        
    def value_iteration_sweep(self):
        max_delta = 0.0
        for index in self.indexes:
            if not self.final_state_flags[index]:
                max_q = -1e100
                max_a = None
                qs = [self.action_value(a, index) for a in self.actions]
                max_q = max(qs)
                max_a = self.actions[np.argmax(qs)]
                
                delta = abs(self.value_function[index] - max_q)
                max_delta = delta if delta > max_delta else max_delta
                
                self.value_function[index] = max_q
                self.policy[index] = np.array(max_a).T
                
        return max_delta
    
    def policy_evaluation_sweep(self):
        max_delta = 0.0
        for index in self.indexes:
            if not self.final_state_flags[index]:
                q = self.action_value(tuple(self.policy[index]), index)
                
                delta = abs(self.value_function[index] - q)
                max_delta = delta if delta > max_delta else max_delta
                
                self.value_function[index] = q
                
        return max_delta

    def action_value(self, action, index, out_penalty=True):
        value = 0.0
        for delta, prob in self.state_transition_probs[(action, index[2])]:
            after, out_reward = self.out_correction(np.array(index).T + delta)
            after = tuple(after)
            reward = - self.time_interval * self.depths[(after[0], after[1])] * self.puddle_coef - self.time_interval + out_reward * out_penalty
            value += (self.value_function[after] + reward) * prob
            
        return value
    
    def out_correction(self, index):
        out_reward = 0.0
        index[2] = (index[2] + self.index_nums[2]) % self.index_nums[2]
        
        for i in range(2):
            if index[i] < 0:
                index[i] = 0
                out_reward = -1e100
            elif index[i] >= self.index_nums[i]:
                index[i] = self.index_nums[i] - 1
                out_reward = -1e100
        
        return index, out_reward
    
    def depth_means(self, puddles, sampling_num):
        dx = np.linspace(0, self.widths[0], sampling_num)
        dy = np.linspace(0, self.widths[1], sampling_num)
        samples = list(itertools.product(dx, dy))
        
        tmp = np.zeros(self.index_nums[0:2])
        for xy in itertools.product(range(self.index_nums[0]), range(self.index_nums[1])):
            for s in samples:
                pose = self.pose_min + self.widths*np.array([xy[0], xy[1], 0]).T + np.array([s[0], s[1], 0]).T
                for p in puddles:
                    tmp[xy] += p.depth * p.inside(pose)
                    
            tmp[xy] /= sampling_num**2
            
        return tmp
        
    def init_state_transition_probs(self, time_interval, sampling_num):
        dx = np.linspace(0.001, self.widths[0]*0.999, sampling_num)
        dy = np.linspace(0.001, self.widths[1]*0.999, sampling_num)
        dt = np.linspace(0.001, self.widths[2]*0.999, sampling_num)
        samples = list(itertools.product(dx, dy, dt))
        
        tmp = {}
        for a in self.actions:
            for i_t in range(self.index_nums[2]):
                transitions = []
                for s in samples:
                    before = np.array([s[0], s[1], s[2] + i_t*self.widths[2]]).T + self.pose_min
                    before_index = np.array([0, 0, i_t]).T
                    
                    after = IdealRobot.state_transition(a[0], a[1], time_interval, before)
                    after_index = np.floor((after - self.pose_min)/self.widths).astype(int)
                    
                    transitions.append(after_index - before_index)
                    
                unique, count = np.unique(transitions, axis=0, return_counts=True)
                probs = [c/sampling_num**3 for c in count]
                tmp[a, i_t] = list(zip(unique, probs))
                
        return tmp
        
    def init_policy(self):
        tmp = np.zeros(np.r_[self.index_nums, 2])
        for index in self.indexes:
            center = self.pose_min + self.widths*(np.array(index).T + 0.5)
            tmp[index] = PuddleIgnoreAgent.policy(center, self.goal)
            
        return tmp
        
    def init_value_function(self):
        v = np.empty(self.index_nums)
        f = np.zeros(self.index_nums)
        
        for index in self.indexes:
            f[index] = self.final_state(np.array(index).T)
            v[index] = self.goal.value if f[index] else - 100.0
            
        return v, f
    
    def final_state(self, index):
        x_min, y_min, _ = self.pose_min + self.widths*index
        x_max, y_max, _ = self.pose_min + self.widths*(index + 1)
        
        corners = [[x_min, y_min, _], [x_min, y_max, _], [x_max, y_min, _], [x_max, y_max, _]]
        return all([self.goal.inside(np.array(c).T) for c in corners])

In [10]:
import seaborn as sns

puddles = [Puddle((-2, 0), (0, 2), 0.1), Puddle((-0.5, -2), (2.5, 1), 0.1)]
dp = DynamicProgramming(np.array([0.2, 0.2, math.pi/18]).T, Goal(-3, -3), puddles, 0.1, 10)
counter = 0

In [11]:
delta = 1e100

while delta > 0.01:
    delta = dp.value_iteration_sweep()
    counter += 1
    print(counter, delta)

1 54.12620000000001
2 52.540741168171046
3 27.091733819809406
4 24.39709159867244
5 19.958651189319355
6 18.650774772789774
7 16.225165985426358
8 15.781952896530726
9 14.141842898129966
10 13.270731255868625
11 12.497393357370377
12 11.732667732727286
13 11.238231423228726
14 10.004223508516631
15 9.156190280697437
16 8.713907413060952
17 8.311729090172499
18 8.003867161321743
19 7.670727509824289
20 7.386915336036694
21 7.137289788630262
22 6.918135696700659
23 6.8608402702996045
24 6.710750534605353
25 6.55310883437339
26 6.49657837154642
27 6.179852073016036
28 6.10790245886345
29 5.840942349519494
30 5.702705930557386
31 5.593934283599893
32 5.45512894903807
33 5.332177442427394
34 5.235898193331209
35 5.145362578982962
36 5.059359919967868
37 4.977445179187143
38 4.900195466992585
39 4.827392749510928
40 4.684750759563244
41 4.479900776997326
42 4.223964280239741
43 3.9409767020735416
44 3.8374174227343474
45 3.7333358234886873
46 3.5736119095796255
47 3.370378819190506
48 3.1652

In [5]:
with open("policy.txt", "w") as f:
    for index in dp.indexes:
        p = dp.policy[index]
        f.write("{} {} {} {} {}\n".format(index[0], index[1], index[2], p[0], p[1]))
        
with open("value.txt", "w") as f:
    for index in dp.indexes:
        p = dp.value_function[index]
        f.write("{} {} {} {}\n".format(index[0], index[1], index[2], p))

In [6]:
v =dp.value_function[:, :, 18]
sns.heatmap(np.rot90(v), square=False)
plt.show()

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

In [7]:
p = np.zeros(dp.index_nums)
for i in dp.indexes:
    p[i] = sum(dp.policy[i])
    
sns.heatmap(np.rot90(p[:, :, 18]), square=False)
plt.show()