|
|
@@ -2,14 +2,13 @@ |
|
|
|
|
|
from mutagen._util import DictMixin, cdata, insert_bytes, delete_bytes, \ |
|
|
decode_terminated, dict_match, enum, get_size, BitReader, BitReaderError, \ |
|
|
resize_bytes, seek_end, mmap_move, verify_fileobj, fileobj_name, \ |
|
|
read_full, flags, resize_file, fallback_move, encode_endian, loadfile, \ |
|
|
resize_bytes, seek_end, verify_fileobj, fileobj_name, \ |
|
|
read_full, flags, resize_file, move_bytes, encode_endian, loadfile, \ |
|
|
intround, verify_filename |
|
|
from tests import TestCase, get_temp_empty |
|
|
import os |
|
|
import random |
|
|
import tempfile |
|
|
import mmap |
|
|
import errno |
|
|
import builtins |
|
|
from io import BytesIO |
|
|
@@ -332,9 +331,9 @@ def raise_no_space(*args): |
|
|
self.assertEqual(h.tell(), 3) |
|
|
|
|
|
|
|
|
class TMoveMixin(object): |
|
|
class TMoveMixin(TestCase): |
|
|
|
|
|
MOVE = None |
|
|
MOVE = staticmethod(move_bytes) |
|
|
|
|
|
def file(self, contents): |
|
|
temp = tempfile.TemporaryFile() |
|
|
@@ -373,32 +372,6 @@ def test_ok(self): |
|
|
self.MOVE(o, 0, 1, 2) |
|
|
self.MOVE(o, 1, 0, 2) |
|
|
|
|
|
def test_larger_than_page_size(self): |
|
|
off = mmap.ALLOCATIONGRANULARITY |
|
|
with self.file(b"f" * off * 2) as o: |
|
|
self.MOVE(o, off, off + 1, off - 1) |
|
|
self.MOVE(o, off + 1, off, off - 1) |
|
|
|
|
|
with self.file(b"f" * off * 2 + b"x") as o: |
|
|
self.MOVE(o, off * 2 - 1, off * 2, 1) |
|
|
self.assertEqual(self.read(o)[-3:], b"fxx") |
|
|
|
|
|
|
|
|
class Tfallback_move(TestCase, TMoveMixin): |
|
|
|
|
|
MOVE = staticmethod(fallback_move) |
|
|
|
|
|
|
|
|
class MmapMove(TestCase, TMoveMixin): |
|
|
|
|
|
MOVE = staticmethod(mmap_move) |
|
|
|
|
|
def test_stringio(self): |
|
|
self.assertRaises(mmap.error, mmap_move, BytesIO(), 0, 0, 0) |
|
|
|
|
|
def test_no_fileno(self): |
|
|
self.assertRaises(mmap.error, mmap_move, object(), 0, 0, 0) |
|
|
|
|
|
|
|
|
class FileHandling(TestCase): |
|
|
def file(self, contents): |
|
|
@@ -519,8 +492,7 @@ def test_delete_negative(self): |
|
|
|
|
|
def test_insert_6106_79_51760(self): |
|
|
# This appears to be due to ANSI C limitations in read/write on rb+ |
|
|
# files. The problematic behavior only showed up in our mmap fallback |
|
|
# code for transfers of this or similar sizes. |
|
|
# files. |
|
|
data = u''.join(map(str, range(12574))) # 51760 bytes |
|
|
data = data.encode("ascii") |
|
|
with self.file(data) as o: |
|
|
@@ -529,8 +501,7 @@ def test_insert_6106_79_51760(self): |
|
|
|
|
|
def test_delete_6106_79_51760(self): |
|
|
# This appears to be due to ANSI C limitations in read/write on rb+ |
|
|
# files. The problematic behavior only showed up in our mmap fallback |
|
|
# code for transfers of this or similar sizes. |
|
|
# files. |
|
|
data = u''.join(map(str, range(12574))) # 51760 bytes |
|
|
data = data.encode("ascii") |
|
|
with self.file(data[:6106 + 79] + data[79:]) as o: |
|
|
@@ -585,41 +556,6 @@ def test_many_changes(self, num_runs=5, num_changes=300, |
|
|
self.failUnless(fobj.read() == data) |
|
|
|
|
|
|
|
|
class FileHandlingMockedMMap(FileHandling): |
|
|
|
|
|
def setUp(self): |
|
|
def MockMMap2(*args, **kwargs): |
|
|
raise mmap.error |
|
|
|
|
|
self._orig_mmap = mmap.mmap |
|
|
mmap.mmap = MockMMap2 |
|
|
|
|
|
def tearDown(self): |
|
|
mmap.mmap = self._orig_mmap |
|
|
|
|
|
|
|
|
class FileHandlingNoMMap(FileHandling): |
|
|
"""Disables mmap and makes sure it raises if it still gets used somehow""" |
|
|
|
|
|
def setUp(self): |
|
|
from mutagen import _util |
|
|
|
|
|
def MockMMap2(*args, **kwargs): |
|
|
assert False |
|
|
|
|
|
self._orig_mmap = mmap.mmap |
|
|
mmap.mmap = MockMMap2 |
|
|
|
|
|
_util.mmap = None |
|
|
|
|
|
def tearDown(self): |
|
|
from mutagen import _util |
|
|
import mmap |
|
|
|
|
|
_util.mmap = mmap |
|
|
mmap.mmap = self._orig_mmap |
|
|
|
|
|
|
|
|
class Tdict_match(TestCase): |
|
|
|
|
|
def test_match(self): |
|
|
|