Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncIO support #37

Open
rgalanakis opened this issue Aug 31, 2014 · 8 comments
Open

AsyncIO support #37

rgalanakis opened this issue Aug 31, 2014 · 8 comments

Comments

@rgalanakis
Copy link
Owner

I am not sure what'd be involved, but it would be nice if goless had asyncio support. I'm not sure what that means since I haven't looked into it much, so please educate me (or start a pull request).

@pothos
Copy link

pothos commented Feb 14, 2016

Python 3.5 brings everything to par up with Go, but it looks quite different. Except that synchronous channels (aka queues) have to be simulated by a handshake and closing a queue is also not supported. For "select" I don't know.

#!/usr/bin/env python3.5

import asyncio

async def give(start, end, c, k):
  for i in range(start, end):
    await c.put(i)
    await k.get() # sync send
  await c.put(None) # simulate closing of channel

async def consume(c, k):
  while True:
    x = await c.get()
    await k.put(True) # sync recv
    if x is None: # channel closed
      return
    print(x)

loop = asyncio.get_event_loop()
c, k = asyncio.Queue(), asyncio.Queue()
tasks = [asyncio.ensure_future(give(3, 7, c, k)), asyncio.ensure_future(consume(c, k))]
try:
  loop.run_until_complete(asyncio.wait(tasks))
finally:
  loop.close()

But as it is shown in http://www.snarky.ca/how-the-heck-does-async-await-work-in-python-3-5 the whole asyncio library can be ignored by writing an own event loop with only async and await. But I think that using asyncio with some wrapper would be good enough if the goless.chan implementation is working like Queue to support await and goless.go is adding something to tasks. Yet it would require to start the loop at the end and use "async def" and await c.recv/send

@pothos
Copy link

pothos commented Feb 14, 2016

#!/usr/bin/env python3.5

import asyncio
# @TODO: goless.select via loop through send_ready, recv_ready
class Chan:
  q = None  # data channel
  x = None  # sync channel for size=0
  size = None
  is_closed = False
  close = "{}{}".format(hash("Chan.closed"), "Chan.closed")  # magic string as last element
  def __init__(self, size=0):
    """size 0 or None indicates a blocking channel (handshake)
    size -1 indicates an unlimited buffer size
    otherwise send will block when buffer size is reached"""
    if size == 0:
      self.q = asyncio.Queue(1)
      self.x = asyncio.Queue(1)
    elif size == -1:
      self.q = asyncio.Queue(0)
    else:
      self.q = asyncio.Queue(size)
    self.size = size

  @asyncio.coroutine
  def close(self):
    self.is_closed = True
    yield from self.q.put(self.close)

  @asyncio.coroutine
  def send(self, item):
    if self.is_closed:
      raise Exception
    yield from self.q.put(item)
    if self.size == 0:
      yield from self.x.get()

  def send_ready(self):
    return not self.q.full()

  def recv_ready(self):
    return not self.q.empty()

  @asyncio.coroutine
  def recv(self):
    if self.is_closed and self.q.empty():
      self.put_nowait(self.close)
      raise Exception
    g = yield from self.q.get()
    if self.is_closed and self.q.empty() and g == self.close:
      self.q.put_nowait(self.close)  # push back
      raise Exception
    if self.size == 0:
      yield from self.x.put(True)
    return g

  async def __aiter__(self):
    return self

  async def __anext__(self):
    try:
      return await self.recv()
    except:
      raise StopAsyncIteration

go_tasks = []
def go(f, *args, **kwargs):
  go_tasks.append(asyncio.ensure_future(f(*args, **kwargs)))
def run():
  loop = asyncio.get_event_loop()
  try:
    loop.run_until_complete(asyncio.wait(go_tasks))
  finally:
    loop.stop()


# EXAMPLE:

async def give(start, end, c):
  for i in range(start, end):
    await c.send(i)
  await c.close()

async def consume_iter(t, c):
  async for i in c:
    print(t, i)
  return

async def consume(t, c):
  while True:
    try:
      x = await c.recv()
    except:
      break
    print(t, x)

c = Chan()
go(give, 1, 21, c)
go(consume, "A", c)
go(consume_iter, "B", c)
go(consume, "C", c)

run()

@rgalanakis
Copy link
Owner Author

Hmmm, this is very interesting!

So the next step would be to convert that into a pluggable backend. The main difficulty is creating a channel with send and receive methods that behave as if they are blocking, but it looks like that's what you have above? I don't think you'll need the internal event loop bookkeeping above, but it could be added (possibly for all backends though would need to refresh my memory). After seeing your examples I'm pretty sure an asyncio backend wouldn't be too difficult. Do check out https://github.com/rgalanakis/goless/blob/master/goless/backends.py for the two current backend examples.

Will you be at PyCon in Portland, and want to help on this? I won't be attending the conference since I'm not writing any Python nowadays, but since I'm in Portland I was planning on going to the sprints and hacking on something.

@pothos
Copy link

pothos commented Feb 16, 2016

Hey,
I really don't know if it would be possible to use normal functions without async def or receiving/sending without await. The rest should be okay for writing a backend. I'm not in the US and so won't attend, but thank you for your invitation! ;)
Anyway I've added support for select and published a draft here: https://github.com/pothos/awaitchannel

@pothos
Copy link

pothos commented Feb 18, 2016

It was not as nice as I expected, but the run() at the end can be omitted by using a separate thread. Code is updated in the repo.

@rgalanakis
Copy link
Owner Author

That looks really promising! I will report back if I get a chance to hack on it, otherwise if someone else takes this on it will be very useful for them.

@runflowcode
Copy link

Sorry for jumping in on the thread. Just wondering if this is still on the backlog ?

@rgalanakis
Copy link
Owner Author

Not unless someone is running this in production and/or wants to develop it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants