Skip to content

Commit

Permalink
Merge branch 'refactor_protobuf'
Browse files Browse the repository at this point in the history
  • Loading branch information
Francisco Treacy committed May 5, 2011
2 parents fea240b + 7ca961e commit 72df47b
Show file tree
Hide file tree
Showing 8 changed files with 310 additions and 217 deletions.
63 changes: 0 additions & 63 deletions spec/batch_protobuf.coffee
Expand Up @@ -7,69 +7,6 @@ meta = {}
module.exports =

batches: (db) -> [{

'server info request':
topic: ->
db.serverInfo @callback

'returns info': (data) ->
assert.equal 'riak@127.0.0.1', data.node
assert.ok data.serverVersion.match(/\d+\.\d+/)

'processKeysResponse 1':
topic: ->
db.processKeysResponse {errcode: 1}, keys, meta, @callback

'has an errcode': (data) ->
assert.equal 1, data.errcode

'multiple RpbListKeysResp messages':
topic: ->
db.processKeysResponse {keys: [1]}, keys, meta, @callback

'does not call back': (data) ->
# should never reach here
assert.fail true

'processKeysResponse 2':
topic: ->
db.processKeysResponse {keys: [2], done: true}, keys, meta, @callback

'gets keys back': (data) ->
assert.deepEqual [1,2], data

'RpbMapRedResp error':
topic: ->
resp = phases: []
db.processMapReduceResponse {errcode: 1}, resp, meta, @callback

'errcode is 1': (data) ->
assert.equal 1, data.errcode

'multiple RpbMapRedResp messages':
topic: ->
db.processMapReduceResponse {phase: 0, response: "[1]"}, resp, meta, @callback

'does not call back': (data) ->
# should never reach here
assert.fail true

'processMapReduceResponse 1':
topic: ->
db.processMapReduceResponse {phase: 1, response: "[1]"}, resp, meta, @callback

'does not call back': (data) ->
# should never reach here
assert.fail true

'processMapReduceResponse 2':
topic: ->
db.processMapReduceResponse {phase: 0, response: "[2]", done: true}, resp, meta, @callback

'returns response': (data) ->
assert.deepEqual [0, 1], data.phases
assert.deepEqual [1, 2], data[0]
assert.deepEqual [1], data[1]

'buckets request':
topic: ->
Expand Down
73 changes: 73 additions & 0 deletions spec/simple.coffee
@@ -0,0 +1,73 @@
seq = require 'seq'
assert = require 'assert'
db = require('../src/index').getClient({ api: 'protobuf', debug: false })

bucket = 'riakjs_protobuf'
print = (message) -> console.log "=> " + message

seq([1,2,3,4])

.flatten(fully = false)

.parEach (number) ->
print "saving doc number #{number}"
db.save bucket, "#{number}", { test: 'yes', number: number }, @

.seq ->
print 'retrieving that doc'
db.get bucket, '1', @

.seq (contents) ->
print 'document matches what saved'
assert.deepEqual(contents, { test: 'yes', number: 1})
@()

.seq ->
print 'gimme all buckets'
db.buckets @

.seq (buckets) ->
print "our bucket #{bucket} is in the list of buckets"
assert.ok(bucket in buckets)
@()

.seq ->
print 'now lets map/reduce the hell out of it'
db.add(bucket).map((v) -> v = Riak.mapValuesJson(v)[0]; if 1 < v.number < 4 then [v] else []).run(@)

.seq (results) ->
print "this stuff should be in by now"
assert.deepEqual results, [[{ test: 'yes', number: 2 }, { test: 'yes', number: 3 }]]
@()

.seq ->
print 'now gimme all keys'
db.keys bucket, @

.seq (keys) ->
print 'see if our doc is there...'
assert.ok('1' in keys)
@(null, keys)

.flatten()

.seqEach (key) ->
print 'cleaning up key '+ key
db.remove bucket, key, @

.seq ->
db.buckets @

.seq (buckets) ->
print 'bucket shouldnt exist anymore'
assert.ok(bucket not in buckets)
@()

.seq ->
print "[result] 's all good"
db.end()

.catch (err) ->
print 'omg we fucked up'
console.dir err
db.end()
2 changes: 1 addition & 1 deletion spec/test_client.coffee
Expand Up @@ -27,7 +27,7 @@ setupBatch =
db.save bucket, key, value, meta, (err) ->
if index is queue.length-1
# make sure allow_mult is set to false
db.updateProps client.bucket, { allow_mult: false }, done()
db.updateProps client.bucket, { allow_mult: false }, done

'when get keys':
topic: ->
Expand Down
3 changes: 1 addition & 2 deletions src/http_client.coffee
Expand Up @@ -88,10 +88,9 @@ class HttpClient extends Client

save: (bucket, key, data, options...) ->
[options, callback] = @ensure options
data or= {}

meta = new Meta bucket, key, options
meta.data = data
meta.data = data or {}

verb = options.method or if key then 'PUT' else 'POST'
@execute verb, meta, callback
Expand Down
18 changes: 10 additions & 8 deletions src/index.coffee
Expand Up @@ -8,8 +8,8 @@ module.exports =
options ||= {}
pool = options.pool
delete options.pool
pool ||= new @ProtoBufPool options
cli = new @ProtoBufClient options
pool ||= new @ProtobufPool options
cli = new @ProtobufClient options
cli.pool = pool
cli

Expand All @@ -23,17 +23,19 @@ module.exports =
getSessionStore: (options) ->
new @SessionStore options

module.exports.__defineGetter__ 'ProtoBufClient', ->
@_pbcClient ||= require './protobuf_client'

module.exports.__defineGetter__ 'ProtoBufPool', ->
@_pbcPool ||= require './protobuf'
# exports

module.exports.__defineGetter__ 'HttpClient', ->
@_httpClient ||= require './http_client'

module.exports.__defineGetter__ 'ProtobufClient', ->
@_pbcClient ||= require './protobuf_client'

module.exports.__defineGetter__ 'ProtobufPool', ->
@_pbcPool ||= require './protobuf'

module.exports.__defineGetter__ 'TestServer', ->
@_testServer ||= require './test_server'

module.exports.__defineGetter__ 'SessionStore', ->
@_sessionStore ||= require './session_store'
@_sessionStore ||= require './session_store'
82 changes: 52 additions & 30 deletions src/protobuf.coffee
Expand Up @@ -8,7 +8,7 @@ Buffer = require('buffer').Buffer
# Keeps a pool of Riak socket connections.
class Pool
constructor: (options) ->
@options = options || {}
@options = options or {}
@options.port ||= 8087
@options.host ||= '127.0.0.1'
@options.max ||= 10
Expand All @@ -29,10 +29,10 @@ class Pool

@next (conn) ->
if conn.writable
callback conn if callback
callback(conn) if callback
else
conn.on 'connect', ->
callback conn if callback
callback(conn) if callback

true

Expand All @@ -51,14 +51,13 @@ class Pool
# data - Object data to be serialized.
#
# Returns anonymous function that takes a single callback.
send: (name, data) ->
(callback) =>
@start (conn) ->
conn.send(name, data) (resp) ->
try
callback resp
finally
conn.finish()
send: (name, data, callback) ->
@start (conn) ->
conn.send name, data, (resp) ->
try
callback resp
finally
conn.finish()

# Public: Returns the Connection back to the Pool. If the Pool is
# inactive, disconnect the Connection.
Expand Down Expand Up @@ -124,8 +123,7 @@ class Connection
# data - Object data to be serialized.
#
# Returns anonymous function that takes a single callback.
send: (name, data) ->
(callback) =>
send: (name, data, callback) ->
@callback = callback
@conn.write @prepare(name, data)

Expand Down Expand Up @@ -158,12 +156,15 @@ class Connection
#
# Returns a Buffer that is ready to be sent to Riak.
prepare: (name, data) ->
type = ProtoBuf[name]

type = Protobuf[name]

if data
buf = type.serialize(data)
len = buf.length + 1
else
len = 1

msg = new Buffer(len + 4)
msg[0] = len >>> 24
msg[1] = len >>> 16
Expand Down Expand Up @@ -249,7 +250,7 @@ class Connection
(@chunk[@chunk_pos + 1] << 16) +
(@chunk[@chunk_pos + 2] << 8) +
@chunk[@chunk_pos + 3] - 1
@type = ProtoBuf.type @chunk[@chunk_pos + 4]
@type = Protobuf.type @chunk[@chunk_pos + 4]
@resp = new Buffer(@resp_len)
@resp_pos = 0
@chunk_pos += 5
Expand All @@ -274,34 +275,55 @@ Connection.prototype.__defineGetter__ 'writable', ->

Pool.Connection = Connection

ProtoBuf =
types: ["ErrorResp", "PingReq", "PingResp", "GetClientIdReq",
"GetClientIdResp", "SetClientIdReq", "SetClientIdResp", "GetServerInfoReq",
"GetServerInfoResp", "GetReq", "GetResp", "PutReq", "PutResp", "DelReq",
"DelResp", "ListBucketsReq", "ListBucketsResp", "ListKeysReq",
"ListKeysResp", "GetBucketReq", "GetBucketResp", "SetBucketReq",
"SetBucketResp", "MapRedReq", "MapRedResp"]

# Find a ProtoBuf type given its riak code.
Protobuf =
types: [
"ErrorResp" # 0
"PingReq" # 1
"PingResp" # 2
"GetClientIdReq" # 3
"GetClientIdResp" # 4
"SetClientIdReq" # 5
"SetClientIdResp" # 6
"GetServerInfoReq" # 7
"GetServerInfoResp" # 8
"GetReq" # 9
"GetResp" # 10
"PutReq" # 11
"PutResp" # 12
"DelReq" # 13
"DelResp" # 14
"ListBucketsReq" # 15
"ListBucketsResp" # 16
"ListKeysReq" # 17
"ListKeysResp" # 18
"GetBucketReq" # 19
"GetBucketResp" # 20
"SetBucketReq" # 21
"SetBucketResp" # 22
"MapRedReq" # 23
"MapRedResp" # 24
]

# Find a Protobuf type given its riak code.
type: (num) ->
@[@types[num]]

schemaFile: path.join path.dirname(module.filename), 'riak.desc'

# lazily load protobuf schema
ProtoBuf.__defineGetter__ 'schema', ->
@_schema ||= new (require('protobuf_for_node').Schema)(fs.readFileSync(ProtoBuf.schemaFile))
Protobuf.__defineGetter__ 'schema', ->
@_schema ||= new (require('protobuf_for_node').Schema)(fs.readFileSync(Protobuf.schemaFile))

# lazily load protobuf types
ProtoBuf.types.forEach (name) ->
Protobuf.types.forEach (name) ->
cached_name = "_#{name}"

ProtoBuf.__defineGetter__ name, ->
Protobuf.__defineGetter__ name, ->
if @[cached_name]
@[cached_name]
else
code = ProtoBuf.types.indexOf(name)
if sch = ProtoBuf.schema["Rpb#{name}"]
code = Protobuf.types.indexOf(name)
if sch = Protobuf.schema["Rpb#{name}"]
sch.riak_code = code
@[cached_name] = sch
else
Expand Down

0 comments on commit 72df47b

Please sign in to comment.