Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP/RFC] allow polling on a provided fd #72

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 38 additions & 9 deletions neovim/api/nvim.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,20 @@ def new_highlight_source(self):
"""Return new src_id for use with Buffer.add_highlight."""
return self.current.buffer.add_highlight("", 0, src_id=0)

def _error_wrapper(self, fn, call_point, *args, **kwargs):
if fn is None:
return None
def handler():
try:
fn(*args, **kwargs)
except Exception as err:
msg = ("error caught while executing async callback:\n"
"{0!r}\n{1}\n \nthe call was requested at\n{2}"
.format(err, format_exc_skip(1, 5), call_point))
self._err_cb(msg)
raise
return handler

def async_call(self, fn, *args, **kwargs):
"""Schedule `fn` to be called by the event loop soon.

Expand All @@ -333,18 +347,33 @@ def async_call(self, fn, *args, **kwargs):
that shouldn't block neovim.
"""
call_point = ''.join(format_stack(None, 5)[:-1])
handler = self._error_wrapper(fn, call_point, *args, **kwargs)

def handler():
try:
fn(*args, **kwargs)
except Exception as err:
msg = ("error caught while executing async callback:\n"
"{0!r}\n{1}\n \nthe call was requested at\n{2}"
.format(err, format_exc_skip(1, 5), call_point))
self._err_cb(msg)
raise
self._session.threadsafe_call(handler)

def poll_fd(self, fd, on_readable=None, on_writable=None, greenlet=True):
"""
Invoke callbacks when the fd is ready for reading and/or writing. if
`on_readable` is not None, it should be callback, which will be invoked
(with no arguments) when the fd is ready for writing. Similarily if
`on_writable` is not None it will be invoked when the fd is ready for
writing.

Only one callback (of each kind) can be registered on the same fd at a
time. If both readability and writability should be monitored, both
callbacks must be registered by the same `poll_fd` call.

By default, the function is invoked in a greenlet, just like a callback
scheduled by async_call.

Returns a function that deactivates the callback(s).
"""
call_point = ''.join(format_stack(None, 5)[:-1])
on_readable = self._error_wrapper(on_readable, call_point)
on_writable = self._error_wrapper(on_writable, call_point)
return self._session.poll_fd(fd, on_readable, on_writable, greenlet)



class Buffers(object):

Expand Down
4 changes: 4 additions & 0 deletions neovim/msgpack_rpc/async_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ def threadsafe_call(self, fn):
"""Wrapper around `MsgpackStream.threadsafe_call`."""
self._msgpack_stream.threadsafe_call(fn)

def poll_fd(self, fd, on_readable, on_writable):
"""Wrapper around `BaseEventLoop.poll_fd`."""
return self._msgpack_stream.poll_fd(fd, on_readable, on_writable)

def request(self, method, args, response_cb):
"""Send a msgpack-rpc request to Nvim.

Expand Down
12 changes: 12 additions & 0 deletions neovim/msgpack_rpc/event_loop/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,18 @@ def _stop(self):
def _threadsafe_call(self, fn):
self._loop.call_soon_threadsafe(fn)

def _poll_fd(self, fd, on_readable, on_writable):
if on_readable is not None:
self._loop.add_reader(fd, on_readable)
if on_writable is not None:
self._loop.add_writer(fd, on_writable)
def cancel():
if on_readable is not None:
self._loop.remove_reader(fd)
if on_writable is not None:
self._loop.remove_writer(fd)
return cancel

def _setup_signals(self, signals):
if os.name == 'nt':
# add_signal_handler is not supported in win32
Expand Down
18 changes: 18 additions & 0 deletions neovim/msgpack_rpc/event_loop/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,24 @@ def threadsafe_call(self, fn):
"""
self._threadsafe_call(fn)

def poll_fd(self, fd, on_readable=None, on_writable=None):
"""
Invoke callbacks when the fd is ready for reading and/or writing. if
`on_readable` is not None, it should be callback, which will be invoked
(with no arguments) when the fd is ready for writing. Similarily if
`on_writable` is not None it will be invoked when the fd is ready for
writing.

Only one callback (of each kind) can be registered on the same fd at a
time. If both readability and writability should be monitored, both
callbacks must be registered by the same `poll_fd` call.

Returns a function that deactivates the callback(s).
"""
if on_readable is None and on_writable is None:
raise ValueError("poll_fd: At least one of `on_readable` and `on_writable` must be present")
return self._poll_fd(fd, on_readable, on_writable)

def run(self, data_cb):
"""Run the event loop."""
if self._error:
Expand Down
16 changes: 16 additions & 0 deletions neovim/msgpack_rpc/event_loop/uv.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,22 @@ def _on_async(self, handle):
while self._callbacks:
self._callbacks.popleft()()

def _poll_fd(self, fd, on_readable, on_writable):
poll = pyuv.Poll(self._loop, fd)
events = 0
if on_readable is not None:
events |= pyuv.UV_READABLE
if on_writable is not None:
events |= pyuv.UV_WRITABLE
def callback(poll_handle, evts, errorno):
if evts & pyuv.UV_READABLE:
on_readable()
if evts & pyuv.UV_WRITABLE:
on_writable()

poll.start(events, callback)
return poll.stop

def _setup_signals(self, signals):
self._signal_handles = []

Expand Down
4 changes: 4 additions & 0 deletions neovim/msgpack_rpc/msgpack_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ def threadsafe_call(self, fn):
"""Wrapper around `BaseEventLoop.threadsafe_call`."""
self._event_loop.threadsafe_call(fn)

def poll_fd(self, fd, on_readable, on_writable):
"""Wrapper around `BaseEventLoop.poll_fd`."""
return self._event_loop.poll_fd(fd, on_readable, on_writable)

def send(self, msg):
"""Queue `msg` for sending to Nvim."""
debug('sent %s', msg)
Expand Down
21 changes: 19 additions & 2 deletions neovim/msgpack_rpc/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ def __init__(self, async_session):
self._is_running = False
self._setup_exception = None

def threadsafe_call(self, fn, *args, **kwargs):
"""Wrapper around `AsyncSession.threadsafe_call`."""
def _wrap_greenlet(self, fn, *args, **kwargs):
if fn is None:
return None

def handler():
try:
fn(*args, **kwargs)
Expand All @@ -41,8 +43,23 @@ def greenlet_wrapper():
gr = greenlet.greenlet(handler)
gr.switch()

return greenlet_wrapper

def threadsafe_call(self, fn, *args, **kwargs):
"""Wrapper around `AsyncSession.threadsafe_call`."""

greenlet_wrapper = self._wrap_greenlet(fn, *args, **kwargs)
self._async_session.threadsafe_call(greenlet_wrapper)

def poll_fd(self, fd, on_readable, on_writable, greenlet=True):
"""Wrapper around `AsyncSession.threadsafe_call`."""
if greenlet:
on_readable = self._wrap_greenlet(on_readable)
on_writable = self._wrap_greenlet(on_writable)

return self._async_session.poll_fd(fd, on_readable, on_writable)


def next_message(self):
"""Block until a message(request or notification) is available.

Expand Down