Skip to content
Browse files

Match the stream API's .write() return values when pausing and resuming

  • Loading branch information...
1 parent 4483eca commit 7b8a07ab4eb92ce038afb21853c36499a4534d13 @jhs jhs committed Apr 3, 2012
Showing with 80 additions and 3 deletions.
  1. +4 −3 lib/stream.js
  2. +76 −0 test/stream.js
View
7 lib/stream.js
@@ -169,11 +169,12 @@ Changes.prototype.write_continuous = function(data) {
self.changes.push(json)
}
- // Remember the unused data and send all known good changes (or heartbeats).
+ // Remember the unused data and send all known good changes (or heartbeats). The data (or heartbeat)
+ // event listeners may call .pause() so remember the is_sending state now before calling them.
+ var was_sending = self.is_sending
self.buf = buf
self.emit_changes()
-
- return self.is_sending
+ return was_sending
}
View
76 test/stream.js
@@ -279,6 +279,82 @@ test('Continuous pause', function(t) {
}
})
+test('Paused while heartbeats are arriving', function(t) {
+ var feed = new follow.Changes({'feed':'continuous'})
+ , start = new Date
+
+ var events = []
+
+ feed.on('data', function(change) {
+ change = JSON.parse(change)
+ change.elapsed = new Date - start
+ events.push(change)
+ })
+
+ feed.on('heartbeat', function(hb) {
+ var hb = {'heartbeat':true, 'elapsed':(new Date) - start}
+ events.push(hb)
+ })
+
+ feed.once('data', function(data) {
+ t.equal(data, '{"change":1}', 'First data event was the first change')
+ feed.pause()
+ t.equal(feed.readable, true, 'Feed is readable after pause()')
+
+ setTimeout(unpause, 350)
+ function unpause() {
+ t.equal(feed.readable, true, 'Feed is readable just before resume()')
+ feed.resume()
+ }
+ })
+
+ // Simulate a change, then a couple heartbeats every 100ms, then more changes.
+ var writes = []
+ function write(data) {
+ var result = feed.write(data)
+ writes.push(result)
+ }
+
+ setTimeout(function() { write('{"change":1}\n') }, 0)
+ setTimeout(function() { write('\n') }, 100)
+ setTimeout(function() { write('\n') }, 200)
+ setTimeout(function() { write('\n') }, 300)
+ setTimeout(function() { write('\n') }, 400)
+ setTimeout(function() { write('{"change":2}\n') }, 415)
+ setTimeout(function() { write('{"change":3}\n') }, 430)
+
+ setTimeout(check_events, 500)
+ function check_events() {
+ t.ok(events[0].change , 'First event was data (a change)')
+ t.ok(events[1].heartbeat, 'Second event was a heartbeat')
+ t.ok(events[2].heartbeat, 'Third event was a heartbeat')
+ t.ok(events[3].heartbeat, 'Fourth event was a heartbeat')
+ t.ok(events[4].heartbeat, 'Fifth event was a heartbeat')
+ t.ok(events[5].change , 'Sixth event was data')
+ t.ok(events[6].change , 'Seventh event was data')
+
+ t.ok(events[0].elapsed < 10, 'Immediate emit first data event')
+ t.ok(events[1].elapsed > 350, 'First heartbeat comes after the pause')
+ t.ok(events[2].elapsed > 350, 'Second heartbeat comes after the pause')
+ t.ok(events[3].elapsed > 350, 'Third heartbeat comes after the pause')
+ t.ok(events[4].elapsed > 350, 'Fourth heartbeat comes after the pause')
+
+ t.ok(events[3].elapsed < 360, 'Third heartbeat came right after the resume')
+ t.ok(events[4].elapsed >= 400, 'Fourth heartbeat came at the normal time')
+
+ t.equal(writes[0], true , 'First write flushed')
+ t.equal(writes[1], false, 'Second write (first heartbeat) did not flush because the feed was paused')
+ t.equal(writes[2], false, 'Third write did not flush due to pause')
+ t.equal(writes[3], false, 'Fourth write did not flush due to pause')
+ t.equal(writes[4], true , 'Fifth write (fourth heartbeat) flushed due to resume()')
+ t.equal(writes[5], true , 'Sixth write flushed normally')
+ t.equal(writes[6], true , 'Seventh write flushed normally')
+
+ feed.end()
+ t.end()
+ }
+})
+
test('Feeds from couch', function(t) {
t.ok(couch.rtt(), 'RTT to couch is known')

0 comments on commit 7b8a07a

Please sign in to comment.
Something went wrong with that request. Please try again.