Skip to content

Commit

Permalink
Merge branch '4.x' into OFS_avoid_direct_id_access#903
Browse files Browse the repository at this point in the history
  • Loading branch information
dataflake committed Oct 8, 2020
2 parents c7c356d + f874fe0 commit 0b57748
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 3 deletions.
3 changes: 3 additions & 0 deletions CHANGES.rst
Expand Up @@ -13,6 +13,9 @@ https://zope.readthedocs.io/en/2.13/CHANGES.html
- Replace (in ``OFS``) the deprecated direct ``id`` access by
``getId`` calls.

- HTTP header encoding support
(`#905 <https://github.com/zopefoundation/Zope/pull/905>`_)

- Provide a more senseful ``OFS.SimpleItem.Item_w__name__.id``
to avoid bugs by use of deprecated direct ``id`` access
(as e.g. (`#903 <https://github.com/zopefoundation/Zope/issues/903>`_).
Expand Down
142 changes: 139 additions & 3 deletions src/ZPublisher/HTTPResponse.py
Expand Up @@ -18,13 +18,17 @@
import sys
import time
import zlib
from email.header import Header
from email.message import _parseparam
from email.utils import encode_rfc2231
from io import BytesIO

from six import PY2
from six import PY3
from six import binary_type
from six import class_types
from six import reraise
from six import string_types
from six import text_type
from six.moves.urllib.parse import quote
from six.moves.urllib.parse import urlparse
Expand Down Expand Up @@ -109,7 +113,9 @@


def _scrubHeader(name, value):
return ''.join(_CRLF.split(str(name))), ''.join(_CRLF.split(str(value)))
if not isinstance(value, string_types):
value = str(value)
return ''.join(_CRLF.split(str(name))), ''.join(_CRLF.split(value))


_NOW = None # overwrite for testing
Expand Down Expand Up @@ -709,20 +715,24 @@ def listHeaders(self):
""" Return a list of (key, value) pairs for our headers.
o Do appropriate case normalization.
o Encode header values via `header_encoding_registry`
"""

result = [
('X-Powered-By', 'Zope (www.zope.org), Python (www.python.org)')
]

encode = header_encoding_registry.encode
for key, value in self.headers.items():
if key.lower() == key:
# only change non-literal header names
key = '-'.join([x.capitalize() for x in key.split('-')])
result.append((key, value))
result.append((key, encode(key, value)))

result.extend(self._cookie_list())
result.extend(self.accumulated_headers)
for key, value in self.accumulated_headers:
result.append((key, encode(key, value)))
return result

def _unauthorized(self):
Expand Down Expand Up @@ -1111,3 +1121,129 @@ def __bytes__(self):

def __str__(self):
raise NotImplementedError


# HTTP header encoding
class HeaderEncodingRegistry(dict):
"""Encode HTTP headers.
HTTP/1.1 uses `ISO-8859-1` as charset for its headers
(the modern spec (RFC 7230-7235) has deprecated non ASCII characters
but for the sake of older browsers we still use `ISO-8859-1`).
Header values need encoding if they contain characters
not expressible in this charset.
HTTP/1.1 is based on MIME
("Multimedia Internet Mail Extensions" RFC 2045-2049).
MIME knows about 2 header encodings:
- one for parameter values (RFC 2231)
- and one word words as part of text, phrase or comment (RFC 2047)
For use with HTTP/1.1 MIME's parameter value encoding (RFC 2231)
was specialized and simplified via RFC 5987 and RFC 8187.
For efficiency reasons and because HTTP is an extensible
protocol (an application can use headers not specified
by HTTP), we use an encoding registry to guide the header encoding.
An application can register an encoding for specific keys and/or
a default encoding to be used for keys without specific registration.
If there is neither a specific encoding nor a default encoding,
a header value remains unencoded.
Header values are encoded only if they contain non `ISO-8859-1` characters.
"""

def register(self, header, encoder, **kw):
"""register *encoder* as encoder for header *header*.
If *encoder* is `None`, this indicates that *header* should not
get encoded.
If *header* is `None`, this indicates that *encoder* is defined
as the default encoder.
When encoding is necessary, *encoder* is called with
the header value and the keywords specified by *kw*.
"""
if header is not None:
header = header.lower()
self[header] = encoder, kw

def unregister(self, header):
"""remove any registration for *header*.
*header* can be either a header name or `None`.
In the latter case, a default registration is removed.
"""
if header is not None:
header = header.lower()
if header in self:
del self[header]

def encode(self, header, value):
"""encode *value* as specified for *header*.
encoding takes only place if *value* contains non ISO-8859-1 chars.
"""
if not isinstance(value, text_type):
return value
header = header.lower()
reg = self.get(header) or self.get(None)
if reg is None or reg[0] is None or non_latin_1(value) is None:
return value
return reg[0](value, **reg[1])


non_latin_1 = re.compile(r"[^\x00-\xff]").search


def encode_words(value):
"""RFC 2047 word encoding.
Note: treats *value* as unstructured data
and therefore must not be applied for headers with
a structured value (unless the structure is garanteed
to only contain ISO-8859-1 chars).
"""
return Header(value, 'utf-8', 1000000).encode()


def encode_params(value):
"""RFC 5987(8187) (specialized from RFC 2231) parameter encoding.
This encodes parameters as specified by RFC 5987 using
fixed `UTF-8` encoding (as required by RFC 8187).
However, all parameters with non latin-1 values are
automatically transformed and a `*` suffixed parameter is added
(RFC 8187 allows this only for parameters explicitly specified
to have this behavior).
Many HTTP headers use `,` separated lists. For simplicity,
such headers are not supported (we would need to recognize
`,` inside quoted strings as special).
"""
params = []
for p in _parseparam(";" + value):
p = p.strip()
if not p:
continue
params.append([s.strip() for s in p.split("=", 1)])
known_params = set(p[0] for p in params)
for p in params[:]:
if len(p) == 2 and non_latin_1(p[1]): # need encoding
pn = p[0]
pnc = pn + "*"
pv = p[1]
if pnc not in known_params:
if pv.startswith('"'):
pv = pv[1:-1] # remove quotes
if PY2:
# we know `pv` is unicode
pv = pv.encode("utf-8")
params.append((pnc, encode_rfc2231(pv, "utf-8", None)))
# backward compatibility for clients not understanding RFC 5987
p[1] = p[1].encode("iso-8859-1", "replace").decode("iso-8859-1")
return "; ".join("=".join(p) for p in params)


header_encoding_registry = HeaderEncodingRegistry()
header_encoding_registry.register("content-type", encode_params)
header_encoding_registry.register("content-disposition", encode_params)
76 changes: 76 additions & 0 deletions src/ZPublisher/tests/testHTTPResponse.py
Expand Up @@ -11,6 +11,10 @@
from zExceptions import ResourceLockedError
from zExceptions import Unauthorized

from ..HTTPResponse import encode_params
from ..HTTPResponse import encode_words
from ..HTTPResponse import header_encoding_registry


class HTTPResponseTests(unittest.TestCase):

Expand Down Expand Up @@ -1377,3 +1381,75 @@ def test_exception_500_text(self):
def test_isHTML_not_decodable_bytes(self):
response = self._makeOne()
self.assertFalse(response.isHTML(u'bïñårÿ'.encode('latin1')))

def test_header_encoding(self):
r = self._makeOne()
r.setHeader("unencoded1", u"€")
r.setHeader("content-disposition", u"a; p=€")
r.addHeader("unencoded2", u"€")
r.addHeader("content-disposition", u"a2; p2=€")
hdrs = r.listHeaders()[1:] # drop `X-Powered...`
shdrs, ahdrs = dict(hdrs[:2]), dict(hdrs[2:])
# for some reasons, `set` headers change their name
# while `add` headers do not
self.assertEqual(shdrs["Unencoded1"], u"€")
self.assertEqual(ahdrs["unencoded2"], u"€")
self.assertEqual(shdrs["Content-Disposition"],
u"a; p=?; p*=utf-8''%E2%82%AC")
self.assertEqual(ahdrs["content-disposition"],
u"a2; p2=?; p2*=utf-8''%E2%82%AC")


class TestHeaderEncodingRegistry(unittest.TestCase):
def setUp(self):
self._copy = header_encoding_registry.copy()

def tearDown(self):
header_encoding_registry.clear()
header_encoding_registry.update(self._copy)

def test_default_registrations(self):
self.assertIn('content-type', header_encoding_registry)
self.assertEqual(header_encoding_registry["content-disposition"],
(encode_params, {}))

def test_encode(self):
def encode(value, param):
return param
header_encoding_registry.register("my-header", encode, param=1)
# non-ISO-8859-1 encoded
self.assertEqual(header_encoding_registry.encode("my-header", u"€"),
1)
# ISO-8859-1 not encoded
self.assertEqual(header_encoding_registry.encode("my-header", u"ä"),
u"ä")
# unregistered not encoded
self.assertEqual(header_encoding_registry.encode("my-header2", u"€"),
u"€")
# test header name not case sensitive
self.assertEqual(header_encoding_registry.encode("My-Header", u"€"),
1)
# default
header_encoding_registry.register(None, encode, param=2)
self.assertEqual(header_encoding_registry.encode("my-header2", u"€"),
2)
self.assertEqual(header_encoding_registry.encode("my-header", u"€"),
1)

def test_encode_words(self):
self.assertEqual(encode_words(u"ä"), "=?utf-8?b?w6Q=?=")

def test_encode_params(self):
self.assertEqual(encode_params(u'abc; p1=1; p2="2"; p3="€"; p4=€; '
u'p5="€"; p5*=5'),
u'abc; p1=1; p2="2"; p3="?"; p4=?; p5="?"; p5*=5; '
u'p3*=utf-8\'\'%E2%82%AC; p4*=utf-8\'\'%E2%82%AC')

def test_case_insensitivity(self):
header_encoding_registry.register("HdR", lambda value: 0)
# Note: case insensitivity not implemented for `dict` methods
self.assertIn("hdr", header_encoding_registry)
self.assertEqual(header_encoding_registry.encode("HDR", u"€"), 0)
header_encoding_registry.unregister("hDr")
header_encoding_registry.unregister("hDr") # no exception
self.assertNotIn("hdr", header_encoding_registry)
1 change: 1 addition & 0 deletions versions-prod.cfg
Expand Up @@ -4,6 +4,7 @@
[versions]
Zope =
Zope2 = 4.0
# AccessControl 5+ no longer supports Zope 4.
AccessControl = 4.2
Acquisition = 4.6
# AuthEncoding 5+ requires Python 3
Expand Down

0 comments on commit 0b57748

Please sign in to comment.