Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rewrite test client redirect handling #1402

Merged
merged 1 commit into from Nov 25, 2018
Merged
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -116,6 +116,21 @@ Unreleased
versions of Lighttpd. ``LighttpdCGIRootFix`` was renamed to
``CGIRootFix`` in 0.9. The old name emits a deprecation warning and
will be removed in the next version. (`#1141`_)
- The test :class:`~test.Client` redirect handling is rewritten.
(`#1402`_)

- The redirect environ is copied from the initial request environ.
- Script root and path are correctly distinguished when
redirecting to a path under the root.
- The HEAD method is not changed to GET.
- 307 and 308 codes preserve the method and body. All others
ignore the body and related headers.
- Headers are passed to the new request for all codes, following
what browsers do.
- :class:`~test.EnvironBuilder` sets the content type and length
headers in addition to the WSGI keys when detecting them from
the data.


.. _`#209`: https://github.com/pallets/werkzeug/pull/209
.. _`#609`: https://github.com/pallets/werkzeug/pull/609
@@ -164,6 +179,7 @@ Unreleased
.. _`#1393`: https://github.com/pallets/werkzeug/pull/1393
.. _`#1395`: https://github.com/pallets/werkzeug/pull/1395
.. _`#1401`: https://github.com/pallets/werkzeug/pull/1401
.. _`#1402`: https://github.com/pallets/werkzeug/pull/1402


Version 0.14.1
@@ -56,17 +56,6 @@ def redirect_with_get_app(environ, start_response):
return response(environ, start_response)


def redirect_with_post_app(environ, start_response):
req = Request(environ)
if req.url == 'http://localhost/some/redirect/':
assert req.method == 'GET', 'request should be GET'
assert not req.form, 'request should not have data'
response = Response('current url: %s' % req.url)
else:
response = redirect('http://localhost/some/redirect/')
return response(environ, start_response)


def external_redirect_demo_app(environ, start_response):
response = redirect('http://example.com/')
return response(environ, start_response)
@@ -333,6 +322,22 @@ def test_create_environ_query_string_error():
create_environ('/foo?bar=baz', query_string={'a': 'b'})


def test_builder_from_environ():
environ = create_environ(
"/foo",
base_url="https://example.com/base",
query_string={"name": "Werkzeug"},
data={"foo": "bar"},
headers={"X-Foo": "bar"}
)
builder = EnvironBuilder.from_environ(environ)
try:
new_environ = builder.get_environ()
finally:
builder.close()
assert new_environ == environ


def test_file_closing():
closed = []

@@ -390,21 +395,35 @@ def local_redirect_app(environ, start_response):
strict_eq(resp.data, b'current path: /to/location')


def test_follow_redirect_with_post_307():
def redirect_with_post_307_app(environ, start_response):
req = Request(environ)
if req.url == 'http://localhost/some/redirect/':
assert req.method == 'POST', 'request should be POST'
assert not req.form, 'request should not have data'
response = Response('current url: %s' % req.url)
else:
response = redirect('http://localhost/some/redirect/', code=307)
return response(environ, start_response)
@pytest.mark.parametrize(
("code", "keep"),
((302, False), (301, False), (307, True), (308, True)),
)
def test_follow_redirect_body(code, keep):
@Request.application
def app(request):
if request.url == "http://localhost/some/redirect/":
assert request.method == "POST" if keep else "GET"
assert request.headers["X-Foo"] == "bar"

if keep:
assert request.form["foo"] == "bar"
else:
assert not request.form

return Response("current url: %s" % request.url)

return redirect("http://localhost/some/redirect/", code=code)

c = Client(redirect_with_post_307_app, response_wrapper=BaseResponse)
resp = c.post('/', follow_redirects=True, data='foo=blub+hehe&blah=42')
assert resp.status_code == 200
assert resp.data == b'current url: http://localhost/some/redirect/'
c = Client(app, response_wrapper=BaseResponse)
response = c.post(
"/",
follow_redirects=True,
data={"foo": "bar"},
headers={"X-Foo": "bar"},
)
assert response.status_code == 200
assert response.data == b"current url: http://localhost/some/redirect/"


def test_follow_external_redirect():
@@ -436,11 +455,17 @@ def test_follow_redirect_loop():
c.get('/', follow_redirects=True)


