Skip to content

Commit

Permalink
Merge ca3a1fc into 9539d3b
Browse files Browse the repository at this point in the history
  • Loading branch information
syrusakbary committed Mar 7, 2017
2 parents 9539d3b + ca3a1fc commit 9add8e5
Show file tree
Hide file tree
Showing 20 changed files with 2,094 additions and 507 deletions.
2 changes: 1 addition & 1 deletion .coveragerc
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[run]
omit = promise/compat.py,promise/iterate_promise.py
omit = promise/compat.py,promise/iterate_promise.py,promise/utils.py
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ matrix:
script: flake8
- python: '3.5'
script: |
pip install mypy-lang
mypy promise/ --check-untyped-defs
pip install mypy
mypy promise/ --check-untyped-defs --ignore-missing-imports
4 changes: 2 additions & 2 deletions promise/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .promise import Promise, promise_for_dict, promisify, is_thenable
from .promise import Promise, promise_for_dict, promisify, is_thenable, async_instance

__all__ = ['Promise', 'promise_for_dict', 'promisify', 'is_thenable']
__all__ = ['Promise', 'promise_for_dict', 'promisify', 'is_thenable', 'async_instance']
126 changes: 126 additions & 0 deletions promise/async_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# from threading import Timer, Thread

from .compat import Queue
# from .context import Context

# Based on https://github.com/petkaantonov/bluebird/blob/master/src/async.js


class Scheduler(object):

def call(self, fn):
# thread = Thread(target=fn)
# thread = Timer(0.001, fn)
# fn = thread.start
try:
# c = Context.peek_context()
# if not c:
fn()
# else:
# c.on_exit(fn)
except:
pass
# thread = Thread(target=fn)
# thread = Timer(0.001, fn)
# thread.start()


def get_default_scheduler():
return Scheduler()


# https://docs.python.org/2/library/queue.html#Queue.Queue
LATE_QUEUE_CAPACITY = 0 # The queue size is infinite
NORMAL_QUEUE_CAPACITY = 0 # The queue size is infinite


class Async(object):

def __init__(self, schedule=None):
self.is_tick_used = False
self.late_queue = Queue(LATE_QUEUE_CAPACITY)
self.normal_queue = Queue(NORMAL_QUEUE_CAPACITY)
self.have_drained_queues = False
self.trampoline_enabled = True
self.schedule = schedule or get_default_scheduler()

def enable_trampoline(self):
self.trampoline_enabled = True

def disable_trampoline(self):
self.trampoline_enabled = False

def have_items_queued(self):
return self.is_tick_used or self.have_drained_queues

def _async_invoke_later(self, fn, context):
self.late_queue.put(fn)
self.queue_tick(context)

def _async_invoke(self, fn, context):
self.normal_queue.put(fn)
self.queue_tick(context)

def _async_settle_promise(self, promise):
self.normal_queue.put(promise)
self.queue_tick(context=promise._trace)

def invoke_later(self, fn, context):
if self.trampoline_enabled:
self._async_invoke_later(fn, context)
else:
self.schedule.call_later(0.1, fn)

def invoke(self, fn, context):
if self.trampoline_enabled:
self._async_invoke(fn, context)
else:
self.schedule.call(
fn
)

def settle_promises(self, promise):
if self.trampoline_enabled:
self._async_settle_promise(promise)
else:
self.schedule.call(
promise._settle_promises
)

def throw_later(self, reason):
def fn():
raise reason

self.schedule.call(fn)

fatal_error = throw_later

def drain_queue(self, queue):
from .promise import Promise
while not queue.empty():
fn = queue.get()
if (isinstance(fn, Promise)):
fn._settle_promises()
continue
fn()

def drain_queues(self):
assert self.is_tick_used
self.drain_queue(self.normal_queue)
self.reset()
self.have_drained_queues = True
self.drain_queue(self.late_queue)

def queue_context_tick(self):
if not self.is_tick_used:
self.is_tick_used = True
self.schedule.call(self.drain_queues)

def queue_tick(self, context):
if not context:
self.queue_context_tick()
else:
context.on_exit(self.queue_context_tick)

def reset(self):
self.is_tick_used = False
5 changes: 5 additions & 0 deletions promise/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
except ImportError:

class Future: # type: ignore

def __init__(self):
raise Exception("You need asyncio for using Futures")

Expand All @@ -18,6 +19,10 @@ def ensure_future(): # type: ignore
def iscoroutine(obj): # type: ignore
return False

try:
from Queue import Queue # flake8: noqa
except ImportError:
from queue import Queue # flake8: noqa

try:
from .iterate_promise import iterate_promise
Expand Down
59 changes: 59 additions & 0 deletions promise/context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from typing import List, Callable # flake8: noqa

context_stack = []


class Context(object):

__slots__ = ('_parent', '_exited', '_exit_fns')

def __init__(self):
self._parent = self.peek_context()
if self._parent:
self._parent.on_exit(self._exit)
self._exited = False
self._exit_fns = [] # type: List[Callable]

def push_context(self):
# if self._trace:
# self._trace._promise_created = None
context_stack.append(self)

def __enter__(self):
self.push_context()
return self

def __exit__(self, type, value, traceback):
assert not self._exited, "Can't exit a Context twice"
self._exit()

def _exit(self):
if not self._exited:
self._exited = True
self.pop_context()
self.drain_queue()

def drain_queue(self):
exit_fns = self._exit_fns
self._exit_fns = []
for fn in exit_fns:
fn()

def on_exit(self, fn):
if self._exited:
fn()
else:
self._exit_fns.append(fn)

def pop_context(self):
context_stack.pop()
# if self._trace:
# trace = context_stack.pop()
# ret = trace._promise_created
# trace._promise_created = None
# return ret

@classmethod
def peek_context(cls):
if context_stack:
return context_stack[-1]
Loading

0 comments on commit 9add8e5

Please sign in to comment.