forked from square/cube
/
event.js
120 lines (99 loc) · 3.38 KB
/
event.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
// TODO report failures?
// TODO include the event._id (and define a JSON encoding for ObjectId?)
// TODO allow the event time to change when updating (fix invalidation)
// TODO fix race condition between cache invalidation and metric computation
var util = require("util"),
mongodb = require("mongodb"),
parser = require("./event-expression"),
tiers = require("./tiers"),
types = require("./types");
var type_re = /^[a-z][a-zA-Z0-9_]+$/,
invalidate = {$set: {i: true}},
multi = {multi: true},
event_options = {sort: {t: -1}, batchSize: 1000};
exports.putter = function(db) {
var collection = types(db),
flushInterval,
flushTypes = {},
flushDelay = 5000;
function endpoint(request) {
var time = new Date(request.time),
type = request.type;
// Validate the date and type.
if (!type_re.test(type)) return util.log("invalid type: " + request.type);
if (isNaN(time)) return util.log("invalid time: " + request.time);
// Create the event object, to control which fields are addressible.
var event = {t: time, d: request.data};
// If an id is specified, promote it to Mongo's primary key.
if ("id" in request) event._id = request.id;
// Save the event.
collection(type).events.save(event);
// Queue invalidation of metrics for this type.
var times = flushTypes[type] || (flushTypes[type] = [time, time]);
if (time < times[0]) times[0] = time;
if (time > times[1]) times[1] = time;
}
// Invalidate cached metrics.
endpoint.flush = function() {
var types = [];
for (var type in flushTypes) {
var metrics = collection(type).metrics,
times = flushTypes[type];
types.push(type);
for (var tier in tiers) {
var floor = tiers[tier].floor;
metrics.update({
i: false,
l: +tier,
t: {
$gte: floor(times[0]),
$lte: floor(times[1])
}
}, invalidate, multi);
}
}
if (types.length) util.log("flush " + types.join(", "));
flushTypes = {};
};
flushInterval = setInterval(endpoint.flush, flushDelay);
return endpoint;
};
exports.getter = function(db) {
var collection = types(db);
function getter(request, callback) {
var start = new Date(request.start),
stop = new Date(request.stop);
// Validate the dates.
if (isNaN(start)) return util.log("invalid start: " + request.start);
if (isNaN(stop)) return util.log("invalid stop: " + request.stop);
// Parse the expression.
var expression;
try {
expression = parser.parse(request.expression);
} catch (error) {
return util.log("invalid expression: " + error);
}
// Copy any expression filters into the query object.
var filter = {t: {$gte: start, $lt: stop}};
expression.filter(filter);
// Request any needed fields.
var fields = {t: 1};
expression.fields(fields);
// Query for the desired events.
collection(expression.type).events.find(filter, fields, event_options, function(error, cursor) {
if (error) throw error;
cursor.each(function(error, event) {
if (callback.closed) return cursor.close();
if (error) throw error;
if (event) callback({
time: event.t,
data: event.d
});
});
});
}
getter.close = function(callback) {
callback.closed = true;
};
return getter;
};