def test_follow_redirect_with_post():
c = Client(redirect_with_post_app, response_wrapper=BaseResponse)
resp = c.post('/', follow_redirects=True, data='foo=blub+hehe&blah=42')
strict_eq(resp.status_code, 200)
strict_eq(resp.data, b'current url: http://localhost/some/redirect/')
def test_follow_redirect_non_root_base_url():
@Request.application
def app(request):
if request.path == "/redirect":
return redirect("done")

return Response(request.path)

c = Client(app, response_wrapper=Response)
response = c.get("/redirect", base_url="http://localhost/other", follow_redirects=True)
assert response.data == b"/done"


def test_path_info_script_name_unquoting():
@@ -121,6 +121,7 @@
304: 'Not Modified',
305: 'Use Proxy',
307: 'Temporary Redirect',
308: 'Permanent Redirect',
400: 'Bad Request',
401: 'Unauthorized',
402: 'Payment Required', # unused
@@ -32,10 +32,17 @@
from werkzeug.wrappers import BaseRequest
from werkzeug.urls import url_encode, url_fix, iri_to_uri, url_unquote, \
url_unparse, url_parse
from werkzeug.wsgi import get_host, get_current_url, ClosingIterator
from werkzeug.wsgi import get_current_url, ClosingIterator
from werkzeug.utils import dump_cookie, get_content_type
from werkzeug.datastructures import FileMultiDict, MultiDict, \
CombinedMultiDict, Headers, FileStorage, CallbackDict
from werkzeug.datastructures import (
FileMultiDict,
MultiDict,
CombinedMultiDict,
Headers,
FileStorage,
CallbackDict,
EnvironHeaders,
)
from werkzeug.http import dump_options_header, parse_options_header


@@ -355,6 +362,35 @@ def __init__(self, path='/', base_url=None, query_string=None,
if mimetype is not None:
self.mimetype = mimetype

@classmethod
def from_environ(cls, environ, **kwargs):
"""Turn an environ dict back into a builder. Any extra kwargs
override the args extracted from the environ.
.. versionadded:: 0.15
"""
headers = Headers(EnvironHeaders(environ))
out = {
"path": environ["PATH_INFO"],
"base_url": cls._make_base_url(
environ["wsgi.url_scheme"],
headers.pop("Host"),
environ["SCRIPT_NAME"],
),
"query_string": environ["QUERY_STRING"],
"method": environ["REQUEST_METHOD"],
"input_stream": environ["wsgi.input"],
"content_type": headers.pop("Content-Type", None),
"content_length": headers.pop("Content-Length", None),
"errors_stream": environ["wsgi.errors"],
"multithread": environ["wsgi.multithread"],
"multiprocess": environ["wsgi.multiprocess"],
"run_once": environ["wsgi.run_once"],
"headers": headers,
}
out.update(kwargs)
return cls(**out)

def _add_file_from_data(self, key, value):
"""Called in the EnvironBuilder to add files from the data dict."""
if isinstance(value, tuple):
@@ -372,11 +408,19 @@ def _add_file_from_data(self, key, value):
else:
self.files.add_file(key, value)

def _get_base_url(self):
return url_unparse((self.url_scheme, self.host,
self.script_root, '', '')).rstrip('/') + '/'
@staticmethod
def _make_base_url(scheme, host, script_root):
return url_unparse((scheme, host, script_root, "", "")).rstrip("/") + "/"

@property
def base_url(self):
"""The base URL is used to extract the URL scheme, host name,
port, and root path.
"""
return self._make_base_url(self.url_scheme, self.host, self.script_root)

def _set_base_url(self, value):
@base_url.setter
def base_url(self, value):
if value is None:
scheme = 'http'
netloc = 'localhost'
@@ -390,12 +434,6 @@ def _set_base_url(self, value):
self.host = netloc
self.url_scheme = scheme

base_url = property(_get_base_url, _set_base_url, doc='''
The base URL is a URL that is used to extract the WSGI
URL scheme, host (server name + server port) and the
script root (`SCRIPT_NAME`).''')
del _get_base_url, _set_base_url

def _get_content_type(self):
ct = self.headers.get('Content-Type')
if ct is None and not self._input_stream:
@@ -571,7 +609,13 @@ def close(self):
self.closed = True

def get_environ(self):
"""Return the built environ."""
"""Return the built environ.
.. versionchanged:: 0.15
The content type and length headers are set based on
input stream detection. Previously this only set the WSGI
keys.
"""
input_stream = self.input_stream
content_length = self.content_length

@@ -624,15 +668,23 @@ def _path_encode(x):
'wsgi.multiprocess': self.multiprocess,
'wsgi.run_once': self.run_once
})

