Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bpo-42967: Don't treat semicolon as a separator in urllib.parse #24271

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 23 additions & 11 deletions Lib/cgi.py
Expand Up @@ -115,7 +115,8 @@ def closelog():
# 0 ==> unlimited input
maxlen = 0

def parse(fp=None, environ=os.environ, keep_blank_values=0, strict_parsing=0):
def parse(fp=None, environ=os.environ, keep_blank_values=0, strict_parsing=0,
separators=('&', ';')):
"""Parse a query in the environment or from a file (default stdin)

Arguments, all optional:
Expand All @@ -134,6 +135,8 @@ def parse(fp=None, environ=os.environ, keep_blank_values=0, strict_parsing=0):
strict_parsing: flag indicating what to do with parsing errors.
If false (the default), errors are silently ignored.
If true, errors raise a ValueError exception.

separators: valid separators in the query string, e.g. ('&',)
"""
if fp is None:
fp = sys.stdin
Expand All @@ -154,7 +157,7 @@ def parse(fp=None, environ=os.environ, keep_blank_values=0, strict_parsing=0):
if environ['REQUEST_METHOD'] == 'POST':
ctype, pdict = parse_header(environ['CONTENT_TYPE'])
if ctype == 'multipart/form-data':
return parse_multipart(fp, pdict)
return parse_multipart(fp, pdict, separators=separators)
elif ctype == 'application/x-www-form-urlencoded':
clength = int(environ['CONTENT_LENGTH'])
if maxlen and clength > maxlen:
Expand All @@ -178,10 +181,11 @@ def parse(fp=None, environ=os.environ, keep_blank_values=0, strict_parsing=0):
qs = ""
environ['QUERY_STRING'] = qs # XXX Shouldn't, really
return urllib.parse.parse_qs(qs, keep_blank_values, strict_parsing,
encoding=encoding)
encoding=encoding, separators=separators)


