/
env.py
executable file
·227 lines (206 loc) · 9.62 KB
/
env.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
''' Batched Room-to-Room navigation environment '''
import sys
sys.path.append('build')
import MatterSim
import csv
import numpy as np
import math
import base64
import json
import random
import networkx as nx
from utils import load_datasets, load_nav_graphs
csv.field_size_limit(sys.maxsize)
class EnvBatch():
''' A simple wrapper for a batch of MatterSim environments,
using discretized viewpoints and pretrained features '''
def __init__(self, feature_store=None, batch_size=100):
if feature_store:
print('Loading image features from %s' % feature_store)
tsv_fieldnames = ['scanId', 'viewpointId', 'image_w','image_h', 'vfov', 'features']
self.features = {}
with open(feature_store, "rt") as tsv_in_file:
reader = csv.DictReader(tsv_in_file, delimiter='\t', fieldnames = tsv_fieldnames)
for item in reader:
self.image_h = int(item['image_h'])
self.image_w = int(item['image_w'])
self.vfov = int(item['vfov'])
long_id = self._make_id(item['scanId'], item['viewpointId'])
self.features[long_id] = np.frombuffer(base64.b64decode(item['features']),
dtype=np.float32).reshape((36, 2048))
else:
print('Image features not provided')
self.features = None
self.image_w = 640
self.image_h = 480
self.vfov = 60
self.batch_size = batch_size
self.sim = MatterSim.Simulator()
self.sim.setRenderingEnabled(False)
self.sim.setDiscretizedViewingAngles(True)
self.sim.setBatchSize(self.batch_size)
self.sim.setCameraResolution(self.image_w, self.image_h)
self.sim.setCameraVFOV(math.radians(self.vfov))
self.sim.initialize()
def _make_id(self, scanId, viewpointId):
return scanId + '_' + viewpointId
def newEpisodes(self, scanIds, viewpointIds, headings):
self.sim.newEpisode(scanIds, viewpointIds, headings, [0]*self.batch_size)
def getStates(self):
''' Get list of states augmented with precomputed image features. rgb field will be empty. '''
feature_states = []
for state in self.sim.getState():
long_id = self._make_id(state.scanId, state.location.viewpointId)
if self.features:
feature = self.features[long_id][state.viewIndex,:]
feature_states.append((feature, state))
else:
feature_states.append((None, state))
return feature_states
def makeActions(self, actions):
''' Take an action using the full state dependent action interface (with batched input).
Every action element should be an (index, heading, elevation) tuple. '''
ix = []
heading = []
elevation = []
for i,h,e in actions:
ix.append(int(i))
heading.append(float(h))
elevation.append(float(e))
self.sim.makeAction(ix, heading, elevation)
def makeSimpleActions(self, simple_indices):
''' Take an action using a simple interface: 0-forward, 1-turn left, 2-turn right, 3-look up, 4-look down.
All viewpoint changes are 30 degrees. Forward, look up and look down may not succeed - check state.
WARNING - Very likely this simple interface restricts some edges in the graph. Parts of the
environment may not longer be navigable. '''
actions = []
for i, index in enumerate(simple_indices):
if index == 0:
actions.append((1, 0, 0))
elif index == 1:
actions.append((0,-1, 0))
elif index == 2:
actions.append((0, 1, 0))
elif index == 3:
actions.append((0, 0, 1))
elif index == 4:
actions.append((0, 0,-1))
else:
sys.exit("Invalid simple action");
self.makeActions(actions)
class R2RBatch():
''' Implements the Room to Room navigation task, using discretized viewpoints and pretrained features '''
def __init__(self, feature_store, batch_size=100, seed=10, splits=['train'], tokenizer=None):
self.env = EnvBatch(feature_store=feature_store, batch_size=batch_size)
self.data = []
self.scans = []
for item in load_datasets(splits):
# Split multiple instructions into separate entries
for j,instr in enumerate(item['instructions']):
self.scans.append(item['scan'])
new_item = dict(item)
new_item['instr_id'] = '%s_%d' % (item['path_id'], j)
new_item['instructions'] = instr
if tokenizer:
new_item['instr_encoding'] = tokenizer.encode_sentence(instr)
self.data.append(new_item)
self.scans = set(self.scans)
self.splits = splits
self.seed = seed
random.seed(self.seed)
random.shuffle(self.data)
self.ix = 0
self.batch_size = batch_size
self._load_nav_graphs()
print('R2RBatch loaded with %d instructions, using splits: %s' % (len(self.data), ",".join(splits)))
def _load_nav_graphs(self):
''' Load connectivity graph for each scan, useful for reasoning about shortest paths '''
print('Loading navigation graphs for %d scans' % len(self.scans))
self.graphs = load_nav_graphs(self.scans)
self.paths = {}
for scan,G in self.graphs.items(): # compute all shortest paths
self.paths[scan] = dict(nx.all_pairs_dijkstra_path(G))
self.distances = {}
for scan,G in self.graphs.items(): # compute all shortest paths
self.distances[scan] = dict(nx.all_pairs_dijkstra_path_length(G))
def _next_minibatch(self):
batch = self.data[self.ix:self.ix+self.batch_size]
if len(batch) < self.batch_size:
random.shuffle(self.data)
self.ix = self.batch_size - len(batch)
batch += self.data[:self.ix]
else:
self.ix += self.batch_size
self.batch = batch
def reset_epoch(self):
''' Reset the data index to beginning of epoch. Primarily for testing.
You must still call reset() for a new episode. '''
self.ix = 0
def _shortest_path_action(self, state, goalViewpointId):
''' Determine next action on the shortest path to goal, for supervised training. '''
if state.location.viewpointId == goalViewpointId:
return (0, 0, 0) # do nothing
path = self.paths[state.scanId][state.location.viewpointId][goalViewpointId]
nextViewpointId = path[1]
# Can we see the next viewpoint?
for i,loc in enumerate(state.navigableLocations):
if loc.viewpointId == nextViewpointId:
# Look directly at the viewpoint before moving
if loc.rel_heading > math.pi/6.0:
return (0, 1, 0) # Turn right
elif loc.rel_heading < -math.pi/6.0:
return (0,-1, 0) # Turn left
elif loc.rel_elevation > math.pi/6.0 and state.viewIndex//12 < 2:
return (0, 0, 1) # Look up
elif loc.rel_elevation < -math.pi/6.0 and state.viewIndex//12 > 0:
return (0, 0,-1) # Look down
else:
return (i, 0, 0) # Move
# Can't see it - first neutralize camera elevation
if state.viewIndex//12 == 0:
return (0, 0, 1) # Look up
elif state.viewIndex//12 == 2:
return (0, 0,-1) # Look down
# Otherwise decide which way to turn
pos = [state.location.x, state.location.y, state.location.z]
target_rel = self.graphs[state.scanId].node[nextViewpointId]['position'] - pos
target_heading = math.pi/2.0 - math.atan2(target_rel[1], target_rel[0]) # convert to rel to y axis
if target_heading < 0:
target_heading += 2.0*math.pi
if state.heading > target_heading and state.heading - target_heading < math.pi:
return (0,-1, 0) # Turn left
if target_heading > state.heading and target_heading - state.heading > math.pi:
return (0,-1, 0) # Turn left
return (0, 1, 0) # Turn right
def _get_obs(self):
obs = []
for i,(feature,state) in enumerate(self.env.getStates()):
item = self.batch[i]
obs.append({
'instr_id' : item['instr_id'],
'scan' : state.scanId,
'viewpoint' : state.location.viewpointId,
'viewIndex' : state.viewIndex,
'heading' : state.heading,
'elevation' : state.elevation,
'feature' : feature,
'step' : state.step,
'navigableLocations' : state.navigableLocations,
'instructions' : item['instructions'],
'teacher' : self._shortest_path_action(state, item['path'][-1]),
})
if 'instr_encoding' in item:
obs[-1]['instr_encoding'] = item['instr_encoding']
return obs
def reset(self):
''' Load a new minibatch / episodes. '''
self._next_minibatch()
scanIds = [item['scan'] for item in self.batch]
viewpointIds = [item['path'][0] for item in self.batch]
headings = [item['heading'] for item in self.batch]
self.env.newEpisodes(scanIds, viewpointIds, headings)
return self._get_obs()
def step(self, actions):
''' Take action (same interface as makeActions) '''
self.env.makeActions(actions)
return self._get_obs()