headers = self.headers.copy()

if content_type is not None:
result['CONTENT_TYPE'] = content_type
headers.set("Content-Type", content_type)

if content_length is not None:
result['CONTENT_LENGTH'] = str(content_length)
headers.set("Content-Length", content_length)

for key, value in self.headers.to_wsgi_list():
for key, value in headers.to_wsgi_list():
result['HTTP_%s' % key.upper().replace('-', '_')] = value

if self.environ_overrides:
result.update(self.environ_overrides)

return result

def get_request(self, cls=None):
@@ -724,43 +776,65 @@ def run_wsgi_app(self, environ, buffered=False):
return rv

def resolve_redirect(self, response, new_location, environ, buffered=False):
"""Resolves a single redirect and triggers the request again
directly on this redirect client.
"""Perform a new request to the location given by the redirect
response to the previous request.
"""
scheme, netloc, script_root, qs, anchor = url_parse(new_location)
base_url = url_unparse((scheme, netloc, '', '', '')).rstrip('/') + '/'

cur_server_name = netloc.split(':', 1)[0].split('.')
real_server_name = get_host(environ).rsplit(':', 1)[0].split('.')
if cur_server_name == ['']:
# this is a local redirect having autocorrect_location_header=False
cur_server_name = real_server_name
base_url = EnvironBuilder(environ).base_url

if self.allow_subdomain_redirects:
allowed = cur_server_name[-len(real_server_name):] == real_server_name
scheme, netloc, path, qs, anchor = url_parse(new_location)
builder = EnvironBuilder.from_environ(environ, query_string=qs)

to_name_parts = netloc.split(':', 1)[0].split(".")
from_name_parts = builder.server_name.split(".")

if to_name_parts != [""]:
# The new location has a host, use it for the base URL.
builder.url_scheme = scheme
builder.host = netloc
else:
allowed = cur_server_name == real_server_name
# A local redirect with autocorrect_location_header=False
# doesn't have a host, so use the request's host.
to_name_parts = from_name_parts

# Explain why a redirect to a different server name won't be followed.
if to_name_parts != from_name_parts:
if to_name_parts[-len(from_name_parts):] == from_name_parts:
if not self.allow_subdomain_redirects:
raise RuntimeError("Following subdomain redirects is not enabled.")
else:
raise RuntimeError("Following external redirects is not supported.")

if not allowed:
raise RuntimeError('%r does not support redirect to '
'external targets' % self.__class__)
path_parts = path.split("/")
root_parts = builder.script_root.split("/")

status_code = int(response[1].split(None, 1)[0])
if status_code == 307:
method = environ['REQUEST_METHOD']
if path_parts[:len(root_parts)] == root_parts:
# Strip the script root from the path.
builder.path = path[len(builder.script_root):]
else:
method = 'GET'
# The new location is not under the script root, so use the
# whole path and clear the previous root.
builder.path = path
builder.script_root = ""

# For redirect handling we temporarily disable the response
# wrapper. This is not threadsafe but not a real concern
# since the test client must not be shared anyways.
status_code = int(response[1].split(None, 1)[0])

# Only 307 and 308 preserve all of the original request.
if status_code not in {307, 308}:
# HEAD is preserved, everything else becomes GET.
if builder.method != "HEAD":
builder.method = "GET"

# Clear the body and the headers that describe it.
builder.input_stream = None
builder.content_type = None
builder.content_length = None
builder.headers.pop("Transfer-Encoding", None)

# Disable the response wrapper while handling redirects. Not
# thread safe, but the client should not be shared anyway.
old_response_wrapper = self.response_wrapper
self.response_wrapper = None

try:
return self.open(path=script_root, base_url=base_url,
query_string=qs, as_tuple=True,
buffered=buffered, method=method)
return self.open(builder, as_tuple=True, buffered=buffered)
finally:
self.response_wrapper = old_response_wrapper

@@ -811,8 +885,10 @@ def open(self, *args, **kwargs):
redirect_chain = []
while 1:
status_code = int(response[1].split(None, 1)[0])
if status_code not in (301, 302, 303, 305, 307) \
or not follow_redirects:
if (
status_code not in {301, 302, 303, 305, 307, 308}
or not follow_redirects
):
break
new_location = response[2]['location']
new_redirect_entry = (new_location, status_code)
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.