Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ruby EventMachine Handler callback API #4630

Open
nviennot opened this issue Aug 5, 2015 · 16 comments
Open

Ruby EventMachine Handler callback API #4630

nviennot opened this issue Aug 5, 2015 · 16 comments

Comments

@nviennot
Copy link
Contributor

nviennot commented Aug 5, 2015

Hi :)

I was trying to abort a ongoing query from an em_run() call, but the only way I found was to save the caller variable (which is a QueryHandle) from any of these callbacks:

def on_open(caller)
end
def on_close(caller)
end
def on_wait_complete(caller)
end
def on_error(err, caller)
raise err
end
def on_val(val, caller)
end
def on_array(arr, caller)
if method(:on_atom).owner != Handler
handle(:on_atom, [arr], caller)
else
arr.each {|x|
break if stopped?
handle(:on_stream_val, [x], caller)
}
end
end
def on_atom(val, caller)
handle(:on_val, [val], caller)
end
def on_stream_val(val, caller)
handle(:on_val, [val], caller)
end
def on_unhandled_change(val, caller)
and then call close() on the query_handle.

This might not be ideal, because these callbacks might not have had the time to be called, but EM.next_tick() did. Maybe an EM callback would want to close the query_handle already.

Here's what I suggest we do:

  1. Remove all the caller arguments from the Handler callbacks.
  2. Add a new callback on_dispatch(query_handle) that would be called right around here:
    register_query(token, all_opts, callback)

This way we don't need a response from the server to terminate an ongoing query.


I've made NoBrainer compatible with EventMachine through the use of Fibers (so the code is still written in an asynchronous manner). If you are curious about how I use the Handler callbacks, check it out here: https://github.com/nviennot/nobrainer/blob/master/lib/no_brainer/query_runner/em_driver.rb

Further, I've written a self-contained Goliath server to demonstrate the usefulness of the Fiber abstraction: https://gist.github.com/nviennot/67a0795db7ccad973885
Maybe we could add this as a real-time tutorial to complement http://www.rethinkdb.com/docs/rails/

@nviennot
Copy link
Contributor Author

nviennot commented Aug 5, 2015

Nevermind, em_run() returns the query_handle. I feel dumb.
Well, I maintain my suggestion -- but no strong opinion :)

@mlucy mlucy added this to the 2.2-polish milestone Aug 5, 2015
@mlucy
Copy link
Member

mlucy commented Aug 5, 2015

Thanks for the suggestion. Tagging as a ReQL proposal since it's an API change and scheduling it for 2.2-polish so we can discuss it.

@nviennot
Copy link
Contributor Author

nviennot commented Aug 5, 2015