def parse_multipart(fp, pdict, encoding="utf-8", errors="replace"):
def parse_multipart(fp, pdict, encoding="utf-8", errors="replace",
separators=('&', ';')):
"""Parse multipart input.

Arguments:
Expand All @@ -205,7 +209,7 @@ def parse_multipart(fp, pdict, encoding="utf-8", errors="replace"):
except KeyError:
pass
fs = FieldStorage(fp, headers=headers, encoding=encoding, errors=errors,
environ={'REQUEST_METHOD': 'POST'})
environ={'REQUEST_METHOD': 'POST'}, separators=separators)
return {k: fs.getlist(k) for k in fs}

def _parseparam(s):
Expand Down Expand Up @@ -315,7 +319,7 @@ class FieldStorage:
def __init__(self, fp=None, headers=None, outerboundary=b'',
environ=os.environ, keep_blank_values=0, strict_parsing=0,
limit=None, encoding='utf-8', errors='replace',
max_num_fields=None):
max_num_fields=None, separators=('&', ';')):
"""Constructor. Read multipart/* until last part.

Arguments, all optional:
Expand Down Expand Up @@ -358,11 +362,15 @@ def __init__(self, fp=None, headers=None, outerboundary=b'',
max_num_fields: int. If set, then __init__ throws a ValueError
if there are more than n fields read by parse_qsl().

separators: valid separators in the query string, e.g. ('&',).
Passed directly to urllib.parse.parse_qsl internally.

"""
method = 'GET'
self.keep_blank_values = keep_blank_values
self.strict_parsing = strict_parsing
self.max_num_fields = max_num_fields
self.separators = separators
if 'REQUEST_METHOD' in environ:
method = environ['REQUEST_METHOD'].upper()
self.qs_on_post = None
Expand Down Expand Up @@ -472,7 +480,7 @@ def __init__(self, fp=None, headers=None, outerboundary=b'',
if ctype == 'application/x-www-form-urlencoded':
self.read_urlencoded()
elif ctype[:10] == 'multipart/':
self.read_multi(environ, keep_blank_values, strict_parsing)
self.read_multi(environ, keep_blank_values, strict_parsing, separators)
else:
self.read_single()

Expand Down Expand Up @@ -589,13 +597,15 @@ def read_urlencoded(self):
query = urllib.parse.parse_qsl(
qs, self.keep_blank_values, self.strict_parsing,
encoding=self.encoding, errors=self.errors,
max_num_fields=self.max_num_fields)
max_num_fields=self.max_num_fields,
separators=self.separators)
self.list = [MiniFieldStorage(key, value) for key, value in query]
self.skip_lines()

FieldStorageClass = None

def read_multi(self, environ, keep_blank_values, strict_parsing):
def read_multi(self, environ, keep_blank_values, strict_parsing,
separators=('&', ';')):
"""Internal: read a part that is itself multipart."""
ib = self.innerboundary
if not valid_boundary(ib):
Expand All @@ -605,7 +615,8 @@ def read_multi(self, environ, keep_blank_values, strict_parsing):
query = urllib.parse.parse_qsl(
self.qs_on_post, self.keep_blank_values, self.strict_parsing,
encoding=self.encoding, errors=self.errors,
max_num_fields=self.max_num_fields)
max_num_fields=self.max_num_fields,
separators=self.separators)
self.list.extend(MiniFieldStorage(key, value) for key, value in query)

klass = self.FieldStorageClass or self.__class__
Expand Down Expand Up @@ -649,7 +660,8 @@ def read_multi(self, environ, keep_blank_values, strict_parsing):
else self.limit - self.bytes_read
part = klass(self.fp, headers, ib, environ, keep_blank_values,
strict_parsing, limit,
self.encoding, self.errors, max_num_fields)
self.encoding, self.errors, max_num_fields,
separators)

if max_num_fields is not None:
max_num_fields -= 1
Expand Down
131 changes: 104 additions & 27 deletions Lib/test/test_urlparse.py
Expand Up @@ -11,7 +11,33 @@
# Each parse_qsl testcase is a two-tuple that contains
# a string with the query and a list with the expected result.

parse_qsl_test_cases = [
parse_qsl_test_cases_allow_semicolon = [
(";", []),
(";;", []),
(";a=b", [('a', 'b')]),
("a=a+b;b=b+c", [('a', 'a b'), ('b', 'b c')]),
("a=1;a=2", [('a', '1'), ('a', '2')]),
(b";", []),
(b";;", []),
(b";a=b", [(b'a', b'b')]),
(b"a=a+b;b=b+c", [(b'a', b'a b'), (b'b', b'b c')]),
(b"a=1;a=2", [(b'a', b'1'), (b'a', b'2')]),
]

parse_qsl_test_cases_deny_semicolon = [
(";", [(';', '')]),
(";;", [(';;', '')]),
(";a=b", [(';a', 'b')]),
("a=a+b;b=b+c", [('a', 'a b;b=b c')]),
("a=1;a=2", [('a', '1;a=2')]),
(b";", [(b';', b'')]),
(b";;", [(b';;', b'')]),
(b";a=b", [(b';a', b'b')]),
(b"a=a+b;b=b+c", [(b'a', b'a b;b=b c')]),
(b"a=1;a=2", [(b'a', b'1;a=2')]),
]

parse_qsl_test_cases_ampersand = [
("", []),
("&", []),
("&&", []),
Expand All @@ -32,22 +58,43 @@
(b"&a=b", [(b'a', b'b')]),
(b"a=a+b&b=b+c", [(b'a', b'a b'), (b'b', b'b c')]),
(b"a=1&a=2", [(b'a', b'1'), (b'a', b'2')]),
(";", []),
(";;", []),
(";a=b", [('a', 'b')]),
("a=a+b;b=b+c", [('a', 'a b'), ('b', 'b c')]),
("a=1;a=2", [('a', '1'), ('a', '2')]),
(b";", []),
(b";;", []),
(b";a=b", [(b'a', b'b')]),
(b"a=a+b;b=b+c", [(b'a', b'a b'), (b'b', b'b c')]),
(b"a=1;a=2", [(b'a', b'1'), (b'a', b'2')]),
]

parse_qsl_test_cases = (parse_qsl_test_cases_ampersand
+ parse_qsl_test_cases_allow_semicolon)

parse_qsl_test_cases_semicolon = (parse_qsl_test_cases_ampersand
+ parse_qsl_test_cases_deny_semicolon)

# Each parse_qs testcase is a two-tuple that contains
# a string with the query and a dictionary with the expected result.
parse_qs_test_cases_allow_semicolon = [
(";", {}),
(";;", {}),
(";a=b", {'a': ['b']}),
("a=a+b;b=b+c", {'a': ['a b'], 'b': ['b c']}),
("a=1;a=2", {'a': ['1', '2']}),
(b";", {}),
(b";;", {}),
(b";a=b", {b'a': [b'b']}),
(b"a=a+b;b=b+c", {b'a': [b'a b'], b'b': [b'b c']}),
(b"a=1;a=2", {b'a': [b'1', b'2']}),
]

parse_qs_test_cases = [
parse_qs_test_cases_deny_semicolon = [
(";", {';': ['']}),
(";;", {';;': ['']}),
(";a=b", {';a': ['b']}),
("a=a+b;b=b+c", {'a': ['a b;b=b c']}),
("a=1;a=2", {'a': ['1;a=2']}),
(b";", {b';': [b'']}),
(b";;", {b';;': [b'']}),
(b";a=b", {b';a': [b'b']}),
(b"a=a+b;b=b+c", {b'a': [b'a b;b=b c']}),
(b"a=1;a=2", {b'a': [b'1;a=2']}),
]

parse_qs_test_cases_ampersand = [
("", {}),
("&", {}),
("&&", {}),
Expand All @@ -68,18 +115,15 @@
(b"&a=b", {b'a': [b'b']}),
(b"a=a+b&b=b+c", {b'a': [b'a b'], b'b': [b'b c']}),
(b"a=1&a=2", {b'a': [b'1', b'2']}),
(";", {}),
(";;", {}),
(";a=b", {'a': ['b']}),
("a=a+b;b=b+c", {'a': ['a b'], 'b': ['b c']}),
("a=1;a=2", {'a': ['1', '2']}),
(b";", {}),
(b";;", {}),
(b";a=b", {b'a': [b'b']}),
(b"a=a+b;b=b+c", {b'a': [b'a b'], b'b': [b'b c']}),
(b"a=1;a=2", {b'a': [b'1', b'2']}),
]

parse_qs_test_cases = (parse_qs_test_cases_ampersand
+ parse_qs_test_cases_allow_semicolon)

parse_qs_test_cases_semicolon = (parse_qs_test_cases_ampersand
+ parse_qs_test_cases_deny_semicolon)


class UrlParseTestCase(unittest.TestCase):

def checkRoundtrips(self, url, parsed, split):
Expand Down Expand Up @@ -135,23 +179,55 @@ def checkRoundtrips(self, url, parsed, split):

def test_qsl(self):
for orig, expect in parse_qsl_test_cases:
result = urllib.parse.parse_qsl(orig, keep_blank_values=True)
result = urllib.parse.parse_qsl(orig, keep_blank_values=True,
separators=('&', ';'))
self.assertEqual(result, expect, "Error parsing %r" % orig)
expect_without_blanks = [v for v in expect if len(v[1])]
result = urllib.parse.parse_qsl(orig, keep_blank_values=False)
result = urllib.parse.parse_qsl(orig, keep_blank_values=False,
separators=('&', ';'))
self.assertEqual(result, expect_without_blanks,
"Error parsing %r" % orig)

def test_qs(self):
for orig, expect in parse_qs_test_cases:
result = urllib.parse.parse_qs(orig, keep_blank_values=True)
result = urllib.parse.parse_qs(orig, keep_blank_values=True,
separators=('&', ';'))
self.assertEqual(result, expect, "Error parsing %r" % orig)
expect_without_blanks = {v: expect[v]
for v in expect if len(expect[v][0])}
result = urllib.parse.parse_qs(orig, keep_blank_values=False)
result = urllib.parse.parse_qs(orig, keep_blank_values=False,
separators=('&', ';'))
self.assertEqual(result, expect_without_blanks,
"Error parsing %r" % orig)

def test_qsl_no_semicolon(self):
# See bpo-42967 for more information.
for orig, expect in parse_qsl_test_cases_semicolon:
with self.subTest(f"Original: {orig!r}, Expected: {expect!r}"):
result = urllib.parse.parse_qsl(orig, keep_blank_values=True,
separators=('&',))
self.assertEqual(result, expect,
"Error parsing %r" % orig)
expect_without_blanks = [v for v in expect if len(v[1])]
result = urllib.parse.parse_qsl(orig, keep_blank_values=False,
separators=('&',))
self.assertEqual(result, expect_without_blanks,
"Error parsing %r" % orig)

def test_qs_no_semicolon(self):
# See bpo-42967 for more information.
for orig, expect in parse_qs_test_cases_semicolon:
with self.subTest(f"Original: {orig!r}, Expected: {expect!r}"):
result = urllib.parse.parse_qs(orig, keep_blank_values=True,
separators=('&',))
self.assertEqual(result, expect, "Error parsing %r" % orig)
expect_without_blanks = {v: expect[v]
for v in expect if len(expect[v][0])}
result = urllib.parse.parse_qs(orig, keep_blank_values=False,
separators=('&',))
self.assertEqual(result, expect_without_blanks,
"Error parsing %r" % orig)

def test_roundtrips(self):
str_cases = [
('file:///tmp/junk.txt',
Expand Down Expand Up @@ -887,7 +963,8 @@ def test_parse_qsl_max_num_fields(self):
with self.assertRaises(ValueError):
urllib.parse.parse_qs('&'.join(['a=a']*11), max_num_fields=10)
with self.assertRaises(ValueError):
urllib.parse.parse_qs(';'.join(['a=a']*11), max_num_fields=10)
urllib.parse.parse_qs(';'.join(['a=a']*11), max_num_fields=10,
separators=('&', ';'))
urllib.parse.parse_qs('&'.join(['a=a']*10), max_num_fields=10)

def test_urlencode_sequences(self):
Expand Down
22 changes: 17 additions & 5 deletions Lib/urllib/parse.py
Expand Up @@ -662,7 +662,8 @@ def unquote(string, encoding='utf-8', errors='replace'):


def parse_qs(qs, keep_blank_values=False, strict_parsing=False,
encoding='utf-8', errors='replace', max_num_fields=None):
encoding='utf-8', errors='replace', max_num_fields=None,
separators=('&', ';')):
"""Parse a query given as a string argument.

Arguments:
Expand All @@ -686,12 +687,15 @@ def parse_qs(qs, keep_blank_values=False, strict_parsing=False,
max_num_fields: int. If set, then throws a ValueError if there
are more than n fields read by parse_qsl().

separators: valid separators in the query string, e.g. ('&',)

Returns a dictionary.
"""
parsed_result = {}
pairs = parse_qsl(qs, keep_blank_values, strict_parsing,
encoding=encoding, errors=errors,
max_num_fields=max_num_fields)
max_num_fields=max_num_fields,
separators=separators)
for name, value in pairs:
if name in parsed_result:
parsed_result[name].append(value)
Expand All @@ -701,7 +705,8 @@ def parse_qs(qs, keep_blank_values=False, strict_parsing=False,


def parse_qsl(qs, keep_blank_values=False, strict_parsing=False,
encoding='utf-8', errors='replace', max_num_fields=None):
encoding='utf-8', errors='replace', max_num_fields=None,
separators=('&', ';')):
"""Parse a query given as a string argument.

Arguments:
Expand All @@ -724,6 +729,8 @@ def parse_qsl(qs, keep_blank_values=False, strict_parsing=False,
max_num_fields: int. If set, then throws a ValueError
if there are more than n fields read by parse_qsl().

separators: valid separators in the query string, e.g. ('&',)

Returns a list, as G-d intended.
"""
qs, _coerce_result = _coerce_args(qs)
Expand All @@ -732,11 +739,16 @@ def parse_qsl(qs, keep_blank_values=False, strict_parsing=False,
# is less than max_num_fields. This prevents a memory exhaustion DOS
# attack via post bodies with many fields.
if max_num_fields is not None:
num_fields = 1 + qs.count('&') + qs.count(';')
num_fields = 1 + sum(qs.count(sep) for sep in separators)
if max_num_fields < num_fields:
raise ValueError('Max number of fields exceeded')

pairs = [s2 for s1 in qs.split('&') for s2 in s1.split(';')]
if not separators:
raise ValueError('Separators must be a tuple or list of strings')
for sep in separators[1:]:
qs = qs.replace(sep, separators[0])
pairs = qs.split(separators[0])

r = []
for name_value in pairs:
if not name_value and not strict_parsing:
Expand Down
1 change: 1 addition & 0 deletions Misc/ACKS
Expand Up @@ -613,6 +613,7 @@ Karan Goel
Jeroen Van Goey
Christoph Gohlke
Tim Golden
Adam Goldschmidt
Yonatan Goldschmidt
Mark Gollahon
Mikhail Golubev
Expand Down