Skip to content

Commit

Permalink
Fixed dataloader
Browse files Browse the repository at this point in the history
  • Loading branch information
syrusakbary committed Jul 22, 2017
1 parent 5177e53 commit 65dbcbf
Show file tree
Hide file tree
Showing 3 changed files with 321 additions and 244 deletions.
37 changes: 36 additions & 1 deletion promise/async_.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def __init__(self, schedule):
self.late_queue = collections.deque() # type: ignore
self.normal_queue = collections.deque() # type: ignore
self.have_drained_queues = False
self.trampoline_enabled = False
self.trampoline_enabled = True
self.schedule = schedule

def enable_trampoline(self):
Expand Down Expand Up @@ -72,6 +72,41 @@ def drain_queue(self, queue):
continue
fn()

def drain_queue_until_resolved(self, promise):
from .promise import Promise
queue = self.normal_queue
while queue:
if not promise.is_pending:
return
fn = queue.popleft()
if (isinstance(fn, Promise)):
fn._settle_promises()
continue
fn()

self.reset()
self.have_drained_queues = True
self.drain_queue(self.late_queue)

def wait(self, promise, timeout=None):
if not promise.is_pending:
# We return if the promise is already
# fulfilled or rejected
return

target = promise._target()

if self.trampoline_enabled:
if self.is_tick_used:
self.drain_queue_until_resolved(target)

if not promise.is_pending:
# We return if the promise is already
# fulfilled or rejected
return

self.schedule.wait(target, timeout)

def drain_queues(self):
assert self.is_tick_used
self.drain_queue(self.normal_queue)
Expand Down
31 changes: 12 additions & 19 deletions promise/promise.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,13 +411,8 @@ def reject(reason):
self._reject_callback(error, True, traceback)

@classmethod
def wait(self, promise, timeout=None):
if not promise.is_pending:
# We return if the promise is already
# fulfilled or rejected
return
target = promise._target()
async_instance.schedule.wait(target, timeout)
def wait(cls, promise, timeout=None):
return async_instance.wait(promise, timeout)

def _wait(self, timeout=None):
self.wait(self, timeout)
Expand Down Expand Up @@ -663,21 +658,19 @@ def executor(resolve, reject):

return wrapper

safe = promisify

# _safe_resolved_promise = None
_safe_resolved_promise = None

# @classmethod
# def safe(cls, fn):
# from functools import wraps
# if not cls._safe_resolved_promise:
# cls._safe_resolved_promise = Promise.resolve(None)
@classmethod
def safe(cls, fn):
from functools import wraps
if not cls._safe_resolved_promise:
cls._safe_resolved_promise = Promise.resolve(None)

# @wraps(fn)
# def wrapper(*args, **kwargs):
# return cls._safe_resolved_promise.then(lambda v: fn(*args, **kwargs))
@wraps(fn)
def wrapper(*args, **kwargs):
return cls._safe_resolved_promise.then(lambda v: fn(*args, **kwargs))

# return wrapper
return wrapper

@classmethod
def all(cls, promises):
Expand Down
Loading

1 comment on commit 65dbcbf

@tomas-sk
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@syrusakbary, was the trampoline re-allowed on purpose in this commit, or is it a bug? It was earlier disabled by default in PR #37.

We use graphene in wsgi threaded server and with sqlalchemy scoped thread local sessions and we rely on executing whole graphene query (all resolvers) in the same thread. It would help if the docs was clearer on execution, even better configurable through schema execute method.

I figured out that totally synchronous thread safe execution can only be achieved with combination of SyncExecutor in graphene, ImmediateScheduler and disabled trampoline in promises async_instance.

Please sign in to comment.