-
-
Notifications
You must be signed in to change notification settings - Fork 4.4k
/
test_io.py
178 lines (143 loc) · 5.51 KB
/
test_io.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# encoding: utf-8
"""Tests for io.py"""
#-----------------------------------------------------------------------------
# Copyright (C) 2008-2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
from __future__ import print_function
from __future__ import absolute_import
import io as stdlib_io
import os.path
import stat
import sys
from subprocess import Popen, PIPE
import unittest
import nose.tools as nt
from IPython.testing.decorators import skipif
from IPython.utils.io import (Tee, capture_output, unicode_std_stream,
atomic_writing,
)
from IPython.utils.py3compat import doctest_refactor_print, PY3
from IPython.utils.tempdir import TemporaryDirectory
if PY3:
from io import StringIO
else:
from StringIO import StringIO
#-----------------------------------------------------------------------------
# Tests
#-----------------------------------------------------------------------------
def test_tee_simple():
"Very simple check with stdout only"
chan = StringIO()
text = 'Hello'
tee = Tee(chan, channel='stdout')
print(text, file=chan)
nt.assert_equal(chan.getvalue(), text+"\n")
class TeeTestCase(unittest.TestCase):
def tchan(self, channel, check='close'):
trap = StringIO()
chan = StringIO()
text = 'Hello'
std_ori = getattr(sys, channel)
setattr(sys, channel, trap)
tee = Tee(chan, channel=channel)
print(text, end='', file=chan)
setattr(sys, channel, std_ori)
trap_val = trap.getvalue()
nt.assert_equal(chan.getvalue(), text)
if check=='close':
tee.close()
else:
del tee
def test(self):
for chan in ['stdout', 'stderr']:
for check in ['close', 'del']:
self.tchan(chan, check)
def test_io_init():
"""Test that io.stdin/out/err exist at startup"""
for name in ('stdin', 'stdout', 'stderr'):
cmd = doctest_refactor_print("from IPython.utils import io;print io.%s.__class__"%name)
p = Popen([sys.executable, '-c', cmd],
stdout=PIPE)
p.wait()
classname = p.stdout.read().strip().decode('ascii')
# __class__ is a reference to the class object in Python 3, so we can't
# just test for string equality.
assert 'IPython.utils.io.IOStream' in classname, classname
def test_capture_output():
"""capture_output() context works"""
with capture_output() as io:
print('hi, stdout')
print('hi, stderr', file=sys.stderr)
nt.assert_equal(io.stdout, 'hi, stdout\n')
nt.assert_equal(io.stderr, 'hi, stderr\n')
def test_UnicodeStdStream():
# Test wrapping a bytes-level stdout
if PY3:
stdoutb = stdlib_io.BytesIO()
stdout = stdlib_io.TextIOWrapper(stdoutb, encoding='ascii')
else:
stdout = stdoutb = stdlib_io.BytesIO()
orig_stdout = sys.stdout
sys.stdout = stdout
try:
sample = u"@łe¶ŧ←"
unicode_std_stream().write(sample)
output = stdoutb.getvalue().decode('utf-8')
nt.assert_equal(output, sample)
assert not stdout.closed
finally:
sys.stdout = orig_stdout
@skipif(not PY3, "Not applicable on Python 2")
def test_UnicodeStdStream_nowrap():
# If we replace stdout with a StringIO, it shouldn't get wrapped.
orig_stdout = sys.stdout
sys.stdout = StringIO()
try:
nt.assert_is(unicode_std_stream(), sys.stdout)
assert not sys.stdout.closed
finally:
sys.stdout = orig_stdout
def test_atomic_writing():
class CustomExc(Exception): pass
with TemporaryDirectory() as td:
f1 = os.path.join(td, 'penguin')
with stdlib_io.open(f1, 'w') as f:
f.write(u'Before')
if os.name != 'nt':
os.chmod(f1, 0o701)
orig_mode = stat.S_IMODE(os.stat(f1).st_mode)
f2 = os.path.join(td, 'flamingo')
try:
os.symlink(f1, f2)
have_symlink = True
except (AttributeError, NotImplementedError, OSError):
# AttributeError: Python doesn't support it
# NotImplementedError: The system doesn't support it
# OSError: The user lacks the privilege (Windows)
have_symlink = False
with nt.assert_raises(CustomExc):
with atomic_writing(f1) as f:
f.write(u'Failing write')
raise CustomExc
# Because of the exception, the file should not have been modified
with stdlib_io.open(f1, 'r') as f:
nt.assert_equal(f.read(), u'Before')
with atomic_writing(f1) as f:
f.write(u'Overwritten')
with stdlib_io.open(f1, 'r') as f:
nt.assert_equal(f.read(), u'Overwritten')
if os.name != 'nt':
mode = stat.S_IMODE(os.stat(f1).st_mode)
nt.assert_equal(mode, orig_mode)
if have_symlink:
# Check that writing over a file preserves a symlink
with atomic_writing(f2) as f:
f.write(u'written from symlink')
with stdlib_io.open(f1, 'r') as f:
nt.assert_equal(f.read(), u'written from symlink')