In [1]:
import numpy as np

In [2]:
AGENT_STATES = 3*3
TARGET_STATES = 3*3
CALL_STATES = 2

AGENT_ACTIONS = 5
CALL_ACTIONS = 2 # (IF ON->(ON, OFF)  OFF->(OFF, ON)) 0-> OFF, 1 -> ON

AGENT_STATE_VALUES = tuple(range(AGENT_STATES))
TARGET_STATE_VALUES = tuple(range(TARGET_STATES))
CALL_STATE_VALUES = tuple(range(CALL_STATES))

AGENT_ACTION_VALUES = tuple(range(AGENT_ACTIONS))
CALL_ACTION_VALUES = tuple(range(CALL_ACTIONS))

NUM_STATES = AGENT_STATES * TARGET_STATES * CALL_STATES
NUM_ACTIONS = AGENT_ACTIONS
NUM_OBSERVATIONS = 6

ACTIONS = ['UP', 'RIGHT', 'DOWN', 'LEFT', 'STAY']
ACTION_MAP = {'UP':0, 'RIGHT':1, 'DOWN':2, 'LEFT':3, 'STAY':4}
TARGET_ACTION_PROB = [0.15, 0.15, 0.15, 0.15, 0.40]

DELTA = [3, 1, -3, -1, 0]

CALL = 1
NOT_CALL = 0

X = 0.89

In [3]:
class State:
    def __init__(self, agent, target, call):
        if (agent not in AGENT_STATE_VALUES) or (target not in TARGET_STATE_VALUES) or (call not in CALL_STATE_VALUES):
            raise ValueError
        self.agent = agent
        self.target = target
        self.call = call
    
    def asTuple(self):
        return (self.agent, self.target, self.call)
    
    def asList(self):
        return [self.agent, self.target, self.call]
    
    def getHash(self):
        return (self.agent * TARGET_STATES * CALL_STATES +
                self.target * CALL_STATES +
                self.call)
    @classmethod
    def fromHash(cls, num):
        if type(num)!= int:
            raise ValueError
        if num not in range(0,NUM_STATES):
            raise ValueError
        
        agent = num // (TARGET_STATES*CALL_STATES)
        num = num % (TARGET_STATES*CALL_STATES)
        
        target = num // CALL_STATES
        num = num % CALL_STATES
        
        call = num
        return State(agent, target, call)
            

In [4]:
class Action:
    def __init__(self, agent, call):
        if (agent not in AGENT_ACTION_VALUES) or (call not in CALL_ACTION_VALUES):
            raise ValueError
        self.agent = agent
        self.call = call
    
    def asTuple(self):
        return (self.agent, self.call)
    
    def asList(self):
        return [self.agent, self.call]
    
    def getHash(self):
        return (self.agent * CALL_ACTIONS +
                self.call)
    @classmethod
    def fromHash(cls, num):
        if type(num)!= int:
            raise ValueError
        if num not in range(0,NUM_STATES):
            raise ValueError
        
        agent = num // (CALL_ACTIONS)
        num = num % (CALL_ACTIONS)
        
        call = num
        return Action(agent, call)

In [5]:
def action_valid(current_state, action):
    if action == ACTION_MAP['UP']:
        return current_state not in [6, 7, 8]
    elif action == ACTION_MAP['RIGHT']:
        return current_state not in [2, 5, 8]
    elif action == ACTION_MAP['LEFT']:
        return current_state not in [0, 3, 6]
    elif action == ACTION_MAP['DOWN']:
        return current_state not in [0, 1, 2]
    
    return True

In [6]:
# It is assumed that the move is valid
def move_target(current_state):
    prob_state = {}
    for i in range(len(ACTIONS)):
        if action_valid(current_state, i):
            if (current_state + DELTA[i]) not in prob_state:
                prob_state[current_state + DELTA[i]] = TARGET_ACTION_PROB[i]
            else:
                prob_state[current_state + DELTA[i]] += TARGET_ACTION_PROB[i]
        else:
            if (current_state) not in prob_state:
                prob_state[current_state] = TARGET_ACTION_PROB[i]
            else:
                prob_state[current_state] += TARGET_ACTION_PROB[i]
    
    return prob_state

In [7]:
# return list of list ['prob', 'state'] the agent reaches on taking the action
def move_agent(curr_state, action):
    if action == ACTION_MAP['STAY']:
        return [[1.0, curr_state]]
    res = []
    if action == ACTION_MAP['UP']:
        if action_valid(curr_state, action):
            res.append([X, curr_state + DELTA[action]])
        else:
            res.append([X, curr_state])
        if action_valid(curr_state, ACTION_MAP['DOWN']):
            res.append([1-X, curr_state + DELTA[ACTION_MAP['DOWN']] ])
        else:
            res.append([1-X, curr_state])
    elif action == ACTION_MAP['DOWN']:
        if action_valid(curr_state, action):
            res.append([X, curr_state + DELTA[action]])
        else:
            res.append([X, curr_state])
        if action_valid(curr_state, ACTION_MAP['UP']):
            res.append([1-X, curr_state + DELTA[ACTION_MAP['UP']]])
        else:
            res.append([1-X, curr_state])
    elif action == ACTION_MAP['RIGHT']:
        if action_valid(curr_state, action):
            res.append([X, curr_state + DELTA[action]])
        else:
            res.append([X, curr_state])
        if action_valid(curr_state, ACTION_MAP['LEFT']):
            res.append([1-X, curr_state + DELTA[ACTION_MAP['LEFT']]])
        else:
            res.append([1-X, curr_state])
    elif action == ACTION_MAP['LEFT']:
        if action_valid(curr_state, action):
            res.append([X, curr_state + DELTA[action]])
        else:
            res.append([X, curr_state])
        if action_valid(curr_state, ACTION_MAP['RIGHT']):
            res.append([1-X, curr_state + DELTA[ACTION_MAP['RIGHT']]])
        else:
            res.append([1-X, curr_state])
        
    return res

