# Grid System

|  |  |  |
| ---  | --- | --- |
| [0][0] | [0][1] | [0][2] |
| [1][0] | [1][1] | [1][2] |
| [2][0] | [2][1] | [2][2] |

In [1]:
ROLL_NO = 2018113012
MV_PROB = 1 - (((int(str(ROLL_NO)[-3:]) % 40)+1) / 100)
CALL_ON_PROB = 0.4
CALL_OFF_PROB = 0.2
TGT_STAY_PROB = 0.4
TGT_MOVE_PROB = 0.15
ROW_LIMIT = 3
COLUMN_LIMIT = 3
STEP_REWARD = -1
REACH_REWARD = (ROLL_NO%100) + 10
OBSERVATION_SET = ["o" + str(i) for i in range(1, 7)]
DISCOUNT = 0.5
AGENT_MOVE_ACTIONS = [ "STAY", "UP", "DOWN", "LEFT", "RIGHT" ]
TARGET_MOVE_ACTIONS = [ "STAY", "MOVE" ]
START_TARGET_POS = (1, 1)
START_AGENT_POS_LIST = [(0, 0), (0, 2), (2, 0), (2, 2)]

In [2]:
MV_PROB

0.87

In [3]:
# WORKS
def relative_pos (i_pos, f_pos):
	if (i_pos[0]+1 == f_pos[0]) and (i_pos[1] == f_pos[1]):
		return "DOWN"
	elif (i_pos[0]-1 == f_pos[0]) and (i_pos[1] == f_pos[1]):
		return "UP"
	elif (i_pos[0] == f_pos[0]) and (i_pos[1]+1 == f_pos[1]):
		return "RIGHT"
	elif (i_pos[0] == f_pos[0]) and (i_pos[1]-1 == f_pos[1]):
		return "LEFT"
	elif (i_pos == f_pos):
		return "STAY"
	else:
		return False

In [4]:
# WORKS
def gen_call_prob (call_change, same_pos):
    initial_call = call_change[0]
    final_call = call_change[1]
    if initial_call == True:
        if not same_pos:
            if final_call == True:
                prob = round(1 - CALL_OFF_PROB, 1)
            else:
                prob = CALL_OFF_PROB
        else:
            # When agent and target are in same position, call 
            # turns off in the next step with a probability of 1
            if final_call == True:
                prob = 0
            else:
                prob = 1
    else:
        if final_call == True:
            prob = CALL_ON_PROB
        else:
            prob = round(1 - CALL_ON_PROB, 1)
    return prob

In [5]:
# Testing gen_call_prob
call_change = (False, False)
print(gen_call_prob(call_change, True))

0.6


In [6]:
# WORKS
def on_border (pos):
    dirs = []
    if (pos[0] == 0):
        dirs.append("UP")
    if (pos[0] == ROW_LIMIT-1):
        dirs.append("DOWN")
    if (pos[1] == 0): 
        dirs.append("LEFT")
    if (pos[1] == COLUMN_LIMIT-1):
        dirs.append("RIGHT")
    if len(dirs) == 0:
        return False
    return dirs

In [7]:
# WORKS
def get_opposite (dir1):
    if dir1 == "LEFT": return "RIGHT"
    elif dir1 == "RIGHT": return "LEFT"
    elif dir1 == "UP": return "DOWN"
    elif dir1 == "DOWN": return "UP"
    elif dir1 == "STAY": return None

print(get_opposite("LEFT"))

RIGHT


In [8]:
# WORKS
def is_opposite (dir1, dir2):
    if get_opposite(dir1) == dir2: return True
    return False

print(is_opposite("LEFT", "RIGHT"))
print(is_opposite("LEFT", "DOWN"))
print(is_opposite("UP", "DOWN"))
print(is_opposite("UP", "LEFT"))

True
False
True
False


In [9]:
def in_limits (pos):
	if pos[0] >= 0 and pos[0] < ROW_LIMIT:
		if pos[1] >= 0 and pos[1] < COLUMN_LIMIT:
			return True	
	return False

In [10]:
# WORKS
def gen_target_move_prob (target_pos):
    i_pos = target_pos[0]
    f_pos = target_pos[1]
    r_pos = relative_pos(i_pos, f_pos)
    valid = in_limits(f_pos)
    bordered = on_border (i_pos)
    if r_pos == "STAY":
        prob = TGT_STAY_PROB
        if bordered:
            prob += TGT_MOVE_PROB * len(bordered)
    elif r_pos:
        prob = TGT_MOVE_PROB
    else:
        prob = 0
    if not valid:
        return 0
    return prob

In [11]:
# Testing target_move_prob for all possibilities
def test_target_move_prob():
    pos_set = []
    errors = False
    for i in range(ROW_LIMIT):
        for j in range(COLUMN_LIMIT):
            pos_set.append((i, j))
    for i_pos in pos_set:
        tot = 0
#         print("Testing for " + str(i_pos))
        for f_pos in pos_set:
            pos = (i_pos, f_pos)
            prob = gen_target_move_prob(pos)
            tot += prob
