Skip to content

Commit

Permalink
Fix a bug with metric invalidation.
Browse files Browse the repository at this point in the history
This reverts @76a7527. That change introduced a bug because I forgot {multi:
true}, so the collector was only invalidating the first matching first metric
for a given event, rather than all of them.

Also, by delaying metric invalidation, the evaluator is less likely to encounter
the race condition where a new event is collected between the evaluator reading
the events and saving the computed metric. Still, this solution is not perfect.
  • Loading branch information
mbostock committed Apr 20, 2012
1 parent 6dfc0dd commit eb857dc
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 22 deletions.
11 changes: 11 additions & 0 deletions lib/cube/bisect.js
@@ -0,0 +1,11 @@
module.exports = bisect;

function bisect(a, x) {
var lo = 0, hi = a.length;
while (lo < hi) {
var mid = lo + hi >> 1;
if (a[mid] < x) lo = mid + 1;
else hi = mid;
}
return lo;
}
69 changes: 56 additions & 13 deletions lib/cube/event.js
@@ -1,11 +1,11 @@
// TODO include the event._id (and define a JSON encoding for ObjectId?)
// TODO allow the event time to change when updating (fix invalidation)
// TODO fix race condition between cache invalidation and metric computation

var mongodb = require("mongodb"),
parser = require("./event-expression"),
tiers = require("./tiers"),
types = require("./types");
types = require("./types"),
bisect = require("./bisect");

var type_re = /^[a-z][a-zA-Z0-9_]+$/,
invalidate = {$set: {i: true}},
Expand All @@ -18,10 +18,14 @@ var type_re = /^[a-z][a-zA-Z0-9_]+$/,
var streamDelayDefault = 5000,
streamInterval = 1000;

// How frequently to invalidate metrics after receiving events.
var invalidateInterval = 5000;

exports.putter = function(db) {
var collection = types(db),
knownByType = {},
eventsToSaveByType = {};
eventsToSaveByType = {},
timesToInvalidateByTierByType = {};

function putter(request, callback) {
var time = new Date(request.time),
Expand Down Expand Up @@ -78,20 +82,59 @@ exports.putter = function(db) {
});
}

// Save the event of the specified type, and invalidate any cached metrics
// associated with this event type and time.
// Save the event of the specified type, and queue invalidation of any cached
// metrics associated with this event type and time.
//
// We don't invalidate the events immediately. This would cause many redundant
// updates when many events are received simultaneously. Also, having a short
// delay between saving the event and invalidating the metrics reduces the
// likelihood of a race condition between when the events are read by the
// evaluator and when the newly-computed metrics are saved.
function save(type, event) {
type = collection(type);
type.events.save(event, handle);
for (var tier in tiers) {
type.metrics.update({
i: false,
"_id.l": +tier,
"_id.t": tiers[tier].floor(event.t)
}, invalidate, handle);
collection(type).events.save(event, handle);
queueInvalidation(type, event);
}

// Schedule deferred invalidation of metrics for this type.
// For each type and tier, track the metric times to invalidate.
// The times are kept in sorted order for bisection.
function queueInvalidation(type, event) {
var timesToInvalidateByTier = timesToInvalidateByTierByType[type],
time = event.t;
if (timesToInvalidateByTier) {
for (var tier in tiers) {
var tierTimes = timesToInvalidateByTier[tier],
tierTime = tiers[tier].floor(time),
i = bisect(tierTimes, tierTime);
if (tierTimes[i] > tierTime) tierTimes.splice(i, 0, tierTime);
}
} else {
timesToInvalidateByTier = timesToInvalidateByTierByType[type] = {};
for (var tier in tiers) {
timesToInvalidateByTier[tier] = [tiers[tier].floor(time)];
}
}
}

// Process any deferred metric invalidations, flushing the queues. Note that
// the queue (timesToInvalidateByTierByType) is copied-on-write, so while the
// previous batch of events are being invalidated, new events can arrive.
setInterval(function() {
for (var type in timesToInvalidateByTierByType) {
var metrics = collection(type).metrics,
timesToInvalidateByTier = timesToInvalidateByTierByType[type];
for (var tier in tiers) {
metrics.update({
i: false,
"_id.l": +tier,
"_id.t": {$in: timesToInvalidateByTier[tier]}
}, invalidate, multi);
}
flushed = true;
}
timesToInvalidateByTierByType = {}; // copy-on-write
}, invalidateInterval);

return putter;
};

Expand Down
21 changes: 12 additions & 9 deletions test/metric-test.js
Expand Up @@ -138,16 +138,19 @@ function metricTest(request, expected) {
var actual = [],
timeout = setTimeout(function() { cb("Time's up!"); }, 10000),
cb = this.callback,
req = Object.create(request);
req = Object.create(request),
test = arguments[depth];
req.step = step;
arguments[depth](req, function(response) {
if (response.time >= stop) {
clearTimeout(timeout);
cb(null, actual.sort(function(a, b) { return a.time - b.time; }));
} else {
actual.push(response);
}
});
setTimeout(function() {
test(req, function(response) {
if (response.time >= stop) {
clearTimeout(timeout);
cb(null, actual.sort(function(a, b) { return a.time - b.time; }));
} else {
actual.push(response);
}
});
}, depth * 250);
}
};

Expand Down

0 comments on commit eb857dc

Please sign in to comment.