Skip to content

Commit

Permalink
protobuf parsing is now more robust.
Browse files Browse the repository at this point in the history
  • Loading branch information
chrismoos committed Jan 16, 2011
1 parent 8222134 commit 7d299d9
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 3 deletions.
80 changes: 80 additions & 0 deletions spec/test_protobuf.coffee
@@ -0,0 +1,80 @@
vows = require 'vows'
assert = require 'assert'
pb = require '../src/protobuf';
sys = require 'sys'

suite = vows.describe "Riak protobuf parsing"

suite.addBatch({
'a connection':
topic: ->
pool = new pb
pool.getConnection()

'parses a full response': (conn) ->
pong = false
conn.callback = (p) ->
pong = true

conn.receive(new Buffer('\x00\x00\x00\x01\x02'))
assert.ok pong

'parses partial responses': (conn) ->
pong = false
conn.callback = (p) ->
pong = true

conn.receive(new Buffer('\x00\x00\x00\x01'))
conn.receive(new Buffer('\x02'))
assert.ok pong

'parses partial header and response': (conn) ->
pong = false
conn.callback = (p) ->
pong = true

conn.receive(new Buffer('\x00\x00'))
conn.receive(new Buffer('\x00\x01'))
conn.receive(new Buffer('\x02'))
assert.ok pong

'parses partial header and partial response': (conn) ->
pong = false
conn.callback = (p) ->
pong = true

conn.receive(new Buffer('\x00\x00'))
conn.receive(new Buffer('\x00\x07'))
conn.receive(new Buffer('\x04'))
conn.receive(new Buffer('\x0a\x04'))
conn.receive(new Buffer('\x01\x65'))
conn.receive(new Buffer('\x01\x39'))

assert.ok pong

'parses multiple responses, with the second response partial': (conn) ->
pongs = 0
conn.callback = (p) ->
pongs++

conn.receive(new Buffer('\x00\x00\x00\x01\x02\x00\x00\x00\x01'))
conn.receive(new Buffer('\x02'))

assert.equal pongs, 2

'parses multiple responses, with the partial headers and messages': (conn) ->
pongs = 0
conn.callback = (p) ->
pongs++

conn.receive(new Buffer('\x00\x00\x00\x01\x02\x00\x00'))
conn.receive(new Buffer('\x00'))
conn.receive(new Buffer('\x07'))
conn.receive(new Buffer('\x04'))
conn.receive(new Buffer('\x0a\x04'))
conn.receive(new Buffer('\x01\x65'))
conn.receive(new Buffer('\x01\x39\x00\x00\x00\x01\x02'))

assert.equal pongs, 3

}).export(module)
31 changes: 28 additions & 3 deletions src/protobuf.coffee
Expand Up @@ -107,6 +107,8 @@ class Pool

# A single Riak socket connection.
class Connection
PB_HEADER_LENGTH = 5

constructor: (pool) ->
@conn = net.createConnection pool.options.port, pool.options.host
@pool = pool
Expand Down Expand Up @@ -172,13 +174,21 @@ class Connection
buf.copy msg, 5, 0
msg

chunk_append: (buf) ->
@new_buf = new Buffer(@chunk.length + buf.length)
@chunk.copy(@new_buf, 0, 0)
buf.copy(@new_buf, @chunk.length, 0)
@chunk = @new_buf

# Parses the received chunk for one or more messages. If there is no data
# from a Riak response left to read from the chunk, release this connection
# to the pool.
#
# Returns nothing.
receive: (chunk) ->
@chunk = chunk
# Append the chunk to the current buffer
@chunk_append(chunk)

if @attempt_parse()
if @pool.running? then @reset() else @end()

Expand Down Expand Up @@ -218,8 +228,23 @@ class Connection
@chunk_pos += bytes_read

# are we there yet?
@type.parse @resp if @resp_pos == @resp_len
if @resp_pos >= @resp_len
resp = @type.parse @resp

# slice the chunk to leave any remaining bytes
remaining = @chunk.slice(@resp_len + PB_HEADER_LENGTH, @chunk.length)

# reset because a full response has been read
@reset()

# set the current chunk to the remaining part
@chunk = remaining

resp
else
if @chunk.length < PB_HEADER_LENGTH
return

@resp_len = (@chunk[@chunk_pos + 0] << 24) +
(@chunk[@chunk_pos + 1] << 16) +
(@chunk[@chunk_pos + 2] << 8) +
Expand All @@ -236,7 +261,7 @@ class Connection
reset: ->
@type = null
@resp = null
@chunk = null
@chunk = new Buffer(0)
@chunk_pos = 0 # byte position of the chunk buffer
@resp_pos = 0 # byte position of the response buffer
@resp_len = 0 # expected response length
Expand Down

0 comments on commit 7d299d9

Please sign in to comment.