Permalink
Browse files

New class Meteor._SynchronousQueue, which runs tasks serially.

Will be used for Meteor._Mongo._LiveResultsSet de-duping.
  • Loading branch information...
1 parent 11fe37a commit c7710ccd3ec7a0b2100892e5af6460af616fa886 @glasser glasser committed Nov 11, 2012
Showing with 182 additions and 0 deletions.
  1. +117 −0 packages/meteor/fiber_helpers.js
  2. +63 −0 packages/meteor/fiber_helpers_test.js
  3. +2 −0 packages/meteor/package.js
@@ -24,4 +24,121 @@ Meteor._noYieldsAllowed = function (f) {
// js2-mode AST blows up when parsing 'future.return()', so alias.
Future.prototype.ret = Future.prototype.return;
+// Meteor._SynchronousQueue is a queue which runs task functions serially.
+// Tasks are assumed to be synchronous: ie, it's assumed that they are
+// done when they return.
+//
+// It has two methods:
+// - queueTask queues a task to be run, and returns immediately.
+// - runTask queues a task to be run, and then yields. It returns
+// when the task finishes running.
+//
+// It's safe to call queueTask from within a task, but not runTask (unless
+// you're calling runTask from a nested Fiber).
+//
+// Somewhat inspired by async.queue, but specific to blocking tasks.
+// XXX break this out into an NPM module?
+// XXX could maybe use the npm 'schlock' module instead, which would
+// also support multiple concurrent "read" tasks
+Meteor._SynchronousQueue = function () {
+ var self = this;
+ // List of tasks to run (not including a currently-running task if any). Each
+ // is an object with field 'task' (the task function to run) and 'future' (the
+ // Future associated with the blocking runTask call that queued it, or null if
+ // called from queueTask).
+ self._taskHandles = [];
+ // This is true if self._run() is either currently executing or scheduled to
+ // do so soon.
+ self._runningOrRunScheduled = false;
+ // During the execution of a task, this is set to the fiber used to execute
+ // that task. We use this to throw an error rather than deadlocking if the
+ // user calls runTask from within a task on the same fiber.
+ self._currentTaskFiber = undefined;
+};
+
+_.extend(Meteor._SynchronousQueue.prototype, {
+ runTask: function (task) {
+ var self = this;
+
+ if (!Fiber.current)
+ throw new Error("Can only call runTask in a Fiber");
+ if (self._currentTaskFiber === Fiber.current)
+ throw new Error("Can't runTask from another task in the same fiber");
+
+ var fut = new Future;
+ self._taskHandles.push({task: task, future: fut});
+ self._scheduleRun();
+ // Yield. We'll get back here after the task is run (and will throw if the
+ // task throws).
+ fut.wait();
+ },
+ queueTask: function (task) {
+ var self = this;
+ self._taskHandles.push({task: task});
+ self._scheduleRun();
+ // No need to block.
+ },
+ taskRunning: function () {
+ var self = this;
+ return self._taskRunning;
+ },
+ _scheduleRun: function () {
+ var self = this;
+
+ // Already running or scheduled? Do nothing.
+ if (self._runningOrRunScheduled)
+ return;
+
+ self._runningOrRunScheduled = true;
+
+ process.nextTick(function () {
+ Fiber(function () {
+ self._run();
+ }).run();
+ });
+ },
+ _run: function () {
+ var self = this;
+
+ if (!self._runningOrRunScheduled)
+ throw new Error("expected to be _runningOrRunScheduled");
+
+ if (_.isEmpty(self._taskHandles)) {
+ // Done running tasks! Don't immediately schedule another run, but
+ // allow future tasks to do so.
+ self._runningOrRunScheduled = false;
+ return;
+ }
+ var taskHandle = self._taskHandles.shift();
+
+ // Run the task.
+ self._currentTaskFiber = Fiber.current;
+ var exception = undefined;
+ try {
+ taskHandle.task();
+ } catch (err) {
+ if (taskHandle.future) {
+ // We'll throw this exception through runTask.
+ exception = err;
+ } else {
+ Meteor._debug("Exception in queued task: " + err);
+ }
+ }
+ self._currentTaskFiber = undefined;
+
+ // Soon, run the next task, if there is any.
+ self._runningOrRunScheduled = false;
+ self._scheduleRun();
+
+ // If this was queued with runTask, let the runTask call return (throwing if
+ // the task threw).
+ if (taskHandle.future) {
+ if (exception)
+ taskHandle.future.throw(exception);
+ else
+ taskHandle.future.ret();
+ }
+ }
+});
+
})();
@@ -0,0 +1,63 @@
+Tinytest.add("fibers - synchronous queue", function (test) {
+ var q = new Meteor._SynchronousQueue;
+ var output = [];
+ var pusher = function (n) {
+ return function () {
+ output.push(n);
+ };
+ };
+ var outputIsUpTo = function (n) {
+ test.equal(output, _.range(1, n+1));
+ };
+
+ // Queue a task. It cannot run until we yield.
+ q.queueTask(pusher(1));
+ outputIsUpTo(0);
+
+ // Run another task. After queueing it, the fiber constructed here will yield
+ // back to this outer function. No task can have run yet since the main test
+ // fiber still will not have yielded.
+ var runTask2Done = false;
+ Fiber(function () {
+ q.runTask(pusher(2));
+ runTask2Done = true;
+ }).run();
+ outputIsUpTo(0);
+ test.isFalse(runTask2Done);
+
+ // Queue a third task. Still no outer yields, so still no runs.
+ q.queueTask(function () {
+ output.push(3);
+ // This task gets queued once we actually start running functions, which
+ // isn't until the runTask(pusher(4)), so it gets queued after Task #4.
+ q.queueTask(pusher(5));
+ });
+ outputIsUpTo(0);
+ test.isFalse(runTask2Done);
+
+ // Run a task and block for it to be done. All queued tasks up to this one
+ // will now be run.
+ q.runTask(pusher(4));
+ outputIsUpTo(4);
+ test.isTrue(runTask2Done);
+
+ // Task #5 is still in the queue. Run another task synchronously.
+ q.runTask(pusher(6));
+ outputIsUpTo(6);
+
+ // Queue a task that throws. It'll write some debug output, but that's it.
+ Meteor._suppress_log(1);
+ q.queueTask(function () {
+ throw new Error("bla");
+ });
+ // let it run.
+ q.runTask(pusher(7));
+ outputIsUpTo(7);
+
+ // Run a task that throws. It should throw from runTask.
+ test.throws(function () {
+ q.runTask(function () {
+ throw new Error("this is thrown");
+ });
+ });
+});
@@ -57,5 +57,7 @@ Package.on_test(function (api) {
api.add_files('helpers_test.js', ['client', 'server']);
api.add_files('dynamics_test.js', ['client', 'server']);
+ api.add_files('fiber_helpers_test.js', ['server']);
+
api.add_files('url_tests.js', ['client', 'server']);
});

0 comments on commit c7710cc

Please sign in to comment.