Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Fix a bug with metric invalidation.

This reverts @76a7527. That change introduced a bug because I forgot {multi:
true}, so the collector was only invalidating the first matching first metric
for a given event, rather than all of them.

Also, by delaying metric invalidation, the evaluator is less likely to encounter
the race condition where a new event is collected between the evaluator reading
the events and saving the computed metric. Still, this solution is not perfect.
  • Loading branch information...
commit eb857dc2bc49526d2448492ba48261b3ae2c7166 1 parent 6dfc0dd
@mbostock mbostock authored
Showing with 79 additions and 22 deletions.
  1. +11 −0 lib/cube/bisect.js
  2. +56 −13 lib/cube/event.js
  3. +12 −9 test/metric-test.js
View
11 lib/cube/bisect.js
@@ -0,0 +1,11 @@
+module.exports = bisect;
+
+function bisect(a, x) {
+ var lo = 0, hi = a.length;
+ while (lo < hi) {
+ var mid = lo + hi >> 1;
+ if (a[mid] < x) lo = mid + 1;
+ else hi = mid;
+ }
+ return lo;
+}
View
69 lib/cube/event.js
@@ -1,11 +1,11 @@
// TODO include the event._id (and define a JSON encoding for ObjectId?)
// TODO allow the event time to change when updating (fix invalidation)
-// TODO fix race condition between cache invalidation and metric computation
var mongodb = require("mongodb"),
parser = require("./event-expression"),
tiers = require("./tiers"),
- types = require("./types");
+ types = require("./types"),
+ bisect = require("./bisect");
var type_re = /^[a-z][a-zA-Z0-9_]+$/,
invalidate = {$set: {i: true}},
@@ -18,10 +18,14 @@ var type_re = /^[a-z][a-zA-Z0-9_]+$/,
var streamDelayDefault = 5000,
streamInterval = 1000;
+// How frequently to invalidate metrics after receiving events.
+var invalidateInterval = 5000;
+
exports.putter = function(db) {
var collection = types(db),
knownByType = {},
- eventsToSaveByType = {};
+ eventsToSaveByType = {},
+ timesToInvalidateByTierByType = {};
function putter(request, callback) {
var time = new Date(request.time),
@@ -78,20 +82,59 @@ exports.putter = function(db) {
});
}
- // Save the event of the specified type, and invalidate any cached metrics
- // associated with this event type and time.
+ // Save the event of the specified type, and queue invalidation of any cached
+ // metrics associated with this event type and time.
+ //
+ // We don't invalidate the events immediately. This would cause many redundant
+ // updates when many events are received simultaneously. Also, having a short
+ // delay between saving the event and invalidating the metrics reduces the
+ // likelihood of a race condition between when the events are read by the
+ // evaluator and when the newly-computed metrics are saved.
function save(type, event) {
- type = collection(type);
- type.events.save(event, handle);
- for (var tier in tiers) {
- type.metrics.update({
- i: false,
- "_id.l": +tier,
- "_id.t": tiers[tier].floor(event.t)
- }, invalidate, handle);
+ collection(type).events.save(event, handle);
+ queueInvalidation(type, event);
+ }
+
+ // Schedule deferred invalidation of metrics for this type.
+ // For each type and tier, track the metric times to invalidate.
+ // The times are kept in sorted order for bisection.
+ function queueInvalidation(type, event) {
+ var timesToInvalidateByTier = timesToInvalidateByTierByType[type],
+ time = event.t;
+ if (timesToInvalidateByTier) {
+ for (var tier in tiers) {
+ var tierTimes = timesToInvalidateByTier[tier],
+ tierTime = tiers[tier].floor(time),
+ i = bisect(tierTimes, tierTime);
+ if (tierTimes[i] > tierTime) tierTimes.splice(i, 0, tierTime);
+ }
+ } else {
+ timesToInvalidateByTier = timesToInvalidateByTierByType[type] = {};
+ for (var tier in tiers) {
+ timesToInvalidateByTier[tier] = [tiers[tier].floor(time)];
+ }
}
}
+ // Process any deferred metric invalidations, flushing the queues. Note that
+ // the queue (timesToInvalidateByTierByType) is copied-on-write, so while the
+ // previous batch of events are being invalidated, new events can arrive.
+ setInterval(function() {
+ for (var type in timesToInvalidateByTierByType) {
+ var metrics = collection(type).metrics,
+ timesToInvalidateByTier = timesToInvalidateByTierByType[type];
+ for (var tier in tiers) {
+ metrics.update({
+ i: false,
+ "_id.l": +tier,
+ "_id.t": {$in: timesToInvalidateByTier[tier]}
+ }, invalidate, multi);
+ }
+ flushed = true;
+ }
+ timesToInvalidateByTierByType = {}; // copy-on-write
+ }, invalidateInterval);
+
return putter;
};
View
21 test/metric-test.js
@@ -138,16 +138,19 @@ function metricTest(request, expected) {
var actual = [],
timeout = setTimeout(function() { cb("Time's up!"); }, 10000),
cb = this.callback,
- req = Object.create(request);
+ req = Object.create(request),
+ test = arguments[depth];
req.step = step;
- arguments[depth](req, function(response) {
- if (response.time >= stop) {
- clearTimeout(timeout);
- cb(null, actual.sort(function(a, b) { return a.time - b.time; }));
- } else {
- actual.push(response);
- }
- });
+ setTimeout(function() {
+ test(req, function(response) {
+ if (response.time >= stop) {
+ clearTimeout(timeout);
+ cb(null, actual.sort(function(a, b) { return a.time - b.time; }));
+ } else {
+ actual.push(response);
+ }
+ });
+ }, depth * 250);
}
};
Please sign in to comment.
Something went wrong with that request. Please try again.