Skip to content

Commit

Permalink
add full asyncio mode to bravado-asyncio
Browse files Browse the repository at this point in the history
  • Loading branch information
sjaensch committed Jan 24, 2018
1 parent c77a119 commit 22e681a
Show file tree
Hide file tree
Showing 9 changed files with 347 additions and 92 deletions.
18 changes: 18 additions & 0 deletions bravado_asyncio/definitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from enum import Enum
from typing import NamedTuple
from typing import Optional

import aiohttp


class RunMode(Enum):
THREAD = 'thread'
FULL_ASYNCIO = 'full_asyncio'


AsyncioResponse = NamedTuple(
'AsyncioResponse', [
('response', aiohttp.ClientResponse),
('remaining_timeout', Optional[float]),
]
)
44 changes: 44 additions & 0 deletions bravado_asyncio/future_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import asyncio
import concurrent.futures
import time
from typing import Optional

from bravado.http_future import FutureAdapter as BaseFutureAdapter

from .definitions import AsyncioResponse


class FutureAdapter(BaseFutureAdapter):
"""FutureAdapter that will be used when run_mode is THREAD. The result method is
a normal Python function, and we expect future to be from the concurrent.futures module."""

timeout_errors = (concurrent.futures.TimeoutError,)

def __init__(self, future: concurrent.futures.Future) -> None:
self.future = future

def result(self, timeout: Optional[float]=None) -> AsyncioResponse:
start = time.monotonic()
response = self.future.result(timeout)
time_elapsed = time.monotonic() - start
remaining_timeout = timeout - time_elapsed if timeout else None

return AsyncioResponse(response=response, remaining_timeout=remaining_timeout)


class AsyncioFutureAdapter(BaseFutureAdapter):
"""FutureAdapter that will be used when run_mode is FULL_ASYNCIO. The result method is
a coroutine, and we expect future to be awaitable."""

timeout_errors = (asyncio.TimeoutError,)

def __init__(self, future: asyncio.Future) -> None:
self.future = future

async def result(self, timeout: Optional[float]=None) -> AsyncioResponse:
start = time.monotonic()
response = await asyncio.wait_for(self.future, timeout=timeout)
time_elapsed = time.monotonic() - start
remaining_timeout = timeout - time_elapsed if timeout else None

return AsyncioResponse(response=response, remaining_timeout=remaining_timeout)
136 changes: 44 additions & 92 deletions bravado_asyncio/http_client.py
Original file line number Diff line number Diff line change
@@ -1,102 +1,72 @@
import asyncio
import concurrent.futures
import logging
import threading
import time
from collections import Mapping
from typing import NamedTuple
from typing import Optional

import aiohttp
from aiohttp.formdata import FormData
from bravado.http_client import HttpClient
from bravado.http_future import FutureAdapter
from bravado.http_future import HttpFuture
from bravado_core.response import IncomingResponse
from bravado_core.schema import is_list_like
from multidict import MultiDict

log = logging.getLogger(__name__)


AsyncioResponse = NamedTuple(
'AsyncioResponse', [
('response', aiohttp.ClientResponse),
('remaining_timeout', Optional[float])
]
)

from .definitions import RunMode
from .future_adapter import AsyncioFutureAdapter
from .future_adapter import FutureAdapter
from .response_adapter import AioHTTPResponseAdapter
from .response_adapter import AsyncioHTTPResponseAdapter
from .thread_loop import get_thread_loop

loop = None
client_session = None


def run_event_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
log = logging.getLogger(__name__)


def get_loop():
global loop
if loop is None:
loop = asyncio.new_event_loop()
thread = threading.Thread(target=run_event_loop, args=(loop,), daemon=True)
thread.start()
return loop
# module variable holding the current ClientSession, so that we can share it between AsyncioClient instances
client_session: Optional[aiohttp.ClientSession] = None


