Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Ensure `end` event isn't call after `error`; Fully implement ReadStre…

…am API
  • Loading branch information...
commit c919371baf08037806676f6771542a98ba5d9a10 1 parent bc33845
@wdavidw authored
Showing with 25 additions and 37 deletions.
  1. +15 −12 lib/hive.coffee
  2. +10 −25 test/QueryTest.coffee
View
27 lib/hive.coffee
@@ -1,4 +1,6 @@
+Stream = require 'stream'
+
thrift = require 'thrift'
transport = require 'thrift/lib/thrift/transport'
EventEmitter = require('events').EventEmitter
@@ -24,35 +26,36 @@ module.exports.createClient = (options = {}) ->
size = -1
client.execute query, (err) ->
if err
+ emitter.readable = false
emitter.emit 'error', err if emitter.listeners('error').length
- emitter.emit 'end', err
return
fetch()
- closed = false
- emitter = new EventEmitter
+ buffer = []
+ #emitter = new EventEmitter
+ emitter = new Stream
+ emitter.readable = true
emitter.paused = 0
emitter.pause = ->
- @paused++
+ @paused = 1
emitter.resume = ->
- @paused--
- fetch() if @paused is 0
+ @was = @paused
+ @paused = 0
+ fetch() if @was
handle = (err, rows) =>
if err
- closed = true
+ emitter.readable = false
emitter.emit 'error', err
- emitter.emit 'end', err
return
rows = rows.map (row) -> row.split '\t'
for row in rows
emitter.emit 'row', row
if rows.length is size
- fetch()
+ fetch() unless emitter.paused
else
- closed = true
- emitter.emit 'success', err
+ emitter.readable = false
emitter.emit 'end'
fetch = ->
- return if emitter.paused or closed
+ return if emitter.paused or not emitter.readable
if size
then client.fetchN size, handle
else client.fetchAll handle
View
35 test/QueryTest.coffee
@@ -40,45 +40,29 @@ module.exports =
assert.ok Array.isArray row
assert.eql row.length, 3
.on 'error', (err) ->
- assert.ifError err
- .on 'end', (err) ->
- assert.ifError err
+ assert.ok false
+ .on 'end', ->
assert.eql count, 54
next()
'Query # n': (next) ->
count = 0
- success_called = false
client.query("select * from #{table}", 10)
.on 'row', (row) ->
count++
.on 'error', (err) ->
- assert.ifError err
- .on 'success', (err) ->
- success_called = true
- .on 'end', (err) ->
- assert.ifError err
+ assert.ok false
+ .on 'end', ->
assert.eql count, 54
- next() if success_called
+ next()
'Query # error': (next) ->
- error_called = false
client.query("select * from undefined_table", 10)
.on 'row', (row) ->
assert.ok false
.on 'error', (err) ->
assert.ok err instanceof Error
- error_called = true
- .on 'success', ->
- assert.ok false
- .on 'end', (err) ->
- assert.ok err instanceof Error
- next() if error_called
- 'Query # error # no error callback': (next) ->
- client.query("select * from undefined_table", 10)
- .on 'row', (row) ->
- assert.ok false
- .on 'end', (err) ->
- assert.ok err instanceof Error
next()
+ .on 'end', ->
+ assert.ok false
'Query # pause/resume': (next) ->
count = 0
query = client.query("select * from #{table}", 10)
@@ -88,8 +72,9 @@ module.exports =
setTimeout ->
query.resume()
, 10
- .on 'end', (err) ->
- assert.ifError err
+ .on 'error', (err) ->
+ assert.ok false
+ .on 'end', ->
assert.eql count, 54
next()
'Close': (next) ->
Please sign in to comment.
Something went wrong with that request. Please try again.