Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add getWorkerTask functions #601

Merged
merged 1 commit into from
Apr 12, 2014
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions source/vibe/core/core.d
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import std.functional;
import std.range : empty, front, popFront;
import std.string;
import std.variant;
import std.typecons : Typedef;
import core.atomic;
import core.sync.condition;
import core.sync.mutex;
Expand Down Expand Up @@ -226,6 +227,47 @@ void runWorkerTask(alias method, T, ARGS...)(shared(T) object, ARGS args)
runWorkerTask_unsafe({ __traits(getMember, object, __traits(identifier, method))(args); });
}

/**
Runs a new asynchronous task in a worker thread, returning the task.

This function will yield and wait for the new task to be created and started
in the worker thread, then resume and return it.


Only function pointers with weakly isolated arguments are allowed to be
able to guarantee thread-safety.
*/
Task getWorkerTask(R, ARGS...)(R function(ARGS) func, ARGS args)
{
alias Typedef!(Task, Task.init, __PRETTY_FUNCTION__) PrivateTask;
foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
Task caller = Task.getThis();
runWorkerTask_unsafe({
PrivateTask callee = Task.getThis();
caller.prioritySend(callee);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You send the Task as first message with priority, so it will be the first one that is received and
you don't need to use a Typedef, receive((Task t) { result = t; }); should work just as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except if someone else priority-sent something that hasn't yet been received before calling getWorkerTask. Because it could come from any thread, it would at least be a race condition. Or am I missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As Ludwig said, the typedef is necessary because the caller task could receive a Task message at any moment before the callee task is even started. This could happen even without threads: the caller task creates the worker delegate and passes it to runWorkerTask_unsafe, than sits waiting for a Task message; a different task sends (even without priority) a Task message before the worker delegate has been popped from the worker queue; the caller task receives it and mistakes it for the callee task.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right, I forgot that it's receive not receiveFrom.

func(args);
});
Task result;
receive((PrivateTask val) { result = to!Task(val); });
return result;
}
/// ditto
Task getWorkerTask(alias method, T, ARGS...)(shared(T) object, ARGS args)
if (is(typeof(__traits(getMember, object, __traits(identifier, method)))))
{
alias Typedef!(Task, Task.init, __PRETTY_FUNCTION__) PrivateTask;
foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
Task caller = Task.getThis();
runWorkerTask_unsafe({
PrivateTask callee = Task.getThis();
caller.prioritySend(callee);
__traits(getMember, object, __traits(identifier, method))(args);
});
Task result;
receive((PrivateTask val) { result = to!Task(val); });
return result;
}

/// Running a worker task using a function
unittest {
static void workerFunc(int param)
Expand Down Expand Up @@ -255,6 +297,53 @@ unittest {
}
}

/// Running a worker task using a function and communicating with it
unittest {
static void workerFunc(Task caller)
{
int counter = 10;
while (receiveOnly!string() == "ping" && --counter) {
logInfo("pong");
caller.send("pong");
}
caller.send("goodbye");

}

static void test()
{
Task callee = getWorkerTask(&workerFunc, Task.getThis);
do {
logInfo("ping");
callee.send("ping");
} while (receiveOnly!string() == "pong");
}
}

/// Running a worker task using a class method and communicating with it
unittest {
static class Test {
void workerMethod(Task caller) shared {
int counter = 10;
while (receiveOnly!string() == "ping" && --counter) {
logInfo("pong");
caller.send("pong");
}
caller.send("goodbye");
}
}

static void test()
{
auto cls = new shared Test;
Task callee = getWorkerTask!(Test.workerMethod)(cls, Task.getThis());
do {
logInfo("ping");
callee.send("ping");
} while (receiveOnly!string() == "pong");
}
}

private void runWorkerTask_unsafe(void delegate() del)
{
setupWorkerThreads();
Expand Down