/
task_spec.cc
268 lines (216 loc) · 8.85 KB
/
task_spec.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
#include <sstream>
#include "ray/common/task/task_spec.h"
#include "ray/util/logging.h"
namespace ray {
absl::Mutex TaskSpecification::mutex_;
std::unordered_map<SchedulingClassDescriptor, SchedulingClass>
TaskSpecification::sched_cls_to_id_;
std::unordered_map<SchedulingClass, SchedulingClassDescriptor>
TaskSpecification::sched_id_to_cls_;
int TaskSpecification::next_sched_id_;
SchedulingClassDescriptor &TaskSpecification::GetSchedulingClassDescriptor(
SchedulingClass id) {
absl::MutexLock lock(&mutex_);
auto it = sched_id_to_cls_.find(id);
RAY_CHECK(it != sched_id_to_cls_.end()) << "invalid id: " << id;
return it->second;
}
void TaskSpecification::ComputeResources() {
auto required_resources = MapFromProtobuf(message_->required_resources());
auto required_placement_resources =
MapFromProtobuf(message_->required_placement_resources());
if (required_placement_resources.empty()) {
required_placement_resources = required_resources;
}
required_resources_.reset(new ResourceSet(required_resources));
required_placement_resources_.reset(new ResourceSet(required_placement_resources));
// Map the scheduling class descriptor to an integer for performance.
auto sched_cls = std::make_pair(GetRequiredResources(), FunctionDescriptor());
absl::MutexLock lock(&mutex_);
auto it = sched_cls_to_id_.find(sched_cls);
if (it == sched_cls_to_id_.end()) {
sched_cls_id_ = ++next_sched_id_;
// TODO(ekl) we might want to try cleaning up task types in these cases
if (sched_cls_id_ > 100) {
RAY_LOG(WARNING) << "More than " << sched_cls_id_
<< " types of tasks seen, this may reduce performance.";
} else if (sched_cls_id_ > 1000) {
RAY_LOG(ERROR) << "More than " << sched_cls_id_
<< " types of tasks seen, this may reduce performance.";
}
sched_cls_to_id_[sched_cls] = sched_cls_id_;
sched_id_to_cls_[sched_cls_id_] = sched_cls;
} else {
sched_cls_id_ = it->second;
}
}
// Task specification getter methods.
TaskID TaskSpecification::TaskId() const {
return TaskID::FromBinary(message_->task_id());
}
JobID TaskSpecification::JobId() const { return JobID::FromBinary(message_->job_id()); }
TaskID TaskSpecification::ParentTaskId() const {
return TaskID::FromBinary(message_->parent_task_id());
}
size_t TaskSpecification::ParentCounter() const { return message_->parent_counter(); }
std::vector<std::string> TaskSpecification::FunctionDescriptor() const {
return VectorFromProtobuf(message_->function_descriptor());
}
const SchedulingClass TaskSpecification::GetSchedulingClass() const {
RAY_CHECK(sched_cls_id_ > 0);
return sched_cls_id_;
}
size_t TaskSpecification::NumArgs() const { return message_->args_size(); }
size_t TaskSpecification::NumReturns() const { return message_->num_returns(); }
ObjectID TaskSpecification::ReturnId(size_t return_index,
TaskTransportType transport_type) const {
return ObjectID::ForTaskReturn(TaskId(), return_index + 1,
static_cast<uint8_t>(transport_type));
}
bool TaskSpecification::ArgByRef(size_t arg_index) const {
return (ArgIdCount(arg_index) != 0);
}
size_t TaskSpecification::ArgIdCount(size_t arg_index) const {
return message_->args(arg_index).object_ids_size();
}
ObjectID TaskSpecification::ArgId(size_t arg_index, size_t id_index) const {
return ObjectID::FromBinary(message_->args(arg_index).object_ids(id_index));
}
const uint8_t *TaskSpecification::ArgData(size_t arg_index) const {
return reinterpret_cast<const uint8_t *>(message_->args(arg_index).data().data());
}
size_t TaskSpecification::ArgDataSize(size_t arg_index) const {
return message_->args(arg_index).data().size();
}
const uint8_t *TaskSpecification::ArgMetadata(size_t arg_index) const {
return reinterpret_cast<const uint8_t *>(message_->args(arg_index).metadata().data());
}
size_t TaskSpecification::ArgMetadataSize(size_t arg_index) const {
return message_->args(arg_index).metadata().size();
}
const ResourceSet &TaskSpecification::GetRequiredResources() const {
return *required_resources_;
}
std::vector<ObjectID> TaskSpecification::GetDependencies() const {
std::vector<ObjectID> dependencies;
for (size_t i = 0; i < NumArgs(); ++i) {
int count = ArgIdCount(i);
for (int j = 0; j < count; j++) {
dependencies.push_back(ArgId(i, j));
}
}
if (IsActorTask()) {
dependencies.push_back(PreviousActorTaskDummyObjectId());
}
return dependencies;
}
const ResourceSet &TaskSpecification::GetRequiredPlacementResources() const {
return *required_placement_resources_;
}
bool TaskSpecification::IsDriverTask() const {
// Driver tasks are empty tasks that have no function ID set.
return FunctionDescriptor().empty();
}
Language TaskSpecification::GetLanguage() const { return message_->language(); }
bool TaskSpecification::IsNormalTask() const {
return message_->type() == TaskType::NORMAL_TASK;
}
bool TaskSpecification::IsActorCreationTask() const {
return message_->type() == TaskType::ACTOR_CREATION_TASK;
}
bool TaskSpecification::IsActorTask() const {
return message_->type() == TaskType::ACTOR_TASK;
}
// === Below are getter methods specific to actor creation tasks.
ActorID TaskSpecification::ActorCreationId() const {
RAY_CHECK(IsActorCreationTask());
return ActorID::FromBinary(message_->actor_creation_task_spec().actor_id());
}
uint64_t TaskSpecification::MaxActorReconstructions() const {
RAY_CHECK(IsActorCreationTask());
return message_->actor_creation_task_spec().max_actor_reconstructions();
}
std::vector<std::string> TaskSpecification::DynamicWorkerOptions() const {
RAY_CHECK(IsActorCreationTask());
return VectorFromProtobuf(
message_->actor_creation_task_spec().dynamic_worker_options());
}
TaskID TaskSpecification::CallerId() const {
return TaskID::FromBinary(message_->caller_id());
}
// === Below are getter methods specific to actor tasks.
ActorID TaskSpecification::ActorId() const {
RAY_CHECK(IsActorTask());
return ActorID::FromBinary(message_->actor_task_spec().actor_id());
}
uint64_t TaskSpecification::ActorCounter() const {
RAY_CHECK(IsActorTask());
return message_->actor_task_spec().actor_counter();
}
ObjectID TaskSpecification::ActorCreationDummyObjectId() const {
RAY_CHECK(IsActorTask());
return ObjectID::FromBinary(
message_->actor_task_spec().actor_creation_dummy_object_id());
}
ObjectID TaskSpecification::PreviousActorTaskDummyObjectId() const {
RAY_CHECK(IsActorTask());
return ObjectID::FromBinary(
message_->actor_task_spec().previous_actor_task_dummy_object_id());
}
ObjectID TaskSpecification::ActorDummyObject() const {
RAY_CHECK(IsActorTask() || IsActorCreationTask());
return ReturnId(NumReturns() - 1, TaskTransportType::RAYLET);
}
bool TaskSpecification::IsDirectCall() const { return message_->is_direct_call(); }
bool TaskSpecification::IsDirectActorCreationCall() const {
if (IsActorCreationTask()) {
return message_->actor_creation_task_spec().is_direct_call();
} else {
return false;
}
}
int TaskSpecification::MaxActorConcurrency() const {
RAY_CHECK(IsActorCreationTask());
return message_->actor_creation_task_spec().max_concurrency();
}
bool TaskSpecification::IsAsyncioActor() const {
RAY_CHECK(IsActorCreationTask());
return message_->actor_creation_task_spec().is_asyncio();
}
bool TaskSpecification::IsDetachedActor() const {
return IsActorCreationTask() && message_->actor_creation_task_spec().is_detached();
}
std::string TaskSpecification::DebugString() const {
std::ostringstream stream;
stream << "Type=" << TaskType_Name(message_->type())
<< ", Language=" << Language_Name(message_->language())
<< ", function_descriptor=";
// Print function descriptor.
const auto list = VectorFromProtobuf(message_->function_descriptor());
// The 4th is the code hash which is binary bits. No need to output it.
const size_t size = std::min(static_cast<size_t>(3), list.size());
for (size_t i = 0; i < size; ++i) {
if (i != 0) {
stream << ",";
}
stream << list[i];
}
stream << ", task_id=" << TaskId() << ", job_id=" << JobId()
<< ", num_args=" << NumArgs() << ", num_returns=" << NumReturns();
if (IsActorCreationTask()) {
// Print actor creation task spec.
stream << ", actor_creation_task_spec={actor_id=" << ActorCreationId()
<< ", max_reconstructions=" << MaxActorReconstructions()
<< ", is_direct_call=" << IsDirectCall()
<< ", max_concurrency=" << MaxActorConcurrency()
<< ", is_asyncio_actor=" << IsAsyncioActor()
<< ", is_detached=" << IsDetachedActor() << "}";
} else if (IsActorTask()) {
// Print actor task spec.
stream << ", actor_task_spec={actor_id=" << ActorId()
<< ", actor_caller_id=" << CallerId() << ", actor_counter=" << ActorCounter()
<< "}";
}
return stream.str();
}
} // namespace ray