Skip to content

Commit

Permalink
bpo-34866: Adding max_num_fields to cgi.FieldStorage (GH-9660)
Browse files Browse the repository at this point in the history
Adding `max_num_fields` to `cgi.FieldStorage` to make DOS attacks harder by
limiting the number of `MiniFieldStorage` objects created by `FieldStorage`.
(cherry picked from commit 2091448)

Co-authored-by: matthewbelisle-wf <matthew.belisle@workiva.com>
  • Loading branch information
miss-islington and matthewbelisle-wf committed Oct 19, 2018
1 parent d85c272 commit 322a914
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 12 deletions.
34 changes: 25 additions & 9 deletions Lib/cgi.py
Expand Up @@ -404,7 +404,8 @@ class FieldStorage:
"""
def __init__(self, fp=None, headers=None, outerboundary=b'',
environ=os.environ, keep_blank_values=0, strict_parsing=0,
limit=None, encoding='utf-8', errors='replace'):
limit=None, encoding='utf-8', errors='replace',
max_num_fields=None):
"""Constructor. Read multipart/* until last part.
Arguments, all optional:
Expand Down Expand Up @@ -444,10 +445,14 @@ def __init__(self, fp=None, headers=None, outerboundary=b'',
for the page sending the form (content-type : meta http-equiv or
header)
max_num_fields: int. If set, then __init__ throws a ValueError
if there are more than n fields read by parse_qsl().
"""
method = 'GET'
self.keep_blank_values = keep_blank_values
self.strict_parsing = strict_parsing
self.max_num_fields = max_num_fields
if 'REQUEST_METHOD' in environ:
method = environ['REQUEST_METHOD'].upper()
self.qs_on_post = None
Expand Down Expand Up @@ -670,12 +675,11 @@ def read_urlencoded(self):
qs = qs.decode(self.encoding, self.errors)
if self.qs_on_post:
qs += '&' + self.qs_on_post
self.list = []
query = urllib.parse.parse_qsl(
qs, self.keep_blank_values, self.strict_parsing,
encoding=self.encoding, errors=self.errors)
for key, value in query:
self.list.append(MiniFieldStorage(key, value))
encoding=self.encoding, errors=self.errors,
max_num_fields=self.max_num_fields)
self.list = [MiniFieldStorage(key, value) for key, value in query]
self.skip_lines()

FieldStorageClass = None
Expand All @@ -689,9 +693,9 @@ def read_multi(self, environ, keep_blank_values, strict_parsing):
if self.qs_on_post:
query = urllib.parse.parse_qsl(
self.qs_on_post, self.keep_blank_values, self.strict_parsing,
encoding=self.encoding, errors=self.errors)
for key, value in query:
self.list.append(MiniFieldStorage(key, value))
encoding=self.encoding, errors=self.errors,
max_num_fields=self.max_num_fields)
self.list.extend(MiniFieldStorage(key, value) for key, value in query)

klass = self.FieldStorageClass or self.__class__
first_line = self.fp.readline() # bytes
Expand Down Expand Up @@ -725,11 +729,23 @@ def read_multi(self, environ, keep_blank_values, strict_parsing):
if 'content-length' in headers:
del headers['content-length']

# Propagate max_num_fields into the sub class appropriately
sub_max_num_fields = self.max_num_fields
if sub_max_num_fields is not None:
sub_max_num_fields -= len(self.list)

part = klass(self.fp, headers, ib, environ, keep_blank_values,
strict_parsing,self.limit-self.bytes_read,
self.encoding, self.errors)
self.encoding, self.errors, sub_max_num_fields)

max_num_fields = self.max_num_fields
if max_num_fields is not None and part.list:
max_num_fields -= len(part.list)

self.bytes_read += part.bytes_read
self.list.append(part)
if max_num_fields is not None and max_num_fields < len(self.list):
raise ValueError('Max number of fields exceeded')
if part.done or self.bytes_read >= self.length > 0:
break
self.skip_lines()
Expand Down
49 changes: 49 additions & 0 deletions Lib/test/test_cgi.py
Expand Up @@ -373,6 +373,55 @@ def testQSAndUrlEncode(self):
v = gen_result(data, environ)
self.assertEqual(self._qs_result, v)

def test_max_num_fields(self):
# For application/x-www-form-urlencoded
data = '&'.join(['a=a']*11)
environ = {
'CONTENT_LENGTH': str(len(data)),
'CONTENT_TYPE': 'application/x-www-form-urlencoded',
'REQUEST_METHOD': 'POST',
}

with self.assertRaises(ValueError):
cgi.FieldStorage(
fp=BytesIO(data.encode()),
environ=environ,
max_num_fields=10,
)

# For multipart/form-data
data = """---123
Content-Disposition: form-data; name="a"
a
---123
Content-Type: application/x-www-form-urlencoded
a=a&a=a
---123--
"""
environ = {
'CONTENT_LENGTH': str(len(data)),
'CONTENT_TYPE': 'multipart/form-data; boundary=-123',
'QUERY_STRING': 'a=a&a=a',
'REQUEST_METHOD': 'POST',
}

# 2 GET entities
# 2 top level POST entities
# 2 entities within the second POST entity
with self.assertRaises(ValueError):
cgi.FieldStorage(
fp=BytesIO(data.encode()),
environ=environ,
max_num_fields=5,
)
cgi.FieldStorage(
fp=BytesIO(data.encode()),
environ=environ,
max_num_fields=6,
)

def testQSAndFormData(self):
data = """---123
Content-Disposition: form-data; name="key2"
Expand Down
7 changes: 7 additions & 0 deletions Lib/test/test_urlparse.py
Expand Up @@ -879,6 +879,13 @@ def test_parse_qsl_encoding(self):
errors="ignore")
self.assertEqual(result, [('key', '\u0141-')])

def test_parse_qsl_max_num_fields(self):
with self.assertRaises(ValueError):
urllib.parse.parse_qs('&'.join(['a=a']*11), max_num_fields=10)
with self.assertRaises(ValueError):
urllib.parse.parse_qs(';'.join(['a=a']*11), max_num_fields=10)
urllib.parse.parse_qs('&'.join(['a=a']*10), max_num_fields=10)

def test_urlencode_sequences(self):
# Other tests incidentally urlencode things; test non-covered cases:
# Sequence and object values.
Expand Down
22 changes: 19 additions & 3 deletions Lib/urllib/parse.py
Expand Up @@ -624,7 +624,7 @@ def unquote(string, encoding='utf-8', errors='replace'):


def parse_qs(qs, keep_blank_values=False, strict_parsing=False,
encoding='utf-8', errors='replace'):
encoding='utf-8', errors='replace', max_num_fields=None):
"""Parse a query given as a string argument.
Arguments:
Expand All @@ -645,11 +645,15 @@ def parse_qs(qs, keep_blank_values=False, strict_parsing=False,
encoding and errors: specify how to decode percent-encoded sequences
into Unicode characters, as accepted by the bytes.decode() method.
max_num_fields: int. If set, then throws a ValueError if there
are more than n fields read by parse_qsl().
Returns a dictionary.
"""
parsed_result = {}
pairs = parse_qsl(qs, keep_blank_values, strict_parsing,
encoding=encoding, errors=errors)
encoding=encoding, errors=errors,
max_num_fields=max_num_fields)
for name, value in pairs:
if name in parsed_result:
parsed_result[name].append(value)
Expand All @@ -659,7 +663,7 @@ def parse_qs(qs, keep_blank_values=False, strict_parsing=False,


def parse_qsl(qs, keep_blank_values=False, strict_parsing=False,
encoding='utf-8', errors='replace'):
encoding='utf-8', errors='replace', max_num_fields=None):
"""Parse a query given as a string argument.
Arguments:
Expand All @@ -679,9 +683,21 @@ def parse_qsl(qs, keep_blank_values=False, strict_parsing=False,
encoding and errors: specify how to decode percent-encoded sequences
into Unicode characters, as accepted by the bytes.decode() method.
max_num_fields: int. If set, then throws a ValueError
if there are more than n fields read by parse_qsl().
Returns a list, as G-d intended.
"""
qs, _coerce_result = _coerce_args(qs)

# If max_num_fields is defined then check that the number of fields
# is less than max_num_fields. This prevents a memory exhaustion DOS
# attack via post bodies with many fields.
if max_num_fields is not None:
num_fields = 1 + qs.count('&') + qs.count(';')
if max_num_fields < num_fields:
raise ValueError('Max number of fields exceeded')

pairs = [s2 for s1 in qs.split('&') for s2 in s1.split(';')]
r = []
for name_value in pairs:
Expand Down
@@ -0,0 +1,2 @@
Adding ``max_num_fields`` to ``cgi.FieldStorage`` to make DOS attacks harder by
limiting the number of ``MiniFieldStorage`` objects created by ``FieldStorage``.

0 comments on commit 322a914

Please sign in to comment.