Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions Lib/importlib/_bootstrap_external.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,8 @@ def _write_atomic(path, data, mode=0o666):
try:
# We first write data to a temporary file, and then use os.replace() to
# perform an atomic rename.
with _io.FileIO(fd, 'wb') as file:
bytes_written = file.write(data)
if bytes_written != len(data):
# Raise an OSError so the 'except' below cleans up the partially
# written file.
raise OSError("os.write() didn't write the full pyc file")
with _io.FileIO(fd, 'wb') as file, _io.BufferedWriter(file) as writer:
writer.write(data)
_os.replace(path_tmp, path)
except OSError:
try:
Expand Down
71 changes: 55 additions & 16 deletions Lib/test/test_importlib/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -788,31 +788,70 @@ def test_complete_multi_phase_init_module(self):
self.run_with_own_gil(script)


class MiscTests(unittest.TestCase):
def test_atomic_write_should_notice_incomplete_writes(self):
class PatchAtomicWrites():
def __init__(self, truncate_at_length=100, never_complete=False):
self.truncate_at_length = truncate_at_length
self.never_complete = never_complete
self.seen_write = False
self._children = []

def __enter__(self):
import _pyio

oldwrite = os.write
seen_write = False

truncate_at_length = 100

# Emulate an os.write that only writes partial data.
def write(fd, data):
nonlocal seen_write
seen_write = True
return oldwrite(fd, data[:truncate_at_length])
if self.seen_write and self.never_complete:
return None
self.seen_write = True
return oldwrite(fd, data[:self.truncate_at_length])

# Need to patch _io to be _pyio, so that io.FileIO is affected by the
# os.write patch.
with (support.swap_attr(_bootstrap_external, '_io', _pyio),
support.swap_attr(os, 'write', write)):
with self.assertRaises(OSError):
# Make sure we write something longer than the point where we
# truncate.
content = b'x' * (truncate_at_length * 2)
_bootstrap_external._write_atomic(os_helper.TESTFN, content)
assert seen_write
self.children = [
support.swap_attr(_bootstrap_external, '_io', _pyio),
support.swap_attr(os, 'write', write)
]
for child in self.children:
child.__enter__()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
for child in self.children:
child.__exit__(exc_type, exc_val, exc_tb)


class MiscTests(unittest.TestCase):

def test_atomic_write_retries_incomplete_writes(self):
truncate_at_length = 100
length = truncate_at_length * 2

with PatchAtomicWrites(truncate_at_length=truncate_at_length) as cm:
# Make sure we write something longer than the point where we
# truncate.
content = b'x' * length
_bootstrap_external._write_atomic(os_helper.TESTFN, content)
assert cm.seen_write

assert os.stat(support.os_helper.TESTFN).st_size == length
os.unlink(support.os_helper.TESTFN)

def test_atomic_write_errors_if_unable_to_complete(self):
truncate_at_length = 100

with (
PatchAtomicWrites(
truncate_at_length=truncate_at_length, never_complete=True,
) as cm,
self.assertRaises(OSError)
):
# Make sure we write something longer than the point where we
# truncate.
content = b'x' * (truncate_at_length * 2)
_bootstrap_external._write_atomic(os_helper.TESTFN, content)
assert cm.seen_write

with self.assertRaises(OSError):
os.stat(support.os_helper.TESTFN) # Check that the file did not get written.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Use ``BufferedWriter`` to ensure that writes to ``.pyc`` files are retried, or an appropriate error is raised.
Loading