/
wsgi.py
143 lines (113 loc) · 5.05 KB
/
wsgi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
##############################################################################
#
# Copyright (c) 2010 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Zope 3-specific testing code
"""
import cStringIO
import Cookie
import httplib
import socket
import sys
import mechanize
import zope.testbrowser.browser
import zope.testbrowser.testing
class WSGIConnection(object):
"""A ``mechanize`` compatible connection object."""
def __init__(self, test_app, host, timeout=None):
self._test_app = test_app
self.host = host
def set_debuglevel(self, level):
pass
def _quote(self, url):
# the publisher expects to be able to split on whitespace, so we have
# to make sure there is none in the URL
return url.replace(' ', '%20')
def request(self, method, url, body=None, headers=None):
"""Send a request to the publisher.
The response will be stored in ``self.response``.
"""
if body is None:
body = ''
if url == '':
url = '/'
url = self._quote(url)
# Extract the handle_error option header
if sys.version_info >= (2,5):
handle_errors_key = 'X-Zope-Handle-Errors'
else:
handle_errors_key = 'X-zope-handle-errors'
handle_errors_header = headers.get(handle_errors_key, True)
if handle_errors_key in headers:
del headers[handle_errors_key]
# Translate string to boolean.
handle_errors = {'False': False}.get(handle_errors_header, True)
extra_environ = {}
if not handle_errors:
# There doesn't seem to be a "Right Way" to do this
extra_environ['wsgi.handleErrors'] = False # zope.app.wsgi does this
extra_environ['paste.throw_errors'] = True # the paste way of doing this
app = self._test_app
# Here we do a complicated dance to sync the webtest apps idea of what
# cookies there are with the testbrowsers. It's not quite perfect as
# they can still get un-synced if you don't execute a request via the
# testbrowser. But that's a veryvery edge case.
app.cookies.clear()
for h, v in headers.items():
if h.lower() == 'cookie':
cookies = Cookie.SimpleCookie()
cookies.load(v)
for key, morsel in cookies.items():
app.cookies[key] = morsel.value
break
# pass the request to webtest
if method == 'GET':
assert not body, body
response = app.get(url, headers=headers, expect_errors=True, extra_environ=extra_environ)
elif method == 'POST':
response = app.post(url, body, headers=headers, expect_errors=True, extra_environ=extra_environ)
else:
raise Exception('Couldnt handle method %s' % method)
self.response = response
def getresponse(self):
"""Return a ``mechanize`` compatible response.
The goal of ths method is to convert the Zope Publisher's reseponse to
a ``mechanize`` compatible response, which is also understood by
mechanize.
"""
response = self.response
status = int(response.status[:3])
reason = response.status[4:]
headers = response.headers.items()
headers.sort()
headers.insert(0, ('Status', response.status))
headers = '\r\n'.join('%s: %s' % h for h in headers)
content = response.body
return zope.testbrowser.testing.PublisherResponse(content, headers, status, reason)
class WSGIHTTPHandler(zope.testbrowser.testing.PublisherHTTPHandler):
def __init__(self, test_app, *args, **kw):
self._test_app = test_app
zope.testbrowser.testing.PublisherHTTPHandler.__init__(self, *args, **kw)
def _connect(self, *args, **kw):
return WSGIConnection(self._test_app, *args, **kw)
class WSGIMechanizeBrowser(zope.testbrowser.testing.PublisherMechanizeBrowser):
"""Special ``mechanize`` browser using the Zope Publisher HTTP handler."""
def __init__(self, test_app, *args, **kw):
self._test_app = test_app
zope.testbrowser.testing.PublisherMechanizeBrowser.__init__(self, *args, **kw)
def _http_handler(self, *args, **kw):
return WSGIHTTPHandler(self._test_app, *args, **kw)
class Browser(zope.testbrowser.browser.Browser):
"""A Zope `testbrowser` Browser that uses the Zope Publisher."""
def __init__(self, test_app, url=None):
mech_browser = WSGIMechanizeBrowser(test_app)
super(Browser, self).__init__(url=url, mech_browser=mech_browser)