#             if prob != 0:
#                 print("prob = " + str(prob) + " on f_pos = " + str(f_pos))
        if tot != 1:
            errors = True
            print("Error on " + str(i_pos) + ". tot = " + str(tot))
    if errors == False:
        print("No errors detected.")
    return

test_target_move_prob()

# Testing target_move_prob
pos = [ (1, 1), (1, 2) ]
prob = gen_target_move_prob(pos)
print(prob)
# WORKS

No errors detected.
0.15


In [12]:
# WORKS
def gen_agent_move_prob (agent_pos, agent_move):
    i_pos = agent_pos[0]
    f_pos = agent_pos[1]
    r_pos = relative_pos(i_pos, f_pos)
    valid = in_limits(f_pos)
    bordered = on_border(i_pos)
#     print("Bordered on " + str(bordered))
    if agent_move == "STAY" and r_pos == "STAY":
        prob = 1
    elif agent_move != "STAY":
#         print("Move = " + agent_move + ", r_pos = " + r_pos)
        if r_pos == agent_move:
            prob = MV_PROB
        elif bordered and r_pos == "STAY" and agent_move in bordered:
            prob = MV_PROB
        elif r_pos != agent_move and is_opposite(r_pos, agent_move):
            prob = round(1 - MV_PROB, 2)
        elif bordered and r_pos == "STAY" and get_opposite(agent_move) in bordered:            
            prob = round(1 - MV_PROB, 2)
        else:
            prob = 0
    else:
        prob = 0
    return prob

In [13]:
# WORKS
def test_agent_move_prob():
    test_set = []
    errors = False
    for i in range(ROW_LIMIT):
        for j in range(COLUMN_LIMIT):
            test_set.append((i, j))
    for i_pos in test_set:
        for mov in AGENT_MOVE_ACTIONS:
            tot = 0
#             print("For i_pos = " + str(i_pos) + " and action = " + str(mov) + ":")
            for f_pos in test_set:
                pos = (i_pos, f_pos)
                prob = gen_agent_move_prob(pos, mov)
#                 print("Prob on " + str(f_pos) + " = " + str(prob))
                tot += prob
            if tot != 1:
                errors = True
                print("ERROR. i_pos : " + str(i_pos) + ". Sum = " + str(tot))
    if errors == False:
        print("No errors detected.")
    return

test_agent_move_prob()

No errors detected.


In [14]:
# WORKS
def gen_transition_prob (i_state, action, f_state):
    '''
     state = (p_{a}, p_{t}, call)
     action = (act_{a}, toggle)
     where	act_{a} = [ Stay, Up, Down, Left, Right ]
    '''
    agent_pos = [i_state[0], f_state[0]]
    target_pos = [i_state[1], f_state[1]]
    call_change = [i_state[2], f_state[2]]
    agent_move = action[0]
    same_initial_pos = i_state[0] == i_state[1]

    agent_move_prob = gen_agent_move_prob (agent_pos, agent_move)
    target_move_prob = gen_target_move_prob (target_pos)
    call_prob = gen_call_prob (call_change, same_initial_pos)
#     print("probs = " + str(agent_move_prob) + ", " + str(target_move_prob) + ", " + str(call_prob))
    prob = agent_move_prob * target_move_prob * call_prob
    
    # Max precision of a transition_prob value is 4
    return round(prob, 4)

In [15]:
# WORKS
def gen_transitions (states, actions):
    # transition format = [i_state, action, f_state, transition_prob]
    transitions = []
    for i in range(len(states)):
        for j in range(len(states)):
            for action in actions:
                transition_prob = gen_transition_prob(states[i], action, states[j])
                transitions.append([states[i], action, states[j], transition_prob])
    return transitions

In [16]:
from decimal import Decimal
def test_transitions (states, actions, transitions):
    errors = False
    for state in states:
        for action in actions:
#             print("For state = " + str(state) + " and action = " + str(action))
            tot = Decimal(0)
            for transition in transitions:
                if transition[0] == state and transition[1] == action:
#                     print("To final state = " + str(transition[2]) + ", prob = " + str(transition[3]))
                    cur = transition[3]
#                     if cur != 0:
#                         print("cur = " + str(cur))
                    tot = Decimal(str(tot + Decimal(str(transition[3]))))
            if tot != 1:
                errors = True
                print("Error on initial state = " + str(state) + " and action = " + str(action)\
                     + ". tot = " + str(tot))
    if errors == False:
        print("No errors found.")
    return errors

In [17]:
# WORKS
def gen_states():
    states = []
    for a0 in range(ROW_LIMIT):
        for a1 in range(COLUMN_LIMIT):
            for t0 in range(ROW_LIMIT):
                for t1 in range(COLUMN_LIMIT):
                    for call in [True, False]:
                        state = ( (a0, a1), (t0, t1), call )
                        states.append(state)
    return states

In [18]:
# WORKS
def gen_actions():
    actions = []
    for act_a in AGENT_MOVE_ACTIONS:
        action = [ act_a ]
        actions.append(action)
    return actions

