In [1]:
import pickle
import blosc
import numpy as np
import matplotlib.pyplot as plt

with open('demos/demos2.pkl', 'rb') as f:
    data = pickle.load(f)
    
actions = np.array([[action.name for action in mission[4]] for mission in data])

#replace the first action of every mission with start
for traj in actions:
    traj[0] = 'start'
    
#with open('gridworld-actions.pickle', 'wb') as f:
#    pickle.dump(actions, f, pickle.HIGHEST_PROTOCOL)

raw_states = np.array([blosc.unpack_array(mission[2]) for mission in data])
raw_states = np.array([[s[7:15,7:15] for s in traj] for traj in raw_states])

with open('gridworld-states.pickle', 'wb') as f:
    pickle.dump(raw_states, f, pickle.HIGHEST_PROTOCOL)

# Returns the coordinates of the agent from a numeric state
def get_agent_coordinates(state):
    return [(i,j) for j,row in enumerate(state) for i,elem in enumerate(row) if elem[0] == 10][0]

def keys(state):
    return [(i,j) for j,row in enumerate(state) for i,elem in enumerate(row) if elem[0] == 5]

def doors(state):
    return [(i,j) for j,row in enumerate(state) for i,elem in enumerate(row) if elem[0] == 4]

def contains_key(state):
    if len(keys(state)) == 0:
        return False
    else:
        return True
    
def contains_door(state):
    if len(doors(state)) == 0:
        return False
    else:
        return True

# Returns the coordinates of the key from a numeric state
def get_key_coordinates(state):
    if contains_key(state):
        return keys(state)[0]
    else:
        return None
    
def get_door_coordinates(state):
    if contains_door(state):
        return doors(state)[0]
    else:
        return None

def first_personize(state,scope=6,behind=True):
    agent_coords = get_agent_coordinates(state)
    x = agent_coords[0]
    y = agent_coords[1]
    
    #create blank first person state
    fps = np.array([[[0,0,0] for i in np.arange(13)] for j in np.arange(13)])
    for j,col in enumerate(state):
        for i,elem in enumerate(col):
            dx = i-x
            dy = j-y
            fps[6+dy][6+dx] = elem
    
    if behind:
        return fps[6-scope:7+scope,6-scope:7+scope]
    else: 
        return fps[6:7+scope,6-scope:7+scope]

def display_state(state):
    s = np.flip(state,axis=0)
    plt.imshow(s/10)

In [2]:
# a representation of agent orientation based on the difference in state cause by a 'forward action'
def get_orientation(xdiff,ydiff):
    if xdiff == 1:
        return 'E'
    if xdiff == -1:
        return 'W'
    if ydiff == 1:
        return 'N'
    if ydiff == -1:
        return 'S'
    
    return None

# modifies an orientation by a direction either 'left' or 'right'
def adjust_direction(orientation,direction):
    if orientation == 'N':
        return 'W' if direction == 'left' else 'E'
    if orientation == 'E':
        return 'N' if direction == 'left' else 'S'
    if orientation == 'S':
        return 'E' if direction == 'left' else 'W'
    if orientation == 'W':
        return 'S' if direction == 'left' else 'N'

# gets the initial orientation of the agent
def get_initial_orientation(actionT,stateT):
    forward_actions = [i for i,a in enumerate(actionT) if a == 'forward']
    if len(forward_actions) == 0:
        return 'N'
    first_forward_action = forward_actions[0]
    
    before = get_agent_coordinates(stateT[first_forward_action-1])
    after = get_agent_coordinates(stateT[first_forward_action])
    orientation = get_orientation(after[0]-before[0],after[1]-before[1])
    
    turns = actionT[1:first_forward_action]
    
    for turn in np.flip(turns):
        if turn == 'left': 
            orientation = adjust_direction(orientation,'right')
        if turn == 'right':
            orientation = adjust_direction(orientation,'left')
    
    return orientation 

# returns a state rotated in the direction of the agents orientation
def rotate_state(state,orientation):
    if orientation == 'N':
        return state
    if orientation == 'E':
        return np.rot90(state,3)
    if orientation == 'S':
        return np.rot90(state,2)
    if orientation == 'W':
        return np.rot90(state)
    
    return state

