Skip to content

Commit

Permalink
Implement support for asyncio.
Browse files Browse the repository at this point in the history
This adds an optional AsyncTransport class which only works for
Python 3.5. Code is heavily inspired by the pull-request from
chrisimcevoy (see #207)
  • Loading branch information
mvantellingen committed Nov 13, 2016
1 parent ec62168 commit 1066e96
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 6 deletions.
62 changes: 62 additions & 0 deletions examples/async_client.py
@@ -0,0 +1,62 @@
import asyncio
import time

import zeep

from zeep.asyncio import AsyncTransport


def run_async():
print("async example")
print("=============")

result = []

def handle_future(future):
result.extend(future.result())

loop = asyncio.get_event_loop()

transport = AsyncTransport(loop, cache=None)
client = zeep.Client('http://localhost:8000/?wsdl', transport=transport)

tasks = [
client.service.slow_request('request-1'), # takes 1 sec
client.service.slow_request('request-2'), # takes 1 sec
]
future = asyncio.gather(*tasks, return_exceptions=True)

result = []
future.add_done_callback(handle_future)

st = time.time()
loop.run_until_complete(future)
loop.run_until_complete(transport.session.close())
print("time: %.2f" % (time.time() - st))
print("result: %s", result)
print("")
return result


def run_sync():
print("sync example")
print("============")
transport = zeep.Transport(cache=None)
client = zeep.Client('http://localhost:8000/?wsdl', transport=transport)

st = time.time()
result = [
client.service.slow_request('request-1'), # takes 1 sec
client.service.slow_request('request-2'), # takes 1 sec
]
print("Time: %.2f" % (time.time() - st))
print("result: %s", result)
print("\n")

return result


if __name__ == '__main__':
print("")
run_async()
run_sync()
46 changes: 46 additions & 0 deletions examples/soap_server.py
@@ -0,0 +1,46 @@
"""
Example soap server using spyne.
Run with
uwsgi --http :8000 \
--wsgi-file soap_server.py \
--virtualenv ~/.pyenv/versions/3.5.2/envs/zeep \
-p 10
"""
import time

from spyne import Application, ServiceBase, Unicode, rpc
from spyne.protocol.soap import Soap11
from spyne.server.wsgi import WsgiApplication


class ExampleService(ServiceBase):

@rpc(Unicode, _returns=Unicode)
def slow_request(ctx, request_id):
time.sleep(1)
return u'Request: %s' % request_id

application = Application(
services=[ExampleService],
tns='http://tests.python-zeep.org/',
in_protocol=Soap11(validator='lxml'),
out_protocol=Soap11())

application = WsgiApplication(application)

if __name__ == '__main__':
import logging

from wsgiref.simple_server import make_server

logging.basicConfig(level=logging.DEBUG)
logging.getLogger('spyne.protocol.xml').setLevel(logging.DEBUG)

logging.info("listening to http://127.0.0.1:8000")
logging.info("wsdl is at: http://localhost:8000/?wsdl")

server = make_server('127.0.0.1', 8000, application)
server.serve_forever()
90 changes: 90 additions & 0 deletions src/zeep/asyncio.py
@@ -0,0 +1,90 @@
"""
Adds asyncio support to Zeep. Contains Python 3.5+ only syntax!
"""
import asyncio

import aiohttp

from zeep.transports import Transport
from zeep.wsdl import bindings
from zeep.wsdl.utils import etree_to_string


class AsyncTransport(Transport):
supports_async = True

def __init__(self, loop, *args, **kwargs):
self.loop = loop if loop else asyncio.get_event_loop()
super().__init__(*args, **kwargs)

# Create a separate regular requests session for non async
self._load_session = Transport.create_session(self)

def create_session(self):
connector = aiohttp.TCPConnector(verify_ssl=self.http_verify)

return aiohttp.ClientSession(
connector=connector,
loop=self.loop,
headers=self.http_headers,
auth=self.http_auth)

def _load_remote_data(self, url):
response = self._load_session.get(url, timeout=self.load_timeout)
response.raise_for_status()
return response.content

async def post(self, address, message, headers):
self.logger.debug("HTTP Post to %s:\n%s", address, message)
with aiohttp.Timeout(self.operation_timeout):
response = await self.session.post(
address, data=message, headers=headers)
self.logger.debug(
"HTTP Response from %s (status: %d):\n%s",
address, response.status, await response.read())
return response

async def post_xml(self, address, envelope, headers):
message = etree_to_string(envelope)
response = await self.post(address, message, headers)

from pretend import stub
return stub(
content=await response.read(),
status_code=response.status,
headers=response.headers)

async def get(self, address, params, headers):
with aiohttp.Timeout(self.operation_timeout):
response = await self.session.get(
address, params=params, headers=headers)

from pretend import stub
return await stub(
content=await response.read(),
status_code=response.status,
headers=response.headers)


class AsyncSoapBinding(object):

async def send(self, client, options, operation, args, kwargs):
envelope, http_headers = self._create(
operation, args, kwargs,
client=client,
options=options)

response = await client.transport.post_xml(
options['address'], envelope, http_headers)

operation_obj = self.get(operation)
return self.process_reply(client, operation_obj, response)


class AsyncSoap11Binding(AsyncSoapBinding, bindings.Soap11Binding):
pass


class AsyncSoap12Binding(AsyncSoapBinding, bindings.Soap12Binding):
pass
1 change: 1 addition & 0 deletions src/zeep/transports.py
Expand Up @@ -11,6 +11,7 @@


class Transport(object):
supports_async = False

def __init__(self, cache=NotSet, timeout=300, operation_timeout=None,
verify=True, http_auth=None):
Expand Down
19 changes: 13 additions & 6 deletions src/zeep/wsdl/wsdl.py
Expand Up @@ -338,12 +338,19 @@ def parse_binding(self, doc):
"""
result = {}
binding_classes = [
bindings.Soap11Binding,
bindings.Soap12Binding,
bindings.HttpGetBinding,
bindings.HttpPostBinding,
]
if not getattr(self.wsdl.transport, 'supports_async', False):
binding_classes = [
bindings.Soap11Binding,
bindings.Soap12Binding,
bindings.HttpGetBinding,
bindings.HttpPostBinding,
]
else:
from zeep.asyncio import AsyncSoap11Binding, AsyncSoap12Binding
binding_classes = [
AsyncSoap11Binding,
AsyncSoap12Binding,
]

for binding_node in doc.findall('wsdl:binding', namespaces=NSMAP):
# Detect the binding type
Expand Down

0 comments on commit 1066e96

Please sign in to comment.