Skip to content
This repository has been archived by the owner on Nov 23, 2020. It is now read-only.

Commit

Permalink
make sure @asyncio.coroutine is set for python 3.5 compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
lsbardel committed Feb 17, 2016
1 parent 4a5d6cf commit 709f9ff
Show file tree
Hide file tree
Showing 16 changed files with 48 additions and 12 deletions.
5 changes: 3 additions & 2 deletions pulsar/apps/greenio/__init__.py
Expand Up @@ -139,8 +139,8 @@ def example():
"""
import threading
import logging
import asyncio
from collections import deque
from functools import wraps

from greenlet import greenlet, getcurrent

Expand Down Expand Up @@ -180,7 +180,7 @@ def run_in_greenlet(callable):
A ``callable`` decorated with this decorator returns a coroutine
"""
@wraps(callable)
@asyncio.coroutine
def _(*args, **kwargs):
green = greenlet(callable)
# switch to the new greenlet
Expand Down Expand Up @@ -288,6 +288,7 @@ def _check_queue(self):
ensure_future(self._green_task(self._available.pop(), task),
loop=self._loop)

@asyncio.coroutine
def _green_task(self, green, task):
# Coroutine executing the in main greenlet
# This coroutine is executed for every task put into the queue
Expand Down
2 changes: 2 additions & 0 deletions pulsar/apps/http/__init__.py
Expand Up @@ -781,6 +781,7 @@ def _write_body_data(self, transport, data, finish=False):
for chunk in data:
transport.write(chunk)

@asyncio.coroutine
def _write_streamed_data(self, transport):
for data in self.data:
if is_async(data):
Expand Down Expand Up @@ -1288,6 +1289,7 @@ def ssl_context(self, verify=True, cert_reqs=None,
cafile=cafile, capath=capath,
cadata=cadata)

@asyncio.coroutine
def _connect(self, host, port, ssl):
_, connection = yield from self._loop.create_connection(
self.create_protocol, host, port, ssl=ssl)
Expand Down
2 changes: 2 additions & 0 deletions pulsar/apps/http/plugins.py
Expand Up @@ -35,6 +35,7 @@ def _consumer(response, consumer):
return consumer


@asyncio.coroutine
def start_request(request, conn):
response = conn.current_consumer()
# bind request-specific events
Expand Down Expand Up @@ -241,6 +242,7 @@ def on_headers(self, response, exc=None):
response.request_again = self._tunnel_request
response.finished()

@asyncio.coroutine
def _tunnel_request(self, response):
request = response.request.request
connection = response.connection
Expand Down
1 change: 1 addition & 0 deletions pulsar/apps/http/stream.py
Expand Up @@ -24,6 +24,7 @@ def __repr__(self):
def done(self):
return self._response.on_finished.fired()

@asyncio.coroutine
def read(self, n=None):
'''Read all content'''
if self._streamed:
Expand Down
7 changes: 5 additions & 2 deletions pulsar/apps/rpc/jsonrpc.py
@@ -1,8 +1,8 @@
import sys
import json
import logging
import asyncio
from collections import namedtuple
from asyncio import gather

from pulsar import AsyncObject, as_coroutine, new_event_loop, ensure_future
from pulsar.utils.string import gen_unique_id
Expand Down Expand Up @@ -38,6 +38,7 @@ class JSONRPC(RpcHandler):
def __call__(self, request):
return ensure_future(self._execute_request(request))

@asyncio.coroutine
def _execute_request(self, request):
response = request.response

Expand All @@ -52,14 +53,15 @@ def _execute_request(self, request):
status = 200

tasks = [self._call(request, each) for each in data]
result = yield from gather(*tasks)
result = yield from asyncio.gather(*tasks)
res = [r[0] for r in result]
else:
res, status = yield from self._call(request, data)

response.status_code = status
return Json(res).http_response(request)

@asyncio.coroutine
def _call(self, request, data):
exc_info = None
proc = None
Expand Down Expand Up @@ -328,6 +330,7 @@ def _call(self, name, *args, **kwargs):
self._batch.append(body)
return data['id']

@asyncio.coroutine
def __call__(self):
if not self._batch:
return
Expand Down
2 changes: 2 additions & 0 deletions pulsar/apps/socket/__init__.py
Expand Up @@ -214,6 +214,7 @@ def protocol_factory(self):
'''
return partial(Connection, self.cfg.callable)

@asyncio.coroutine
def monitor_start(self, monitor):
'''Create the socket listening to the ``bind`` address.
Expand Down Expand Up @@ -332,6 +333,7 @@ def protocol_factory(self):
'''
return self.cfg.callable

@asyncio.coroutine
def monitor_start(self, monitor):
'''Create the socket listening to the ``bind`` address.
Expand Down
4 changes: 4 additions & 0 deletions pulsar/apps/test/runner.py
Expand Up @@ -66,6 +66,7 @@ def _next(self):
exit_code = 0
self._loop.call_soon(self._exit, exit_code)

