-
Notifications
You must be signed in to change notification settings - Fork 10
/
FileStore.js
114 lines (98 loc) · 2.17 KB
/
FileStore.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
/**
* Module dependencies.
*/
var EventEmitter = require('events').EventEmitter
, debug = require('debug')('jog:file')
, path = require('path')
, fs = require('fs');
/**
* Expose `FileStore`.
*/
module.exports = FileStore;
/**
* Initialize a `FileStore` with the given `path`.
*
* @param {String} path
* @api public
*/
function FileStore(path) {
debug('filestore %s', path);
if (!path) throw new Error('path required');
this.path = path;
this.write = fs.createWriteStream(path, { flags: 'a' });
}
/**
* Add `obj` to the file.
*
* @param {Object} obj
* @api private
*/
FileStore.prototype.add = function(obj){
debug('add %j', obj);
this.write.write(JSON.stringify(obj) + '\n');
};
/**
* Clear and invoke `fn()`.
*
* @param {Function} fn
* @api private
*/
FileStore.prototype.clear = function(fn){
var self = this;
debug('clear');
fs.exists(this.path, function(yes){
if (!yes) return fn();
debug('unlink %s', self.path);
fs.unlink(self.path, fn)
});
};
/**
* Return an `EventEmitter` which emits "data"
* and "end" events.
*
* @param {Object} options
* @return {EventEmitter}
* @api private
*/
FileStore.prototype.stream = function(options){
var options = options || {}
, emitter = options.emitter || new EventEmitter
, buf = options.buf || ''
, self = this
, substr
, obj
, i;
// options
options.offset = options.offset || 0;
// stream
var stream = fs.createReadStream(this.path, {
flags: 'a+',
start: options.offset
});
debug('offset %d', options.offset);
stream.setEncoding('utf8');
stream.on('data', function(chunk){
buf += chunk
options.offset += chunk.length;
while (~(i = buf.indexOf('\n'))) {
substr = buf.slice(0, i);
if ('' == substr) break;
obj = JSON.parse(substr);
emitter.emit('data', obj);
buf = buf.slice(i + 1);
}
});
stream.on('end', function(){
if (false === options.end) {
setTimeout(function(){
debug('polling');
options.buf = buf;
options.emitter = emitter;
self.stream(options);
}, options.interval);
} else {
emitter.emit('end');
}
});
return emitter;
};