Skip to content

Commit

Permalink
Set of changes:
Browse files Browse the repository at this point in the history
 - Fix *_unordered; add iwait & wait.
 - Add may_block context manager.
 - Add utils.Pool to limit concurrent work.
  • Loading branch information
Mike Kaplinskiy committed Oct 22, 2014
1 parent 33967c0 commit ac8af0c
Show file tree
Hide file tree
Showing 6 changed files with 374 additions and 18 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 0.4
- Added a may_block context manager to be able to use gevent primitives between batch greenlets. For an example, see the iwait & wait implementations.
- Add a version of iwait & wait that work in a batch context.
- Fix *_unordered - this was previously using gevent.iwait.
- Add a Pool implementation that mirrors gevent.pool.Pool, but works with batch greenlets. Unfortunately it was not as simple as greenlet_class=BatchGreenlet.

## 0.3.1
- Fix redis *scan functions.

Expand Down
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ Mini docs:
- `pget(iterable)`: a quick way to `.get()` all the arguments passed.
- `pmap(fn, iterable)`: same as `map(fn, iterable)`, except runs in parallel. Note: keyword arguments to pmap are passed through to fn for each element.
- `pfilter(fn, iterable)`: same as `filter(fn, iterable)` except runs in parallel.
- `immediate(v)`: returns an `AsyncResult`-like object that is immediately ready and `immediate(v).get() is v`.
- `Pool(size)`: same as gevent.pool.Pool - a way to limit the maximum concurrent amount of work.
- `iwait(greenlets)`: same as gevent.iwait, but works with batch greenlets. Using gevent.iwait with batch greenlets is strongly discouraged and will lead to mysterious hangs.
- `wait(greenlets, timeout, count)`: same as gevent.wait.
- `immediate(v)`: returns an `AsyncResult`-like object that is immediately ready and `immediate(v).get() is v == True`.
- `immediate_exception(exc)`: same as `immediate`, but raises `exc`.
- `with may_block()`: a low-level primitive when you need to use a gevent-native blocking call between calls to @batched functions (e.g. gevent.queue).
- `transform(pending, fn)`: a somewhat low-level, but performant way to take an `AsyncResult`-like object and run `immediate(fn(pending.get()))`. Note that fn must be pure - it cannot interact with greenlets. Any extra kwargs will be passed to `fn`.
2 changes: 1 addition & 1 deletion gbatchy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from .batch import batched, class_batched
from .scheduler import Raise
from .utils import (pmap, pmap_unordered, pfilter, pfilter_unordered, pget, immediate,
immediate_exception, transform, spawn_proxy)
immediate_exception, transform, spawn_proxy, iwait, wait, Pool)

# Set up the default scheduler.
from .scheduler import AllAtOnceScheduler as DefaultScheduler
Expand Down
73 changes: 71 additions & 2 deletions gbatchy/context.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
from collections import OrderedDict
from contextlib import contextmanager
from functools import wraps
from gevent import Greenlet as _GeventGreenlet, getcurrent, Timeout, get_hub
from gevent import Greenlet as _GeventGreenlet, getcurrent, Timeout, get_hub, iwait as _gevent_iwait
import logging
import sys

logger = logging.getLogger(__name__)

DEFAULT_SCHEDULER = None
def set_default_scheduler(cls):
global DEFAULT_SCHEDULER
Expand Down Expand Up @@ -166,7 +170,7 @@ def awaiting_batch(self):

def switch(self, *args, **kwargs):
if self.is_blocked: # There are other reasons we may be switched into, e.g. gevent.sleep().
# We want to ignore those (which is why we use awaiting_batch instead of switch_out).
# We want to ignore those (which is why we use awaiting_batch instead of switch_out, unless may_block is used).
self.is_blocked = False
self.context.greenlet_unblocked(self)
return super(BatchGreenlet, self).switch(*args, **kwargs)
Expand Down Expand Up @@ -231,6 +235,71 @@ def join(self, timeout=None):
wait = join # Compat with AsyncResult.


@contextmanager
def may_block():
"""A context manager where the operation may block waiting for another batch operation.
This usually happens when using gevent's primitives, e.g. iwait directly. You need to
wrap these operations in a `with may_block():`. For example, to use gevent.wait:
```
def my_wait(objs, count=1):
with may_block():
return gevent.wait(objs, count=count)
```
While under this context manager, ANY context switch out of this greenlet will be treated as a wait for a batch. For example, doing `with may_block(): gevent.sleep(1)` will cause queued @batched() functions to execute.
You should be careful while using generators & this context manager since the blocking behavior may leak out of the generator, e.g.
```
def my_iwait(objs):
it = iwait(objs)
while True:
with may_block():
yield next(it)
for v in my_iwait([g1, g2, g3]):
gevent.sleep(0.1)
```
would cause a batch flush during a gevent.sleep, which is probably not intentional. You should instead do:
```
def my_iwait(objs):
it = iwait(objs)
while True:
with may_block():
v = next(it)
yield v
```
"""

