Navigation Menu

Skip to content

Commit

Permalink
Added tailf-like functionality to both stores
Browse files Browse the repository at this point in the history
  • Loading branch information
tj committed Mar 27, 2012
1 parent 28d2f41 commit 2c93018
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 16 deletions.
8 changes: 6 additions & 2 deletions Makefile
Expand Up @@ -2,7 +2,11 @@
test:
@./node_modules/.bin/mocha \
--timeout 500 \
--reporter dot \
--require should
--reporter spec \
--require should \
test/jog.js \
test/FileStore.js \
test/RedisStore.js \
--grep "remain open"

.PHONY: test
7 changes: 7 additions & 0 deletions Readme.md
Expand Up @@ -12,12 +12,17 @@
-h, --help output usage information
-V, --version output the version number
<<<<<<< HEAD
=======
-q, --query <str> query with the given string
>>>>>>> tailf
-F, --file <path> load from the given <path>
-R, --redis load from redis store
-s, --select <fn> use the given <fn> for filtering
-m, --map <fn> use the given <fn> for mapping
```

<<<<<<< HEAD
### Examples

View all logs from tobi:
Expand Down Expand Up @@ -71,6 +76,8 @@ var jog = require('jog');
var log = jog(new jog.RedisStore);
```

=======
>>>>>>> tailf
## License

(The MIT License)
Expand Down
16 changes: 16 additions & 0 deletions examples/tail-redis.js
@@ -0,0 +1,16 @@

var Jog = require('..')
, log = new Jog(new Jog.RedisStore)
, id = 0;

function again() {
log.info('something happend', { id: ++id });
setTimeout(again, Math.random() * 100 | 0);
}

again();

log.stream({ end: false, interval: 100 })
.on('data', function(line){
console.log(line);
});
16 changes: 16 additions & 0 deletions examples/tail.js
@@ -0,0 +1,16 @@

var Jog = require('..')
, log = new Jog(new Jog.FileStore('/tmp/tail'))
, id = 0;

function again() {
log.info('something happend', { id: ++id });
setTimeout(again, Math.random() * 100 | 0);
}

again();

log.stream({ end: false, interval: 500 })
.on('data', function(line){
console.log(line);
});
7 changes: 5 additions & 2 deletions lib/jog.js
Expand Up @@ -112,12 +112,15 @@ Jog.prototype.write = function(level, msg, attrs){
/**
* Stream data from the store.
*
* @param {Object} options
* @return {EventEmitter}
* @api public
*/

Jog.prototype.stream = function(){
return this.store.stream();
Jog.prototype.stream = function(options){
options = options || {};
options.interval = options.interval || 2000;
return this.store.stream(options);
};

/**
Expand Down
32 changes: 27 additions & 5 deletions lib/stores/FileStore.js
Expand Up @@ -61,21 +61,34 @@ FileStore.prototype.clear = function(fn){
* Return an `EventEmitter` which emits "data"
* and "end" events.
*
* @param {Object} options
* @return {EventEmitter}
* @api private
*/

FileStore.prototype.stream = function(){
var emitter = new EventEmitter
, stream = fs.createReadStream(this.path, { flags: 'a+' })
, buf = ''
FileStore.prototype.stream = function(options){
var emitter = options.emitter || new EventEmitter
, options = options || {}
, buf = options.buf || ''
, self = this
, substr
, obj
, i;

// options
options.offset = options.offset || 0;

// stream
var stream = fs.createReadStream(this.path, {
flags: 'a+',
start: options.offset
});

debug('offset %d', options.offset);
stream.setEncoding('utf8');
stream.on('data', function(chunk){
buf += chunk
options.offset += chunk.length;
while (~(i = buf.indexOf('\n'))) {
substr = buf.slice(0, i);
if ('' == substr) break;
Expand All @@ -86,7 +99,16 @@ FileStore.prototype.stream = function(){
});

stream.on('end', function(){
emitter.emit('end');
if (false === options.end) {
setTimeout(function(){
debug('polling');
options.buf = buf;
options.emitter = emitter;
self.stream(options);
}, options.interval);
} else {
emitter.emit('end');
}
});

return emitter;
Expand Down
25 changes: 18 additions & 7 deletions lib/stores/RedisStore.js
Expand Up @@ -55,28 +55,39 @@ RedisStore.prototype.clear = function(fn){
* Return an `EventEmitter` which emits "data"
* and "end" events.
*
* @param {Object} options
* @return {EventEmitter}
* @api private
*/

RedisStore.prototype.stream = function(){
RedisStore.prototype.stream = function(options){
var emitter = new EventEmitter
, options = options || {}
, size = this.rangeSize
, key = this.key
, db = this.db
, self = this
, start = 0;

function emit(vals) {
vals.forEach(function(val){
emitter.emit('data', JSON.parse(val));
});
}

function fetch() {
var stop = start + size;
debug('lrange %s %s..%s', key, start, stop);
db.lrange(key, start, stop, function(err, vals){
if (err) return emitter.emit('error', err);
if (!vals.length) return emitter.emit('end');
vals.forEach(function(val){
emitter.emit('data', JSON.parse(val));
});
start = stop + 1;
fetch();
emit(vals);
start += vals.length;
if (false === options.end) {
setTimeout(fetch, options.interval);
} else {
if (vals.length) return fetch();
else emitter.emit('end');
}
});
}

Expand Down
21 changes: 21 additions & 0 deletions test/shared/Store.js
Expand Up @@ -23,6 +23,27 @@ module.exports = function(store){
done();
});
})

describe('when "end" is false', function(){
it('should remain open', function(done){
var log = new Jog(store)
, stream = log.stream({ end: false, interval: 100 })
, n = 0;

var id = setInterval(function(){
log.write('info', 'compiling video', { vid: ++n });
}, 2);

stream.on('data', function(line){
if (line.vid == 20) {
clearInterval(id);
done();
}
}).on('end', function(){
done(new Error('called end'));
});
})
})
})

describe('#clear(fn)', function(){
Expand Down

0 comments on commit 2c93018

Please sign in to comment.