In [8]:
def generateTransitions():
    '''
    Generates a list of ['action', 'current-state' , 'end-state', 'probability']
    '''
    result = []
    for state in range(NUM_STATES):
        for action in range(NUM_ACTIONS):
            
            state_obj = State.fromHash(state)
            action_obj = Action.fromHash(action)
            
            curr_agent, curr_target, curr_call = state_obj.agent, state_obj.target, state_obj.call
#             act_agent, act_call = action_obj.agent, action_obj.call
#             agent
            act_agent = action
            res_agent = move_agent(curr_agent, act_agent)
            
            # target
            prob_state = move_target(curr_target)
            
            # call
            res_call = []
            if curr_call == NOT_CALL:
                res_call.append([0.4, CALL])
                res_call.append([0.6, NOT_CALL])
            elif curr_call == CALL:
                res_call.append([0.2, NOT_CALL])
                res_call.append([0.8, CALL])
            
            for k,v in prob_state.items():
                for i in range(len(res_agent)):
                    for j in range(len(res_call)):
                        # create a state and specify its probability
                        
                        st = State(res_agent[i][1], k, res_call[j][1])
                        pro = res_agent[i][0] * v * res_call[j][0]
                        result.append([action, state, st.getHash(), pro])
            
    
    return result
            
            

In [9]:
'''
O1 -> TARGET == AGENT
O2 -> TARGET RIGHT OF AGENT
O3 -> TARGET BELOW OF AGENT
O4 -> TARGET LEFT OF AGENT
O5 -> TARGET ABOVE OF AGENT
O6 -> NONE OF THE ABOVE
stores ['end-state', 'observation', 'probability']
'''
def generateObservation():
    result = []
    for state in range(NUM_STATES):
        for obs in range(NUM_OBSERVATIONS):
            
            st = State.fromHash(state)
            
            if obs == 0:
                if st.agent == st.target:
                    result.append([state, obs, 1.0])
                else:
                    result.append([state, obs, 0.0])
            elif obs == 1:
                if st.agent + 1 == st.target:
                    result.append([state, obs, 1.0])
                else:
                    result.append([state, obs, 0.0])
            elif obs == 2:
                if st.agent + 3 == st.target:
                    result.append([state, obs, 1.0])
                else:
                    result.append([state, obs, 0.0])
            elif obs == 3:
                if st.agent - 1 == st.target:
                    result.append([state, obs, 1.0])
                else:
                    result.append([state, obs, 0.0])
            elif obs == 4:
                if st.agent - 3 == st.target:
                    result.append([state, obs, 1.0])
                else:
                    result.append([state, obs, 0.0])
            elif obs == 5:
                neighbourHood = [st.agent, st.agent-1, st.agent+1, st.agent+3, st.agent-3]
                if st.target in neighbourHood:
                    result.append([state, obs, 0.0])
                else:
                    result.append([state, obs, 1.0])
                    
    return result
                
    

In [10]:
'''
IF AGENT_POS == TARGET_POS REWARD = 19 ELSE REWARD = -1
return list of ['action', state', 'reward']
'''
def generateReward():
    result = []
    for state in range(NUM_STATES):
        for action in range(NUM_ACTIONS):
            st = State.fromHash(state)
            if st.agent == st.target:
                if action == ACTION_MAP['STAY']:
                    result.append([action, state, 20.0])
                else:
                    result.append([action, state, 19.0])
            else:
                result.append([action, state, -1.0])
            
    return result

In [11]:
'''
WRITES TO THE FILE. CREATE .POMDP FILE
'''
def execute():
    trans = generateTransitions()
    obs = generateObservation()
    reward = generateReward()
    f = open("./robo.pomdp", "w")
    f.writelines("discount: 0.5\n")
    f.writelines("values: reward\n")
    f.writelines("states: 162\n")
    f.writelines("actions: 5\n")
    f.writelines("observations: 6\n")
    f.writelines("start include: 8 9 44 45 116 117 152 153\n")
    for t in trans:
        f.writelines("T: {action:d} : {currState:d} : {endState:d} {prob:f}\n".format(action=t[0], currState=t[1], endState=t[2], prob=t[3]))
        
    for o in obs:
        f.writelines("O: * : {endSt:d} : {observation:d} {prob:f}\n".format(endSt=o[0], observation=o[1], prob=o[2]))
    
    for r in reward:
        f.writelines("R: {act:d} : * : {endSt:d} : * {rew:f}\n".format(act=r[0], endSt=r[1], rew=r[2]))
        
    f.close()

In [12]:
execute()