From ea252c107cc48e5bd23e374d962a8facda1f6886 Mon Sep 17 00:00:00 2001 From: Syrus Akbary Date: Wed, 12 Jul 2017 23:08:22 -0700 Subject: [PATCH 1/7] Removed unnecessary context --- promise/async_.py | 28 +++++------- promise/context.py | 58 ----------------------- promise/dataloader.py | 3 +- promise/promise.py | 76 ++++++++++++------------------ promise/promise_list.py | 7 +-- tests/test_awaitable_35.py | 10 ++-- tests/test_benchmark.py | 4 +- tests/test_context.py | 94 -------------------------------------- tests/test_dataloader.py | 1 + 9 files changed, 52 insertions(+), 229 deletions(-) delete mode 100644 promise/context.py delete mode 100644 tests/test_context.py diff --git a/promise/async_.py b/promise/async_.py index b5fae48..dcd80f6 100644 --- a/promise/async_.py +++ b/promise/async_.py @@ -9,7 +9,7 @@ def __init__(self, schedule): self.late_queue = collections.deque() self.normal_queue = collections.deque() self.have_drained_queues = False - self.trampoline_enabled = True + self.trampoline_enabled = False self.schedule = schedule def enable_trampoline(self): @@ -21,27 +21,27 @@ def disable_trampoline(self): def have_items_queued(self): return self.is_tick_used or self.have_drained_queues - def _async_invoke_later(self, fn, context): + def _async_invoke_later(self, fn): self.late_queue.append(fn) - self.queue_tick(context) + self.queue_tick() - def _async_invoke(self, fn, context): + def _async_invoke(self, fn): self.normal_queue.append(fn) - self.queue_tick(context) + self.queue_tick() def _async_settle_promise(self, promise): self.normal_queue.append(promise) - self.queue_tick(context=promise._trace) + self.queue_tick() - def invoke_later(self, fn, context): + def invoke_later(self, fn): if self.trampoline_enabled: - self._async_invoke_later(fn, context) + self._async_invoke_later(fn) else: self.schedule.call_later(0.1, fn) - def invoke(self, fn, context): + def invoke(self, fn): if self.trampoline_enabled: - self._async_invoke(fn, context) + self._async_invoke(fn) else: self.schedule.call( fn @@ -79,16 +79,10 @@ def drain_queues(self): self.have_drained_queues = True self.drain_queue(self.late_queue) - def queue_context_tick(self): + def queue_tick(self): if not self.is_tick_used: self.is_tick_used = True self.schedule.call(self.drain_queues) - def queue_tick(self, context): - if not context: - self.queue_context_tick() - else: - (context._parent or context).on_exit(self.queue_context_tick) - def reset(self): self.is_tick_used = False diff --git a/promise/context.py b/promise/context.py deleted file mode 100644 index b0e57fd..0000000 --- a/promise/context.py +++ /dev/null @@ -1,58 +0,0 @@ -from typing import List, Callable # flake8: noqa - -context_stack = [] - - -class Context(object): - - __slots__ = ('_parent', '_exited', '_exit_fns') - - def __init__(self): - self._parent = self.peek_context() - if self._parent: - self._parent.on_exit(self._exit) - self._exited = False - self._exit_fns = [] # type: List[Callable] - - def push_context(self): - # if self._trace: - # self._trace._promise_created = None - context_stack.append(self) - - def __enter__(self): - self.push_context() - return self - - def __exit__(self, type, value, traceback): - self._exit() - - def _exit(self): - if not self._exited: - self._exited = True - self.pop_context() - self.drain_queue() - - def drain_queue(self): - exit_fns = self._exit_fns - self._exit_fns = [] - for fn in exit_fns: - fn() - - def on_exit(self, fn): - if self._exited: - fn() - else: - self._exit_fns.append(fn) - - def pop_context(self): - context_stack.pop() - # if self._trace: - # trace = context_stack.pop() - # ret = trace._promise_created - # trace._promise_created = None - # return ret - - @classmethod - def peek_context(cls): - if context_stack: - return context_stack[-1] diff --git a/promise/dataloader.py b/promise/dataloader.py index 777e16a..e219b0b 100644 --- a/promise/dataloader.py +++ b/promise/dataloader.py @@ -4,7 +4,6 @@ from typing import List, Sized # flake8: noqa from .promise import Promise, async_instance -from .context import Context def get_chunks(iterable_obj, chunk_size=1): @@ -196,7 +195,7 @@ def enqueue_post_promise_job(fn): if not resolved_promise: resolved_promise = Promise.resolve(None) # queue.invoke(fn) - async_instance.invoke(fn, context=Context.peek_context()) + resolved_promise.then(lambda v: async_instance.invoke(fn)) # Promise.resolve(None).then(lambda v: async.invoke(fn, context=Context.peek_context())) # resolved_promise.then(lambda v: queue.invoke(fn, context=Context.peek_context())) diff --git a/promise/promise.py b/promise/promise.py index 946a31c..71f7b45 100644 --- a/promise/promise.py +++ b/promise/promise.py @@ -12,7 +12,6 @@ from .compat import (Future, ensure_future, iscoroutine, # type: ignore iterate_promise) from .utils import deprecated, integer_types, string_types, text_type, binary_type, warn -from .context import Context from .promise_list import PromiseList from .scheduler import SyncScheduler @@ -66,9 +65,6 @@ def try_catch(handler, *args, **kwargs): return (None, (e, tb)) -peek_context = Context.peek_context - - class Promise(object): """ This is the Promise class that complies @@ -113,7 +109,6 @@ def __init__(self, executor=None): # self._promise0 = None # type: Promise # self._future = None # type: Future # self._event_instance = None # type: Event - self._trace = peek_context() # self._is_waiting = False if executor is not None: @@ -132,11 +127,6 @@ def future(self): return self._future def __iter__(self): - if self._trace: - # If we wait, we drain the queue of the - # callbacks waiting on the context exit - # so we avoid a blocking state - self._trace.drain_queue() return iterate_promise(self._target()) __await__ = __iter__ @@ -164,7 +154,7 @@ def _resolve_callback(self, value): if not self.is_thenable(value): return self._fulfill(value) - promise = self._try_convert_to_promise(value, self)._target() + promise = self._try_convert_to_promise(value)._target() if promise == self: self._reject(make_self_resolution_error()) return @@ -287,10 +277,7 @@ def _settle_promise0(self, handler, value, traceback): self._settle_promise(promise, handler, value, traceback) def _settle_promise_from_handler(self, handler, value, promise): - # promise._push_context() - # with Context(): value, error_with_tb = try_catch(handler, value) # , promise - # promise_created = promise._pop_context() if error_with_tb: error, tb = error_with_tb @@ -400,26 +387,23 @@ def _settle_promises(self): def _resolve_from_executor(self, executor): # self._capture_stacktrace() - # self._push_context() - with Context(): - synchronous = True + synchronous = True - def resolve(value): - self._resolve_callback(value) + def resolve(value): + self._resolve_callback(value) - def reject(reason): - self._reject_callback(reason, synchronous) + def reject(reason): + self._reject_callback(reason, synchronous) - error = None - traceback = None - try: - executor(resolve, reject) - except Exception as e: - traceback = exc_info()[2] - error = e + error = None + traceback = None + try: + executor(resolve, reject) + except Exception as e: + traceback = exc_info()[2] + error = e - synchronous = False - # self._pop_context() + synchronous = False if error is not None: self._reject_callback(error, True, traceback) @@ -442,12 +426,6 @@ def on_resolve_or_reject(_): target._then(on_resolve_or_reject, on_resolve_or_reject) - if target._trace: - # If we wait, we drain the queue of the - # callbacks waiting on the context exit - # so we avoid a blocking state - target._trace.drain_queue() - if timeout is None and IS_PYTHON2: timeout = float("Inf") @@ -533,11 +511,9 @@ def _then(self, did_fulfill=None, did_reject=None): # target._rejection_is_unhandled = False async_instance.invoke( partial(target._settle_promise, promise, handler, value, traceback), - context=target._trace, # target._settle_promise instead? # settler, # target, - # Context(handler, promise, value), ) return promise @@ -632,7 +608,7 @@ def then_all(self, handlers=None): return promises @classmethod - def _try_convert_to_promise(cls, obj, context=None): + def _try_convert_to_promise(cls, obj): _type = obj.__class__ if issubclass(_type, cls): return obj @@ -697,15 +673,21 @@ def executor(resolve, reject): return wrapper - @classmethod - def safe(cls, fn): - from functools import wraps - @wraps(fn) - def wrapper(*args, **kwargs): - with Context(): - return fn(*args, **kwargs) + safe = promisify - return wrapper + # _safe_resolved_promise = None + + # @classmethod + # def safe(cls, fn): + # from functools import wraps + # if not cls._safe_resolved_promise: + # cls._safe_resolved_promise = Promise.resolve(None) + + # @wraps(fn) + # def wrapper(*args, **kwargs): + # return cls._safe_resolved_promise.then(lambda v: fn(*args, **kwargs)) + + # return wrapper @classmethod def all(cls, promises): diff --git a/promise/promise_list.py b/promise/promise_list.py index 846c3bf..7292914 100644 --- a/promise/promise_list.py +++ b/promise/promise_list.py @@ -9,9 +9,6 @@ class PromiseList(object): def __init__(self, values, promise_class): self._promise_class = promise_class self.promise = self._promise_class() - # if (isinstance(values, Promise)): - # # promise._propagate_from(values) - # pass self._values = values self._length = 0 @@ -25,7 +22,7 @@ def _init(self): Promise = self._promise_class values = self._values if Promise.is_thenable(values): - values = Promise._try_convert_to_promise(self._values, self.promise)._target() + values = Promise._try_convert_to_promise(self._values)._target() if values.is_fulfilled: values = values._value() elif values.is_rejected: @@ -59,7 +56,7 @@ def _iterate(self, values): for i, val in enumerate(values): if Promise.is_thenable(val): - maybe_promise = Promise._try_convert_to_promise(val, self.promise)._target() + maybe_promise = Promise._try_convert_to_promise(val)._target() # if is_resolved: # # maybe_promise.suppressUnhandledRejections # pass diff --git a/tests/test_awaitable_35.py b/tests/test_awaitable_35.py index 97c5674..4aa7050 100644 --- a/tests/test_awaitable_35.py +++ b/tests/test_awaitable_35.py @@ -1,6 +1,5 @@ from asyncio import sleep, Future, wait, FIRST_COMPLETED from pytest import mark -from promise.context import Context from promise import Promise, is_thenable @@ -35,11 +34,14 @@ async def test_promisify_future(): @mark.asyncio -async def test_await_in_context(): +async def test_await_in_safe_promise(): async def inner(): - with Context(): + @Promise.safe + def x(): promise = Promise.resolve(True).then(lambda x: x) - return await promise + return promise + + return await x() result = await inner() assert result == True diff --git a/tests/test_benchmark.py b/tests/test_benchmark.py index 4710400..4ff10e0 100644 --- a/tests/test_benchmark.py +++ b/tests/test_benchmark.py @@ -104,11 +104,11 @@ def create_promise(): # unnecessary function call def test_benchmark_promise_all_promise(benchmark): - values = [Promise.resolve(i) for i in range(1000)] + values = [Promise.resolve(i) for i in range(100000)] def create_promise(): # unnecessary function call return Promise.all(values) result = benchmark(create_promise) assert isinstance(result, Promise) - assert result.get() == list(range(1000)) + assert result.get() == list(range(100000)) diff --git a/tests/test_context.py b/tests/test_context.py deleted file mode 100644 index 5527a32..0000000 --- a/tests/test_context.py +++ /dev/null @@ -1,94 +0,0 @@ -from pytest import raises - -from promise import ( - Promise, -) -from mock import Mock -from promise.context import ( - Context, - context_stack -) -import time - - -def test_basic(): - assert Context.peek_context() == None - with Context() as c: - assert context_stack == [c] - assert Context.peek_context() == c - assert c._parent == None - - assert Context.peek_context() == None - - -def test_concatenated(): - with Context() as c1, Context() as c2: - assert context_stack == [c1, c2] - assert Context.peek_context() == c2 - assert c1._parent == None - assert c2._parent == c1 - - -def test_promise_with_context(): - with Context() as c: - on_resolved1 = Mock() - on_resolved2 = Mock() - p1 = Promise.resolve(1).then(on_resolved1) - p2 = Promise.resolve(1).then(on_resolved2) - time.sleep(.1) - on_resolved1.assert_not_called() - on_resolved2.assert_not_called() - - -def test_promise_with_context_2(): - with Context() as c: - on_resolved = Mock() - promises = [Promise.resolve(i).then(on_resolved) for i in range(3)] - on_resolved.assert_not_called() - - -def test_promise_with_context_3(): - with Context() as c: - on_resolved = Mock() - p = Promise.resolve(1) - p.then(on_resolved) - on_resolved.assert_not_called() - -class Counter: - """ - A helper class with some side effects - we can test. - """ - - def __init__(self): - self.count = 0 - - def tick(self): - self.count += 1 - - def value(self): - return self.count - - -# @Promise.safe -# def test_promise_with_context_4(): -# x = Counter() - -# def on_resolved(v): -# x.tick() -# return 2 - -# p = Promise.resolve(1) -# p.then(on_resolved) - -# assert x.value() == 0 -# p._wait() -# assert x.value() == 1 - -# def test_promise_with_context(): -# with Context() as c: -# on_resolved = Mock() -# p = Promise.all([ -# promise_something(x, None).then(lambda y: x*y) for x in (0,1,2,3) -# ]).then(on_resolved) -# on_resolved.assert_not_called() diff --git a/tests/test_dataloader.py b/tests/test_dataloader.py index b4baa59..58a5acb 100644 --- a/tests/test_dataloader.py +++ b/tests/test_dataloader.py @@ -378,6 +378,7 @@ def do_resolve(x): assert a_loader.clear('A1') == a_loader +@Promise.safe def test_wrong_loader_return_type_does_not_block_async_instance(): def do_resolve(x): return x From 09b8234d18221651dcaec7fccbe922f80f9d5b83 Mon Sep 17 00:00:00 2001 From: Syrus Akbary Date: Wed, 12 Jul 2017 23:17:20 -0700 Subject: [PATCH 2/7] Uncommented loader tests --- tests/test_dataloader.py | 62 ++++++++++++++++++++-------------------- 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/tests/test_dataloader.py b/tests/test_dataloader.py index 58a5acb..7e360fb 100644 --- a/tests/test_dataloader.py +++ b/tests/test_dataloader.py @@ -317,21 +317,20 @@ def test_caches_failed_fetches(): assert load_calls == [] -# It is resilient to job queue ordering - -# @Promise.safe -# def test_batches_loads_occuring_within_promises(): -# identity_loader, load_calls = id_loader() -# values = Promise.all([ -# identity_loader.load('A'), -# Promise.resolve(None).then(lambda v: Promise.resolve(None)).then( -# lambda v: identity_loader.load('B') -# ) -# ]).get() -# assert values == ['A', 'B'] +# It is resilient to job queue ordering +@Promise.safe +def test_batches_loads_occuring_within_promises(): + identity_loader, load_calls = id_loader() + values = Promise.all([ + identity_loader.load('A'), + Promise.resolve(None).then(lambda v: Promise.resolve(None)).then( + lambda v: identity_loader.load('B') + ) + ]).get() -# assert load_calls == [['A', 'B']] + assert values == ['A', 'B'] + assert load_calls == [['A', 'B']] @Promise.safe @@ -347,27 +346,28 @@ def do_resolve(x): assert str(exc_info.value) == "Data loader batch_load_fn function raised an Exception: Exception('AOH!',)" -# @Promise.safe -# def test_can_call_a_loader_from_a_loader(): -# deep_loader, deep_load_calls = id_loader() -# a_loader, a_load_calls = id_loader(resolve=lambda keys:deep_loader.load(tuple(keys))) -# b_loader, b_load_calls = id_loader(resolve=lambda keys:deep_loader.load(tuple(keys))) +@Promise.safe +def test_can_call_a_loader_from_a_loader(): + deep_loader, deep_load_calls = id_loader() + a_loader, a_load_calls = id_loader(resolve=lambda keys:deep_loader.load(tuple(keys))) + b_loader, b_load_calls = id_loader(resolve=lambda keys:deep_loader.load(tuple(keys))) + + a1, b1, a2, b2 = Promise.all([ + a_loader.load('A1'), + b_loader.load('B1'), + a_loader.load('A2'), + b_loader.load('B2') + ]).get() -# a1, b1, a2, b2 = Promise.all([ -# a_loader.load('A1'), -# b_loader.load('B1'), -# a_loader.load('A2'), -# b_loader.load('B2') -# ]).get() + assert a1 == 'A1' + assert b1 == 'B1' + assert a2 == 'A2' + assert b2 == 'B2' -# assert a1 == 'A1' -# assert b1 == 'B1' -# assert a2 == 'A2' -# assert b2 == 'B2' + assert a_load_calls == [['A1', 'A2']] + assert b_load_calls == [['B1', 'B2']] + assert deep_load_calls == [[('A1', 'A2'), ('B1', 'B2')]] -# assert a_load_calls == [['A1', 'A2']] -# assert b_load_calls == [['B1', 'B2']] -# assert deep_load_calls == [[('A1', 'A2'), ('B1', 'B2')]] @Promise.safe def test_dataloader_clear_with_missing_key_works(): From 46fb420f07188afb1a5225fba603ca5c8e47611f Mon Sep 17 00:00:00 2001 From: Syrus Akbary Date: Wed, 12 Jul 2017 23:26:15 -0700 Subject: [PATCH 3/7] Move blocking wait to the scheduler --- promise/promise.py | 34 +++++++++------------------------- promise/scheduler.py | 13 +++++++++++++ 2 files changed, 22 insertions(+), 25 deletions(-) diff --git a/promise/promise.py b/promise/promise.py index 71f7b45..68f2f7b 100644 --- a/promise/promise.py +++ b/promise/promise.py @@ -1,7 +1,7 @@ from collections import namedtuple from functools import partial, wraps from sys import version_info, exc_info -from threading import Event, RLock +from threading import RLock from types import TracebackType from six import reraise @@ -87,7 +87,6 @@ class Promise(object): _rejection_handler0 = None # type: Union[Callable, partial] _promise0 = None # type: Promise _future = None # type: Future - _event_instance = None # type: Event _traceback = None # type: TracebackType # _trace = None _is_waiting = False @@ -366,8 +365,6 @@ def _followee(self): def _set_followee(self, promise): assert self._is_following assert not isinstance(self._rejection_handler0, Promise) - if not self._event_instance: - self._event_instance = promise._event_instance self._rejection_handler0 = promise def _settle_promises(self): @@ -407,31 +404,18 @@ def reject(reason): if error is not None: self._reject_callback(error, True, traceback) - - def _event(self): - if not self._event_instance: - self._event_instance = Event() - return self._event_instance - - def _wait(self, timeout=None): - if not self.is_pending: + + @classmethod + def wait(self, promise, timeout=None): + if not promise.is_pending: # We return if the promise is already # fulfilled or rejected return + target = promise._target() + async_instance.schedule.wait(target, timeout) - target = self._target() - - def on_resolve_or_reject(_): - target._event().set() - - target._then(on_resolve_or_reject, on_resolve_or_reject) - - if timeout is None and IS_PYTHON2: - timeout = float("Inf") - - waited = target._event().wait(timeout) - if not waited: - raise Exception("Timeout") + def _wait(self, timeout=None): + self.wait(self, timeout) def get(self, timeout=None): target = self._target() diff --git a/promise/scheduler.py b/promise/scheduler.py index 766cde3..d3c4b79 100644 --- a/promise/scheduler.py +++ b/promise/scheduler.py @@ -1,6 +1,19 @@ +from threading import Event + + class SyncScheduler(object): def call(self, fn): try: fn() except: pass + + def wait(self, promise, timeout=None): + e = Event() + def on_resolve_or_reject(_): + e.set() + + promise._then(on_resolve_or_reject, on_resolve_or_reject) + waited = e.wait(timeout) + if not waited: + raise Exception("Timeout") From f991ae9dc32d973723e700c3c9d16f3b8e187086 Mon Sep 17 00:00:00 2001 From: Syrus Akbary Date: Wed, 12 Jul 2017 23:54:12 -0700 Subject: [PATCH 4/7] Improved Schedulers and added Gevent Scheduler --- promise/dataloader.py | 14 ------ promise/gevent_scheduler.py | 17 +++++++ promise/promise.py | 2 + promise/thread_scheduler.py | 12 ++++- tests/test_extra.py | 91 +++++++++++++++++++------------------ tests/test_spec.py | 4 +- 6 files changed, 79 insertions(+), 61 deletions(-) create mode 100644 promise/gevent_scheduler.py diff --git a/promise/dataloader.py b/promise/dataloader.py index e219b0b..b0932cc 100644 --- a/promise/dataloader.py +++ b/promise/dataloader.py @@ -179,25 +179,11 @@ def prime(self, key, value): # Private: cached resolved Promise instance resolved_promise = None -# def enqueue_post_promise_job(fn): -# # t.run() -# # from threading import Timer -# # t = Timer(0.10, fn) -# # t.run() -# # return fn() -# global resolved_promise -# if not resolved_promise: -# resolved_promise = Promise.resolve(None) -# resolved_promise.then(lambda v: queue.invoke(fn)) # TODO: Change to async - def enqueue_post_promise_job(fn): global resolved_promise if not resolved_promise: resolved_promise = Promise.resolve(None) - # queue.invoke(fn) resolved_promise.then(lambda v: async_instance.invoke(fn)) - # Promise.resolve(None).then(lambda v: async.invoke(fn, context=Context.peek_context())) - # resolved_promise.then(lambda v: queue.invoke(fn, context=Context.peek_context())) def dispatch_queue(loader): diff --git a/promise/gevent_scheduler.py b/promise/gevent_scheduler.py new file mode 100644 index 0000000..9a7925d --- /dev/null +++ b/promise/gevent_scheduler.py @@ -0,0 +1,17 @@ +from gevent.event import Event +import gevent + +class GeventScheduler(object): + def call(self, fn): + # print fn + gevent.spawn(fn) + + def wait(self, promise, timeout=None): + e = Event() + def on_resolve_or_reject(_): + e.set() + + promise._then(on_resolve_or_reject, on_resolve_or_reject) + waited = e.wait(timeout) + if not waited: + raise Exception("Timeout") diff --git a/promise/promise.py b/promise/promise.py index 68f2f7b..9326169 100644 --- a/promise/promise.py +++ b/promise/promise.py @@ -14,6 +14,8 @@ from .utils import deprecated, integer_types, string_types, text_type, binary_type, warn from .promise_list import PromiseList from .scheduler import SyncScheduler +# from .gevent_scheduler import GeventScheduler +# from .thread_scheduler import ThreadScheduler async_instance = Async(SyncScheduler()) diff --git a/promise/thread_scheduler.py b/promise/thread_scheduler.py index ea9ed41..6ae0d1f 100644 --- a/promise/thread_scheduler.py +++ b/promise/thread_scheduler.py @@ -1,7 +1,17 @@ -from threading import Thread +from threading import Thread, Event class ThreadScheduler(object): def call(self, fn): thread = Thread(target=fn) thread.start() + + def wait(self, promise, timeout=None): + e = Event() + def on_resolve_or_reject(_): + e.set() + + promise._then(on_resolve_or_reject, on_resolve_or_reject) + waited = e.wait(timeout) + if not waited: + raise Exception("Timeout") diff --git a/tests/test_extra.py b/tests/test_extra.py index bfae655..07bc9e6 100644 --- a/tests/test_extra.py +++ b/tests/test_extra.py @@ -137,11 +137,11 @@ def test_fake_promise(): # WAIT -def test_wait_when(): - p1 = df(5, 0.01) - assert p1.is_pending - p1._wait() - assert p1.is_fulfilled +# def test_wait_when(): +# p1 = df(5, 0.01) +# assert p1.is_pending +# p1._wait() +# assert p1.is_fulfilled def test_wait_if(): @@ -151,24 +151,24 @@ def test_wait_if(): assert p1.is_fulfilled -def test_wait_timeout(): - p1 = df(5, 0.1) - assert p1.is_pending - with raises(Exception) as exc_info: - p1._wait(timeout=0.05) - assert str(exc_info.value) == "Timeout" - assert p1.is_pending - p1._wait() - assert p1.is_fulfilled +# def test_wait_timeout(): +# p1 = df(5, 0.1) +# assert p1.is_pending +# with raises(Exception) as exc_info: +# p1._wait(timeout=0.05) +# assert str(exc_info.value) == "Timeout" +# assert p1.is_pending +# p1._wait() +# assert p1.is_fulfilled -# GET -def test_get_when(): - p1 = df(5, 0.01) - assert p1.is_pending - v = p1.get() - assert p1.is_fulfilled - assert 5 == v +# # GET +# def test_get_when(): +# p1 = df(5, 0.01) +# assert p1.is_pending +# v = p1.get() +# assert p1.is_fulfilled +# assert 5 == v def test_get_if(): @@ -179,16 +179,16 @@ def test_get_if(): assert 5 == v -def test_get_timeout(): - p1 = df(5, 0.1) - assert p1.is_pending - with raises(Exception) as exc_info: - p1._wait(timeout=0.05) - assert str(exc_info.value) == "Timeout" - assert p1.is_pending - v = p1.get() - assert p1.is_fulfilled - assert 5 == v +# def test_get_timeout(): +# p1 = df(5, 0.1) +# assert p1.is_pending +# with raises(Exception) as exc_info: +# p1._wait(timeout=0.05) +# assert str(exc_info.value) == "Timeout" +# assert p1.is_pending +# v = p1.get() +# assert p1.is_fulfilled +# assert 5 == v # Promise.all @@ -350,36 +350,40 @@ def test_dict_promise_if(promise_for_dict): def test_done(): counter = [0] - e = Event() + r = Promise() def inc(_): counter[0] += 1 - e.set() def dec(_): counter[0] -= 1 - e.set() + + def end(_): + r.do_resolve(None) p = Promise() p.done(inc, dec) p.done(inc, dec) + p.done(end) p.do_resolve(4) - e.wait() + Promise.wait(r) assert counter[0] == 2 + r = Promise() + counter = [0] p = Promise() - e = Event() p.done(inc, dec) p.done(inc, dec) + p.done(None, end) p.do_reject(Exception()) - e.wait() + + Promise.wait(r) assert counter[0] == -2 def test_done_all(): counter = [0] - e = Event() def inc(_): counter[0] += 1 @@ -388,6 +392,7 @@ def dec(_): counter[0] -= 1 p = Promise() + r = Promise() p.done_all() p.done_all([(inc, dec)]) p.done_all([ @@ -397,14 +402,14 @@ def dec(_): 'success': inc, 'failure': dec }, - lambda _: e.set() + lambda _: r.do_resolve(None) ]) p.do_resolve(4) - e.wait() + Promise.wait(r) assert counter[0] == 4 - e = Event() p = Promise() + r = Promise() p.done_all() p.done_all([inc]) p.done_all([(inc, dec)]) @@ -414,10 +419,10 @@ def dec(_): 'success': inc, 'failure': dec }, - (None, lambda _: e.set()) + (None, lambda _: r.do_resolve(None)) ]) p.do_reject(Exception("Uh oh!")) - e.wait() + Promise.wait(r) assert counter[0] == 1 diff --git a/tests/test_spec.py b/tests/test_spec.py index 7c4511b..4f3b549 100644 --- a/tests/test_spec.py +++ b/tests/test_spec.py @@ -541,17 +541,15 @@ def test_promise_resolved_after(): """ c = Counter() - e = Event() def check(v, c): assert v == 5 - e.set() c.tick() p1 = Promise() p2 = p1.then(lambda v: check(v, c)) p1.do_resolve(5) - e.wait() + Promise.wait(p2) assert 1 == c.value() From e4fa2e3c7e46668fd98a5affd8fa98b77edd6ddd Mon Sep 17 00:00:00 2001 From: Syrus Akbary Date: Thu, 13 Jul 2017 00:08:11 -0700 Subject: [PATCH 5/7] Refactored schedulers --- .travis.yml | 3 +-- promise/__init__.py | 9 ++------- promise/async_.py | 4 ++-- promise/promise.py | 9 +++++---- promise/schedulers/__init__.py | 0 promise/schedulers/asyncio.py | 11 +++++++++++ promise/{gevent_scheduler.py => schedulers/gevent.py} | 4 ++++ promise/{scheduler.py => schedulers/immediate.py} | 3 ++- promise/{thread_scheduler.py => schedulers/thread.py} | 1 + 9 files changed, 28 insertions(+), 16 deletions(-) create mode 100644 promise/schedulers/__init__.py create mode 100644 promise/schedulers/asyncio.py rename promise/{gevent_scheduler.py => schedulers/gevent.py} (91%) rename promise/{scheduler.py => schedulers/immediate.py} (91%) rename promise/{thread_scheduler.py => schedulers/thread.py} (99%) diff --git a/.travis.yml b/.travis.yml index 7ad5cac..d5478cf 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,9 +2,8 @@ language: python sudo: false python: - 2.7 -- 3.3 -- 3.4 - 3.5 +- 3.6 - pypy before_install: - | diff --git a/promise/__init__.py b/promise/__init__.py index bc9260e..0487986 100644 --- a/promise/__init__.py +++ b/promise/__init__.py @@ -24,11 +24,7 @@ get_default_scheduler, set_default_scheduler ) - from .scheduler import SyncScheduler - try: - from .thread_sheduler import ThreadScheduler - except ImportError: - ThreadScheduler = None + from .schedulers.immediate import ImmediateScheduler __all__ = [ 'Promise', @@ -38,6 +34,5 @@ 'async_instance', 'get_default_scheduler', 'set_default_scheduler', - 'SyncScheduler', - 'ThreadScheduler' + 'ImmediateScheduler' ] diff --git a/promise/async_.py b/promise/async_.py index dcd80f6..b2eed3f 100644 --- a/promise/async_.py +++ b/promise/async_.py @@ -6,8 +6,8 @@ class Async(object): def __init__(self, schedule): self.is_tick_used = False - self.late_queue = collections.deque() - self.normal_queue = collections.deque() + self.late_queue = collections.deque() # type: ignore + self.normal_queue = collections.deque() # type: ignore self.have_drained_queues = False self.trampoline_enabled = False self.schedule = schedule diff --git a/promise/promise.py b/promise/promise.py index 9326169..c88db51 100644 --- a/promise/promise.py +++ b/promise/promise.py @@ -13,11 +13,12 @@ iterate_promise) from .utils import deprecated, integer_types, string_types, text_type, binary_type, warn from .promise_list import PromiseList -from .scheduler import SyncScheduler -# from .gevent_scheduler import GeventScheduler -# from .thread_scheduler import ThreadScheduler +from .schedulers.immediate import ImmediateScheduler +# from .schedulers.gevent import GeventScheduler +# from .schedulers.asyncio import AsyncioScheduler +# from .schedulers.thread import ThreadScheduler -async_instance = Async(SyncScheduler()) +async_instance = Async(ImmediateScheduler()) def get_default_scheduler(): diff --git a/promise/schedulers/__init__.py b/promise/schedulers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/promise/schedulers/asyncio.py b/promise/schedulers/asyncio.py new file mode 100644 index 0000000..0d768e1 --- /dev/null +++ b/promise/schedulers/asyncio.py @@ -0,0 +1,11 @@ +from __future__ import absolute_import + +from asyncio import get_event_loop + + +class AsyncioScheduler(object): + def __init__(self, loop=None): + self.loop = loop or get_event_loop() + + def call(self, fn): + self.loop.call_soon(fn) diff --git a/promise/gevent_scheduler.py b/promise/schedulers/gevent.py similarity index 91% rename from promise/gevent_scheduler.py rename to promise/schedulers/gevent.py index 9a7925d..efc2119 100644 --- a/promise/gevent_scheduler.py +++ b/promise/schedulers/gevent.py @@ -1,6 +1,9 @@ +from __future__ import absolute_import + from gevent.event import Event import gevent + class GeventScheduler(object): def call(self, fn): # print fn @@ -8,6 +11,7 @@ def call(self, fn): def wait(self, promise, timeout=None): e = Event() + def on_resolve_or_reject(_): e.set() diff --git a/promise/scheduler.py b/promise/schedulers/immediate.py similarity index 91% rename from promise/scheduler.py rename to promise/schedulers/immediate.py index d3c4b79..79a8b04 100644 --- a/promise/scheduler.py +++ b/promise/schedulers/immediate.py @@ -1,7 +1,7 @@ from threading import Event -class SyncScheduler(object): +class ImmediateScheduler(object): def call(self, fn): try: fn() @@ -10,6 +10,7 @@ def call(self, fn): def wait(self, promise, timeout=None): e = Event() + def on_resolve_or_reject(_): e.set() diff --git a/promise/thread_scheduler.py b/promise/schedulers/thread.py similarity index 99% rename from promise/thread_scheduler.py rename to promise/schedulers/thread.py index 6ae0d1f..a83f9a0 100644 --- a/promise/thread_scheduler.py +++ b/promise/schedulers/thread.py @@ -8,6 +8,7 @@ def call(self, fn): def wait(self, promise, timeout=None): e = Event() + def on_resolve_or_reject(_): e.set() From 579cf4676945d0f89e6b19166b6a366a92be09a8 Mon Sep 17 00:00:00 2001 From: Syrus Akbary Date: Thu, 13 Jul 2017 00:22:10 -0700 Subject: [PATCH 6/7] Improved asyncio scheduler --- promise/schedulers/asyncio.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/promise/schedulers/asyncio.py b/promise/schedulers/asyncio.py index 0d768e1..34285fa 100644 --- a/promise/schedulers/asyncio.py +++ b/promise/schedulers/asyncio.py @@ -1,6 +1,6 @@ from __future__ import absolute_import -from asyncio import get_event_loop +from asyncio import get_event_loop, Event class AsyncioScheduler(object): @@ -9,3 +9,14 @@ def __init__(self, loop=None): def call(self, fn): self.loop.call_soon(fn) + + def wait(self, promise, timeout=None): + e = Event() + + def on_resolve_or_reject(_): + e.set() + + promise._then(on_resolve_or_reject, on_resolve_or_reject) + + # We can't use the timeout in Asyncio event + e.wait() From 50902a9085bd8222e21aa92b48ae1f25d9e661a4 Mon Sep 17 00:00:00 2001 From: Syrus Akbary Date: Thu, 13 Jul 2017 18:50:43 -0700 Subject: [PATCH 7/7] Removed support for promise-like thenables (then) Making objects with `.then` method was causing some issues with rx.Observable --- promise/promise.py | 46 ++++++++++++++++++++++----------------------- tests/test_extra.py | 27 +------------------------- 2 files changed, 23 insertions(+), 50 deletions(-) diff --git a/promise/promise.py b/promise/promise.py index c88db51..4060220 100644 --- a/promise/promise.py +++ b/promise/promise.py @@ -90,7 +90,7 @@ class Promise(object): _rejection_handler0 = None # type: Union[Callable, partial] _promise0 = None # type: Promise _future = None # type: Future - _traceback = None # type: TracebackType + _traceback = None # type: TracebackType # _trace = None _is_waiting = False @@ -226,7 +226,8 @@ def _ensure_possible_rejection_handled(self): pass def _reject_callback(self, reason, synchronous=False, traceback=None): - assert isinstance(reason, Exception), "A promise was rejected with a non-error: {}".format(reason) + assert isinstance( + reason, Exception), "A promise was rejected with a non-error: {}".format(reason) # trace = ensure_error_object(reason) # has_stack = trace is reason # self._attach_extratrace(trace, synchronous and has_stack) @@ -376,7 +377,8 @@ def _settle_promises(self): if self._state == STATE_REJECTED: reason = self._fulfillment_handler0 traceback = self._traceback - self._settle_promise0(self._rejection_handler0, reason, traceback) + self._settle_promise0( + self._rejection_handler0, reason, traceback) self._reject_promises(length, reason) else: value = self._rejection_handler0 @@ -407,7 +409,7 @@ def reject(reason): if error is not None: self._reject_callback(error, True, traceback) - + @classmethod def wait(self, promise, timeout=None): if not promise.is_pending: @@ -425,7 +427,6 @@ def get(self, timeout=None): self._wait(timeout or DEFAULT_TIMEOUT) return self._target_settled_value(_raise=True) - def _target_settled_value(self, _raise=False): return self._target()._settled_value(_raise) @@ -497,7 +498,8 @@ def _then(self, did_fulfill=None, did_reject=None): handler = did_reject # target._rejection_is_unhandled = False async_instance.invoke( - partial(target._settle_promise, promise, handler, value, traceback), + partial(target._settle_promise, promise, + handler, value, traceback), # target._settle_promise instead? # settler, # target, @@ -597,7 +599,9 @@ def then_all(self, handlers=None): @classmethod def _try_convert_to_promise(cls, obj): _type = obj.__class__ - if issubclass(_type, cls): + if issubclass(_type, Promise): + if cls is not Promise: + return cls(obj.then) return obj if iscoroutine(obj): @@ -609,14 +613,12 @@ def executor(resolve, reject): if obj.done(): _process_future_result(resolve, reject)(obj) else: - obj.add_done_callback(_process_future_result(resolve, reject)) + obj.add_done_callback( + _process_future_result(resolve, reject)) # _process_future_result(resolve, reject)(obj) promise = cls(executor) promise._future = obj return promise - - if is_promise_like(_type): - return cls(obj.then) return obj @@ -648,7 +650,8 @@ def resolve(cls, obj): @classmethod def promisify(cls, f): if not callable(f): - warn("Promise.promisify is now a function decorator, please use Promise.resolve instead.") + warn( + "Promise.promisify is now a function decorator, please use Promise.resolve instead.") return cls.resolve(f) @wraps(f) @@ -672,7 +675,7 @@ def executor(resolve, reject): # @wraps(fn) # def wrapper(*args, **kwargs): - # return cls._safe_resolved_promise.then(lambda v: fn(*args, **kwargs)) + # return cls._safe_resolved_promise.then(lambda v: fn(*args, **kwargs)) # return wrapper @@ -710,26 +713,21 @@ def is_thenable(cls, obj): if obj is None or _type in BASE_TYPES: return False - return issubclass(_type, cls) or \ + return issubclass(_type, Promise) or \ iscoroutine(obj) or \ - is_future_like(_type) or \ - is_promise_like(_type) + is_future_like(_type) _type_done_callbacks = {} # type: Dict[type, bool] + + def is_future_like(_type): if _type not in _type_done_callbacks: - _type_done_callbacks[_type] = callable(getattr(_type, 'add_done_callback', None)) + _type_done_callbacks[_type] = callable( + getattr(_type, 'add_done_callback', None)) return _type_done_callbacks[_type] -_type_then_callbacks = {} # type: Dict[type, bool] -def is_promise_like(_type): - if _type not in _type_then_callbacks: - _type_then_callbacks[_type] = callable(getattr(_type, 'then', None)) - return _type_then_callbacks[_type] - - promisify = Promise.promisify promise_for_dict = Promise.for_dict is_thenable = Promise.is_thenable diff --git a/tests/test_extra.py b/tests/test_extra.py index 07bc9e6..ff02197 100644 --- a/tests/test_extra.py +++ b/tests/test_extra.py @@ -129,13 +129,6 @@ def after_throws(v): assert assert_exc.traceback[-1].path.strpath == __file__ -def test_fake_promise(): - p = Promise() - p.do_resolve(FakeThenPromise()) - assert p.is_rejected - assert_exception(p.reason, Exception, "FakeThenPromise raises in 'then'") - - # WAIT # def test_wait_when(): # p1 = df(5, 0.01) @@ -493,7 +486,7 @@ def test_is_thenable_promise(): def test_is_thenable_then_object(): promise = FakeThenPromise() - assert is_thenable(promise) + assert not is_thenable(promise) def test_is_thenable_future(): @@ -521,13 +514,6 @@ def test_resolve_then_object(resolve): assert isinstance(p, Promise) -def test_resolve_then_object_exception(resolve): - promise = FakeThenPromise() - with raises(Exception) as excinfo: - resolve(promise).get() - assert str(excinfo.value) == "FakeThenPromise raises in 'then'" - - def test_resolve_future(resolve): future = Future() promise = resolve(future) @@ -602,17 +588,6 @@ def executor(resolve, reject): assert p.get(.1) == 2 -def test_resolve_promise_like(resolve): - class CustomThenable(object): - def then(self, resolve, reject): - resolve(True) - - instance = CustomThenable() - - promise = resolve(instance) - assert promise.get() == True - - def test_resolve_future_like(resolve): class CustomThenable(object): def add_done_callback(self, f):