-
Notifications
You must be signed in to change notification settings - Fork 23
/
abstract.py
155 lines (128 loc) · 4.64 KB
/
abstract.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
from costar_task_plan.abstract import NullReward
from costar_task_plan.simulation.world import *
from costar_task_plan.simulation.camera import *
import pybullet as pb
import rospkg
import os
class AbstractTaskDefinition(object):
'''
Defines how we create the world, gives us the reward function, etc. This
builds off of the costar task plan library: it will allow us to create a
World object that represents a particular environment, based off of the
basic BulletWorld.
'''
def __init__(self, robot, features, seed=None, option=None, save=False, *args, **kwargs):
'''
We do not create a world here, but we may need to cache things or read
them off of the ROS parameter server as necessary.
'''
self.seed = seed
self.option = option
self.robot = robot
self.world = None
self.features = features
self.save_world = save
# local storage for object info
self._objs_by_type = {}
self._type_and_name_by_obj = {}
self._cameras = []
def clear(self):
self.world = None
self._objs_by_type = {}
self._type_and_name_by_obj = {}
def addCamera(self, camera):
assert isinstance(camera, Camera)
self._cameras.append(camera)
def capture(self):
imgs = []
for camera in self._cameras:
imgs.append(camera.capture())
return imgs
def addObject(self, typename, objname, obj_id):
'''
Create an object and add it to the world. This will automatically track
and update useful information based on the object's current position
during each world update.
'''
if typename not in self._objs_by_type:
self._objs_by_type[typename] = [obj_id]
num = 0
else:
num = len(self._objs_by_type[typename])
self._objs_by_type[typename].append(obj_id)
self._type_and_name_by_obj[obj_id] = (typename, objname)
def getName(self):
raise NotImplementedError('should return name describing this task')
def makeWorld(self):
'''
Helper function to create the world
'''
world = SimulationWorld(
dt=0.1,
simulation_step=0.01,
task_name=self.getName(),
history_length=0,
cameras=self._cameras)
world.features = self.features
return world
def setup(self):
'''
Create task by adding objects to the scene, including the robot.
'''
rospack = rospkg.RosPack()
path = rospack.get_path('costar_simulation')
static_plane_path = os.path.join(path, 'meshes', 'world', 'plane.urdf')
pb.loadURDF(static_plane_path)
self.world = self.makeWorld()
self.task = self._makeTask()
self._setup()
handle = self.robot.load()
pb.setGravity(0, 0, -9.807)
self._setupRobot(handle)
state = self.robot.getState()
self.world.addActor(SimulationRobotActor(
robot=self.robot,
dynamics=SimulationDynamics(self.world),
policy=NullPolicy(),
state=state))
self._updateWorld()
for handle, (obj_type, obj_name) in self._type_and_name_by_obj.items():
# Create an object and add it to the World
state = GetObjectState(handle)
self.world.addObject(obj_name, obj_type, handle, state)
self.task.compile(self.getObjects())
def getObjects(self):
'''
By default, use the world's configured list of objects.
'''
return self.world.getObjects()
def reset(self):
'''
Reset the whole simulation into a working configuration.
'''
raise NotImplementedError('Task must override the reset() function!')
def getReward(self):
return NullReward()
def _setup(self):
'''
Setup any world objects after the robot has been created.
'''
raise NotImplementedError('Must override the _setup() function!')
def _setupRobot(self, handle):
'''
Do anything you need to do to the robot before it
'''
raise NotImplementedError('Must override the _setupRobot() function!')
def _updateWorld(self):
'''
Do anything necessary to add other agents to the world.
'''
pass
def _makeTask(self):
'''
Create the task model that we are attempting to learn policies for.
'''
raise NotImplementedError('Must override the _makeTask() function!')
def cloneRobot(self):
robot_type = type(self.robot)
return robot_type()