Skip to content
Browse files

Support .pause() and .resume() in the main feed

  • Loading branch information...
1 parent 7b8a07a commit c5ca50062fb29bae1f241f853e24af9378e4cfa0 @jhs jhs committed Apr 3, 2012
Showing with 130 additions and 0 deletions.
  1. +37 −0 lib/feed.js
  2. +93 −0 test/follow.js
View
37 lib/feed.js
@@ -49,6 +49,7 @@ function Feed (opts) {
self.request = {}; // Extra options for potentially future versions of request. The caller can supply them.
self.since = 0;
+ self.is_paused = false
self.caught_up = false
self.retry_delay = INITIAL_RETRY_DELAY; // ms
@@ -258,6 +259,12 @@ Feed.prototype.prep = function prep_request(changes_stream) {
self.pending.activity_at = now;
self.pending.wait_timer = null;
+ // Just re-run the pause or resume to do the needful on changes_stream (self.pending.request).
+ if(self.is_paused)
+ self.pause()
+ else
+ self.resume()
+
// The inactivity timer is for time between *changes*, or time between the
// initial connection and the first change. Therefore it goes here.
self.change_at = now;
@@ -331,6 +338,36 @@ Feed.prototype.got_activity = function() {
self.pending.activity_at = new Date
}
+
+Feed.prototype.pause = function() {
+ var self = this
+ , was_paused = self.is_paused
+
+ self.is_paused = true
+ if(self.pending && self.pending.request && self.pending.request.pause)
+ self.pending.request.pause()
+ else
+ self.log.warn('No pending request to pause')
+
+ if(!was_paused)
+ self.emit('pause')
+}
+
+Feed.prototype.resume = function() {
+ var self = this
+ , was_paused = self.is_paused
+
+ self.is_paused = false
+ if(self.pending && self.pending.request && self.pending.request.resume)
+ self.pending.request.resume()
+ else
+ self.log.warn('No pending request to resume')
+
+ if(was_paused)
+ self.emit('resume')
+}
+
+
Feed.prototype.on_couch_heartbeat = function on_couch_heartbeat() {
var self = this
View
93 test/follow.js
@@ -91,6 +91,99 @@ test('Heartbeats', function(t) {
})
})
+test('Data due on a paused feed', function(t) {
+ t.ok(couch.rtt(), 'The couch RTT is known')
+ var HB = couch.rtt() * 5
+ , HB_DUE = HB * 1.25 // This comes from the hard-coded constant in feed.js.
+
+ var events = []
+ , last_event = new Date
+
+ function ev(type) {
+ var now = new Date
+ events.push({'elapsed':now - last_event, 'event':type})
+ last_event = now
+ }
+
+ var feed = follow(couch.DB, function() {})
+ feed.heartbeat = HB
+ feed.on('response', function() { feed.retry_delay = 1 })
+ // TODO
+ // feed.pause()
+
+ feed.on('heartbeat', function() { ev('heartbeat') })
+ feed.on('change' , function() { ev('change') })
+ feed.on('timeout' , function() { ev('timeout') })
+ feed.on('retry' , function() { ev('retry') })
+ feed.on('pause' , function() { ev('pause') })
+ feed.on('resume' , function() { ev('resume') })
+ feed.on('stop' , function() { ev('stop') })
+
+ feed.on('change', function(change) {
+ console.error('CHANGE: %j', change)
+ if(change.seq == 1) {
+ feed.pause()
+ // Stay paused long enough for three heartbeats to be overdue.
+ setTimeout(function() { feed.resume() }, HB_DUE * 3 * 1.1)
+ }
+
+ if(change.seq == 3) {
+ feed.stop()
+ setTimeout(check_results, couch.rtt())
+ }
+ })
+
+ function check_results() {
+ console.error('rtt=%j HB=%j', couch.rtt(), HB)
+ events.forEach(function(event, i) { console.error('Event %d: %j', i, event) })
+
+ t.equal(events[0].event, 'change', 'First event was a change')
+ t.ok(events[0].elapsed < couch.rtt(), 'First change came quickly')
+
+ t.equal(events[1].event, 'pause', 'Second event was a pause, reacting to the first change')
+ t.ok(events[1].elapsed < 10, 'Pause called/fired immediately after the change')
+
+ t.equal(events[2].event, 'timeout', 'Third event was the first timeout')
+ t.ok(percent(events[2].elapsed, HB_DUE) > 95, 'First timeout fired when the heartbeat was due')
+
+ t.equal(events[3].event, 'retry', 'Fourth event is a retry')
+ t.ok(events[3].elapsed < 10, 'First retry fires immediately after the first timeout')
+
+ t.equal(events[4].event, 'timeout', 'Fifth event was the second timeout')
+ t.ok(percent(events[4].elapsed, HB_DUE) > 95, 'Second timeout fired when the heartbeat was due')
+
+ t.equal(events[5].event, 'retry', 'Sixth event is a retry')
+ t.ok(events[5].elapsed < 10, 'Second retry fires immediately after the second timeout')
+
+ t.equal(events[6].event, 'timeout', 'Seventh event was the third timeout')
+ t.ok(percent(events[6].elapsed, HB_DUE) > 95, 'Third timeout fired when the heartbeat was due')
+
+ t.equal(events[7].event, 'retry', 'Eighth event is a retry')
+ t.ok(events[7].elapsed < 10, 'Third retry fires immediately after the third timeout')
+
+ t.equal(events[8].event, 'resume', 'Ninth event resumed the feed')
+
+ t.equal(events[9].event, 'change', 'Tenth event was the second change')
+ t.ok(events[9].elapsed < 10, 'Second change came immediately after resume')
+
+ t.equal(events[10].event, 'change', 'Eleventh event was the third change')
+ t.ok(events[10].elapsed < 10, 'Third change came immediately after the second change')
+
+ t.equal(events[11].event, 'stop', 'Twelfth event was the feed stopping')
+ t.ok(events[11].elapsed < 10, 'Stop event came immediately in response to the third change')
+
+ t.notOk(events[12], 'No thirteenth event')
+
+ return t.end()
+ }
+
+ function percent(a, b) {
+ var num = Math.min(a, b)
+ , denom = Math.max(a, b)
+ return 100 * num / denom
+ }
+})
+
test('Events for DB confirmation and hitting the original seq', function(t) {
var feed = follow(couch.DB, on_change)

0 comments on commit c5ca500

Please sign in to comment.
Something went wrong with that request. Please try again.