Skip to content

Commit

Permalink
webassembly/asyncio: Support top-level await of asyncio Task and Event.
Browse files Browse the repository at this point in the history
This change allows doing a top-level await on an asyncio primitive like
Task and Event.

This feature enables a better interaction and synchronisation between
JavaScript and Python, because `api.runPythonAsync` can now be used (called
from JavaScript) to await on the completion of asyncio primitives.

Signed-off-by: Damien George <damien@micropython.org>
  • Loading branch information
dpgeorge committed Jun 18, 2024
1 parent a053e63 commit e9c898c
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 22 deletions.
24 changes: 18 additions & 6 deletions ports/webassembly/asyncio/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,6 @@ def __next__(self):
# Pause task execution for the given time (integer in milliseconds, uPy extension)
# Use a SingletonGenerator to do it without allocating on the heap
def sleep_ms(t, sgen=SingletonGenerator()):
if cur_task is None:
# Support top-level asyncio.sleep, via a JavaScript Promise.
return jsffi.async_timeout_ms(t)
assert sgen.state is None
sgen.state = ticks_add(ticks(), max(0, t))
return sgen
Expand All @@ -69,6 +66,18 @@ def sleep(t):
asyncio_timer = None


class TopLevelCoro:
@staticmethod
def set(resolve, reject):
TopLevelCoro.resolve = resolve
TopLevelCoro.reject = reject
_schedule_run_iter(0)

@staticmethod
def send(value):
TopLevelCoro.resolve()


class ThenableEvent:
def __init__(self, thenable):
self.result = None # Result of the thenable
Expand Down Expand Up @@ -122,12 +131,12 @@ def _run_iter():
dt = max(0, ticks_diff(t.ph_key, ticks()))
else:
# No tasks can be woken so finished running
cur_task = None
cur_task = _top_level_task
return

if dt > 0:
# schedule to call again later
cur_task = None
cur_task = _top_level_task
_schedule_run_iter(dt)
return

Expand Down Expand Up @@ -198,11 +207,14 @@ def create_task(coro):
return t


# Task used to suspend and resume top-level await.
_top_level_task = Task(TopLevelCoro, globals())

################################################################################
# Event loop wrapper


cur_task = None
cur_task = _top_level_task


class Loop:
Expand Down
15 changes: 0 additions & 15 deletions ports/webassembly/modjsffi.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,6 @@ static mp_obj_t mp_jsffi_to_js(mp_obj_t arg) {
}
static MP_DEFINE_CONST_FUN_OBJ_1(mp_jsffi_to_js_obj, mp_jsffi_to_js);

// *FORMAT-OFF*
EM_JS(void, promise_with_timeout_ms, (double ms, uint32_t * out), {
const ret = new Promise((resolve) => setTimeout(resolve, ms));
proxy_convert_js_to_mp_obj_jsside(ret, out);
});
// *FORMAT-ON*

static mp_obj_t mp_jsffi_async_timeout_ms(mp_obj_t arg) {
uint32_t out[PVN];
promise_with_timeout_ms(mp_obj_get_float_to_d(arg), out);
return proxy_convert_js_to_mp_obj_cside(out);
}
static MP_DEFINE_CONST_FUN_OBJ_1(mp_jsffi_async_timeout_ms_obj, mp_jsffi_async_timeout_ms);

