This repository has been archived by the owner on Sep 3, 2019. It is now read-only.
/
changes.js
130 lines (102 loc) · 2.96 KB
/
changes.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
var events = require('events');
var async = require('async');
var noop = function () {};
module.exports = function (capot, cb) {
var couch = capot.couch;
var changesDb = couch.db('changes');
var feed = capot.changes = new events.EventEmitter();
var since = {};
feed.start = function () {
getAllChanges(listen);
};
function getAllChanges(cb) {
couch.get('/_all_dbs', function (err, dbs) {
if (err) { return console.error(err); }
async.eachLimit(dbs, 10, getChanges, cb);
});
}
function getChanges(db, cb) {
cb = cb || noop;
if (db === 'changes') { return cb(); }
var params = { since: since[db] || 0, include_docs: true };
couch.db(db).changes(params)
.on('error', function (err) {
console.error('changes:error', err);
//err.db = db;
//feed.emit('error', err);
cb();
})
.on('complete', function (data) {
//console.log('changes:complete', data);
if (data.last_seq > params.since) {
// Keep track of last seq for this db, so future requests only get
// updates since this last sequence number.
since[db] = data.last_seq;
feed.emit('change', db, data);
feed.emit('change:' + db, data);
return updateSince(db, data.last_seq, cb);
}
cb();
});
}
function listen() {
couch.get('/_db_updates', function (err, data) {
// Immediately listen for more changes.
listen();
if (err) { return feed.emit('error', err); }
if (!data || !data.db_name || !data.type) { return; }
var db = data.db_name;
var eventName = data.type;
if (eventName === 'created') {
feed.emit('add', db);
} else if (eventName === 'deleted') {
feed.emit('remove', db);
} else if (eventName === 'updated') {
getChanges(db);
}
});
}
function ensureChangesDb(cb) {
couch.get('/changes', function (err) {
if (err && err.statusCode !== 404) {
cb(err);
} else if (err) { // Not found
couch.put('/changes', cb);
} else {
cb();
}
});
}
function loadSince(cb) {
changesDb.allDocs({
startkey: 'since/',
endkey: 'since0',
include_docs: true
}, function (err, data) {
if (err) { return cb(err); }
data.rows.forEach(function (row) {
var db = row.id.split('/').slice(1).join('/');
since[db] = row.doc.seq;
});
cb();
});
}
function updateSince(db, seq, cb) {
var sinceDoc = { _id: 'since/' + db, type: 'since', seq: seq };
changesDb.get(sinceDoc._id, function (err, data) {
if (err && err.status !== 404) {
return console.error(err);
} else if (err) { // not found
} else if (seq === data.seq) {
return cb();
} else {
sinceDoc._rev = data._rev;
}
changesDb.put(sinceDoc, cb);
});
}
async.series([
ensureChangesDb,
loadSince
], cb);
};