Skip to content

Implement local scheduler task queues using C++ data structures#392

Merged
robertnishihara merged 9 commits intoray-project:masterfrom
stephanie-wang:cpp-task-queues
Mar 30, 2017
Merged

Implement local scheduler task queues using C++ data structures#392
robertnishihara merged 9 commits intoray-project:masterfrom
stephanie-wang:cpp-task-queues

Conversation

@stephanie-wang
Copy link
Copy Markdown
Contributor

This replaces the utarray and utlist C library implementation for the local scheduler's task queue with an implementation using the C++ STL.

@robertnishihara
Copy link
Copy Markdown
Collaborator

This is related to #324 (which does something similar for Plasma), though #324 needs to be rebased.

@robertnishihara
Copy link
Copy Markdown
Collaborator

FYI @rshin.

@AmplabJenkins
Copy link
Copy Markdown

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link
Copy Markdown

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/395/
Test PASSed.

void remove_actor(SchedulingAlgorithmState *algorithm_state, ActorID actor_id);

typedef struct task_queue_entry {
typedef struct TaskQueueEntry {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typedef is unnecessary in C++

object_entry *remote_objects;
};

TaskQueueEntry *TaskQueueEntry_init(TaskSpec *spec, int64_t task_spec_size) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to replace these with a constructor and a destructor.

/** A queue of tasks to be executed on this actor. The tasks will be sorted by
* the order of their actor counters. */
task_queue_entry *task_queue;
std::list<TaskQueueEntry *> *task_queue;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you drop the *s (std::list<TaskQueueEntry> task_queue) then you don't need the allocation/deallocation code.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should probably use std::vector instead of std::list as it has better data locality

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it may actually make more sense to use std::list (or maybe a deque) in this case, since we remove elements from the head and occasionally the middle.

free(elt->spec);
free(elt);
for (auto it = algorithm_state->dispatch_task_queue->begin();
it != algorithm_state->dispatch_task_queue->end(); ++it) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use for (auto &entry : algorithm_state->dispatch_task_queue) {
(but this can also be avoided by not putting pointers in the list)

/** A queue of tasks to be executed on this actor. The tasks will be sorted by
* the order of their actor counters. */
task_queue_entry *task_queue;
std::list<TaskQueueEntry *> *task_queue;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should probably use std::vector instead of std::list as it has better data locality

HASH_ITER(hh, algorithm_state->remote_objects, obj_entry, tmp_obj_entry) {
HASH_DELETE(hh, algorithm_state->remote_objects, obj_entry);
utarray_free(obj_entry->dependent_tasks);
delete obj_entry->dependent_tasks;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In modern C++ code, it is good idea to try to never use new or delete. It is better to use std::unique_ptr or some other smart pointer to handle deallocation in an error-safe way (this is usually called RAII)

entry->task_counter = 0;
/* Initialize the doubly-linked list to NULL. */
entry->task_queue = NULL;
entry->task_queue = new std::list<TaskQueueEntry *>();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comments above. Perhaps the task queue would be better encapsulated in a class?

@robertnishihara
Copy link
Copy Markdown
Collaborator

robertnishihara commented Mar 24, 2017

Thanks @rshin and @wesm! Those are good points. And I agree it'd be nice to find a way to avoid the std::list.

@stephanie-wang, I guess the question is should we try to convert from C to C++ gradually? Or should we just go ahead and convert the whole local scheduler right now (e.g., switch to using classes, no calls to new/delete etc.

Do you have a sense of how long that conversion would take?

@AmplabJenkins
Copy link
Copy Markdown

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link
Copy Markdown

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/400/
Test PASSed.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also do free(entry) here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're using std::list<TaskQueueEntry>, entries that are removed from the list will automatically get freed (in contrast to if we had a std::list<TaskQueueEntry *> like in a previous commit).

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do this? We just memset algorithm_state to all 0's and now we're calling a method of one of the fields?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has to do with the way that std::unique_ptr's work. I think that zeroing out a std::unique_ptr is like initializing it to hold a NULL pointer. You can then replace the pointer that it holds with a pointer to an actual object, like we're doing here.

These docs were useful: http://www.cplusplus.com/reference/memory/unique_ptr/

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't seen the whole code, but generally speaking you should (almost) never need std::unique_ptr<std::vector<T> >; you should just do std::vector<T> and then call vec1.swap(vec2) if you need to move them around the vectors for some reason. Note that swap() doesn't invalid iterators/pointers/references to elements for std::vector, so it's not a problem if that's what you're worried about (this behavior differs from that of std::basic_string).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh and the same goes for std::unique_ptr<std::list<T> >.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you introduce the calls to memset? I imagine it crashes without the memset?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is a bit awkward, but it has to be this way for now while we're mixing C structs and C++ objects. When we malloc the LocalActorInfo, it might get initialized with garbage values. A std::unique_ptr that is a member of the struct will also get initialized with garbage values, and it will seem to hold a pointer, but the pointer isn't valid. When we reset the std::unique_ptr to point to the actual object we want it to point to, it tries to clean up the invalid pointer under the hood. So, I used memset so that it gets initialized to hold a NULL pointer.

Copy link
Copy Markdown
Contributor

@mehrdadn mehrdadn Mar 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The (semi-)C++ way to do this would be LocalActorInfo *entry = new LocalActorInfo(), and to then define LocalActorInfo::LocalActorInfo() : member1(), member2(), member3(), ... { }, and finally call delete after. As-is, this is a bad idea because if LocalActorInfo ever ends up with anything that has a destructor, you'll either have to call them manually (at which point you might as well define the destructor and just call delete) or they won't be called at all.

(The full C++ way would use something like std::unique_ptr etc. which I'd recommend gradually switching wherever you're finding yourself making other changes like these.)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd just use size_t (or maybe auto, though I'd be careful where I sprinkle auto) here. No need to change the signedness unnecessarily.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should never have to explicitly free resources anywhere but inside a destructor in C++ code.
It seems like what you really want is std::list<std::unique_ptr<TaskQueueEntry> >, with the destructor TaskQueueEntry::~TaskQueueEntry defined appropriately. Then you can take out this for loop and just call algorithm_state->waiting_task_queue.reset() if needed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Or maybe just std::list<TaskQueueEntry> -- depends on what you're trying to do.)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

C++ way would be to just define the constructor TaskQueueEntry::TaskQueueEntry appropriately; no init methods anywhere.

Copy link
Copy Markdown
Contributor

@mehrdadn mehrdadn Mar 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

C++ way would be to express the algorithm at a higher level like this:

std::find_if(entry->task_queue->begin(), entry->task_queue->end(), [&](TaskQueueEntry &v)
{
	return task_counter > TaskSpec_actor_counter(v);
})

@atumanov
Copy link
Copy Markdown
Contributor

Let's make a sanity check here : is this the right time to do this C to C++ conversion?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Higher-level comment (not C++-related, maybe for later), but a linear scan seems like something you should never want to do. Perhaps a priority queue?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems you don't need to copy this? You can just use it directly right?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never use int to denote a size. :)

@mehrdadn
Copy link
Copy Markdown
Contributor

(P.S. sorry, I just saw some of my comments were repeats of earlier ones by others... didn't see that until now.)

@robertnishihara
Copy link
Copy Markdown
Collaborator

@mehrdadn thanks for the review! I think this discussion makes it clear that there is a ton of work to do to convert the project to a C++ style #406. It's probably not desirable to radically change the style locally (in this PR) without doing it more globally as well. That is, if we're using malloc/free for memory management everywhere else in the project, and we need to use something in the C++ STL, then it looks like the simplest thing to do is to do a direct translation from malloc/free to new/delete and to defer the usage of unique_ptr a little further down the road when we do a project-wide change.

@AmplabJenkins
Copy link
Copy Markdown

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link
Copy Markdown

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/406/
Test PASSed.

++it;
}
DL_APPEND_ELEM(entry->task_queue, current_entry, elt);
entry->task_queue->insert(it, elt);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a change in semantics (insert prepends whereas before we were appending). It looks like the previous approach was incorrect, although maybe the problem never arose because actor methods always arrived in order.

TaskSpec *spec,
int64_t task_spec_size,
bool from_global_scheduler) {
TaskQueueEntry *queue_task(LocalSchedulerState *state,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should return Void.

@AmplabJenkins
Copy link
Copy Markdown

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link
Copy Markdown

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/408/
Test PASSed.

task_queue_entry *elt;
int count;
DL_COUNT(algorithm_state->dispatch_task_queue, elt, count);
TaskQueueEntry *elt;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can get rid of this, maybe get rid of the whole method now

task_queue_entry *elt;
int count;
DL_COUNT(algorithm_state->waiting_task_queue, elt, count);
TaskQueueEntry *elt;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can get rid of this, maybe can get rid of the whole method now

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I think we still need it for testing purposes (definition of SchedulingAlgorithmState isn't exposed).

@AmplabJenkins
Copy link
Copy Markdown

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link
Copy Markdown

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/409/
Test PASSed.

/** A vector of the tasks dependent on this object. */
std::vector<TaskQueueEntry> *dependent_tasks;
/** A vector of tasks dependent on this object. These tasks are a subset of
* the tasks in the waiting queue. Each element is actually store a reference
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these lines should have two spaces after the *

fetch_missing_dependencies(state, algorithm_state, &task_entry);
/* There must be at least one missing dependency, so record it. The task was
* just queued, so we can get a reference to it by going to the last element
* in the waiting queue. */
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is a bit scary.

Can we avoid this by passing in the iterator?

If we really need to do this, maybe we should do CHECK(it->spec == spec) or something like that.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to get the iterator somehow, and I think this is the best way to do it. I changed this so that queue_task returns the correct iterator, though, so we can be sure that no one modified the list in between.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, this looks better since it should no longer depend on the implementation of queue_task.

* spec. */
it = algorithm_state->dispatch_task_queue->erase(it);
} else {
/* The task can still run,so continue to the next task. */
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a space after ,

DL_DELETE(algorithm_state->dispatch_task_queue, elt);
DL_APPEND(algorithm_state->waiting_task_queue, elt);
fetch_missing_dependency(state, algorithm_state, elt,
fetch_missing_dependency(state, algorithm_state, it,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't necessary need to be addressed in this PR, but are we potentially calling fetch_missing_dependency a bunch of times on the same object ID?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, could you at least add a comment with a TODO to address this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we want to do that, since multiple tasks may depend on the same object ID. The same task will never appear more than once under the same object ID though.

@AmplabJenkins
Copy link
Copy Markdown

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link
Copy Markdown

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/416/
Test FAILed.

@AmplabJenkins
Copy link
Copy Markdown

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link
Copy Markdown

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/417/
Test PASSed.

@AmplabJenkins
Copy link
Copy Markdown

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link
Copy Markdown

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/421/
Test PASSed.

/** An array of the tasks dependent on this object. */
UT_array *dependent_tasks;
/** A vector of tasks dependent on this object. These tasks are a subset of
* the tasks in the waiting queue. Each element is actually store a
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Each element actually stores a reference to the corresponding...

/* Assign as many tasks as we can, while there are workers available. */
DL_FOREACH_SAFE(algorithm_state->dispatch_task_queue, elt, tmp) {
auto it = algorithm_state->dispatch_task_queue->begin();
while (it != algorithm_state->dispatch_task_queue->end()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could be written as a for loop:
for (auto it = algorithm_state->dispatch_task_queue->begin(); it != algorithm_state->dispatch_task_queue->end();) {

@AmplabJenkins
Copy link
Copy Markdown

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link
Copy Markdown

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/429/
Test PASSed.

@robertnishihara robertnishihara merged commit 036b873 into ray-project:master Mar 30, 2017
@robertnishihara robertnishihara deleted the cpp-task-queues branch March 30, 2017 07:40
atumanov pushed a commit to atumanov/ray that referenced this pull request Apr 4, 2017
…project#392)

* Switch to using C++ lists for task queues

* Init and free methods for TaskQueueEntry

* Switch from utarray to c++ vector for TaskQueueEntry

* Get rid of some pointers

* Back to O(1) deletion from waiting_task_queue

* Fix comments

* Cut code

* Non const iterators

* Fix Alexey's comments
task_queue->push_back(*task_entry);
/* Since we just queued the task, we can get a reference to it by going to
* the last element in the queue. */
auto it = state->algorithm_state->waiting_task_queue->end();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a bug, it should be

auto it = task_queue->end();

Is that right @stephanie-wang?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be fixed in #443.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes. Good catch!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants