Permalink
Browse files

bpo-15216: io: TextIOWrapper.reconfigure() accepts encoding, errors a…

…nd newline (GH-2343)
  • Loading branch information...
methane committed Dec 21, 2017
1 parent 31e9908 commit 507434fd504f3ebc1da72aa77544edc0d73f136e
View
@@ -904,7 +904,7 @@ Text I/O
locale encoding using :func:`locale.setlocale`, use the current locale
encoding instead of the user preferred encoding.
:class:`TextIOWrapper` provides one attribute in addition to those of
:class:`TextIOWrapper` provides these members in addition to those of
:class:`TextIOBase` and its parents:
.. attribute:: line_buffering
@@ -918,11 +918,19 @@ Text I/O
.. versionadded:: 3.7
.. method:: reconfigure(*, line_buffering=None, write_through=None)
.. method:: reconfigure(*[, encoding][, errors][, newline][, \
line_buffering][, write_through])
Reconfigure this text stream using new settings for *line_buffering*
and *write_through*. Passing ``None`` as an argument will retain
the current setting for that parameter.
Reconfigure this text stream using new settings for *encoding*,
*errors*, *newline*, *line_buffering* and *write_through*.
Parameters not specified keep current settings, except
``errors='strict`` is used when *encoding* is specified but
*errors* is not specified.
It is not possible to change the encoding or newline if some data
has already been read from the stream. On the other hand, changing
encoding after write is possible.
This method does an implicit stream flush before setting the
new parameters.
View
@@ -1938,10 +1938,7 @@ class TextIOWrapper(TextIOBase):
# so that the signature can match the signature of the C version.
def __init__(self, buffer, encoding=None, errors=None, newline=None,
line_buffering=False, write_through=False):
if newline is not None and not isinstance(newline, str):
raise TypeError("illegal newline type: %r" % (type(newline),))
if newline not in (None, "", "\n", "\r", "\r\n"):
raise ValueError("illegal newline value: %r" % (newline,))
self._check_newline(newline)
if encoding is None:
try:
encoding = os.device_encoding(buffer.fileno())
@@ -1971,22 +1968,38 @@ def __init__(self, buffer, encoding=None, errors=None, newline=None,
raise ValueError("invalid errors: %r" % errors)
self._buffer = buffer
self._decoded_chars = '' # buffer for text returned from decoder
self._decoded_chars_used = 0 # offset into _decoded_chars for read()
self._snapshot = None # info for reconstructing decoder state
self._seekable = self._telling = self.buffer.seekable()
self._has_read1 = hasattr(self.buffer, 'read1')
self._configure(encoding, errors, newline,
line_buffering, write_through)
def _check_newline(self, newline):
if newline is not None and not isinstance(newline, str):
raise TypeError("illegal newline type: %r" % (type(newline),))
if newline not in (None, "", "\n", "\r", "\r\n"):
raise ValueError("illegal newline value: %r" % (newline,))
def _configure(self, encoding=None, errors=None, newline=None,
line_buffering=False, write_through=False):
self._encoding = encoding
self._errors = errors
self._encoder = None
self._decoder = None
self._b2cratio = 0.0
self._readuniversal = not newline
self._readtranslate = newline is None
self._readnl = newline
self._writetranslate = newline != ''
self._writenl = newline or os.linesep
self._encoder = None
self._decoder = None
self._decoded_chars = '' # buffer for text returned from decoder
self._decoded_chars_used = 0 # offset into _decoded_chars for read()
self._snapshot = None # info for reconstructing decoder state
self._seekable = self._telling = self.buffer.seekable()
self._has_read1 = hasattr(self.buffer, 'read1')
self._b2cratio = 0.0
self._line_buffering = line_buffering
self._write_through = write_through
# don't write a BOM in the middle of a file
if self._seekable and self.writable():
position = self.buffer.tell()
if position != 0:
@@ -1996,12 +2009,6 @@ def __init__(self, buffer, encoding=None, errors=None, newline=None,
# Sometimes the encoder doesn't exist
pass
self._configure(line_buffering, write_through)
def _configure(self, line_buffering=False, write_through=False):
self._line_buffering = line_buffering
self._write_through = write_through
# self._snapshot is either None, or a tuple (dec_flags, next_input)
# where dec_flags is the second (integer) item of the decoder state
# and next_input is the chunk of input bytes that comes next after the
@@ -2048,17 +2055,46 @@ def write_through(self):
def buffer(self):
return self._buffer
def reconfigure(self, *, line_buffering=None, write_through=None):
def reconfigure(self, *,
encoding=None, errors=None, newline=Ellipsis,
line_buffering=None, write_through=None):
"""Reconfigure the text stream with new parameters.
This also flushes the stream.
"""
if (self._decoder is not None
and (encoding is not None or errors is not None
or newline is not Ellipsis)):
raise UnsupportedOperation(
"It is not possible to set the encoding or newline of stream "
"after the first read")
if errors is None:
if encoding is None:
errors = self._errors
else:
errors = 'strict'
elif not isinstance(errors, str):
raise TypeError("invalid errors: %r" % errors)
if encoding is None:
encoding = self._encoding
else:
if not isinstance(encoding, str):
raise TypeError("invalid encoding: %r" % encoding)
if newline is Ellipsis:
newline = self._readnl
self._check_newline(newline)
if line_buffering is None:
line_buffering = self.line_buffering
if write_through is None:
write_through = self.write_through
self.flush()
self._configure(line_buffering, write_through)
self._configure(encoding, errors, newline,
line_buffering, write_through)
def seekable(self):
if self.closed:
View
@@ -3408,6 +3408,123 @@ def seekable(self): return True
F.tell = lambda x: 0
t = self.TextIOWrapper(F(), encoding='utf-8')
def test_reconfigure_encoding_read(self):
# latin1 -> utf8
# (latin1 can decode utf-8 encoded string)
data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
raw = self.BytesIO(data)
txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
self.assertEqual(txt.readline(), 'abc\xe9\n')
with self.assertRaises(self.UnsupportedOperation):
txt.reconfigure(encoding='utf-8')
with self.assertRaises(self.UnsupportedOperation):
txt.reconfigure(newline=None)
def test_reconfigure_write_fromascii(self):
# ascii has a specific encodefunc in the C implementation,
# but utf-8-sig has not. Make sure that we get rid of the
# cached encodefunc when we switch encoders.
raw = self.BytesIO()
txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
txt.write('foo\n')
txt.reconfigure(encoding='utf-8-sig')
txt.write('\xe9\n')
txt.flush()
self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
def test_reconfigure_write(self):
# latin -> utf8
raw = self.BytesIO()
txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
txt.write('abc\xe9\n')
txt.reconfigure(encoding='utf-8')
self.assertEqual(raw.getvalue(), b'abc\xe9\n')
txt.write('d\xe9f\n')
txt.flush()
self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
# ascii -> utf-8-sig: ensure that no BOM is written in the middle of
# the file
raw = self.BytesIO()
txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
txt.write('abc\n')
txt.reconfigure(encoding='utf-8-sig')
txt.write('d\xe9f\n')
txt.flush()
self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
def test_reconfigure_write_non_seekable(self):
raw = self.BytesIO()
raw.seekable = lambda: False
raw.seek = None
txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
txt.write('abc\n')
txt.reconfigure(encoding='utf-8-sig')
txt.write('d\xe9f\n')
txt.flush()
# If the raw stream is not seekable, there'll be a BOM
self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
def test_reconfigure_defaults(self):
txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
txt.reconfigure(encoding=None)
self.assertEqual(txt.encoding, 'ascii')
self.assertEqual(txt.errors, 'replace')
txt.write('LF\n')
txt.reconfigure(newline='\r\n')
self.assertEqual(txt.encoding, 'ascii')
self.assertEqual(txt.errors, 'replace')
txt.reconfigure(errors='ignore')
self.assertEqual(txt.encoding, 'ascii')
self.assertEqual(txt.errors, 'ignore')
txt.write('CRLF\n')
txt.reconfigure(encoding='utf-8', newline=None)
self.assertEqual(txt.errors, 'strict')
txt.seek(0)
self.assertEqual(txt.read(), 'LF\nCRLF\n')
self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
def test_reconfigure_newline(self):
raw = self.BytesIO(b'CR\rEOF')
txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
txt.reconfigure(newline=None)
self.assertEqual(txt.readline(), 'CR\n')
raw = self.BytesIO(b'CR\rEOF')
txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
txt.reconfigure(newline='')
self.assertEqual(txt.readline(), 'CR\r')
raw = self.BytesIO(b'CR\rLF\nEOF')
txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
txt.reconfigure(newline='\n')
self.assertEqual(txt.readline(), 'CR\rLF\n')
raw = self.BytesIO(b'LF\nCR\rEOF')
txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
txt.reconfigure(newline='\r')
self.assertEqual(txt.readline(), 'LF\nCR\r')
raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
txt.reconfigure(newline='\r\n')
self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
txt.reconfigure(newline=None)
txt.write('linesep\n')
txt.reconfigure(newline='')
txt.write('LF\n')
txt.reconfigure(newline='\n')
txt.write('LF\n')
txt.reconfigure(newline='\r')
txt.write('CR\n')
txt.reconfigure(newline='\r\n')
txt.write('CRLF\n')
expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
class MemviewBytesIO(io.BytesIO):
'''A BytesIO object whose read method returns memoryviews
@@ -0,0 +1,2 @@
``TextIOWrapper.reconfigure()`` supports changing *encoding*, *errors*, and
*newline*.
@@ -149,7 +149,7 @@ PyDoc_STRVAR(_io_TextIOWrapper___init____doc__,
static int
_io_TextIOWrapper___init___impl(textio *self, PyObject *buffer,
const char *encoding, const char *errors,
const char *encoding, PyObject *errors,
const char *newline, int line_buffering,
int write_through);
@@ -158,10 +158,10 @@ _io_TextIOWrapper___init__(PyObject *self, PyObject *args, PyObject *kwargs)
{
int return_value = -1;
static const char * const _keywords[] = {"buffer", "encoding", "errors", "newline", "line_buffering", "write_through", NULL};
static _PyArg_Parser _parser = {"O|zzzii:TextIOWrapper", _keywords, 0};
static _PyArg_Parser _parser = {"O|zOzii:TextIOWrapper", _keywords, 0};
PyObject *buffer;
const char *encoding = NULL;
const char *errors = NULL;
PyObject *errors = Py_None;
const char *newline = NULL;
int line_buffering = 0;
int write_through = 0;
@@ -177,7 +177,8 @@ _io_TextIOWrapper___init__(PyObject *self, PyObject *args, PyObject *kwargs)
}
PyDoc_STRVAR(_io_TextIOWrapper_reconfigure__doc__,
"reconfigure($self, /, *, line_buffering=None, write_through=None)\n"
"reconfigure($self, /, *, encoding=None, errors=None, newline=None,\n"
" line_buffering=None, write_through=None)\n"
"--\n"
"\n"
"Reconfigure the text stream with new parameters.\n"
@@ -188,24 +189,28 @@ PyDoc_STRVAR(_io_TextIOWrapper_reconfigure__doc__,
{"reconfigure", (PyCFunction)_io_TextIOWrapper_reconfigure, METH_FASTCALL|METH_KEYWORDS, _io_TextIOWrapper_reconfigure__doc__},
static PyObject *
_io_TextIOWrapper_reconfigure_impl(textio *self,
_io_TextIOWrapper_reconfigure_impl(textio *self, PyObject *encoding,
PyObject *errors, PyObject *newline_obj,
PyObject *line_buffering_obj,
PyObject *write_through_obj);
static PyObject *
_io_TextIOWrapper_reconfigure(textio *self, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
{
PyObject *return_value = NULL;
static const char * const _keywords[] = {"line_buffering", "write_through", NULL};
static _PyArg_Parser _parser = {"|$OO:reconfigure", _keywords, 0};
static const char * const _keywords[] = {"encoding", "errors", "newline", "line_buffering", "write_through", NULL};
static _PyArg_Parser _parser = {"|$OOOOO:reconfigure", _keywords, 0};
PyObject *encoding = Py_None;
PyObject *errors = Py_None;
PyObject *newline_obj = NULL;
PyObject *line_buffering_obj = Py_None;
PyObject *write_through_obj = Py_None;
if (!_PyArg_ParseStackAndKeywords(args, nargs, kwnames, &_parser,
&line_buffering_obj, &write_through_obj)) {
&encoding, &errors, &newline_obj, &line_buffering_obj, &write_through_obj)) {
goto exit;
}
return_value = _io_TextIOWrapper_reconfigure_impl(self, line_buffering_obj, write_through_obj);
return_value = _io_TextIOWrapper_reconfigure_impl(self, encoding, errors, newline_obj, line_buffering_obj, write_through_obj);
exit:
return return_value;
@@ -499,4 +504,4 @@ _io_TextIOWrapper_close(textio *self, PyObject *Py_UNUSED(ignored))
{
return _io_TextIOWrapper_close_impl(self);
}
/*[clinic end generated code: output=679b3ac5284df4e0 input=a9049054013a1b77]*/
/*[clinic end generated code: output=b5be870b0039d577 input=a9049054013a1b77]*/
Oops, something went wrong.

0 comments on commit 507434f

Please sign in to comment.