Skip to content

Commit

Permalink
Merge branch 'fix_catchup'
Browse files Browse the repository at this point in the history
Move the code to check for is_db_updates to self.check_for_catchup()

Conflicts:
	lib/feed.js
	package.json
  • Loading branch information
jhs committed May 27, 2014
2 parents c798adf + 67cc898 commit 61d7add
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 15 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ Once you've got one, you can subscribe to these events:
* **confirm_request** | `function(req)` | The database confirmation request is sent; passed the `request` object
* **confirm** | `function(db_obj)` | The database is confirmed; passed the couch database object
* **change** | `function(change)` | A change occured; passed the change object from CouchDB
* **catchup** | `function(seq_id)` | The feed has caught up to the *update_seq* from the confirm step. Assuming no subsequent changes, you have seen all the data. *Always fires before the final **change** event.*
* **catchup** | `function(seq_id)` | The feed has caught up to the *update_seq* from the confirm step. Assuming no subsequent changes, you have seen all the data.
* **wait** | Follow is idle, waiting for the next data chunk from CouchDB
* **timeout** | `function(info)` | Follow did not receive a heartbeat from couch in time. The passed object has `.elapsed_ms` set to the elapsed time
* **retry** | `function(info)` | A retry is scheduled (usually after a timeout or disconnection). The passed object has
Expand Down
27 changes: 21 additions & 6 deletions lib/feed.js
Original file line number Diff line number Diff line change
Expand Up @@ -553,11 +553,6 @@ Feed.prototype.on_change = function on_change(change) {
return destroy_req(self.pending.request);
}

if(!self.is_db_updates && !self.caught_up && change.seq == self.original_db_seq) {
self.caught_up = true
self.emit('catchup', change.seq)
}

if(typeof self.filter !== 'function')
return self.on_good_change(change);

Expand All @@ -579,8 +574,12 @@ Feed.prototype.on_change = function on_change(change) {
if(result) {
self.log.debug('Builtin filter PASS for change: ' + change.seq);
return self.on_good_change(change);
} else
} else {
self.log.debug('Builtin filter FAIL for change: ' + change.seq);

// Even with a filtered change, a "catchup" event might still be appropriate.
self.check_for_catchup(change.seq)
}
}

Feed.prototype.on_good_change = function on_good_change(change) {
Expand All @@ -600,6 +599,22 @@ Feed.prototype.on_good_change = function on_good_change(change) {
self.since = change.seq;

self.emit('change', change);

self.check_for_catchup(change.seq)
}

Feed.prototype.check_for_catchup = function check_for_catchup(seq) {
var self = this

if (self.is_db_updates)
return
if(self.caught_up)
return
if(seq < self.original_db_seq)
return

self.caught_up = true
self.emit('catchup', seq)
}

Feed.prototype.on_inactivity = function on_inactivity() {
Expand Down
3 changes: 3 additions & 0 deletions lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@ Changes.prototype.destroy = function() {
self.is_ending = false
self.is_sending = false

if(self.source && typeof self.source.abort == 'function')
return self.source.abort()

if(self.source && typeof self.source.destroy === 'function')
self.source.destroy()

Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
, "version": "0.11.2"
, "author": { "name": "Jason Smith"
, "email": "jhs@iriscouch.com" }
, "contributors": [ "Jarrett Cruger <jcrugzz@gmail.com>" ]
, "description": "Extremely robust, fault-tolerant CouchDB changes follower"
, "license": "Apache 2.0"
, "keywords": ["couchdb", "changes", "sleep", "sleepy"]
Expand Down
33 changes: 25 additions & 8 deletions test/follow.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,21 @@ test('Heartbeats', function(t) {
})
})

test('Catchup events', function(t) {
t.ok(couch.rtt(), 'The couch RTT is known')

var feed = follow(couch.DB, function() {})
var last_seen = 0

feed.on('change', function(change) { last_seen = change.seq })
feed.on('catchup', function(id) {
t.equal(last_seen, 3, 'The catchup event fires after the change event that triggered it')
t.equal(id , 3, 'The catchup event fires on the seq_id of the latest change')
feed.stop()
t.end()
})
})

test('Data due on a paused feed', function(t) {
t.ok(couch.rtt(), 'The couch RTT is known')
var HB = couch.rtt() * 5
Expand Down Expand Up @@ -121,7 +136,6 @@ test('Data due on a paused feed', function(t) {
feed.on('stop' , function() { ev('stop') })

feed.on('change', function(change) {
console.error('CHANGE: %j', change)
if(change.seq == 1) {
feed.pause()
// Stay paused long enough for three heartbeats to be overdue.
Expand Down Expand Up @@ -186,25 +200,28 @@ test('Data due on a paused feed', function(t) {
})

test('Events for DB confirmation and hitting the original seq', function(t) {
t.plan(7)
var feed = follow(couch.DB, on_change)

var events = { 'confirm':null, 'catchup':null }
var events = { 'confirm':null }
feed.on('confirm', function(db) { events.confirm = db })
feed.on('catchup', function(seq) { events.catchup = seq })
feed.on('catchup', caught_up)

// This will run 3 times.
function on_change(er, ch) {
t.false(er, 'No problem with the feed')
if(ch.seq == 3) {
t.ok(events.confirm, 'Confirm event fired')
t.equal(events.confirm && events.confirm.db_name, 'follow_test', 'Confirm event returned the Couch DB object')
t.equal(events.confirm && events.confirm.update_seq, 3, 'Confirm event got the update_seq right')
}
}

t.ok(events.catchup, 'Catchup event fired')
t.equal(events.catchup, 3, 'Catchup event fired on update 3')
function caught_up(seq) {
t.equal(seq, 3, 'Catchup event fired on update 3')

feed.stop()
t.end()
}
feed.stop()
t.end()
}
})

Expand Down
33 changes: 33 additions & 0 deletions test/issues/43.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
var tap = require('tap')
var test = tap.test
var util = require('util')

var lib = require('../../lib')
var couch = require('../couch')
var follow = require('../../api')

couch.setup(test)

test('Issue #43', function(t) {
var changes = 0

var feed = follow({'db':couch.DB, 'inactivity_ms':couch.rtt()*3}, on_change)
feed.on('stop', on_stop)
feed.on('error', on_err)

function on_err(er) {
t.false(er, 'Error event should never fire')
t.end()
}

function on_change(er, change) {
changes += 1
this.stop()
}

function on_stop(er) {
t.false(er, 'No problem stopping')
t.equal(changes, 1, 'Only one change seen since stop was called in its callback')
t.end()
}
})

0 comments on commit 61d7add

Please sign in to comment.