-
-
Notifications
You must be signed in to change notification settings - Fork 578
/
transports.py
147 lines (111 loc) · 4.71 KB
/
transports.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import logging
import os
from contextlib import contextmanager
from urllib.parse import urlparse
import requests
from zeep.utils import get_media_type, get_version
from zeep.wsdl.utils import etree_to_string
class Transport:
"""The transport object handles all communication to the SOAP server.
:param cache: The cache object to be used to cache GET requests
:param timeout: The timeout for loading wsdl and xsd documents.
:param operation_timeout: The timeout for operations (POST/GET). By
default this is None (no timeout).
:param session: A :py:class:`request.Session()` object (optional)
"""
def __init__(self, cache=None, timeout=300, operation_timeout=None, session=None):
self.cache = cache
self.load_timeout = timeout
self.operation_timeout = operation_timeout
self.logger = logging.getLogger(__name__)
self.session = session or requests.Session()
self.session.headers["User-Agent"] = "Zeep/%s (www.python-zeep.org)" % (
get_version()
)
def get(self, address, params, headers):
"""Proxy to requests.get()
:param address: The URL for the request
:param params: The query parameters
:param headers: a dictionary with the HTTP headers.
"""
response = self.session.get(
address, params=params, headers=headers, timeout=self.operation_timeout
)
return response
def post(self, address, message, headers):
"""Proxy to requests.posts()
:param address: The URL for the request
:param message: The content for the body
:param headers: a dictionary with the HTTP headers.
"""
if self.logger.isEnabledFor(logging.DEBUG):
log_message = message
if isinstance(log_message, bytes):
log_message = log_message.decode("utf-8")
self.logger.debug("HTTP Post to %s:\n%s", address, log_message)
response = self.session.post(
address, data=message, headers=headers, timeout=self.operation_timeout
)
if self.logger.isEnabledFor(logging.DEBUG):
media_type = get_media_type(
response.headers.get("Content-Type", "text/xml")
)
if media_type == "multipart/related":
log_message = response.content
else:
log_message = response.content
if isinstance(log_message, bytes):
log_message = log_message.decode(response.encoding or "utf-8")
self.logger.debug(
"HTTP Response from %s (status: %d):\n%s",
address,
response.status_code,
log_message,
)
return response
def post_xml(self, address, envelope, headers):
"""Post the envelope xml element to the given address with the headers.
This method is intended to be overriden if you want to customize the
serialization of the xml element. By default the body is formatted
and encoded as utf-8. See ``zeep.wsdl.utils.etree_to_string``.
"""
message = etree_to_string(envelope)
return self.post(address, message, headers)
def load(self, url):
"""Load the content from the given URL"""
if not url:
raise ValueError("No url given to load")
scheme = urlparse(url).scheme
if scheme in ("http", "https"):
if self.cache:
response = self.cache.get(url)
if response:
return bytes(response)
content = self._load_remote_data(url)
if self.cache:
self.cache.add(url, content)
return content
elif scheme == "file":
if url.startswith("file://"):
url = url[7:]
with open(os.path.expanduser(url), "rb") as fh:
return fh.read()
def _load_remote_data(self, url):
self.logger.debug("Loading remote data from: %s", url)
response = self.session.get(url, timeout=self.load_timeout)
response.raise_for_status()
return response.content
@contextmanager
def settings(self, timeout=None):
"""Context manager to temporarily overrule options.
Example::
transport = zeep.Transport()
with transport.settings(timeout=10):
client.service.fast_call()
:param timeout: Set the timeout for POST/GET operations (not used for
loading external WSDL or XSD documents)
"""
old_timeout = self.operation_timeout
self.operation_timeout = timeout
yield
self.operation_timeout = old_timeout