Also, it would be useful if the ruby driver could support async run() without requiring eventmachine. Or just support Fibers (e.g. switch the regular condition variable used in @waiters with https://github.com/igrigorik/em-synchrony/blob/master/lib/em-synchrony/thread.rb#L70-L125). Further, some people are fan of Celluloid, we might want to be able to integrate with such framework. So maybe the "wait" functionality should be abstracted away.

@mlucy
Copy link
Member

mlucy commented Aug 5, 2015

@nviennot -- could you elaborate a little more on the fiber support? I confess I've never really used them.

@nviennot
Copy link
Contributor Author

nviennot commented Aug 5, 2015

We shouldn't try to support all the different I/O flavors the user would want.
I think we should support an asynchronous flavored run, which the blocking behavior is implemented by the callbacks. The original blocking run() would be implemented using it.

Practically, I would rename em_run to async_run, and remove any EventMachine references in the code. It is not useful for the RethinkDB driver. The driver would operate as such:

def async_run(query, options, handler=SyncHandler.new)
  ...schedule query to run...
  return handler
end

def run(query, options, handler=SyncHandler.new)
  async_run(query, options, handler).wait
end

Support we have a Handler interface defined as such:

# Handler interface
class Handler
  # For simplicity, I'm assuming we can only get back a single value from the
  # database, excluding streams, so we don't have to deal with cursors in the
  # following code.

  def on_value(val)
    # called from the RethinkDB thread
  end

  def on_error(err)
    # called from the RethinkDB thread
  end

  def wait
    # not a callback, but a way to block until we have some values.
    # This is called by the synchronous version of run().
    # It should return the received value, or raise received errors.
    # Just like futures/promises.
  end
end

To provide the original behavior of run(), the driver could internally use the following synchronous handler.

class SyncHandler < Handler
  def initialize
    @mutex = Mutex.new
    @cond = ConditionVariable.new
  end

  def on value(val)
    @mutex.synchronize do
      @value = val
      @cond.signal
    end
  end

  def on error(err)
    @mutex.synchronize do
      @error = err
      @cond.signal
    end
  end

  def wait
    @mutex.synchronize do
      @cond.wait(@mutex)
    end

    raise @error if @error
    return @value
  end
end

The user could implement such Handler when using EventMachine (This could be provided by the RethinkDB driver as well):

class EventMachineHandler < Handler
  def initialize(success_cb, error_cb)
    @success_cb = success_cb
    @error_cb = error_cb
  end

  def on value(val)
    EM.schedule { @success_cb.call(val) }
  end

  def on error(err)
    EM.schedule { @error_cb.call(err) }
  end

  def wait
    raise NotImplementedError
  end
end

Now with EventMachine and em-synchony (uses Fibers):

class EventMachineSyncHandler < Handler
  def initialize
    @ready = EventMachine::DefaultDeferrable.new
  end

  def on value(val)
    @ready.succeed(val)
  end

  def on error(err)
    @ready.succeed(err)
  end

  def wait
    value = EM::Synchrony.sync(@ready)
    raise value if value.is_a?(Exception)
    value
  end
end

The driver should not worry about what the user wants in terms of blocking
semantics. It should be abstracted away in the Handler interface. Note that
this also provides asynchronous capabilities without the use of EventMachine.
The following example would run two queries in parallel with the SyncHandler:

handler1 = query.async_run()
handler2 = query.async_run()

value1 = handler1.wait
value2 = handler2.wait

@danielmewes
Copy link
Member

I think this is really nice. We could keep the current interface intact as a thin layer on top of this API, and allow for easy extensibility for users who want to use a different async method.

@mlucy have you had a chance to check if this would work as a replacement for how our em_run() is currently implemented?

@danielmewes
Copy link
Member

Unfortunately we didn't end up properly discussing this in the 2.2 ReQL discussion period.
Moving into subsequent. We'll give it another shot for 2.3.

@tinco
Copy link
Contributor

tinco commented Apr 22, 2016

Can we bump this issue? The dependency on EM is really awkward especially as EM does not fit well in the Ruby ecosystem and has fallen out of favor in the community for that reason. Big frameworks like RoR actively avoid EM.

Would you accept a PR for this? Does the team approve of the name #async_run? A nice move would be to mark #em_run deprecated and keep it functional until 3.0.

@danielmewes danielmewes modified the milestones: 2.4, subsequent Apr 22, 2016
@danielmewes
Copy link
Member

@tinco We'll look into it for RethinkDB 2.4 (the next major release).

@mlucy
Copy link
Member

mlucy commented Apr 24, 2016

I like @nviennot's proposal for a generic abstraction over different types of asynchronous behavior. Two things I'd change, though:

  • I wouldn't call the class Handler. For backwards-compatibility reasons we probably want a Handler to continue being the thing it is now. Maybe AsyncHandler is a better name?
  • I don't think wait should be part of the signature since it will never be called internally by the code receiving the AsyncHandler, and only makes sense for the one synchronous handler. The signature should probably literally only require that an AsyncHandler define what to do on a value and what to do on an error.

I think we should continue to support the existing em_run interface with no changes, but I wouldn't be opposed to re-implementing it on top of the more generic architecture. We should also consider offering something like a fiber_run that provides all of the nice syntactic sugar of em_run but for fibers.

@tinco -- since this is an API change it needs to go through the ReQL proposal process for 2.4 in case anyone has objections. The 2.4 proposal process should be happening pretty soon though, and we'd be happy to accept a pull request. We have a lot of tests using em_run, so if you re-implemented em_run on top of the more generic architecture that would be a good way to test whether it works in edge cases.

@mlucy
Copy link
Member

mlucy commented Jun 13, 2016

I read over everything above. Here's a slight variant of the proposal that I think does what we want:

  • We leave Handler and em_run the way they are.
  • We add a new type AsyncHandler.
    • An AsyncHandler supports a single method handle which accepts a block.
      • handle will run that block inside of whatever asynchronous thingy it's doing, getting back three things: a method name, arguments to that method, and the current QueryHandle. (An example would be [:on_change, [OLD_VAL, NEW_VAL], CURRENT_QUERY_HANDLE].) It will then pass those three things on to a Handler (or maybe do something entirely different if the person writing it wants).
      • handle accepts a block because we might not want to do all the parsing work on the main thread (and in fact the current implementation delays some of the parsing work and does it inside an EventMachine tick).
  • There will be a function async_run which is like em_run except it accepts an AsyncHandler.
  • We will probably re-implement em_run to call async_run with an AsyncHandler that knows how to talk to EventMachine. (Basically the implementation will be that it gets the block, and schedules a tick in EventMachine that evaluates the block and then calls handle on the Handler.)
  • Optionally, we will add a fiber_run which is like em_run -- an AsyncHandler we wrote ourselves that plays nicely with fibers.

@tinco
Copy link
Contributor

tinco commented Jun 13, 2016

Sounds great, would be excellent if we could get it in together with the Rails 5.0 release. A well timed blog post on Rails 5.0 support would maybe get RethinkDB some exposure :)

@danielmewes
Copy link
Member

@mlucy This interface sounds a bit technical to me (e.g. it's not obvious what the purpose of the block passed to handle is without additional explanation, and it took me a while to understand), but it sounds like it will do everything we want and it makes adding additional async frameworks very easy. I think it's good.

@danielmewes
Copy link
Member

@tinco Do you know if there's an ETA for Rails 5.0?

@tinco
Copy link
Contributor

tinco commented Jun 16, 2016

@danielmewes they usually do 2rc's with about a month between them, and the full release about a week after the 2nd rc, so if history is any indicator it could be within a week or two.

The interface will only be used by library implementors so I don't think it being a little bit technical should be an issue.

I would offer to help with the implementation but unfortunately I'm moving next wednesday so I have a super busy weekend ahead of me :)

@danielmewes
Copy link
Member

Marking settled on the proposal from this comment #4630 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants