Skip to content

Commit

Permalink
gh-111942: Fix crashes in TextIOWrapper.reconfigure() (GH-111976)
Browse files Browse the repository at this point in the history
* Fix crash when encoding is not string or None.
* Fix crash when both line_buffering and write_through raise exception
  when converted ti int.
* Add a number of tests for constructor and reconfigure() method
  with invalid arguments.
  • Loading branch information
serhiy-storchaka committed Nov 14, 2023
1 parent a519b87 commit ee06fff
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 5 deletions.
86 changes: 84 additions & 2 deletions Lib/test/test_io.py
Expand Up @@ -77,6 +77,10 @@ def _default_chunk_size():
)


class BadIndex:
def __index__(self):
1/0

class MockRawIOWithoutRead:
"""A RawIO implementation without read(), so as to exercise the default
RawIO.read() which calls readinto()."""
Expand Down Expand Up @@ -2709,8 +2713,31 @@ def test_constructor(self):
self.assertEqual(t.encoding, "utf-8")
self.assertEqual(t.line_buffering, True)
self.assertEqual("\xe9\n", t.readline())
self.assertRaises(TypeError, t.__init__, b, encoding="utf-8", newline=42)
self.assertRaises(ValueError, t.__init__, b, encoding="utf-8", newline='xyzzy')
invalid_type = TypeError if self.is_C else ValueError
with self.assertRaises(invalid_type):
t.__init__(b, encoding=42)
with self.assertRaises(UnicodeEncodeError):
t.__init__(b, encoding='\udcfe')
with self.assertRaises(ValueError):
t.__init__(b, encoding='utf-8\0')
with self.assertRaises(invalid_type):
t.__init__(b, encoding="utf-8", errors=42)
if support.Py_DEBUG or sys.flags.dev_mode or self.is_C:
with self.assertRaises(UnicodeEncodeError):
t.__init__(b, encoding="utf-8", errors='\udcfe')
if support.Py_DEBUG or sys.flags.dev_mode:
# TODO: If encoded to UTF-8, should also be checked for
# embedded null characters.
with self.assertRaises(ValueError):
t.__init__(b, encoding="utf-8", errors='replace\0')
with self.assertRaises(TypeError):
t.__init__(b, encoding="utf-8", newline=42)
with self.assertRaises(ValueError):
t.__init__(b, encoding="utf-8", newline='\udcfe')
with self.assertRaises(ValueError):
t.__init__(b, encoding="utf-8", newline='\n\0')
with self.assertRaises(ValueError):
t.__init__(b, encoding="utf-8", newline='xyzzy')

def test_uninitialized(self):
t = self.TextIOWrapper.__new__(self.TextIOWrapper)
Expand Down Expand Up @@ -3756,6 +3783,59 @@ def test_reconfigure_defaults(self):

self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')

def test_reconfigure_errors(self):
txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\r')
with self.assertRaises(TypeError): # there was a crash
txt.reconfigure(encoding=42)
if self.is_C:
with self.assertRaises(UnicodeEncodeError):
txt.reconfigure(encoding='\udcfe')
with self.assertRaises(LookupError):
txt.reconfigure(encoding='locale\0')
# TODO: txt.reconfigure(encoding='utf-8\0')
# TODO: txt.reconfigure(encoding='nonexisting')
with self.assertRaises(TypeError):
txt.reconfigure(errors=42)
if self.is_C:
with self.assertRaises(UnicodeEncodeError):
txt.reconfigure(errors='\udcfe')
# TODO: txt.reconfigure(errors='ignore\0')
# TODO: txt.reconfigure(errors='nonexisting')
with self.assertRaises(TypeError):
txt.reconfigure(newline=42)
with self.assertRaises(ValueError):
txt.reconfigure(newline='\udcfe')
with self.assertRaises(ValueError):
txt.reconfigure(newline='xyz')
if not self.is_C:
# TODO: Should fail in C too.
with self.assertRaises(ValueError):
txt.reconfigure(newline='\n\0')
if self.is_C:
# TODO: Use __bool__(), not __index__().
with self.assertRaises(ZeroDivisionError):
txt.reconfigure(line_buffering=BadIndex())
with self.assertRaises(OverflowError):
txt.reconfigure(line_buffering=2**1000)
with self.assertRaises(ZeroDivisionError):
txt.reconfigure(write_through=BadIndex())
with self.assertRaises(OverflowError):
txt.reconfigure(write_through=2**1000)
with self.assertRaises(ZeroDivisionError): # there was a crash
txt.reconfigure(line_buffering=BadIndex(),
write_through=BadIndex())
self.assertEqual(txt.encoding, 'ascii')
self.assertEqual(txt.errors, 'replace')
self.assertIs(txt.line_buffering, False)
self.assertIs(txt.write_through, False)

