Permalink
Browse files

Invalidate associated metrics immediately.

The previous approach of aggregating events into five-second intervals and
performing bulk invalidation smacks of premature optimization. Much simpler to
just invalidate any associated metrics immediately as the events are received.
This appears to have minimal performance impact, besides.
  • Loading branch information...
1 parent 9e4a2c8 commit 76a75274eb4b012a6e9a8b8eccd59f7d658c36b0 @mbostock mbostock committed Apr 15, 2012
Showing with 18 additions and 57 deletions.
  1. +18 −57 lib/cube/server/event.js
@@ -17,11 +17,8 @@ var type_re = /^[a-z][a-zA-Z0-9_]+$/,
exports.putter = function(db) {
var collection = types(db),
- eventCollectionByType = {},
- eventsToSaveByType = {},
- timesToInvalidateByTierByType = {},
- flushInterval,
- flushDelay = 5000;
+ knownByType = {},
+ eventsToSaveByType = {};
function putter(request) {
var time = new Date(request.time),
@@ -36,7 +33,7 @@ exports.putter = function(db) {
if ("id" in request) event._id = request.id;
// If this is a known event type, save immediately.
- if (type in eventCollectionByType) return save(type, event);
+ if (type in knownByType) return save(type, event);
// If someone is already creating the event collection for this new type,
// then append this event to the queue for later save.
@@ -55,7 +52,7 @@ exports.putter = function(db) {
// or add custom indexes, you can still do all that by hand.
db.collectionNames(type + "_events", function(error, names) {
var events = collection(type).events;
- if (names.length) return flush();
+ if (names.length) return saveEvents();
// Events are indexed by time.
events.ensureIndex({"t": 1}, handle);
@@ -66,68 +63,32 @@ exports.putter = function(db) {
handle(error);
metrics.ensureIndex({"i": 1, "_id.e": 1, "_id.l": 1, "_id.t": 1}, handle);
metrics.ensureIndex({"i": 1, "_id.l": 1, "_id.t": 1}, handle);
- flush();
+ saveEvents();
});
- // Flush any pending events to the new collection.
- function flush() {
- eventCollectionByType[type] = events;
+ // Save any pending events to the new collection.
+ function saveEvents() {
+ knownByType[type] = true;
eventsToSaveByType[type].forEach(function(event) { save(type, event); });
delete eventsToSaveByType[type];
}
});
}
- // Save the event of the specified type.
+ // Save the event of the specified type, and invalidate any cached metrics
+ // associated with this event type and time.
function save(type, event) {
- eventCollectionByType[type].save(event, handle);
- queueInvalidation(type, event);
- }
-
- // Schedule deferred invalidation of metrics for this type.
- // For each type and tier, track the metric times to invalidate.
- // The times are kept in sorted order for bisection.
- function queueInvalidation(type, event) {
- var timesToInvalidateByTier = timesToInvalidateByTierByType[type],
- time = event.t;
- if (timesToInvalidateByTier) {
- for (var tier in tiers) {
- var tierTimes = timesToInvalidateByTier[tier],
- tierTime = tiers[tier].floor(time),
- i = bisect(tierTimes, tierTime);
- if (tierTimes[i] > tierTime) tierTimes.splice(i, 0, tierTime);
- }
- } else {
- timesToInvalidateByTier = timesToInvalidateByTierByType[type] = {};
- for (var tier in tiers) {
- timesToInvalidateByTier[tier] = [tiers[tier].floor(time)];
- }
+ type = collection(type);
+ type.events.save(event, handle);
+ for (var tier in tiers) {
+ type.metrics.update({
+ i: false,
+ "_id.l": +tier,
+ "_id.t": tiers[tier].floor(event.t)
+ }, invalidate, handle);
}
}
- // Process any deferred metric invalidations, flushing the queues. Note that
- // the queue (timesToInvalidateByTierByType) is copied-on-write, so while the
- // previous batch of events are being invalidated, new events can arrive.
- function flush() {
- var flushed = false;
- for (var type in timesToInvalidateByTierByType) {
- var metrics = collection(type).metrics,
- timesToInvalidateByTier = timesToInvalidateByTierByType[type];
- for (var tier in tiers) {
- metrics.update({
- i: false,
- "_id.l": +tier,
- "_id.t": {$in: timesToInvalidateByTier[tier]}
- }, invalidate, multi);
- }
- flushed = true;
- }
- if (flushed) util.log("flush " + Object.keys(timesToInvalidateByTierByType));
- timesToInvalidateByTierByType = {}; // copy-on-write
- };
-
- flushInterval = setInterval(flush, flushDelay);
-
return putter;
};

0 comments on commit 76a7527

Please sign in to comment.