In [19]:
# WORKS
def gen_reward_val (state, action):
    agent_pos = state[0]
    target_pos = state[1]
    call = state[2]
    if (agent_pos == target_pos) and (call == True):
        reward = REACH_REWARD
    elif action[0] == "STAY":
        reward = 0
    else:
        reward = STEP_REWARD
    return reward

In [20]:
# WORKS
def gen_rewards(states, actions):
    # reward_format = [ initial_state, action, reward_val ]
    rewards = []
    for state in states:
        for action in actions:
            reward_val = gen_reward_val (state, action)
            rewards.append([ state, action, reward_val ])
    return rewards

In [21]:
def test_rewards (rewards):
    errors = False
    for item in rewards:
        state = item[0]
        reward = item[2]
        if state[0] == state[1] and state[2] == True:
            if reward == -1:
                errors = True
        elif reward != -1:
            errors = True
    if errors == False:
        print("No errors found.")
    return errors

In [22]:
def gen_observation_prob (end_state, action, observation):
    agent_pos = end_state[0]
    target_pos = end_state[1]
    r_pos = relative_pos(agent_pos, target_pos)
    if r_pos == "STAY" and observation == "o1":
        prob = 1
    elif r_pos == "RIGHT" and observation == "o2":
        prob = 1
    elif r_pos == "DOWN" and observation == "o3":
        prob = 1 
    elif r_pos == "LEFT" and observation == "o4":
        prob = 1
    elif r_pos == "UP" and observation == "o5":
        prob = 1
    elif r_pos == False and observation == "o6":
        prob = 1
    else:
        prob = 0
    return prob

In [23]:
def gen_observations (states, actions, observation_set):
    # obs_format = [state, action, observation, prob]
    observations = []
    for end_state in states:
        for action in actions:
            for observation in observation_set:
                prob = gen_observation_prob (end_state, action, observation)
                observations.append([end_state, action, observation, prob])
    return observations

In [24]:
states = gen_states()

In [25]:
actions = gen_actions()

In [26]:
transitions = gen_transitions(states, actions)

In [27]:
test_transitions(states, actions, transitions)

No errors found.


False

In [28]:
rewards = gen_rewards(states, actions)

In [29]:
test_rewards(rewards)

True

In [30]:
observations = gen_observations(states, actions, OBSERVATION_SET)

In [31]:
class POMDPGenerator:
    def __init__ (self, states, actions, observation_set,\
                  observations, transitions, rewards, discount, \
                  start_target_pos, start_agent_pos_list):
        self.states = states
        self.actions = actions
        self.observation_set = observation_set
        self.observations = observations
        self.transitions = transitions
        self.rewards = rewards
        self.discount = discount
        self.start_target_pos = start_target_pos
        self.start_agent_pos_list = start_agent_pos_list
        self.file = f"discount: {self.discount}\n"
        self.file += f"values: reward\n"
        self.file += self.write_states()
        self.file += self.write_actions()
        self.file += self.write_observation_set()
        self.file += self.write_initial_state() + "\n"
        self.file += self.write_transitions() + "\n"
        self.file += self.write_observations() + "\n"
        self.file += self.write_rewards() + "\n"

    def state_map (self, state):
        return self.states.index(state)

    def write_states(self):
        text = "states: " + str(len(states)) + " \n"
        return text
    
    def write_actions(self):
        text = "actions: " 
        for action in self.actions:
            text += str(action[0]) + " "
        text += "\n"
        return text
    
    def write_observation_set(self):
        text = "observations: "
        for obs in self.observation_set:
            text += str(obs) + " "
        text += "\n"
        return text
    
    def write_initial_state(self):
        text = "start include: "
        for state in self.states:
            if state[1] == self.start_target_pos:
                if state[0] in self.start_agent_pos_list:
                    state_num = self.state_map(state)
                    text += f"{state_num} "
        text += "\n"
        return text
        
    
    def write_transitions(self):
        text = ""
        for transition in self.transitions:
            i_state = self.state_map(transition[0])
            action = transition[1][0]
            f_state = self.state_map(transition[2])
            prob = transition[3]
            if prob != 0:
                text += f"T: {action} : {i_state} : {f_state} {prob} \n"
#         print(text[:1000])
        return text
    
    def write_observations(self):
        text = ""
        for observation in self.observations:
            end_state = self.state_map(observation[0])
            action = observation[1][0]
            obs = observation[2]
            prob = observation[3]
            text += f"O: {action} : {end_state} : {obs} {prob} \n"
#         print(text[:2000])
        return text
    
    def write_rewards(self):
        text = ""
        for reward in self.rewards:
            initial_state = self.state_map(reward[0])
            action = reward[1][0]
            val = reward[2]
            end_state = "*"
            observation = "*"
            text += f"R: {action} : {initial_state} : {end_state} : {observation} {val} \n"
#         print(text[:1000])
        return text

    def write_to_file (self, outfile):
        with open(outfile, "w") as f:
            f.write(self.file)
        return 
        
    pass


In [32]:
pomdp = POMDPGenerator(states, actions, OBSERVATION_SET, observations, \
                       transitions, rewards, DISCOUNT, START_TARGET_POS,\
                       START_AGENT_POS_LIST)

In [33]:
pomdp.write_to_file("problem.pomdp")