Skip to content

Commit

Permalink
rethinkql -- basic change feed support working in the browser :-)
Browse files Browse the repository at this point in the history
  • Loading branch information
williamstein committed Jun 26, 2015
1 parent 7a39c42 commit 4d3a824
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 17 deletions.
21 changes: 14 additions & 7 deletions salvus/client.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,8 @@ class exports.Connection extends EventEmitter
cb(mesg.error)
else
cb(undefined, mesg)
delete @call_callbacks[id]
if not mesg.multi_response
delete @call_callbacks[id]

# Finally, give other listeners a chance to do something with this message.
@emit('message', mesg)
Expand Down Expand Up @@ -2417,14 +2418,20 @@ class exports.Connection extends EventEmitter
# Queries directly to the database (sort of like Facebook's GraphQL)
query: (opts) =>
opts = defaults opts,
query : required
query : required
changes : undefined
options : undefined
cb : required
@call
message : message.query(query:opts.query, options:opts.options)
error_event : true
cb : required
mesg = message.query
query : opts.query
options : opts.options
changes : opts.changes
multi_response : opts.changes
@call
message : mesg
error_event : true
cb : (err, resp) =>
opts.cb(err, if not err then resp.query)
opts.cb(err, if not err then resp)


#################################################
Expand Down
15 changes: 10 additions & 5 deletions salvus/hub.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -3426,18 +3426,23 @@ class Client extends EventEmitter
return
dbg = @dbg("user_query")
dbg("account_id=#{@account_id} makes query='#{misc.to_json(query)}'")
options = mesg.options
if not options? or options.length == 0
options = [{limit: 100}]
first = true
database.user_query
account_id : @account_id
query : query
options : options
options : mesg.options
changes : if mesg.changes then mesg.id
cb : (err, result) =>
if err
@error_to_client(id:mesg.id, error:err)
else
mesg.query = result
if mesg.changes and not first
delete mesg.query
for k, v of result
mesg[k] = v
else
first = false
mesg.query = result
@push_to_client(mesg)


Expand Down
8 changes: 5 additions & 3 deletions salvus/message.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -1779,9 +1779,11 @@ message
#############

message
event : 'query'
id : undefined
query : required
event : 'query'
id : undefined
query : required
changes : undefined
multi_response : false
options : undefined


Expand Down
23 changes: 21 additions & 2 deletions salvus/rethink.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -1724,7 +1724,8 @@ class RethinkDB
opts = defaults opts,
account_id : required
query : required
options : {}
options : []
changes : undefined
cb : required # cb(err, result)
if misc.is_array(opts.query)
# array of queries
Expand All @@ -1742,6 +1743,9 @@ class RethinkDB
subs =
'{account_id}' : opts.account_id

if opts.changes
changes = opts.cb

# individual query
result = {}
f = (table, cb) =>
Expand All @@ -1764,6 +1768,7 @@ class RethinkDB
query : query
options : opts.options
multi : multi
changes : changes
cb : (err, x) =>
result[table] = x; cb(err)
else
Expand Down Expand Up @@ -2096,6 +2101,7 @@ class RethinkDB
query : required
multi : required
options : required
changes : undefined # if given, should be a function to call with additional changes
cb : required # cb(err, result)
###
# User queries are of the form
Expand All @@ -2105,6 +2111,9 @@ class RethinkDB
Using the whitelist rules specified in SCHEMA, we
determine each of get_all, filter, pluck, and options,
then run the query.
If no error in query, and changes is a given uuid, then sets up a change
feed that calls opts.cb on changes as well.
###

# get data about user queries on this table
Expand Down Expand Up @@ -2162,7 +2171,17 @@ class RethinkDB
else if limit and x.length == limit
x.push('...')
opts.cb(undefined, x)

if opts.changes?
# no errors -- setup changefeed now
winston.debug("FEED -- setting up a feed")
db_query.changes().run (err, feed) =>
if err
winston.debug("FEED -- error setting up #{misc.to_json(err)}")
opts.cb(err)
else
feed.each (err, x) =>
winston.debug("FEED -- saw a change! #{misc.to_json([err,x])}")
opts.changes(err, x)


has_null_leaf = (obj) ->
Expand Down

0 comments on commit 4d3a824

Please sign in to comment.