diff --git a/asyncio_slow/asyncio_slow.py b/asyncio_slow/asyncio_slow.py new file mode 100644 index 000000000..55f1347f1 --- /dev/null +++ b/asyncio_slow/asyncio_slow.py @@ -0,0 +1,151 @@ +import time +import logging + + +log = logging.getLogger("asyncio") + + +# Workaround for not being able to subclass builtin types +class LoopStop(Exception): + pass + +class InvalidStateError(Exception): + pass + +# Object not matching any other object +_sentinel = [] + + +class EventLoop: + + def __init__(self): + self.q = [] + + def call_soon(self, c, *args): + self.q.append((c, args)) + + def call_later(self, delay, c, *args): + def _delayed(c, args, delay): + yield from sleep(delay) + self.call_soon(c, *args) + Task(_delayed(c, args, delay)) + + def run_forever(self): + while self.q: + c = self.q.pop(0) + try: + c[0](*c[1]) + except LoopStop: + return + # I mean, forever + while True: + time.sleep(1) + + def stop(self): + def _cb(): + raise LoopStop + self.call_soon(_cb) + + def run_until_complete(self, coro): + t = async(coro) + t.add_done_callback(lambda a: self.stop()) + self.run_forever() + + def close(self): + pass + + +_def_event_loop = EventLoop() + + +class Future: + + def __init__(self, loop=_def_event_loop): + self.loop = loop + self.res = _sentinel + self.cbs = [] + + def result(self): + if self.res is _sentinel: + raise InvalidStateError + return self.res + + def add_done_callback(self, fn): + if self.res is _sentinel: + self.cbs.append(fn) + else: + self.loop.call_soon(fn, self) + + def set_result(self, val): + self.res = val + for f in self.cbs: + f(self) + + +class Task(Future): + + def __init__(self, coro, loop=_def_event_loop): + super().__init__() + self.loop = loop + self.c = coro + # upstream asyncio forces task to be scheduled on instantiation + self.loop.call_soon(self) + + def __call__(self): + try: + next(self.c) + self.loop.call_soon(self) + except StopIteration as e: + log.debug("Coro finished: %s", self.c) + self.set_result(None) + + +def get_event_loop(): + return _def_event_loop + + +# Decorator +def coroutine(f): + return f + + +def async(coro): + if isinstance(coro, Future): + return coro + return Task(coro) + + +class _Wait(Future): + + def __init__(self, n): + Future.__init__(self) + self.n = n + + def _done(self): + self.n -= 1 + log.debug("Wait: remaining tasks: %d", self.n) + if not self.n: + self.set_result(None) + + def __call__(self): + pass + + +def wait(coro_list, loop=_def_event_loop): + + w = _Wait(len(coro_list)) + + for c in coro_list: + t = async(c) + t.add_done_callback(lambda val: w._done()) + + return w + + +def sleep(secs): + t = time.time() + log.debug("Started sleep at: %s, targetting: %s", t, t + secs) + while time.time() < t + secs: + time.sleep(0.01) + yield + log.debug("Finished sleeping %ss", secs) diff --git a/asyncio_slow/test_chain.py b/asyncio_slow/test_chain.py new file mode 100644 index 000000000..8d6b9a615 --- /dev/null +++ b/asyncio_slow/test_chain.py @@ -0,0 +1,18 @@ +#https://docs.python.org/3.4/library/asyncio-task.html#example-chain-coroutines +#import asyncio +import asyncio_slow as asyncio + +@asyncio.coroutine +def compute(x, y): + print("Compute %s + %s ..." % (x, y)) + yield from asyncio.sleep(1.0) + return x + y + +@asyncio.coroutine +def print_sum(x, y): + result = yield from compute(x, y) + print("%s + %s = %s" % (x, y, result)) + +loop = asyncio.get_event_loop() +loop.run_until_complete(print_sum(1, 2)) +loop.close() diff --git a/asyncio_slow/test_future.py b/asyncio_slow/test_future.py new file mode 100644 index 000000000..53026c8d0 --- /dev/null +++ b/asyncio_slow/test_future.py @@ -0,0 +1,15 @@ +#https://docs.python.org/3.4/library/asyncio-task.html#example-chain-coroutines +#import asyncio +import asyncio_slow as asyncio + +@asyncio.coroutine +def slow_operation(future): + yield from asyncio.sleep(1) + future.set_result('Future is done!') + +loop = asyncio.get_event_loop() +future = asyncio.Future() +asyncio.Task(slow_operation(future)) +loop.run_until_complete(future) +print(future.result()) +loop.close() diff --git a/asyncio_slow/test_future2.py b/asyncio_slow/test_future2.py new file mode 100644 index 000000000..8ba03ef85 --- /dev/null +++ b/asyncio_slow/test_future2.py @@ -0,0 +1,21 @@ +#https://docs.python.org/3.4/library/asyncio-task.html#example-future-with-run-forever +#import asyncio +import asyncio_slow as asyncio + +@asyncio.coroutine +def slow_operation(future): + yield from asyncio.sleep(1) + future.set_result('Future is done!') + +def got_result(future): + print(future.result()) + loop.stop() + +loop = asyncio.get_event_loop() +future = asyncio.Future() +asyncio.Task(slow_operation(future)) +future.add_done_callback(got_result) +try: + loop.run_forever() +finally: + loop.close() \ No newline at end of file diff --git a/asyncio_slow/test_hello_world.py b/asyncio_slow/test_hello_world.py new file mode 100644 index 000000000..fab558134 --- /dev/null +++ b/asyncio_slow/test_hello_world.py @@ -0,0 +1,12 @@ +#https://docs.python.org/3.4/library/asyncio-task.html#example-hello-world-coroutine +#import asyncio +import asyncio_slow as asyncio + +@asyncio.coroutine +def greet_every_two_seconds(): + while True: + print('Hello World') + yield from asyncio.sleep(2) + +loop = asyncio.get_event_loop() +loop.run_until_complete(greet_every_two_seconds()) diff --git a/asyncio_slow/test_hello_world_bare.py b/asyncio_slow/test_hello_world_bare.py new file mode 100644 index 000000000..1f8d9702f --- /dev/null +++ b/asyncio_slow/test_hello_world_bare.py @@ -0,0 +1,12 @@ +#import asyncio +import asyncio_slow as asyncio + +@asyncio.coroutine +def greet_every_two_seconds(): + while True: + print('Hello World') + yield from asyncio.sleep(2) + +loop = asyncio.get_event_loop() +asyncio.Task(greet_every_two_seconds()) +loop.run_forever() diff --git a/asyncio_slow/test_hello_world_callback.py b/asyncio_slow/test_hello_world_callback.py new file mode 100644 index 000000000..9836ffd7b --- /dev/null +++ b/asyncio_slow/test_hello_world_callback.py @@ -0,0 +1,11 @@ +# https://docs.python.org/3.4/library/asyncio-eventloop.html#example-hello-world-callback +#import asyncio +import asyncio_slow as asyncio + +def print_and_repeat(loop): + print('Hello World') + loop.call_later(2, print_and_repeat, loop) + +loop = asyncio.get_event_loop() +loop.call_soon(print_and_repeat, loop) +loop.run_forever() diff --git a/asyncio_slow/test_parallel.py b/asyncio_slow/test_parallel.py new file mode 100644 index 000000000..48a187b87 --- /dev/null +++ b/asyncio_slow/test_parallel.py @@ -0,0 +1,21 @@ +#https://docs.python.org/3.4/library/asyncio-task.html#example-parallel-execution-of-tasks +#import asyncio +import asyncio_slow as asyncio + +@asyncio.coroutine +def factorial(name, number): + f = 1 + for i in range(2, number+1): + print("Task %s: Compute factorial(%s)..." % (name, i)) + yield from asyncio.sleep(1) + f *= i + print("Task %s: factorial(%s) = %s" % (name, number, f)) + +tasks = [ + asyncio.Task(factorial("A", 2)), + asyncio.Task(factorial("B", 3)), + asyncio.Task(factorial("C", 4))] + +loop = asyncio.get_event_loop() +loop.run_until_complete(asyncio.wait(tasks)) +loop.close() diff --git a/uasyncio/metadata.txt b/uasyncio/metadata.txt new file mode 100644 index 000000000..d0901ec75 --- /dev/null +++ b/uasyncio/metadata.txt @@ -0,0 +1,6 @@ +srctype = micropython-lib +type = module +version = 0.6.1 +author = Paul Sokolovsky +long_desc = Lightweight asyncio-like library built around native Python coroutines, not around un-Python devices like callback mess. +depends = heapq, errno, select, logging diff --git a/uasyncio/setup.py b/uasyncio/setup.py new file mode 100644 index 000000000..0e56c7549 --- /dev/null +++ b/uasyncio/setup.py @@ -0,0 +1,19 @@ +import sys +# Remove current dir from sys.path, otherwise setuptools will peek up our +# module instead of system. +sys.path.pop(0) +from setuptools import setup + + +setup(name='micropython-uasyncio', + version='0.6.1', + description='uasyncio module for MicroPython', + long_description='Lightweight asyncio-like library built around native Python coroutines, not around un-Python devices like callback mess.', + url='https://github.com/micropython/micropython/issues/405', + author='Paul Sokolovsky', + author_email='micro-python@googlegroups.com', + maintainer='MicroPython Developers', + maintainer_email='micro-python@googlegroups.com', + license='MIT', + py_modules=['uasyncio'], + install_requires=['micropython-heapq', 'micropython-errno', 'micropython-select', 'micropython-logging']) diff --git a/uasyncio/test_call_soon.py b/uasyncio/test_call_soon.py new file mode 100644 index 000000000..99ccfefbc --- /dev/null +++ b/uasyncio/test_call_soon.py @@ -0,0 +1,13 @@ +import uasyncio as asyncio +import time + + +def cb(): + print("callback") + time.sleep(0.5) + loop.call_soon(cb) + + +loop = asyncio.get_event_loop() +loop.call_soon(cb) +loop.run_forever() diff --git a/uasyncio/test_http_client.py b/uasyncio/test_http_client.py new file mode 100644 index 000000000..0b4ff83b7 --- /dev/null +++ b/uasyncio/test_http_client.py @@ -0,0 +1,25 @@ +import uasyncio as asyncio + +@asyncio.coroutine +def print_http_headers(url): + reader, writer = yield from asyncio.open_connection(url, 80) + print(reader, writer) + print("================") + query = "GET / HTTP/1.0\r\n\r\n" + yield from writer.awrite(query.encode('latin-1')) + while True: + line = yield from reader.readline() + if not line: + break + if line: + print(line.rstrip()) + +import logging +logging.basicConfig(level=logging.INFO) +url = "google.com" +loop = asyncio.get_event_loop() +#task = asyncio.async(print_http_headers(url)) +#loop.run_until_complete(task) +loop.call_soon(print_http_headers(url)) +loop.run_forever() +loop.close() diff --git a/uasyncio/test_http_server.py b/uasyncio/test_http_server.py new file mode 100644 index 000000000..51654d1d2 --- /dev/null +++ b/uasyncio/test_http_server.py @@ -0,0 +1,21 @@ +import uasyncio as asyncio + +@asyncio.coroutine +def serve(reader, writer): + print(reader, writer) + print("================") + print((yield from reader.read())) + yield from writer.awrite("HTTP/1.0 200 OK\r\n\r\nHello.\r\n") + print("After response write") + yield from writer.close() + print("Finished processing request") + + +import logging +#logging.basicConfig(level=logging.INFO) +logging.basicConfig(level=logging.DEBUG) +loop = asyncio.get_event_loop() +mem_info() +loop.call_soon(asyncio.start_server(serve, "127.0.0.1", 8081)) +loop.run_forever() +loop.close() diff --git a/uasyncio/uasyncio.py b/uasyncio/uasyncio.py new file mode 100644 index 000000000..f34123e65 --- /dev/null +++ b/uasyncio/uasyncio.py @@ -0,0 +1,288 @@ +import __main__ +import time +import heapq +import errno +import logging + + +log = logging.getLogger("asyncio") + +type_gen = type((lambda: (yield))()) + +class EventLoop: + + def __init__(self): + self.q = [] + self.cnt = 0 + + def time(self): + return time.time() + + def call_soon(self, callback, *args): + self.call_at(0, callback, *args) + + def call_later(self, delay, callback, *args): + self.call_at(self.time() + delay, callback, *args) + + def call_at(self, time, callback, *args): + # Including self.cnt is a workaround per heapq docs + log.debug("Scheduling %s", (time, self.cnt, callback, args)) + heapq.heappush(self.q, (time, self.cnt, callback, args)) +# print(self.q) + self.cnt += 1 + + def wait(self, delay): + # Default wait implementation, to be overriden in subclasses + # with IO scheduling + log.debug("Sleeping for: %s", delay) + time.sleep(delay) + + def run_forever(self): + while True: + if self.q: + t, cnt, cb, args = heapq.heappop(self.q) + log.debug("Next coroutine to run: %s", (t, cnt, cb, args)) +# __main__.mem_info() + tnow = self.time() + delay = t - tnow + if delay > 0: + self.wait(delay) + else: + self.wait(-1) + # Assuming IO completion scheduled some tasks + continue + if callable(cb): + cb(*args) + else: + delay = 0 + try: + if args == (): + args = (None,) + log.debug("Coroutine %s send args: %s", cb, args) + ret = cb.send(*args) + log.debug("Coroutine %s yield result: %s", cb, ret) + if isinstance(ret, SysCall): + arg = ret.args[0] + if isinstance(ret, Sleep): + delay = arg + elif isinstance(ret, IORead): +# self.add_reader(ret.obj.fileno(), lambda self, c, f: self.call_soon(c, f), self, cb, ret.obj) +# self.add_reader(ret.obj.fileno(), lambda c, f: self.call_soon(c, f), cb, ret.obj) + self.add_reader(arg.fileno(), lambda cb, f: self.call_soon(cb, f), cb, arg) + continue + elif isinstance(ret, IOWrite): + self.add_writer(arg.fileno(), lambda cb, f: self.call_soon(cb, f), cb, arg) + continue + elif isinstance(ret, IOReadDone): + self.remove_reader(arg.fileno()) + elif isinstance(ret, IOWriteDone): + self.remove_writer(arg.fileno()) + elif isinstance(ret, StopLoop): + return arg + elif isinstance(ret, type_gen): + self.call_soon(ret) + elif ret is None: + # Just reschedule + pass + else: + assert False, "Unsupported coroutine yield value: %r (of type %r)" % (ret, type(ret)) + except StopIteration as e: + log.debug("Coroutine finished: %s", cb) + continue + self.call_later(delay, cb, *args) + + def run_until_complete(self, coro): + def _run_and_stop(): + yield from coro + yield StopLoop(0) + self.call_soon(_run_and_stop()) + self.run_forever() + + def close(self): + pass + + +import select + +class EpollEventLoop(EventLoop): + + def __init__(self): + EventLoop.__init__(self) + self.poller = select.epoll(1) + + def add_reader(self, fd, cb, *args): + log.debug("add_reader%s", (fd, cb, args)) + self.poller.register(fd, select.EPOLLIN, (cb, args)) + + def remove_reader(self, fd): + log.debug("remove_reader(%s)", fd) + self.poller.unregister(fd) + + def add_writer(self, fd, cb, *args): + log.debug("add_writer%s", (fd, cb, args)) + self.poller.register(fd, select.EPOLLOUT, (cb, args)) + + def remove_writer(self, fd): + log.debug("remove_writer(%s)", fd) + self.poller.unregister(fd) + + def wait(self, delay): + log.debug("epoll.wait(%d)", delay) + if delay == -1: + res = self.poller.poll(-1) + else: + res = self.poller.poll(int(delay * 1000)) + log.debug("epoll result: %s", res) + for cb, ev in res: + log.debug("Calling IO callback: %s%s", cb[0], cb[1]) + cb[0](*cb[1]) + + +class SysCall: + + def __init__(self, *args): + self.args = args + + def handle(self): + raise NotImplementedError + +class Sleep(SysCall): + pass + +class StopLoop(SysCall): + pass + +class IORead(SysCall): + pass + +class IOWrite(SysCall): + pass + +class IOReadDone(SysCall): + pass + +class IOWriteDone(SysCall): + pass + + +def get_event_loop(): + return EpollEventLoop() + +def coroutine(f): + return f + +def async(coro): + # We don't have Task bloat, so op is null + return coro + +def sleep(secs): + yield Sleep(secs) + + +import usocket as _socket + +class StreamReader: + + def __init__(self, s): + self.s = s + + def read(self, n=-1): + s = yield IORead(self.s) + while True: + res = self.s.read(n) + if res is not None: + break + log.warn("Empty read") + if not res: + yield IOReadDone(self.s) + return res + + def readline(self): + log.debug("StreamReader.readline()") + s = yield IORead(self.s) + log.debug("StreamReader.readline(): after IORead: %s", s) + while True: + res = self.s.readline() + if res is not None: + break + log.warn("Empty read") + if not res: + yield IOReadDone(self.s) + log.debug("StreamReader.readline(): res: %s", res) + return res + + def __repr__(self): + return "<StreamReader %r>" % self.s + + +class StreamWriter: + + def __init__(self, s): + self.s = s + + def awrite(self, buf): + # This method is called awrite (async write) to not proliferate + # incompatibility with original asyncio. Unlike original asyncio + # whose .write() method is both not a coroutine and guaranteed + # to return immediately (which means it has to buffer all the + # data), this method is a coroutine. + sz = len(buf) + log.debug("StreamWriter.awrite(): spooling %d bytes", sz) + while True: + res = self.s.write(buf) + # If we spooled everything, return immediately + if res == sz: + log.debug("StreamWriter.awrite(): completed spooling %d bytes", res) + return + if res is None: + res = 0 + log.debug("StreamWriter.awrite(): spooled partial %d bytes", res) + assert res < sz + buf = buf[res:] + sz -= res + s = yield IOWrite(self.s) + log.debug("StreamWriter.awrite(): can write more") + + def close(self): + yield IOWriteDone(self.s) + self.s.close() + + def __repr__(self): + return "<StreamWriter %r>" % self.s + + +def open_connection(host, port): + log.debug("open_connection(%s, %s)", host, port) + s = _socket.socket() + s.setblocking(False) + ai = _socket.getaddrinfo(host, port) + addr = ai[0][4] + try: + s.connect(addr) + except OSError as e: + if e.args[0] != errno.EINPROGRESS: + raise + log.debug("open_connection: After connect") + s = yield IOWrite(s) + log.debug("open_connection: After iowait: %s", s) + return StreamReader(s), StreamWriter(s) + + +def start_server(client_coro, host, port): + log.debug("start_server(%s, %s)", host, port) + s = _socket.socket() + s.setblocking(False) + + ai = _socket.getaddrinfo(host, port) + addr = ai[0][4] + s.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, 1) + s.bind(addr) + s.listen(10) + while True: + log.debug("start_server: Before accept") + yield IORead(s) + log.debug("start_server: After iowait") + s2, client_addr = s.accept() + s2.setblocking(False) + log.debug("start_server: After accept: %s", s2) + yield client_coro(StreamReader(s2), StreamWriter(s2))