# Returns the orientation of the agent for each step in the mission
def get_orientations(actionT,initial_orientation):
    orientations = np.repeat(initial_orientation,len(actionT))
    for i,a in enumerate(actionT):
        if a == 'left':
            orientations[i] = adjust_direction(orientations[i-1],'left')
        elif a == 'right':
            orientations[i] = adjust_direction(orientations[i-1],'right')
        else:
            orientations[i] = orientations[i-1]
    
    return orientations

def getQuadrant(coords):
    X = coords[0]
    Y = coords[1]
    if X > 3:
        if Y < 4:
            return 2
        else:
            return 4
    else:
        if Y < 4:
            return 1
        else:
            return 3

# starting orientation in each mission
initial_orientations = np.array([get_initial_orientation(actionT,stateT) for actionT,stateT in zip(actions,raw_states)])
# the orientations for each step in each mission
orientations = np.array([get_orientations(actionT,io) for actionT,io in zip(actions,initial_orientations)])
# rotated states to match the agents orientation
rot_states = np.array([[rotate_state(s,o) for s,o in zip(sT,oT)] for sT,oT in zip(raw_states,orientations)])

In [3]:
fp_states = np.array([[first_personize(s) for s in stateT] for stateT in rot_states])
fp_states1 = np.array([[first_personize(s,scope=1) for s in stateT] for stateT in rot_states])
fp_states_view = np.array([[first_personize(s,behind=False) for s in stateT] for stateT in rot_states])

In [4]:
def qAgent(state):
    return getQuadrant(get_agent_coordinates(state))
    
def qDoor(state):
    if contains_door(state):
        return getQuadrant(get_door_coordinates(state))
    else: 
        return 0

def qKey(state):
    if contains_key(state):
        return getQuadrant(get_key_coordinates(state))
    else: 
        return 0

In [5]:
def a12(state):
    coords = get_agent_coordinates(state)
    X = coords[0]
    Y = coords[1]
    return state[Y+1][X+0][0]

def a6(state):
    coords = get_agent_coordinates(state)
    X = coords[0]
    Y = coords[1]
    return state[Y-1][X-1][0]

def a3(state):
    coords = get_agent_coordinates(state)
    X = coords[0]
    Y = coords[1]    
    return state[Y+0][X+1][0]

def a9(state):
    coords = get_agent_coordinates(state)
    X = coords[0]
    Y = coords[1]
    return state[Y+0][X-1][0]

def a130(state):
    coords = get_agent_coordinates(state)
    X = coords[0]
    Y = coords[1]    
    return state[Y+1][X+1][0]

def a1030(state):
    coords = get_agent_coordinates(state)
    X = coords[0]
    Y = coords[1]    
    return state[Y+1][X-1][0]

def a430(state):
    coords = get_agent_coordinates(state)
    X = coords[0]
    Y = coords[1]    
    return state[Y-1][X+1][0]

def a730(state):
    coords = get_agent_coordinates(state)
    X = coords[0]
    Y = coords[1]    
    return state[Y-1][X-1][0]

In [6]:
numericOrientation = {
    "N":0,
    "E":1,
    "S":2,
    "W":3
}
    
numericAction = {
    "start":0,
    "forward":1,
    "left":2,
    "right":3,
    "pickup":4,
    "toggle":5
}    
    
def priorAction(m,s):
    a = actions[m][s]
    return numericAction[a]

def agentOrientation(m,s):
    orientation = orientations[m][s]
    return numericOrientation[orientation]

In [7]:
def getView(state):
    view = first_personize(state)[:,:7]
    return view

def canSeeDoor(state):
    return 1 if contains_door(state) else 0
        
def canSeeKey(state):
    return 1 if contains_key(state) else 0
    
def distanceInFront(state):
    coords = get_agent_coordinates(state)
    X = coords[0]
    Y = coords[1]
    
    line_of_sight = state[Y+1:,X,0]
    distance = (line_of_sight!=1).argmax()
    return distance

In [9]:
def makeEvent(m,s):
    S = raw_states[m][s]
    Srot = rot_states[m][s]
    Sv = fp_states_view[m][s]
    
    phrase = 0
    if s == 0:
        phrase = 1
    
    if s == len(S)-1:
        phrase = -1
    
    dic = {
        "onset": int(s*24),
        "deltast": 0,
        "bioi": 24 if s > 0 else 0,
        "duration": 24,
        "cpitch": 69,
        "mpitch": 40,
        "accidental": 0,
        "keysig": 2,
        "mode": 0,
        "barlength": 96,
        "pulses": 4,
        "phrase": phrase,
        "voice": 1,
        "ornament": 0,
        "comma": 0,
        "vertint": "NIL",
        "articulation": 0,
        "dyn": 0,
        "priorAction": priorAction(m,s),
        "orientation": agentOrientation(m,s),
        "agentX": get_agent_coordinates(S)[0],
        "agentY": get_agent_coordinates(S)[1],
        "qAgent" : qAgent(S),
        "qKey": qKey(S),
        "qDoor": qDoor(S),
        "a12": a12(Srot),
        "a6": a6(Srot),
        "a3": a3(Srot),
        "a9": a9(Srot),
        "a1030": a1030(Srot),
        "a130": a130(Srot),
        "a430": a430(Srot),
        "a730": a730(Srot),
        "canSeeDoor": canSeeDoor(Sv),
        "canSeeKey": canSeeKey(Sv),
        "distance": distanceInFront(Sv)
     }
    return dic

def makeEvent2(m,s):
    S = raw_states[m][s]
    Srot = rot_states[m][s]
    Sv = fp_states_view[m][s]
        
    dic = {
        "priorAction": priorAction(m,s),
        "orientation": agentOrientation(m,s),
        "agentX": get_agent_coordinates(S)[0],
        "agentY": get_agent_coordinates(S)[1],
        "qAgent" : qAgent(S),
        "qKey": qKey(S),
        "qDoor": qDoor(S),
        "a12": a12(Srot),
        "a6": a6(Srot),
        "a3": a3(Srot),
        "a9": a9(Srot),
        "a1030": a1030(Srot),
        "a130": a130(Srot),
        "a430": a430(Srot),
        "a730": a730(Srot),
        "canSeeDoor": canSeeDoor(Sv),
        "canSeeKey": canSeeKey(Sv),
        "distance": distanceInFront(Sv)
     }
    return dic

events = np.array([[makeEvent(m,s) for s,S in enumerate(M)] for m,M in enumerate(raw_states)])
events2 = np.array([[makeEvent2(m,s) for s,S in enumerate(M)] for m,M in enumerate(raw_states)])

with open('gridworld-events-melody.pickle', 'wb') as f:
    pickle.dump(events, f, pickle.HIGHEST_PROTOCOL)
    
with open('gridworld-events.pickle', 'wb') as f:
    pickle.dump(events2, f, pickle.HIGHEST_PROTOCOL)

In [None]:
vp = [":ONSET", ":DELTAST", ":BIOI",":DUR",":CPITCH",":MPITCH",":ACCIDENTAL",":KEYSIG",":MODE",":BARLENGTH",":PULSES",":PHRASE",
      ":VOICE",":ORNAMENT",":COMMA",":VERTINT12",":ARTICULATION",":DYN",":ACTION",":ORIENTATION",":AGENTX",":AGENTY",
      ":AGENTQUAD",":KEYQUAD",":DOORQUAD",":AGENTINFRONT",":AGENTBEHIND",":AGENTRIGHT",":AGENTLEFT",":AGENTINFRONTLEFT",
      ":AGENTINFRONTRIGHT",":AGENTBEHINDRIGHT",":AGENTBEHINDLEFT",":CANSEEDOOR",":CANSEEKEY",":DISTANCE"]

def lispifyEvent(event):
    vals = event.values()
    result = "("
    for t in list(zip(vp,vals)):
        result+="("+t[0]+" "+str(t[1])+")"
    result+=")\n"
    return result

def lispifyMission(eventT,missionName):
    eventStrings = [lispifyEvent(event) for event in eventT]
    result = "("
    result+="\""+missionName+"\""
    for s in eventStrings:
        result+=s
    result+=")\n"
    return result

#missionNames = np.array(["m"+str(i) for i,m in enumerate(raw_states)])

#lisp_missions = np.array([lispifyMission(eventT,missionName) for eventT,missionName in zip(events,missionNames)])

#lisp_data = "(\"gridworld data\" 96 60 \n"
#for m in lisp_missions:
#    lisp_data+=m
    
#lisp_data+=")\n"

#with open("lisp-data31.lisp", "w") as text_file:
#    text_file.write(lisp_data)

In [None]:
missionId = 20
stateId = 8
S = rot_states[missionId][stateId]
E = events[missionId][stateId]
display_state(S)
print(E)