@asyncio.coroutine
def _run_testcls(self, testcls, all_tests):
cfg = testcls.cfg
seq = getattr(testcls, '_sequential_execution', cfg.sequential)
Expand Down Expand Up @@ -112,6 +113,7 @@ def _run_testcls(self, testcls, all_tests):
self.logger.info('Finished Tests from %s', testcls)
self._loop.call_soon(self._next)

@asyncio.coroutine
def _run(self, method, test_timeout):
self._check_abort()
coro = method()
Expand All @@ -120,6 +122,7 @@ def _run(self, method, test_timeout):
test_timeout = get_test_timeout(method, test_timeout)
yield from asyncio.wait_for(coro, test_timeout, loop=self._loop)

@asyncio.coroutine
def _run_test(self, test, test_timeout):
'''Run a ``test`` function using the following algorithm
Expand Down Expand Up @@ -149,6 +152,7 @@ def _run_test(self, test, test_timeout):
runner.stopTest(test)
yield None # release the loop

@asyncio.coroutine
def _run_safe(self, test, method_name, test_timeout, error=None):
self._check_abort()
try:
Expand Down
9 changes: 6 additions & 3 deletions pulsar/apps/test/utils.py
Expand Up @@ -29,7 +29,7 @@
'''
import logging
import unittest
from asyncio import Future
import asyncio

import pulsar
from pulsar import (get_actor, send, new_event_loop, as_gather,
Expand Down Expand Up @@ -123,7 +123,7 @@ class AsyncAssert:
class MyTest(unittest.TestCase):
def test1(self):
yield self.async.assertEqual(3, Future().callback(3))
yield from self.async.assertEqual(3, Future().callback(3))
...
Expand All @@ -140,6 +140,7 @@ def __get__(self, instance, instance_type=None):

def __getattr__(self, name):

@asyncio.coroutine
def _(*args, **kwargs):
args = yield from as_gather(*args)
result = getattr(self.test, name)(*args, **kwargs)
Expand All @@ -149,6 +150,7 @@ def _(*args, **kwargs):

return _

@asyncio.coroutine
def assertRaises(self, error, callable, *args, **kwargs):
try:
yield from callable(*args, **kwargs)
Expand Down Expand Up @@ -182,13 +184,14 @@ def all_spawned(self):
self._spawned = []
return self._spawned

@asyncio.coroutine
def spawn_actor(self, concurrency=None, **kwargs):
'''Spawn a new actor and perform some tests
'''
concurrency = concurrency or self.concurrency
ad = pulsar.spawn(concurrency=concurrency, **kwargs)
self.assertTrue(ad.aid)
self.assertIsInstance(ad, Future)
self.assertIsInstance(ad, asyncio.Future)
proxy = yield from ad
self.all_spawned.append(proxy)
self.assertEqual(proxy.aid, ad.aid)
Expand Down
2 changes: 2 additions & 0 deletions pulsar/apps/wsgi/content.py
Expand Up @@ -142,6 +142,7 @@
.. _`HTML5 document`: http://www.w3schools.com/html/html5_intro.asp
'''
import re
import asyncio
from collections import Mapping
from functools import partial

Expand Down Expand Up @@ -1141,6 +1142,7 @@ def do_stream(self, request):
else:
yield self._template % (self.flatatt(), head, body)

@asyncio.coroutine
def _html(self, request, body, head=None):
'''Asynchronous rendering
'''
Expand Down
7 changes: 5 additions & 2 deletions pulsar/apps/wsgi/formdata.py
@@ -1,7 +1,7 @@
import email.parser
import asyncio
from http.client import HTTPMessage, _MAXLINE, _MAXHEADERS
from io import BytesIO
from asyncio import StreamReader
from urllib.parse import parse_qs
from base64 import b64encode
from functools import reduce
Expand Down Expand Up @@ -39,7 +39,7 @@ class HttpBodyReader():
def __init__(self, headers, parser, transport, **kw):
self.headers = headers
self.parser = parser
self.reader = StreamReader(**kw)
self.reader = asyncio.StreamReader(**kw)
self.reader.set_transport(transport)
self.feed_data = self.reader.feed_data
self.feed_eof = self.reader.feed_eof
Expand Down Expand Up @@ -157,6 +157,7 @@ def parse(self):
producer = BytesProducer(inp)
return producer(self._consume, boundary)

@asyncio.coroutine
def _consume(self, fp, boundary):
sep = b'--'
nextpart = ('--%s' % boundary).encode()
Expand Down Expand Up @@ -215,6 +216,7 @@ def parse(self, mem_limit=None, **kw):
else:
return self._ready(data)

@asyncio.coroutine
def _async(self, chunk):
chunk = yield from chunk
return self._ready(chunk)
Expand Down Expand Up @@ -350,6 +352,7 @@ def done(self):
self.parser.result[0][self.name] = self.string()


@asyncio.coroutine
def parse_headers(fp, _class=HTTPMessage):
"""Parses only RFC2822 headers from a file pointer.
email Parser wants to see strings rather than bytes.
Expand Down
7 changes: 5 additions & 2 deletions pulsar/apps/wsgi/handlers.py
@@ -1,4 +1,4 @@
'''
"""
This section describes the asynchronous WSGI specification used by pulsar.
It is a superset of the `WSGI 1.0.1`_ specification for synchronous
server/middleware.
Expand Down Expand Up @@ -97,7 +97,9 @@ def simple_async():
.. _WSGI: http://www.wsgi.org
.. _`WSGI 1.0.1`: http://www.python.org/dev/peps/pep-3333/
'''
"""
import asyncio

from pulsar import Http404, ensure_future, isfuture
from pulsar.utils.log import LocalMixin, local_method

Expand Down Expand Up @@ -160,6 +162,7 @@ def __call__(self, environ, start_response):
response.start(start_response)
return response

@asyncio.coroutine
def _async(self, environ, start_response):
response = None
try:
Expand Down
3 changes: 3 additions & 0 deletions pulsar/async/clients.py
Expand Up @@ -104,6 +104,7 @@ def close(self, async=True):
for connection in in_use:
connection.close()

@asyncio.coroutine
def _get(self):
queue = self._queue
# grab the connection without waiting, important!
Expand Down Expand Up @@ -223,6 +224,7 @@ def close(self):
return self.fire_event('finish')
abort = close

@asyncio.coroutine
def create_connection(self, address, protocol_factory=None, **kw):
'''Helper method for creating a connection to an ``address``.
'''
Expand Down Expand Up @@ -257,6 +259,7 @@ def close(self):
return self.fire_event('finish')
abort = close

@asyncio.coroutine
def create_datagram_endpoint(self, protocol_factory=None, **kw):
'''Helper method for creating a connection to an ``address``.
'''
Expand Down
1 change: 1 addition & 0 deletions pulsar/async/concurrency.py
Expand Up @@ -684,6 +684,7 @@ def _exit_arbiter(self, actor, done=False):
else:
self._exit_arbiter(actor, True)

@asyncio.coroutine
def _close_all(self, actor):
# Close al monitors at once
try:
Expand Down
5 changes: 4 additions & 1 deletion pulsar/async/futures.py
Expand Up @@ -2,7 +2,8 @@
from inspect import isgeneratorfunction
from functools import wraps, partial

from asyncio import Future, CancelledError, TimeoutError, sleep, gather
from asyncio import (Future, CancelledError, TimeoutError, sleep, gather,
coroutine)

from .consts import MAX_ASYNC_WHILE
from .access import get_event_loop, LOGGER, isfuture, is_async, ensure_future
Expand Down Expand Up @@ -144,6 +145,7 @@ def maybe_async(value, loop=None):
return value


@coroutine
def as_coroutine(value):
if is_async(value):
value = yield from value
Expand Down Expand Up @@ -208,6 +210,7 @@ def _():
return waiter


@coroutine
def async_while(timeout, while_clause, *args):
'''The asynchronous equivalent of ``while while_clause(*args):``
Expand Down
2 changes: 2 additions & 0 deletions pulsar/async/mailbox.py
Expand Up @@ -52,6 +52,7 @@ def example():
'''
import socket
import pickle
import asyncio
from collections import namedtuple

from pulsar import ProtocolError, CommandError
Expand All @@ -73,6 +74,7 @@ def create_aid():
return gen_unique_id()[:8]


@asyncio.coroutine
def command_in_context(command, caller, target, args, kwargs, connection=None):
cmnd = get_command(command)
if not cmnd:
Expand Down
1 change: 1 addition & 0 deletions pulsar/async/protocols.py
Expand Up @@ -172,6 +172,7 @@ def abort_request(self):
future.add_done_callback(self._abort_request)
raise AbortRequest

@asyncio.coroutine
def _start(self):
try:
yield from self.fire_event('pre_request')
Expand Down

1 comment on commit 709f9ff

@lsbardel
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

release-note

  • Make sure the asyncio.coroutine decorator is used by all pulsar coroutines. This allows to use pulsar to write python 3.5 compatible code with the async/await syntax

Please sign in to comment.