txt.reconfigure(encoding='latin1', errors='ignore', newline='\r\n',
line_buffering=True, write_through=True)
self.assertEqual(txt.encoding, 'latin1')
self.assertEqual(txt.errors, 'ignore')
self.assertIs(txt.line_buffering, True)
self.assertIs(txt.write_through, True)

def test_reconfigure_newline(self):
raw = self.BytesIO(b'CR\rEOF')
txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
Expand Down Expand Up @@ -4781,9 +4861,11 @@ def load_tests(loader, tests, pattern):
if test.__name__.startswith("C"):
for name, obj in c_io_ns.items():
setattr(test, name, obj)
test.is_C = True
elif test.__name__.startswith("Py"):
for name, obj in py_io_ns.items():
setattr(test, name, obj)
test.is_C = False

suite = loader.suiteClass()
for test in tests:
Expand Down
@@ -0,0 +1,2 @@
Fix crashes in :meth:`io.TextIOWrapper.reconfigure` when pass invalid
arguments, e.g. non-string encoding.
39 changes: 36 additions & 3 deletions Modules/_io/textio.c
Expand Up @@ -1284,30 +1284,40 @@ textiowrapper_change_encoding(textio *self, PyObject *encoding,
errors = &_Py_ID(strict);
}
}
Py_INCREF(errors);

const char *c_encoding = PyUnicode_AsUTF8(encoding);
if (c_encoding == NULL) {
Py_DECREF(encoding);
Py_DECREF(errors);
return -1;
}
const char *c_errors = PyUnicode_AsUTF8(errors);
if (c_errors == NULL) {
Py_DECREF(encoding);
Py_DECREF(errors);
return -1;
}

// Create new encoder & decoder
PyObject *codec_info = _PyCodec_LookupTextEncoding(
PyUnicode_AsUTF8(encoding), "codecs.open()");
c_encoding, "codecs.open()");
if (codec_info == NULL) {
Py_DECREF(encoding);
Py_DECREF(errors);
return -1;
}
if (_textiowrapper_set_decoder(self, codec_info, c_errors) != 0 ||
_textiowrapper_set_encoder(self, codec_info, c_errors) != 0) {
Py_DECREF(codec_info);
Py_DECREF(encoding);
Py_DECREF(errors);
return -1;
}
Py_DECREF(codec_info);

Py_SETREF(self->encoding, encoding);
Py_SETREF(self->errors, Py_NewRef(errors));
Py_SETREF(self->errors, errors);

return _textiowrapper_fix_encoder_state(self);
}
Expand Down Expand Up @@ -1338,6 +1348,26 @@ _io_TextIOWrapper_reconfigure_impl(textio *self, PyObject *encoding,
int write_through;
const char *newline = NULL;

if (encoding != Py_None && !PyUnicode_Check(encoding)) {
PyErr_Format(PyExc_TypeError,
"reconfigure() argument 'encoding' must be str or None, not %s",
Py_TYPE(encoding)->tp_name);
return NULL;
}
if (errors != Py_None && !PyUnicode_Check(errors)) {
PyErr_Format(PyExc_TypeError,
"reconfigure() argument 'errors' must be str or None, not %s",
Py_TYPE(errors)->tp_name);
return NULL;
}
if (newline_obj != NULL && newline_obj != Py_None &&
!PyUnicode_Check(newline_obj))
{
PyErr_Format(PyExc_TypeError,
"reconfigure() argument 'newline' must be str or None, not %s",
Py_TYPE(newline_obj)->tp_name);
return NULL;
}
/* Check if something is in the read buffer */
if (self->decoded_chars != NULL) {
if (encoding != Py_None || errors != Py_None || newline_obj != NULL) {
Expand All @@ -1357,9 +1387,12 @@ _io_TextIOWrapper_reconfigure_impl(textio *self, PyObject *encoding,

line_buffering = convert_optional_bool(line_buffering_obj,
self->line_buffering);
if (line_buffering < 0) {
return NULL;
}
write_through = convert_optional_bool(write_through_obj,
self->write_through);
if (line_buffering < 0 || write_through < 0) {
if (write_through < 0) {
return NULL;
}

Expand Down

0 comments on commit ee06fff

Please sign in to comment.