In [113]:
import numpy as np
import json
import PyGnuplot as gp

In [114]:
with open("input.json", "r") as read_file:
    data = json.load(read_file)
data

{'world': {'size': {'N': 4, 'M': 3},
  'states': {'S': [1, 1], 'T': [[4, 2], [4, 3]], 'F': [[2, 2]], 'B': []}},
 'transition_rates': {'p1': 0.8, 'p2': 0.1, 'p3': 0.1},
 'reward_function': {'r': -0.04, 'r_T': [-1.0, 1.0], 'r_B': []},
 'gamma': 1.0}

In [115]:
class MarkovDecisionProcess:
    def __init__(self, *, N, M, S, T, F, B, p1, p2, p3, r, r_T, r_B, gamma):
        self.N, self.M = N, M
        self.S = S
        self.T = self.lists_to_tuples(T)
        self.F = self.lists_to_tuples(F)
        self.B = self.lists_to_tuples(B)
        self.p1, self.p2, self.p3 = p1, p2, p3
        self.r, self.r_T, self.r_B = r, r_T, r_B
        self.gamma = gamma
        
    def lists_to_tuples(self, I):
        return [(a, b) for a, b in I]

mdp = MarkovDecisionProcess(**data['world']['size'],
                            **data['world']['states'],
                            **data['transition_rates'],
                            **data['reward_function'],
                            gamma=data['gamma'])

In [116]:
states = {(x,y) for x in range(1, mdp.N + 1) 
                for y in range(1, mdp.M + 1) if (x, y) not in mdp.F}

rewards = {(x, y): mdp.r for (x, y) in states}
rewards.update({(x, y): terminal_r for (x, y), terminal_r in zip(mdp.T, mdp.r_T)})
rewards.update({(x, y):    bonus_r for (x, y), bonus_r    in zip(mdp.B, mdp.r_B)})

In [117]:
def invalid_state(x, y):
    out_of_world = x < 1 or x > mdp.N or y < 1 or y > mdp.M
    return (x, y) in mdp.F or out_of_world

def generate_transitions(states):
    probabilities = [mdp.p1, mdp.p2, mdp.p3, 1 - (mdp.p1 + mdp.p2 + mdp.p3)]
    transitions = dict()
    for x, y in states:
        src_state = (x, y)
        for move in ['U', 'L', 'R', 'D']:
            if move == 'U':
                destination = (x, y + 1)
                left = (x - 1, y)
                right = (x + 1, y)
                opposite = (x, y - 1)

            if move == 'L':
                destination = (x - 1, y)
                left = (x, y - 1)
                right = (x, y + 1)
                opposite = (x + 1, y)

            if move == 'D':
                destination = (x, y - 1)
                left = (x + 1, y)
                right = (x - 1, y)
                opposite = (x, y + 1)

            if move == 'R':
                destination = (x + 1, y)
                left = (x, y + 1)
                right = (x, y - 1)
                opposite = (x - 1, y)

            for dst_state, transition_rate in zip([destination, left, right, opposite], probabilities):
                if invalid_state(*dst_state):
                    dst_state = x, y
                
                if src_state in transitions:
                    if move in transitions[src_state]:
                        transitions[src_state][move].append((dst_state, transition_rate))
                    else:
                        transitions[src_state][move] = [(dst_state, transition_rate)]
                else:
                    transitions[src_state] = {move: [(dst_state, transition_rate)]}
    return transitions

transitions = generate_transitions(states)
transitions.update({(x, y): {'END': [((x, y), 0)]} for x, y in mdp.T})

In [118]:
class ValueIteration:
    def __init__(self, states, rewards, transitions, stop_cond=0.0001):
        self.states = states
        self.rewards = rewards
        self.transitions = transitions
        self.stop_cond = stop_cond
        self.t = 0  # liczba iteracji
        self.converged = False
        
    def R(self, state):
        return self.rewards[state]
    
    def T(self, state, action):
        return self.transitions[state][action]
    
    def actions(self, state):
        return self.transitions[state].keys()
        
    def iterate(self):
        utility_history = []
        U1 = {s: 0 for s in states}
        while not self.converged:
            self.converged = True
            self.t += 1
            U = U1.copy()
            utility_history.append(U)
            delta = 0
            for s in self.states:
                U1[s] = self.R(s) + mdp.gamma * max([sum([p * U[s1] for (s1, p) in self.T(s, a)]) 
                                                     for a in self.actions(s)])
                delta = max(delta, abs(U1[s] - U[s]))
                
            if delta >= self.stop_cond:
                self.converged = False
        return U, utility_history
    
    def best_policy(self, U):
        pi = {}
        for s in self.states:
            pi[s] = max(self.actions(s), key=lambda a: self.expected_utility(a, s, U))
        return pi
            
    def expected_utility(self, a, s, U):
        return sum([p * U[s1] for (s1, p) in self.T(s, a)])

