Skip to content
This repository was archived by the owner on Feb 6, 2024. It is now read-only.

Commit eb857dc

Browse files
committed
Fix a bug with metric invalidation.
This reverts @76a7527. That change introduced a bug because I forgot {multi: true}, so the collector was only invalidating the first matching first metric for a given event, rather than all of them. Also, by delaying metric invalidation, the evaluator is less likely to encounter the race condition where a new event is collected between the evaluator reading the events and saving the computed metric. Still, this solution is not perfect.
1 parent 6dfc0dd commit eb857dc

File tree

3 files changed

+79
-22
lines changed

3 files changed

+79
-22
lines changed

lib/cube/bisect.js

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
module.exports = bisect;
2+
3+
function bisect(a, x) {
4+
var lo = 0, hi = a.length;
5+
while (lo < hi) {
6+
var mid = lo + hi >> 1;
7+
if (a[mid] < x) lo = mid + 1;
8+
else hi = mid;
9+
}
10+
return lo;
11+
}

lib/cube/event.js

Lines changed: 56 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
// TODO include the event._id (and define a JSON encoding for ObjectId?)
22
// TODO allow the event time to change when updating (fix invalidation)
3-
// TODO fix race condition between cache invalidation and metric computation
43

54
var mongodb = require("mongodb"),
65
parser = require("./event-expression"),
76
tiers = require("./tiers"),
8-
types = require("./types");
7+
types = require("./types"),
8+
bisect = require("./bisect");
99

1010
var type_re = /^[a-z][a-zA-Z0-9_]+$/,
1111
invalidate = {$set: {i: true}},
@@ -18,10 +18,14 @@ var type_re = /^[a-z][a-zA-Z0-9_]+$/,
1818
var streamDelayDefault = 5000,
1919
streamInterval = 1000;
2020

21+
// How frequently to invalidate metrics after receiving events.
22+
var invalidateInterval = 5000;
23+
2124
exports.putter = function(db) {
2225
var collection = types(db),
2326
knownByType = {},
24-
eventsToSaveByType = {};
27+
eventsToSaveByType = {},
28+
timesToInvalidateByTierByType = {};
2529

2630
function putter(request, callback) {
2731
var time = new Date(request.time),
@@ -78,20 +82,59 @@ exports.putter = function(db) {
7882
});
7983
}
8084

81-
// Save the event of the specified type, and invalidate any cached metrics
82-
// associated with this event type and time.
85+
// Save the event of the specified type, and queue invalidation of any cached
86+
// metrics associated with this event type and time.
87+
//
88+
// We don't invalidate the events immediately. This would cause many redundant
89+
// updates when many events are received simultaneously. Also, having a short
90+
// delay between saving the event and invalidating the metrics reduces the
91+
// likelihood of a race condition between when the events are read by the
92+
// evaluator and when the newly-computed metrics are saved.
8393
function save(type, event) {
84-
type = collection(type);
85-
type.events.save(event, handle);
86-
for (var tier in tiers) {
87-
type.metrics.update({
88-
i: false,
89-
"_id.l": +tier,
90-
"_id.t": tiers[tier].floor(event.t)
91-
}, invalidate, handle);
94+
collection(type).events.save(event, handle);
95+
queueInvalidation(type, event);
96+
}
97+
98+
// Schedule deferred invalidation of metrics for this type.
99+
// For each type and tier, track the metric times to invalidate.
100+
// The times are kept in sorted order for bisection.
101+
function queueInvalidation(type, event) {
102+
var timesToInvalidateByTier = timesToInvalidateByTierByType[type],
103+
time = event.t;
104+
if (timesToInvalidateByTier) {
105+
for (var tier in tiers) {
106+
var tierTimes = timesToInvalidateByTier[tier],
107+
tierTime = tiers[tier].floor(time),
108+
i = bisect(tierTimes, tierTime);
109+
if (tierTimes[i] > tierTime) tierTimes.splice(i, 0, tierTime);
110+
}
111+
} else {
112+
timesToInvalidateByTier = timesToInvalidateByTierByType[type] = {};
113+
for (var tier in tiers) {
114+
timesToInvalidateByTier[tier] = [tiers[tier].floor(time)];
115+
}
92116
}
93117
}
94118

119+
// Process any deferred metric invalidations, flushing the queues. Note that
120+
// the queue (timesToInvalidateByTierByType) is copied-on-write, so while the
121+
// previous batch of events are being invalidated, new events can arrive.
122+
setInterval(function() {
123+
for (var type in timesToInvalidateByTierByType) {
124+
var metrics = collection(type).metrics,
125+
timesToInvalidateByTier = timesToInvalidateByTierByType[type];
126+
for (var tier in tiers) {
127+
metrics.update({
128+
i: false,
129+
"_id.l": +tier,
130+
"_id.t": {$in: timesToInvalidateByTier[tier]}
131+
}, invalidate, multi);
132+
}
133+
flushed = true;
134+
}
135+
timesToInvalidateByTierByType = {}; // copy-on-write
136+
}, invalidateInterval);
137+
95138
return putter;
96139
};
97140

test/metric-test.js

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -138,16 +138,19 @@ function metricTest(request, expected) {
138138
var actual = [],
139139
timeout = setTimeout(function() { cb("Time's up!"); }, 10000),
140140
cb = this.callback,
141-
req = Object.create(request);
141+
req = Object.create(request),
142+
test = arguments[depth];
142143
req.step = step;
143-
arguments[depth](req, function(response) {
144-
if (response.time >= stop) {
145-
clearTimeout(timeout);
146-
cb(null, actual.sort(function(a, b) { return a.time - b.time; }));
147-
} else {
148-
actual.push(response);
149-
}
150-
});
144+
setTimeout(function() {
145+
test(req, function(response) {
146+
if (response.time >= stop) {
147+
clearTimeout(timeout);
148+
cb(null, actual.sort(function(a, b) { return a.time - b.time; }));
149+
} else {
150+
actual.push(response);
151+
}
152+
});
153+
}, depth * 250);
151154
}
152155
};
153156

0 commit comments

Comments
 (0)