Skip to content

Commit

Permalink
Merge pull request #601 from lultimouomo/getWorkerTask
Browse files Browse the repository at this point in the history
Add getWorkerTask functions
  • Loading branch information
s-ludwig committed Apr 12, 2014
2 parents dcdea06 + 6bbfa85 commit a5eee15
Showing 1 changed file with 89 additions and 0 deletions.
89 changes: 89 additions & 0 deletions source/vibe/core/core.d
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import std.functional;
import std.range : empty, front, popFront;
import std.string;
import std.variant;
import std.typecons : Typedef;
import core.atomic;
import core.sync.condition;
import core.sync.mutex;
Expand Down Expand Up @@ -236,6 +237,47 @@ void runWorkerTask(alias method, T, ARGS...)(shared(T) object, ARGS args)
runWorkerTask_unsafe({ __traits(getMember, object, __traits(identifier, method))(args); });
}

/**
Runs a new asynchronous task in a worker thread, returning the task.
This function will yield and wait for the new task to be created and started
in the worker thread, then resume and return it.
Only function pointers with weakly isolated arguments are allowed to be
able to guarantee thread-safety.
*/
Task getWorkerTask(R, ARGS...)(R function(ARGS) func, ARGS args)
{
alias Typedef!(Task, Task.init, __PRETTY_FUNCTION__) PrivateTask;
foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
Task caller = Task.getThis();
runWorkerTask_unsafe({
PrivateTask callee = Task.getThis();
caller.prioritySend(callee);
func(args);
});
Task result;
receive((PrivateTask val) { result = to!Task(val); });
return result;
}
/// ditto
Task getWorkerTask(alias method, T, ARGS...)(shared(T) object, ARGS args)
if (is(typeof(__traits(getMember, object, __traits(identifier, method)))))
{
alias Typedef!(Task, Task.init, __PRETTY_FUNCTION__) PrivateTask;
foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
Task caller = Task.getThis();
runWorkerTask_unsafe({
PrivateTask callee = Task.getThis();
caller.prioritySend(callee);
__traits(getMember, object, __traits(identifier, method))(args);
});
Task result;
receive((PrivateTask val) { result = to!Task(val); });
return result;
}

/// Running a worker task using a function
unittest {
static void workerFunc(int param)
Expand Down Expand Up @@ -265,6 +307,53 @@ unittest {
}
}

/// Running a worker task using a function and communicating with it
unittest {
static void workerFunc(Task caller)
{
int counter = 10;
while (receiveOnly!string() == "ping" && --counter) {
logInfo("pong");
caller.send("pong");
}
caller.send("goodbye");

}

static void test()
{
Task callee = getWorkerTask(&workerFunc, Task.getThis);
do {
logInfo("ping");
callee.send("ping");
} while (receiveOnly!string() == "pong");
}
}

/// Running a worker task using a class method and communicating with it
unittest {
static class Test {
void workerMethod(Task caller) shared {
int counter = 10;
while (receiveOnly!string() == "ping" && --counter) {
logInfo("pong");
caller.send("pong");
}
caller.send("goodbye");
}
}

static void test()
{
auto cls = new shared Test;
Task callee = getWorkerTask!(Test.workerMethod)(cls, Task.getThis());
do {
logInfo("ping");
callee.send("ping");
} while (receiveOnly!string() == "pong");
}
}

private void runWorkerTask_unsafe(void delegate() del)
{
setupWorkerThreads();
Expand Down

0 comments on commit a5eee15

Please sign in to comment.