diff --git a/source/vibe/core/core.d b/source/vibe/core/core.d index 0e789bd4b3..9b1cc0aa89 100644 --- a/source/vibe/core/core.d +++ b/source/vibe/core/core.d @@ -21,6 +21,7 @@ import std.functional; import std.range : empty, front, popFront; import std.string; import std.variant; +import std.typecons : Typedef; import core.atomic; import core.sync.condition; import core.sync.mutex; @@ -226,6 +227,47 @@ void runWorkerTask(alias method, T, ARGS...)(shared(T) object, ARGS args) runWorkerTask_unsafe({ __traits(getMember, object, __traits(identifier, method))(args); }); } +/** + Runs a new asynchronous task in a worker thread, returning the task. + + This function will yield and wait for the new task to be created and started + in the worker thread, then resume and return it. + + + Only function pointers with weakly isolated arguments are allowed to be + able to guarantee thread-safety. +*/ +Task getWorkerTask(R, ARGS...)(R function(ARGS) func, ARGS args) +{ + alias Typedef!(Task, Task.init, __PRETTY_FUNCTION__) PrivateTask; + foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); + Task caller = Task.getThis(); + runWorkerTask_unsafe({ + PrivateTask callee = Task.getThis(); + caller.prioritySend(callee); + func(args); + }); + Task result; + receive((PrivateTask val) { result = to!Task(val); }); + return result; +} +/// ditto +Task getWorkerTask(alias method, T, ARGS...)(shared(T) object, ARGS args) + if (is(typeof(__traits(getMember, object, __traits(identifier, method))))) +{ + alias Typedef!(Task, Task.init, __PRETTY_FUNCTION__) PrivateTask; + foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); + Task caller = Task.getThis(); + runWorkerTask_unsafe({ + PrivateTask callee = Task.getThis(); + caller.prioritySend(callee); + __traits(getMember, object, __traits(identifier, method))(args); + }); + Task result; + receive((PrivateTask val) { result = to!Task(val); }); + return result; +} + /// Running a worker task using a function unittest { static void workerFunc(int param) @@ -255,6 +297,53 @@ unittest { } } +/// Running a worker task using a function and communicating with it +unittest { + static void workerFunc(Task caller) + { + int counter = 10; + while (receiveOnly!string() == "ping" && --counter) { + logInfo("pong"); + caller.send("pong"); + } + caller.send("goodbye"); + + } + + static void test() + { + Task callee = getWorkerTask(&workerFunc, Task.getThis); + do { + logInfo("ping"); + callee.send("ping"); + } while (receiveOnly!string() == "pong"); + } +} + +/// Running a worker task using a class method and communicating with it +unittest { + static class Test { + void workerMethod(Task caller) shared { + int counter = 10; + while (receiveOnly!string() == "ping" && --counter) { + logInfo("pong"); + caller.send("pong"); + } + caller.send("goodbye"); + } + } + + static void test() + { + auto cls = new shared Test; + Task callee = getWorkerTask!(Test.workerMethod)(cls, Task.getThis()); + do { + logInfo("ping"); + callee.send("ping"); + } while (receiveOnly!string() == "pong"); + } +} + private void runWorkerTask_unsafe(void delegate() del) { setupWorkerThreads();