Skip to content

Commit

Permalink
Issue #28217: Adds _testconsole module to test console input. Fixes s…
Browse files Browse the repository at this point in the history
…ome issues found by the tests.
  • Loading branch information
zooba committed Oct 3, 2016
1 parent 7fe091d commit 312cef7
Show file tree
Hide file tree
Showing 11 changed files with 460 additions and 21 deletions.
70 changes: 62 additions & 8 deletions Lib/test/test_winconsoleio.py
@@ -1,12 +1,4 @@
'''Tests for WindowsConsoleIO
Unfortunately, most testing requires interactive use, since we have no
API to read back from a real console, and this class is only for use
with real consoles.
Instead, we validate that basic functionality such as opening, closing
and in particular fileno() work, but are forced to leave real testing
to real people with real keyborads.
'''

import io
Expand All @@ -16,6 +8,8 @@
if sys.platform != 'win32':
raise unittest.SkipTest("test only relevant on win32")

from _testconsole import write_input

ConIO = io._WindowsConsoleIO

class WindowsConsoleIOTests(unittest.TestCase):
Expand Down Expand Up @@ -83,5 +77,65 @@ def test_open_name(self):
f.close()
f.close()

def assertStdinRoundTrip(self, text):
stdin = open('CONIN$', 'r')
old_stdin = sys.stdin
try:
sys.stdin = stdin
write_input(
stdin.buffer.raw,
(text + '\r\n').encode('utf-16-le', 'surrogatepass')
)
actual = input()
finally:
sys.stdin = old_stdin
self.assertEqual(actual, text)

def test_input(self):
# ASCII
self.assertStdinRoundTrip('abc123')
# Non-ASCII
self.assertStdinRoundTrip('ϼўТλФЙ')
# Combining characters
self.assertStdinRoundTrip('A͏B ﬖ̳AA̝')
# Non-BMP
self.assertStdinRoundTrip('\U00100000\U0010ffff\U0010fffd')

def test_partial_reads(self):
# Test that reading less than 1 full character works when stdin
# contains multibyte UTF-8 sequences
source = 'ϼўТλФЙ\r\n'.encode('utf-16-le')
expected = 'ϼўТλФЙ\r\n'.encode('utf-8')
for read_count in range(1, 16):
stdin = open('CONIN$', 'rb', buffering=0)
write_input(stdin, source)

actual = b''
while not actual.endswith(b'\n'):
b = stdin.read(read_count)
actual += b

self.assertEqual(actual, expected, 'stdin.read({})'.format(read_count))
stdin.close()

def test_partial_surrogate_reads(self):
# Test that reading less than 1 full character works when stdin
# contains surrogate pairs that cannot be decoded to UTF-8 without
# reading an extra character.
source = '\U00101FFF\U00101001\r\n'.encode('utf-16-le')
expected = '\U00101FFF\U00101001\r\n'.encode('utf-8')
for read_count in range(1, 16):
stdin = open('CONIN$', 'rb', buffering=0)
write_input(stdin, source)

actual = b''
while not actual.endswith(b'\n'):
b = stdin.read(read_count)
actual += b

self.assertEqual(actual, expected, 'stdin.read({})'.format(read_count))
stdin.close()


if __name__ == "__main__":
unittest.main()
4 changes: 4 additions & 0 deletions Misc/NEWS
Expand Up @@ -186,6 +186,10 @@ Build
- Issue #15819: Remove redundant include search directory option for building
outside the source tree.

Tests
-----

- Issue #28217: Adds _testconsole module to test console input.

What's New in Python 3.6.0 beta 1
=================================
Expand Down
3 changes: 2 additions & 1 deletion Modules/_io/_iomodule.h
Expand Up @@ -22,7 +22,8 @@ extern PyTypeObject PyIncrementalNewlineDecoder_Type;
#ifndef Py_LIMITED_API
#ifdef MS_WINDOWS
extern PyTypeObject PyWindowsConsoleIO_Type;
#define PyWindowsConsoleIO_Check(op) (PyObject_TypeCheck((op), &PyWindowsConsoleIO_Type))
PyAPI_DATA(PyObject *) _PyWindowsConsoleIO_Type;
#define PyWindowsConsoleIO_Check(op) (PyObject_TypeCheck((op), (PyTypeObject*)_PyWindowsConsoleIO_Type))
#endif /* MS_WINDOWS */
#endif /* Py_LIMITED_API */

Expand Down
54 changes: 43 additions & 11 deletions Modules/_io/winconsoleio.c
Expand Up @@ -39,6 +39,11 @@
/* BUFMAX determines how many bytes can be read in one go. */
#define BUFMAX (32*1024*1024)

/* SMALLBUF determines how many utf-8 characters will be
buffered within the stream, in order to support reads
of less than one character */
#define SMALLBUF 4

char _get_console_type(HANDLE handle) {
DWORD mode, peek_count;

Expand Down Expand Up @@ -125,7 +130,8 @@ typedef struct {
unsigned int blksize;
PyObject *weakreflist;
PyObject *dict;
char buf[4];
char buf[SMALLBUF];
wchar_t wbuf;
} winconsoleio;

PyTypeObject PyWindowsConsoleIO_Type;
Expand Down Expand Up @@ -500,11 +506,11 @@ _io__WindowsConsoleIO_writable_impl(winconsoleio *self)
static DWORD
_buflen(winconsoleio *self)
{
for (DWORD i = 0; i < 4; ++i) {
for (DWORD i = 0; i < SMALLBUF; ++i) {
if (!self->buf[i])
return i;
}
return 4;
return SMALLBUF;
}

static DWORD
Expand All @@ -513,12 +519,10 @@ _copyfrombuf(winconsoleio *self, char *buf, DWORD len)
DWORD n = 0;

while (self->buf[0] && len--) {
n += 1;
buf[0] = self->buf[0];
self->buf[0] = self->buf[1];
self->buf[1] = self->buf[2];
self->buf[2] = self->buf[3];
self->buf[3] = 0;
buf[n++] = self->buf[0];
for (int i = 1; i < SMALLBUF; ++i)
self->buf[i - 1] = self->buf[i];
self->buf[SMALLBUF - 1] = 0;
}

return n;
Expand All @@ -531,10 +535,13 @@ read_console_w(HANDLE handle, DWORD maxlen, DWORD *readlen) {
wchar_t *buf = (wchar_t*)PyMem_Malloc(maxlen * sizeof(wchar_t));
if (!buf)
goto error;

*readlen = 0;

//DebugBreak();
Py_BEGIN_ALLOW_THREADS
for (DWORD off = 0; off < maxlen; off += BUFSIZ) {
DWORD off = 0;
while (off < maxlen) {
DWORD n, len = min(maxlen - off, BUFSIZ);
SetLastError(0);
BOOL res = ReadConsoleW(handle, &buf[off], len, &n, NULL);
Expand All @@ -550,7 +557,7 @@ read_console_w(HANDLE handle, DWORD maxlen, DWORD *readlen) {
err = 0;
HANDLE hInterruptEvent = _PyOS_SigintEvent();
if (WaitForSingleObjectEx(hInterruptEvent, 100, FALSE)
== WAIT_OBJECT_0) {
== WAIT_OBJECT_0) {
ResetEvent(hInterruptEvent);
Py_BLOCK_THREADS
sig = PyErr_CheckSignals();
Expand All @@ -568,7 +575,30 @@ read_console_w(HANDLE handle, DWORD maxlen, DWORD *readlen) {
/* If the buffer ended with a newline, break out */
if (buf[*readlen - 1] == '\n')
break;
/* If the buffer ends with a high surrogate, expand the
buffer and read an extra character. */
WORD char_type;
if (off + BUFSIZ >= maxlen &&
GetStringTypeW(CT_CTYPE3, &buf[*readlen - 1], 1, &char_type) &&
char_type == C3_HIGHSURROGATE) {
wchar_t *newbuf;
maxlen += 1;
Py_BLOCK_THREADS
newbuf = (wchar_t*)PyMem_Realloc(buf, maxlen * sizeof(wchar_t));
Py_UNBLOCK_THREADS
if (!newbuf) {
sig = -1;
break;
}
buf = newbuf;
/* Only advance by n and not BUFSIZ in this case */
off += n;
continue;
}

off += BUFSIZ;
}

Py_END_ALLOW_THREADS

if (sig)
Expand Down Expand Up @@ -1110,4 +1140,6 @@ PyTypeObject PyWindowsConsoleIO_Type = {
0, /* tp_finalize */
};

PyAPI_DATA(PyObject *) _PyWindowsConsoleIO_Type = (PyObject*)&PyWindowsConsoleIO_Type;

#endif /* MS_WINDOWS */
131 changes: 131 additions & 0 deletions PC/_testconsole.c
@@ -0,0 +1,131 @@

/* Testing module for multi-phase initialization of extension modules (PEP 489)
*/

#include "Python.h"

#ifdef MS_WINDOWS

#include "..\modules\_io\_iomodule.h"

#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#include <fcntl.h>

/* The full definition is in iomodule. We reproduce
enough here to get the handle, which is all we want. */
typedef struct {
PyObject_HEAD
HANDLE handle;
} winconsoleio;


static int execfunc(PyObject *m)
{
return 0;
}

PyModuleDef_Slot testconsole_slots[] = {
{Py_mod_exec, execfunc},
{0, NULL},
};

/*[clinic input]
module _testconsole
_testconsole.write_input
file: object
s: PyBytesObject
Writes UTF-16-LE encoded bytes to the console as if typed by a user.
[clinic start generated code]*/

static PyObject *
_testconsole_write_input_impl(PyObject *module, PyObject *file,
PyBytesObject *s)
/*[clinic end generated code: output=48f9563db34aedb3 input=4c774f2d05770bc6]*/
{
INPUT_RECORD *rec = NULL;

if (!PyWindowsConsoleIO_Check(file)) {
PyErr_SetString(PyExc_TypeError, "expected raw console object");
return NULL;
}

const wchar_t *p = (const wchar_t *)PyBytes_AS_STRING(s);
DWORD size = (DWORD)PyBytes_GET_SIZE(s) / sizeof(wchar_t);

rec = (INPUT_RECORD*)PyMem_Malloc(sizeof(INPUT_RECORD) * size);
if (!rec)
goto error;
memset(rec, 0, sizeof(INPUT_RECORD) * size);

INPUT_RECORD *prec = rec;
for (DWORD i = 0; i < size; ++i, ++p, ++prec) {
prec->EventType = KEY_EVENT;
prec->Event.KeyEvent.bKeyDown = TRUE;
prec->Event.KeyEvent.wRepeatCount = 10;
prec->Event.KeyEvent.uChar.UnicodeChar = *p;
}

HANDLE hInput = ((winconsoleio*)file)->handle;
DWORD total = 0;
while (total < size) {
DWORD wrote;
if (!WriteConsoleInputW(hInput, &rec[total], (size - total), &wrote)) {
PyErr_SetFromWindowsErr(0);
goto error;
}
total += wrote;
}

PyMem_Free((void*)rec);

Py_RETURN_NONE;
error:
if (rec)
PyMem_Free((void*)rec);
return NULL;
}

/*[clinic input]
_testconsole.read_output
file: object
Reads a str from the console as written to stdout.
[clinic start generated code]*/

static PyObject *
_testconsole_read_output_impl(PyObject *module, PyObject *file)
/*[clinic end generated code: output=876310d81a73e6d2 input=b3521f64b1b558e3]*/
{
Py_RETURN_NONE;
}

#include "clinic\_testconsole.c.h"

PyMethodDef testconsole_methods[] = {
_TESTCONSOLE_WRITE_INPUT_METHODDEF
_TESTCONSOLE_READ_OUTPUT_METHODDEF
{NULL, NULL}
};

static PyModuleDef testconsole_def = {
PyModuleDef_HEAD_INIT, /* m_base */
"_testconsole", /* m_name */
PyDoc_STR("Test module for the Windows console"), /* m_doc */
0, /* m_size */
testconsole_methods, /* m_methods */
testconsole_slots, /* m_slots */
NULL, /* m_traverse */
NULL, /* m_clear */
NULL, /* m_free */
};

PyMODINIT_FUNC
PyInit__testconsole(PyObject *spec)
{
return PyModuleDef_Init(&testconsole_def);
}

#endif /* MS_WINDOWS */

0 comments on commit 312cef7

Please sign in to comment.