vi = ValueIteration(states, rewards, transitions)
u, history = vi.iterate()
print("Liczba iteracji: ", vi.t)
pi = vi.best_policy(u)

Liczba iteracji:  23


In [119]:
for j in reversed(range(1, mdp.M + 1)):
    for i in range(1, mdp.N + 1):
        print("{:.4f}".format(u.get((i, j), 0.)), end=' ');
    print(end='\n')

print()

0.8116 0.8678 0.9178 1.0000 
0.7616 0.0000 0.6603 -1.0000 
0.7053 0.6553 0.6113 0.3878 



In [120]:
arrows = {'U': '^', 'L': '<', 'R': '>', 'D': 'v'}

for j in reversed(range(1, mdp.M + 1)):
    for i in range(1, mdp.N + 1):
        print("{}".format(arrows.get(pi.get((i, j), ' '), ' ')), end=' ');
    print(end='\n')

print()

> > >   
^   ^   
^ < < < 



In [121]:
# zmiana słownika 'history' do postaci użytecznej dla tworzenia kolumn dla każdego stanu
state_history = {}
for u in history:
    for s in u:
        if s in state_history:
            state_history[s].append(u[s])
        else:
            state_history[s] = [u[s]]
state_history

{(1, 2): [0,
  -0.04,
  -0.08000000000000002,
  -0.12000000000000002,
  -0.16000000000000006,
  0.22598399999999993,
  0.4579584,
  0.60559616,
  0.6837125120000002,
  0.7239547904000001,
  0.7437228851200001,
  0.7532310036480001,
  0.7577137516544001,
  0.7597995304550402,
  0.7607594585456643,
  0.7611975703728129,
  0.7613961646202472,
  0.7614856919958242,
  0.7615258679570392,
  0.7615438291869743,
  0.7615518336225691,
  0.7615553913052793,
  0.7615569689976042],
 (3, 2): [0,
  -0.04,
  -0.08000000000000002,
  0.45360000000000006,
  0.5671200000000002,
  0.627176,
  0.6471336000000001,
  0.6553290400000001,
  0.65836516,
  0.6595460648,
  0.65999477416,
  0.660167179368,
  0.6602330700648,
  0.6602583165687199,
  0.66026797821828,
  0.6602716778034665,
  0.6602730940359729,
  0.6602736362534373,
  0.6602738438332055,
  0.6602739233043817,
  0.6602739537292008,
  0.660273965377147,
  0.6602739698364736],
 (1, 3): [0,
  -0.04,
  -0.08000000000000002,
  -0.12000000000000002,
  0.37

In [122]:
# tworzenie listy kolumn oraz listy z kolejnością występowania stanów, tę drugą wykorzystujemy przy tworzeniu komendy dla gnuplota
columns_to_concatenate = []
state_order = []

for s in state_history:
    columns_to_concatenate.append(np.array(state_history[s])[:, np.newaxis])
    state_order.append(s)
    
# sklejenie kolumn wzdłuż drugiej współrzędnej (axis=1) i doklejenie numerów iteracji jako pierwszej kolumny
A = np.concatenate(columns_to_concatenate, axis=1)
A = np.concatenate((np.arange(len(history))[:, np.newaxis], A), axis=1).T

In [123]:
# zdefiniowanie nazwy pliku i zapisanie tabeli do pliku
filename = "value_iteration.dat"
gp.s(A, filename)

In [130]:
# tworzenie komendy poprzez doklejanie stringów rysujących kolejne linie
command = 'plot '
for i, state in enumerate(state_order):
    command += "'{}' using 1:{} title '{}' with lines, ".format(filename, i+2, state)

# rysowanie wykresu
gp.c(command)

# zapisywanie komendy do pliku
with open('command.txt', 'w') as f:
    f.write(command)