def get_client_session():
def get_client_session(loop: asyncio.AbstractEventLoop) -> aiohttp.ClientSession:
global client_session
if client_session:
return client_session
client_session = aiohttp.ClientSession(loop=get_loop())
client_session = aiohttp.ClientSession(loop=loop)
return client_session


class AioHTTPResponseAdapter(IncomingResponse):
"""Wraps a aiohttp Response object to provide a bravado-like interface
to the response innards.
class AsyncioClient(HttpClient):
"""Asynchronous HTTP client using the asyncio event loop. Can either use an event loop
in a separate thread or operate fully asynchronous within the current thread, using
async / await.
"""

def __init__(self, response: AsyncioResponse):
self._delegate = response.response
self._remaining_timeout = response.remaining_timeout

@property
def status_code(self):
return self._delegate.status

@property
def text(self):
future = asyncio.run_coroutine_threadsafe(self._delegate.text(), get_loop())
return future.result(self._remaining_timeout)

@property
def raw_bytes(self):
future = asyncio.run_coroutine_threadsafe(self._delegate.read(), get_loop())
return future.result(self._remaining_timeout)

@property
def reason(self):
return self._delegate.reason

@property
def headers(self):
return self._delegate.headers

def json(self, **_):
future = asyncio.run_coroutine_threadsafe(self._delegate.json(), get_loop())
return future.result(self._remaining_timeout)

def __init__(self, run_mode: RunMode=RunMode.THREAD, loop: asyncio.AbstractEventLoop=None):
self.run_mode = run_mode
if self.run_mode == RunMode.THREAD:
self.loop = loop or get_thread_loop()
self.run_coroutine_func = asyncio.run_coroutine_threadsafe
self.response_adapter = AioHTTPResponseAdapter
self.bravado_future_class = HttpFuture
self.future_adapter = FutureAdapter
elif run_mode == RunMode.FULL_ASYNCIO:
from aiobravado.http_future import HttpFuture as AsyncioHttpFuture

self.loop = loop or asyncio.get_event_loop()
self.run_coroutine_func = asyncio.ensure_future
self.response_adapter = AsyncioHTTPResponseAdapter
self.bravado_future_class = AsyncioHttpFuture
self.future_adapter = AsyncioFutureAdapter
else:
raise ValueError('Don\'t know how to handle run mode {}'.format(str(run_mode)))

class AsyncioClient(HttpClient):
"""Asynchronous HTTP client using the asyncio event loop.
"""
# don't use the shared client_session if we've been passed an explicit loop argument
if loop:
self.client_session = aiohttp.ClientSession(loop=loop)
else:
self.client_session = get_client_session(self.loop)

def request(self, request_params, operation=None, response_callbacks=None,
also_return_response=False):
"""Sets up the request params as per Twisted Agent needs.
Sets up crochet and triggers the API request in background
"""Sets up the request params for aiohttp and executes the request in the background.
:param request_params: request parameters for the http request.
:type request_params: dict
Expand All @@ -112,8 +82,6 @@ def request(self, request_params, operation=None, response_callbacks=None,
:rtype: :class: `bravado_core.http_future.HttpFuture`
"""

client_session = get_client_session()

orig_data = request_params.get('data', {})
if isinstance(orig_data, Mapping):
data = FormData()
Expand Down Expand Up @@ -144,7 +112,7 @@ def request(self, request_params, operation=None, response_callbacks=None,
)
timeout = request_params.get('timeout')

coroutine = client_session.request(
coroutine = self.client_session.request(
method=request_params.get('method') or 'GET',
url=request_params.get('url'),
params=params,
Expand All @@ -153,11 +121,11 @@ def request(self, request_params, operation=None, response_callbacks=None,
timeout=timeout,
)

future = asyncio.run_coroutine_threadsafe(coroutine, get_loop())
future = self.run_coroutine_func(coroutine, loop=self.loop)

return HttpFuture(
AsyncioFutureAdapter(future),
AioHTTPResponseAdapter,
return self.bravado_future_class(
self.future_adapter(future),
self.response_adapter(loop=self.loop),
operation,
response_callbacks,
also_return_response,
Expand All @@ -172,19 +140,3 @@ def prepare_params(self, params):
entries = [(key, str(value))] if not is_list_like(value) else [(key, str(v)) for v in value]
items.extend(entries)
return MultiDict(items)


class AsyncioFutureAdapter(FutureAdapter):

timeout_errors = (concurrent.futures.TimeoutError,)

def __init__(self, future: concurrent.futures.Future) -> None:
self.future = future

def result(self, timeout: Optional[float]=None) -> AsyncioResponse:
start = time.monotonic()
response = self.future.result(timeout)
time_elapsed = time.monotonic() - start
remaining_timeout = timeout - time_elapsed if timeout else None

return AsyncioResponse(response=response, remaining_timeout=remaining_timeout)
64 changes: 64 additions & 0 deletions bravado_asyncio/response_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import asyncio
from typing import TypeVar

from bravado_core.response import IncomingResponse

from .definitions import AsyncioResponse


T = TypeVar('T')


class AioHTTPResponseAdapter(IncomingResponse):
"""Wraps a aiohttp Response object to provide a bravado-like interface
to the response innards."""

def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
self._loop = loop

def __call__(self: T, response: AsyncioResponse) -> T:
self._delegate = response.response
self._remaining_timeout = response.remaining_timeout
return self

@property
def status_code(self):
return self._delegate.status

@property
def text(self):
future = asyncio.run_coroutine_threadsafe(self._delegate.text(), self._loop)
return future.result(self._remaining_timeout)

@property
def raw_bytes(self):
future = asyncio.run_coroutine_threadsafe(self._delegate.read(), self._loop)
return future.result(self._remaining_timeout)

@property
def reason(self):
return self._delegate.reason

@property
def headers(self):
return self._delegate.headers

def json(self, **_):
future = asyncio.run_coroutine_threadsafe(self._delegate.json(), self._loop)
return future.result(self._remaining_timeout)


class AsyncioHTTPResponseAdapter(AioHTTPResponseAdapter):
"""Wraps a aiohttp Response object to provide a bravado-like interface to the response innards.
Methods are coroutines if they call coroutines themselves and need to be awaited."""

@property
async def text(self):
return await asyncio.wait_for(self._delegate.text(), timeout=self._remaining_timeout, loop=self._loop)

@property
async def raw_bytes(self):
return await asyncio.wait_for(self._delegate.read(), timeout=self._remaining_timeout, loop=self._loop)

async def json(self, **_):
return await asyncio.wait_for(self._delegate.json(), timeout=self._remaining_timeout, loop=self._loop)
22 changes: 22 additions & 0 deletions bravado_asyncio/thread_loop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""Module for creating a separate thread with an asyncio event loop running inside it."""
import asyncio
import threading
from typing import Optional


# module variable holding a reference to the event loop
event_loop: Optional[asyncio.AbstractEventLoop] = None


def run_event_loop(loop: asyncio.AbstractEventLoop) -> None:
asyncio.set_event_loop(loop)
loop.run_forever()


def get_thread_loop() -> asyncio.AbstractEventLoop:
global event_loop
if event_loop is None:
event_loop = asyncio.new_event_loop()
thread = threading.Thread(target=run_event_loop, args=(event_loop,), daemon=True)
thread.start()
return event_loop
Empty file added docs/.gitignore
Empty file.
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
aiobravado
bravado-core>=4.11.0
bravado>=9.2.0
coverage
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,6 @@
extras_require={
# as recommended by aiohttp, see http://aiohttp.readthedocs.io/en/stable/#library-installation
'aiohttp_extras': ['aiodns', 'cchardet'],
'aiobravado': ['aiobravado'],
},
)

0 comments on commit 22e681a

Please sign in to comment.