Skip to content

Commit

Permalink
Invalidate associated metrics immediately.
Browse files Browse the repository at this point in the history
The previous approach of aggregating events into five-second intervals and
performing bulk invalidation smacks of premature optimization. Much simpler to
just invalidate any associated metrics immediately as the events are received.
This appears to have minimal performance impact, besides.
  • Loading branch information
mbostock committed Apr 15, 2012
1 parent 9e4a2c8 commit 76a7527
Showing 1 changed file with 18 additions and 57 deletions.
75 changes: 18 additions & 57 deletions lib/cube/server/event.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@ var type_re = /^[a-z][a-zA-Z0-9_]+$/,

exports.putter = function(db) {
var collection = types(db),
eventCollectionByType = {},
eventsToSaveByType = {},
timesToInvalidateByTierByType = {},
flushInterval,
flushDelay = 5000;
knownByType = {},
eventsToSaveByType = {};

function putter(request) {
var time = new Date(request.time),
Expand All @@ -36,7 +33,7 @@ exports.putter = function(db) {
if ("id" in request) event._id = request.id;

// If this is a known event type, save immediately.
if (type in eventCollectionByType) return save(type, event);
if (type in knownByType) return save(type, event);

// If someone is already creating the event collection for this new type,
// then append this event to the queue for later save.
Expand All @@ -55,7 +52,7 @@ exports.putter = function(db) {
// or add custom indexes, you can still do all that by hand.
db.collectionNames(type + "_events", function(error, names) {
var events = collection(type).events;
if (names.length) return flush();
if (names.length) return saveEvents();

// Events are indexed by time.
events.ensureIndex({"t": 1}, handle);
Expand All @@ -66,68 +63,32 @@ exports.putter = function(db) {
handle(error);
metrics.ensureIndex({"i": 1, "_id.e": 1, "_id.l": 1, "_id.t": 1}, handle);
metrics.ensureIndex({"i": 1, "_id.l": 1, "_id.t": 1}, handle);
flush();
saveEvents();
});

// Flush any pending events to the new collection.
function flush() {
eventCollectionByType[type] = events;
// Save any pending events to the new collection.
function saveEvents() {
knownByType[type] = true;
eventsToSaveByType[type].forEach(function(event) { save(type, event); });
delete eventsToSaveByType[type];
}
});
}

// Save the event of the specified type.
// Save the event of the specified type, and invalidate any cached metrics
// associated with this event type and time.
function save(type, event) {
eventCollectionByType[type].save(event, handle);
queueInvalidation(type, event);
}

// Schedule deferred invalidation of metrics for this type.
// For each type and tier, track the metric times to invalidate.
// The times are kept in sorted order for bisection.
function queueInvalidation(type, event) {
var timesToInvalidateByTier = timesToInvalidateByTierByType[type],
time = event.t;
if (timesToInvalidateByTier) {
for (var tier in tiers) {
var tierTimes = timesToInvalidateByTier[tier],
tierTime = tiers[tier].floor(time),
i = bisect(tierTimes, tierTime);
if (tierTimes[i] > tierTime) tierTimes.splice(i, 0, tierTime);
}
} else {
timesToInvalidateByTier = timesToInvalidateByTierByType[type] = {};
for (var tier in tiers) {
timesToInvalidateByTier[tier] = [tiers[tier].floor(time)];
}
type = collection(type);
type.events.save(event, handle);
for (var tier in tiers) {
type.metrics.update({
i: false,
"_id.l": +tier,
"_id.t": tiers[tier].floor(event.t)
}, invalidate, handle);
}
}

// Process any deferred metric invalidations, flushing the queues. Note that
// the queue (timesToInvalidateByTierByType) is copied-on-write, so while the
// previous batch of events are being invalidated, new events can arrive.
function flush() {
var flushed = false;
for (var type in timesToInvalidateByTierByType) {
var metrics = collection(type).metrics,
timesToInvalidateByTier = timesToInvalidateByTierByType[type];
for (var tier in tiers) {
metrics.update({
i: false,
"_id.l": +tier,
"_id.t": {$in: timesToInvalidateByTier[tier]}
}, invalidate, multi);
}
flushed = true;
}
if (flushed) util.log("flush " + Object.keys(timesToInvalidateByTierByType));
timesToInvalidateByTierByType = {}; // copy-on-write
};

flushInterval = setInterval(flush, flushDelay);

return putter;
};

Expand Down

0 comments on commit 76a7527

Please sign in to comment.