Skip to content

Commit

Permalink
uasyncio.core: test_cancel_sleep: Make compatible with CPython.
Browse files Browse the repository at this point in the history
Signed-off-by: Paul Sokolovsky <pfalcon@users.sourceforge.net>
  • Loading branch information
pfalcon committed Oct 26, 2020
1 parent ae1e26a commit dc9d5cf
Showing 1 changed file with 21 additions and 8 deletions.
29 changes: 21 additions & 8 deletions uasyncio.core/test_cancel_sleep.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,46 @@
import utime
# Runs both in Pycopy and CPython.
import time
try:
import uasyncio.core as asyncio
is_uasyncio = True
except ImportError:
import asyncio
is_uasyncio = False


requested = None
cancelled = False

async def coro():
global cancelled
try:
await asyncio.sleep(1)
assert False, "sleep wasn't cancelled"
except asyncio.CancelledError:
now = utime.ticks_ms()
diff = utime.ticks_diff(now, requested)
now = time.time()
diff = now - requested
print("cancelled at: %s, delay: %s" % (now, diff))
assert diff < 10
assert diff < 0.01
cancelled = True


async def canceller(task):
global requested
asyncio.cancel(task)
requested = utime.ticks_ms()
if is_uasyncio:
asyncio.cancel(task)
else:
task.cancel()
requested = time.time()
print("requested cancel at:", requested)
await asyncio.sleep(1)
await asyncio.sleep(0)


loop = asyncio.get_event_loop()

task = coro()
loop.create_task(task)
if is_uasyncio:
loop.create_task(task)
else:
task = loop.create_task(task)
loop.run_until_complete(canceller(task))
assert cancelled

0 comments on commit dc9d5cf

Please sign in to comment.