current = getcurrent()
current_awaiting_batch = getattr(current, 'awaiting_batch', None)
if not current_awaiting_batch:
yield
return

original_switch_out = getattr(current, 'switch_out', None)
if original_switch_out is None:
def switch_out():
current.awaiting_batch()
else:
def switch_out():
current.awaiting_batch()
original_switch_out()

current.switch_out = switch_out
try:
yield
finally:
current.switch_out = original_switch_out


class BatchAsyncResult(object):
"""A slight wrapper around AsyncResult that notifies the greenlet that it's waiting for a batch result."""

Expand Down
150 changes: 148 additions & 2 deletions gbatchy/utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,27 @@
from gevent import iwait, Timeout
from collections import deque, OrderedDict
from gevent import Timeout, iwait as _gevent_iwait, wait as _gevent_wait, pool as _gevent_pool, queue as _gevent_queue, sleep
try:
from peak.util.proxies import LazyProxy
except ImportError:
from objproxies import LazyProxy
import sys

from .context import batch_context, spawn, add_exc_info_container, raise_exc_info_from_container
from .context import batch_context, spawn, add_exc_info_container, raise_exc_info_from_container, may_block, BatchGreenlet

@batch_context
def iwait(*args, **kwargs):
"""Same as gevent.iwait, but works with BatchGreenlets."""
waiter = _gevent_iwait(*args, **kwargs)
while True:
with may_block():
v = next(waiter)
yield v

@batch_context
def wait(*args, **kwargs):
"""Same as gevent.wait, but works with BatchGreenlets."""
with may_block():
return _gevent_wait(*args, **kwargs)

@batch_context
def pget(lst):
Expand Down Expand Up @@ -180,3 +196,133 @@ def rawlink(self, callback):

def unlink(self):
raise NotImplementedError()


class Queue(_gevent_queue.Queue):
def put(self, *args, **kwargs):
with may_block():
return super(Queue, self).put(*args, **kwargs)

def get(self, *args, **kwargs):
with may_block():
return super(Queue, self).get(*args, **kwargs)

def peek(self, *args, **kwargs):
with may_block():
return super(Queue, self).peek(*args, **kwargs)


class Pool(_gevent_pool.Pool):
greenlet_class = BatchGreenlet

def join(self, *args, **kwargs):
with may_block():
return super(Pool, self).join(*args, **kwargs)

def kill(self, *args, **kwargs):
with may_block():
return super(Pool, self).kill(*args, **kwargs)

def killone(self, *args, **kwargs):
with may_block():
return super(Pool, self).kill(*args, **kwargs)

def add(self, greenlet):
with may_block():
return super(Pool, self).add(greenlet)

def wait_available(self):
with may_block():
return super(Pool, self).wait_available()

def apply_async(self, func, args=None, kwds=None, callback=None):
if args is None:
args = ()
if kwds is None:
kwds = {}
if self.full():
# cannot call spawn() directly because it will block
return self.greenlet_class.spawn(self.apply_cb, func, args, kwds, callback)
else:
greenlet = self.spawn(func, *args, **kwds)
if callback is not None:
greenlet.link(_gevent_pool.pass_value(callback))
return greenlet

def map_async(self, func, iterable, callback=None):
"""
A variant of the map() method which returns a Greenlet object.
If callback is specified then it should be a callable which accepts a
single argument.
"""
return self.greenlet_class.spawn(self.map_cb, func, iterable, callback)

def imap(self, func, iterable, **kwargs):
"""An equivalent of itertools.imap()"""
iterable = iter(iterable)
queue = Queue(None)
def fill_queue():
try:
while True:
try:
v = next(iterable)
except StopIteration:
break
else:
queue.put(self.spawn(func, v, **kwargs))
finally:
queue.put(None)

filler = self.greenlet_class.spawn(fill_queue)

while True:
value = queue.get()
if value is None:
break

yield value.get()

filler.get()

def imap_unordered(self, func, iterable, **kwargs):
"""The same as imap() except that the ordering of the results from the
returned iterator should be considered in arbitrary order."""
iterable = iter(iterable)
results_queue = Queue()
num_running = [0]

def fill_queue():
try:
while True:
try:
v = next(iterable)
except StopIteration:
break
else:
self.spawn(func, v, **kwargs).rawlink(results_queue.put)
num_running[0] += 1
finally:
results_queue.put(None)

filler = self.greenlet_class.spawn(fill_queue)
num_running[0] += 1

while num_running[0]:
value = results_queue.get()
num_running[0] -= 1

if value is None:
continue

yield value.get()

filler.get()

pmap_unordered = imap_unordered

def pmap(self, *args, **kwargs):
return list(self.imap(*args, **kwargs))
map = pmap


0 comments on commit ac8af0c

Please sign in to comment.