Browse files

Added support for host validation and added some improved security he…

…lpers for proxy scenarios.
  • Loading branch information...
mitsuhiko committed May 14, 2013
1 parent ad6a2fe commit cdf680222af293a2c118d8d52eecfd7b0c566e14
@@ -30,6 +30,10 @@ Release date to be decided, codename to be chosen.
- The description field of HTTP exceptions is now always escaped.
Use markup objects to disable that.
- Added number of proxy argument to the proxy fix to make it more
secure out of the box on common proxy setups. It will by default
no longer trust the x-forwarded-for header as much as it did
Version 0.8.4
@@ -65,6 +65,8 @@ The following error classes exist in Werkzeug:
.. autoexception:: ClientDisconnected
.. autoexception:: SecurityError
@@ -46,6 +46,8 @@ information or perform common manipulations:
.. autofunction:: extract_path_info
.. autofunction:: host_is_trusted
Convenience Helpers
@@ -83,26 +83,32 @@ class ProxyFix(object):
application that was not designed with HTTP proxies in mind. It
sets `REMOTE_ADDR`, `HTTP_HOST` from `X-Forwarded` headers.
If you have more than one proxy server in front of your app, set
`num_proxies` accordingly.
Do not use this middleware in non-proxy setups for security reasons.
The original values of `REMOTE_ADDR` and `HTTP_HOST` are stored in
the WSGI environment as `werkzeug.proxy_fix.orig_remote_addr` and
:param app: the WSGI application
:param num_proxies: the number of proxy servers in front of the app.
def __init__(self, app):
def __init__(self, app, num_proxies=1): = app
self.num_proxies = num_proxies
def get_remote_addr(self, forwarded_for):
"""Selects the new remote addr from the given list of ips in
X-Forwarded-For. By default the first one is picked.
X-Forwarded-For. By default it picks the one that the `num_proxies`
proxy server provides. Before 0.9 it would always pick the first.
.. versionadded:: 0.8
if forwarded_for:
return forwarded_for[0]
if len(forwarded_for) >= self.num_proxies:
return forwarded_for[-1 * self.num_proxies]
def __call__(self, environ, start_response):
getter = environ.get
@@ -194,6 +194,14 @@ class ClientDisconnected(BadRequest):
class SecurityError(BadRequest):
"""Raised if something triggers a security error. This is otherwise
exactly like a bad request error.
.. versionadded:: 0.9
class Unauthorized(HTTPException):
"""*401* `Unauthorized`
@@ -13,6 +13,7 @@
import posixpath
from itertools import izip
from random import SystemRandom
from werkzeug.exceptions import BadRequest
SALT_CHARS = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
@@ -48,14 +48,14 @@ def test_path_info_from_request_uri_fix(self):
assert == 'PATH_INFO: /foo%bar\nSCRIPT_NAME: /test'
def test_proxy_fix(self):
def app(request):
return Response('%s|%s' % (
# do not use as this fixes too :)
app = fixers.ProxyFix(app, num_proxies=2)
environ = dict(create_environ(),
@@ -16,6 +16,7 @@
from werkzeug.testsuite import WerkzeugTestCase
from werkzeug import wrappers
from werkzeug.exceptions import SecurityError
from werkzeug.datastructures import MultiDict, ImmutableOrderedMultiDict, \
ImmutableList, ImmutableTypeConversionDict, CharsetAccept, \
@@ -138,6 +139,30 @@ def test_url_request_descriptors(self):
req = wrappers.Request.from_values('/bar?foo=baz', '')
assert req.scheme == 'https'
def test_url_request_descriptors_hosts(self):
req = wrappers.Request.from_values('/bar?foo=baz', '')
req.trusted_hosts = ['']
assert req.path == u'/bar'
assert req.full_path == u'/bar?foo=baz'
assert req.script_root == u'/test'
assert req.url == ''
assert req.base_url == ''
assert req.url_root == ''
assert req.host_url == ''
assert == ''
assert req.scheme == 'http'
req = wrappers.Request.from_values('/bar?foo=baz', '')
assert req.scheme == 'https'
req = wrappers.Request.from_values('/bar?foo=baz', '')
req.trusted_hosts = ['']
self.assert_raises(SecurityError, lambda: req.url)
self.assert_raises(SecurityError, lambda: req.base_url)
self.assert_raises(SecurityError, lambda: req.url_root)
self.assert_raises(SecurityError, lambda: req.host_url)
self.assert_raises(SecurityError, lambda:
def test_authorization_mixin(self):
request = wrappers.Request.from_values(headers={
'Authorization': 'Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ=='
@@ -59,6 +59,13 @@ def test_get_host(self):
assert wsgi.get_host(create_environ('/', '')) \
== ''
def test_get_host_validation(self):
'SERVER_NAME': 'bullshit', 'HOST_NAME': 'ignore me dammit'}
self.assert_equal(wsgi.get_host(env, trusted_hosts=['']),
self.assert_raises(BadRequest, wsgi.get_host, env, trusted_hosts=[''])
def test_responder(self):
def foo(environ, start_response):
return BaseResponse('Test')
@@ -171,6 +171,14 @@ class Request(BaseRequest, ETagRequestMixin):
#: the form date parsing.
form_data_parser_class = FormDataParser
#: Optionally a list of hosts that is trusted by this request. By default
#: all hosts are trusted which means that whatever the client sends the
#: host is will be accepted. This is the recommended setup as a webserver
#: should manually be set up to not route invalid hosts to the application.
#: .. versionadded:: 0.9
trusted_hosts = None
def __init__(self, environ, populate_request=True, shallow=False):
self.environ = environ
if populate_request and not shallow:
@@ -447,27 +455,31 @@ def script_root(self):
def url(self):
"""The reconstructed current URL"""
return get_current_url(self.environ)
return get_current_url(self.environ,
def base_url(self):
"""Like :attr:`url` but without the querystring"""
return get_current_url(self.environ, strip_querystring=True)
return get_current_url(self.environ, strip_querystring=True,
def url_root(self):
"""The full URL root (with hostname), this is the application root."""
return get_current_url(self.environ, True)
return get_current_url(self.environ, True,
def host_url(self):
"""Just the host with scheme."""
return get_current_url(self.environ, host_only=True)
return get_current_url(self.environ, host_only=True,
def host(self):
"""Just the host including the port if available."""
return get_host(self.environ)
return get_host(self.environ, trusted_hosts=self.trusted_hosts)
query_string = environ_property('QUERY_STRING', '', read_only=True, doc=
'''The URL parameters as raw bytestring.''')
@@ -38,7 +38,7 @@ def application(environ, start_response):
def get_current_url(environ, root_only=False, strip_querystring=False,
host_only=False, trusted_hosts=None):
"""A handy helper function that recreates the full URL for the current
request or parts of it. Here an example:
@@ -53,12 +53,18 @@ def get_current_url(environ, root_only=False, strip_querystring=False,
>>> get_current_url(env, strip_querystring=True)
This optionally it verifies that the host is in a list of trusted hosts.
If the host is not in there it will raise a
:param environ: the WSGI environment to get the current URL from.
:param root_only: set `True` if you only want the root URL.
:param strip_querystring: set to `True` if you don't want the querystring.
:param host_only: set to `True` if the host URL should be returned.
:param trusted_hosts: a list of trusted hosts, see :func:`host_is_trusted`
for more information.
tmp = [environ['wsgi.url_scheme'], '://', get_host(environ)]
tmp = [environ['wsgi.url_scheme'], '://', get_host(environ, trusted_hosts)]
cat = tmp.append
if host_only:
return ''.join(tmp) + '/'
@@ -82,21 +88,69 @@ def get_current_url(environ, root_only=False, strip_querystring=False,
return ''.join(tmp)
def get_host(environ):
def host_is_trusted(hostname, trusted_list):
"""Checks if a host is trusted against a list. This also takes care
of port normalization.
.. versionadded:: 0.9
:param hostname: the hostname to check
:param trusted_list: a list of hostnames to check against. If a
hostname starts with a dot it will match against
all subdomains as well.
if not hostname:
return False
if isinstance(trusted_list, basestring):
trusted_list = [trusted_list]
def _normalize(hostname):
if ':' in hostname:
hostname = hostname.rsplit(':', 1)[0]
if isinstance(hostname, unicode):
hostname = hostname.encode('idna')
return hostname
hostname = _normalize(hostname)
for ref in trusted_list:
if ref.startswith('.'):
ref = ref[1:]
suffix_match = True
suffix_match = False
ref = _normalize(ref)
if ref == hostname:
return True
if suffix_match and hostname.endswith('.' + ref):
return True
return False
def get_host(environ, trusted_hosts=None):
"""Return the real host for the given WSGI environment. This takes care
of the `X-Forwarded-Host` header.
of the `X-Forwarded-Host` header. Optionally it verifies that the host
is in a list of trusted hosts. If the host is not in there it will raise
a :exc:`~werkzeug.exceptions.SecurityError`.
:param environ: the WSGI environment to get the host of.
:param trusted_hosts: a list of trusted hosts, see :func:`host_is_trusted`
for more information.
if 'HTTP_X_FORWARDED_HOST' in environ:
return environ['HTTP_X_FORWARDED_HOST']
rv = environ['HTTP_X_FORWARDED_HOST']
elif 'HTTP_HOST' in environ:
return environ['HTTP_HOST']
result = environ['SERVER_NAME']
if (environ['wsgi.url_scheme'], environ['SERVER_PORT']) not \
in (('https', '443'), ('http', '80')):
result += ':' + environ['SERVER_PORT']
return result
rv = environ['HTTP_HOST']
rv = environ['SERVER_NAME']
if (environ['wsgi.url_scheme'], environ['SERVER_PORT']) not \
in (('https', '443'), ('http', '80')):
rv += ':' + environ['SERVER_PORT']
if trusted_hosts is not None:
if not host_is_trusted(rv, trusted_hosts):
from werkzeug.exceptions import SecurityError
raise SecurityError('Host "%s" is not trusted' % rv)
return rv
def pop_path_info(environ):

0 comments on commit cdf6802

Please sign in to comment.