// *FORMAT-OFF*
EM_JS(void, js_get_proxy_js_ref_info, (uint32_t * out), {
let used = 0;
Expand Down Expand Up @@ -121,7 +107,6 @@ static const mp_rom_map_elem_t mp_module_jsffi_globals_table[] = {
{ MP_ROM_QSTR(MP_QSTR_JsException), MP_ROM_PTR(&mp_type_JsException) },
{ MP_ROM_QSTR(MP_QSTR_create_proxy), MP_ROM_PTR(&mp_jsffi_create_proxy_obj) },
{ MP_ROM_QSTR(MP_QSTR_to_js), MP_ROM_PTR(&mp_jsffi_to_js_obj) },
{ MP_ROM_QSTR(MP_QSTR_async_timeout_ms), MP_ROM_PTR(&mp_jsffi_async_timeout_ms_obj) },
{ MP_ROM_QSTR(MP_QSTR_mem_info), MP_ROM_PTR(&mp_jsffi_mem_info_obj) },
};
static MP_DEFINE_CONST_DICT(mp_module_jsffi_globals, mp_module_jsffi_globals_table);
Expand Down
33 changes: 32 additions & 1 deletion ports/webassembly/proxy_c.c
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,12 @@ EM_JS(void, js_then_continue, (int jsref, uint32_t * py_resume, uint32_t * resol
});
// *FORMAT-ON*

EM_JS(void, create_promise, (uint32_t * out_set, uint32_t * out_promise), {
const out_set_js = proxy_convert_mp_to_js_obj_jsside(out_set);
const promise = new Promise(out_set_js);
proxy_convert_js_to_mp_obj_jsside(promise, out_promise);
});

static mp_obj_t proxy_resume_execute(mp_obj_t self_in, mp_obj_t send_value, mp_obj_t throw_value, mp_obj_t resolve, mp_obj_t reject) {
if (throw_value != MP_OBJ_NULL && throw_value != mp_const_none) {
if (send_value == mp_const_none) {
Expand All @@ -483,6 +489,9 @@ static mp_obj_t proxy_resume_execute(mp_obj_t self_in, mp_obj_t send_value, mp_o
}
} else {
throw_value = MP_OBJ_NULL;
if (send_value == mp_const_undefined) {
send_value = mp_const_none;
}
}

mp_obj_t ret_value;
Expand All @@ -496,7 +505,29 @@ static mp_obj_t proxy_resume_execute(mp_obj_t self_in, mp_obj_t send_value, mp_o
js_then_resolve(out_ret_value, out_resolve);
return mp_const_none;
} else if (ret_kind == MP_VM_RETURN_YIELD) {
// ret_value should be a JS thenable
// If ret_value is None then there has been a top-level await of an asyncio primitive.
// Otherwise, ret_value should be a JS thenable.

if (ret_value == mp_const_none) {
// Waiting on an asyncio primitive to complete, eg a Task or Event.
//
// Completion of this primitive will occur when the asyncio.core._top_level_task
// Task is made runable and its coroutine's send() method is called. Need to
// construct a Promise that resolves when that send() method is called, because
// that will resume the top-level await from the JavaScript side.
//
// This is accomplished via the asyncio.core.TopLevelCoro class and its methods.
mp_obj_t asyncio = mp_import_name(MP_QSTR_asyncio_dot_core, mp_const_none, MP_OBJ_NEW_SMALL_INT(0));
mp_obj_t asyncio_core = mp_load_attr(asyncio, MP_QSTR_core);
mp_obj_t top_level_coro = mp_load_attr(asyncio_core, MP_QSTR_TopLevelCoro);
mp_obj_t top_level_coro_set = mp_load_attr(top_level_coro, MP_QSTR_set);
uint32_t out_set[PVN];
proxy_convert_mp_to_js_obj_cside(top_level_coro_set, out_set);
uint32_t out_promise[PVN];
create_promise(out_set, out_promise);
ret_value = proxy_convert_js_to_mp_obj_cside(out_promise);
}

mp_obj_t py_resume = mp_obj_new_bound_meth(MP_OBJ_FROM_PTR(&resume_obj), self_in);
int ref = mp_obj_jsproxy_get_ref(ret_value);
uint32_t out_py_resume[PVN];
Expand Down
1 change: 1 addition & 0 deletions ports/webassembly/qstrdefsport.h
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// qstrs specific to this port
// *FORMAT-OFF*
Q(/lib)
Q(asyncio.core)
25 changes: 25 additions & 0 deletions tests/ports/webassembly/asyncio_top_level_await.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Test top-level await on asyncio primitives: Task, Event.

const mp = await (await import(process.argv[2])).loadMicroPython();

await mp.runPythonAsync(`
import asyncio
async def task(event):
print("task set event")
event.set()
print("task sleep")
await asyncio.sleep(0.1)
print("task end")
event = asyncio.Event()
t = asyncio.create_task(task(event))
print("top-level wait event")
await event.wait()
print("top-level wait task")
await t
print("top-level end")
`);

console.log("finished");
7 changes: 7 additions & 0 deletions tests/ports/webassembly/asyncio_top_level_await.mjs.exp
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
top-level wait event
task set event
task sleep
top-level wait task
task end
top-level end
finished

0 comments on commit e9c898c

Please sign in to comment.