Permalink
Browse files

Removed tempfile from Patcher

- removed fake_tempfile
- removed tests only relevant for fake implementation
- changed remaining tests to work with real tempfile
- configure Patcher to work with tempfile, create temp dir in setUpPyfakefs()
- added hack to fix problem with cached os functions in Patcher (Posix only)
- added support for O_TEMPORARY flag to os.open (Windows only)
- added support for low level exclusive mode in Python 2
- added current pypy version to travis
- fixes #191
  • Loading branch information...
mrbean-bremen committed Jun 25, 2017
1 parent fd6fc05 commit 1adfccbd2ed64dc5abf10b43f8086c942b4341ba
View
@@ -22,9 +22,10 @@ python:
- "3.4"
- "3.5"
- "3.6"
- "nightly" # points to 3.7-dev
- "3.7-dev"
- "pypy"
- "pypy3"
- "pypy-5.3.1"
install:
- pip install -r requirements.txt
View
@@ -2571,6 +2571,14 @@ def testLowLevelOpenTruncate(self):
self.assertEqual(4, self.os.write(file_des, b'test'))
self.assertEqual(b'test', file_obj.byte_contents)
@unittest.skipIf(not TestCase.is_windows, 'O_TEMPORARY only present in Windows')
def testLowLevelTempFile(self):
file_name = 'foo'
fd = self.os.open(file_name, os.O_CREAT | os.O_RDWR | os.O_TEMPORARY)
self.assertTrue(self.filesystem.Exists(file_name))
self.os.close(fd)
self.assertFalse(self.filesystem.Exists(file_name))
def testLowLevelOpenAppend(self):
file_path = 'file1'
file_obj = self.filesystem.CreateFile(file_path, contents=b'contents',
@@ -5278,7 +5286,7 @@ def testAddExistingRealDirectoryReadOnly(self):
self.assertTrue(self.filesystem.Exists(os.path.join(real_dir_path, 'fake_filesystem.py')))
self.assertTrue(self.filesystem.Exists(os.path.join(real_dir_path, 'fake_pathlib.py')))
file_path = os.path.join(real_dir_path, 'fake_tempfile.py')
file_path = os.path.join(real_dir_path, 'fake_filesystem_shutil.py')
fake_file = self.filesystem.ResolveObject(file_path)
self.checkFakeFileStat(fake_file, file_path)
self.checkReadOnlyFile(fake_file, file_path)
@@ -23,7 +23,6 @@
import os
import glob
import shutil
import tempfile
import sys
from import_as_example import check_if_exists
@@ -115,20 +114,6 @@ def test_shutil(self):
shutil.rmtree('/test/dir1')
self.assertFalse(self.fs.Exists('/test/dir1'))
def test_tempfile(self):
"""Fake tempfile module is bound"""
with tempfile.NamedTemporaryFile() as tf:
tf.write(b'Temporary file contents\n')
name = tf.name
self.assertTrue(self.fs.Exists(tf.name))
@unittest.skipIf(sys.version_info < (3, 0), "TemporaryDirectory new in Python3")
def test_tempdirectory(self):
"""Fake TemporaryDirectory class is bound"""
with tempfile.TemporaryDirectory() as td:
with open('%s/fake_file.txt' % td, 'w') as f:
self.assertTrue(self.fs.Exists(td))
@unittest.skipIf(sys.version_info < (3, 4), "pathlib new in Python 3.4")
def test_fakepathlib(self):
with pathlib.Path('/fake_file.txt') as p:
@@ -174,7 +159,6 @@ def testAttributes(self):
self.assertEqual(module_with_attributes.path, 'path attribute value')
self.assertEqual(module_with_attributes.pathlib, 'pathlib attribute value')
self.assertEqual(module_with_attributes.shutil, 'shutil attribute value')
self.assertEqual(module_with_attributes.tempfile, 'tempfile attribute value')
self.assertEqual(module_with_attributes.io, 'io attribute value')
@@ -226,9 +210,9 @@ def testCopyRealFile(self):
real_file_path = __file__
fake_file = self.copyRealFile(real_file_path)
self.assertTrue('class TestCopyRealFile(TestPyfakefsUnittestBase)' in self.real_string_contents,
self.assertTrue('class TestCopyOrAddRealFile(TestPyfakefsUnittestBase)' in self.real_string_contents,
'Verify real file string contents')
self.assertTrue(b'class TestCopyRealFile(TestPyfakefsUnittestBase)' in self.real_byte_contents,
self.assertTrue(b'class TestCopyOrAddRealFile(TestPyfakefsUnittestBase)' in self.real_byte_contents,
'Verify real file byte contents')
# note that real_string_contents may differ to fake_file.contents due to newline conversions in open()
@@ -21,7 +21,6 @@
import os # @UnusedImport
import os.path
import shutil
import sys
import tempfile
import time
import sys
View
@@ -14,197 +14,89 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for the fake_tempfile module."""
# pylint: disable-all
"""Tests that ensure that the `tempfile` module works with `fake_filesystem`
if using `Patcher` (via `fake_filesystem_unittest`).
"""
import os
import stat
import sys
import tempfile
if sys.version_info < (2, 7):
import unittest2 as unittest
else:
import unittest
try:
import StringIO as io # pylint: disable-msg=C6204
except ImportError:
import io # pylint: disable-msg=C6204
from pyfakefs import fake_filesystem
from pyfakefs import fake_tempfile
class FakeLogging(object):
"""Fake logging object for testGettempprefix."""
def __init__(self, test_case):
self._message = None
self._test_case = test_case
# pylint: disable-msg=C6409
def error(self, message):
if self._message is not None:
self.FailOnMessage(message)
self._message = message
from pyfakefs import fake_filesystem_unittest
def FailOnMessage(self, message):
self._test_case.fail('Unexpected message received: %s' % message)
warn = FailOnMessage
info = FailOnMessage
debug = FailOnMessage
fatal = FailOnMessage
def message(self):
return self._message
class FakeTempfileModuleTest(unittest.TestCase):
"""Test the 'tempfile' module mock."""
class FakeTempfileModuleTest(fake_filesystem_unittest.TestCase):
"""Test the 'tempfile' module with the fake file system."""
def setUp(self):
self.filesystem = fake_filesystem.FakeFilesystem(path_separator='/')
self.os = fake_filesystem.FakeOsModule(self.filesystem)
self.tempfile = fake_tempfile.FakeTempfileModule(self.filesystem)
self.orig_logging = fake_tempfile.logging
self.fake_logging = FakeLogging(self)
fake_tempfile.logging = self.fake_logging
def tearDown(self):
fake_tempfile.logging = self.orig_logging
def testTempFilename(self):
# pylint: disable-msg=C6002
# TODO: test that tempdir is init'ed
filename_a = self.tempfile._TempFilename()
# expect /tmp/tmp######
self.assertTrue(filename_a.startswith('/tmp/tmp'))
self.assertLess(len('/tmp/tmpA'), len(filename_a))
# see that random part changes
filename_b = self.tempfile._TempFilename()
self.assertTrue(filename_b.startswith('/tmp/tmp'))
self.assertLess(len('/tmp/tmpB'), len(filename_a))
self.assertNotEqual(filename_a, filename_b)
def testTempFilenameSuffix(self):
"""test tempfile._TempFilename(suffix=)."""
filename = self.tempfile._TempFilename(suffix='.suffix')
self.assertTrue(filename.startswith('/tmp/tmp'))
self.assertTrue(filename.endswith('.suffix'))
self.assertLess(len('/tmp/tmpX.suffix'), len(filename))
def testTempFilenamePrefix(self):
"""test tempfile._TempFilename(prefix=)."""
filename = self.tempfile._TempFilename(prefix='prefix.')
self.assertTrue(filename.startswith('/tmp/prefix.'))
self.assertLess(len('/tmp/prefix.X'), len(filename))
def testTempFilenameDir(self):
"""test tempfile._TempFilename(dir=)."""
filename = self.tempfile._TempFilename(directory='/dir')
self.assertTrue(filename.startswith('/dir/tmp'))
self.assertLess(len('/dir/tmpX'), len(filename))
def testTemporaryFile(self):
obj = self.tempfile.TemporaryFile()
self.assertEqual('<fdopen>', obj.name)
self.assertTrue(isinstance(obj, io.StringIO))
self.setUpPyfakefs()
def testNamedTemporaryFile(self):
obj = self.tempfile.NamedTemporaryFile()
created_filenames = self.tempfile.FakeReturnedMktempValues()
self.assertEqual(created_filenames[0], obj.name)
self.assertTrue(self.filesystem.GetObject(obj.name))
obj = tempfile.NamedTemporaryFile()
self.assertTrue(self.fs.GetObject(obj.name))
obj.close()
self.assertRaises(IOError, self.filesystem.GetObject, created_filenames[0])
self.assertRaises(IOError, self.fs.GetObject, obj.name)
def testNamedTemporaryFileNoDelete(self):
obj = self.tempfile.NamedTemporaryFile(delete=False)
obj = tempfile.NamedTemporaryFile(delete=False)
obj.write(b'foo')
obj.close()
file_obj = self.filesystem.GetObject(obj.name)
file_obj = self.fs.GetObject(obj.name)
contents = file_obj.contents
self.assertEqual('foo', contents)
obj = self.tempfile.NamedTemporaryFile(mode='w', delete=False)
obj = tempfile.NamedTemporaryFile(mode='w', delete=False)
obj.write('foo')
obj.close()
file_obj = self.filesystem.GetObject(obj.name)
file_obj = self.fs.GetObject(obj.name)
self.assertEqual('foo', file_obj.contents)
def testMkstemp(self):
next_fd = len(self.filesystem.open_files)
temporary = self.tempfile.mkstemp()
next_fd = len(self.fs.open_files)
temporary = tempfile.mkstemp()
self.assertEqual(2, len(temporary))
self.assertTrue(temporary[1].startswith('/tmp/tmp'))
created_filenames = self.tempfile.FakeReturnedMktempValues()
self.assertTrue(temporary[1].startswith(os.path.join(tempfile.gettempdir(), 'tmp')))
self.assertEqual(next_fd, temporary[0])
self.assertEqual(temporary[1], created_filenames[0])
self.assertTrue(self.filesystem.Exists(temporary[1]))
self.assertEqual(self.filesystem.GetObject(temporary[1]).st_mode,
self.assertTrue(self.fs.Exists(temporary[1]))
self.assertEqual(self.fs.GetObject(temporary[1]).st_mode,
stat.S_IFREG | 0o600)
fh = self.os.fdopen(temporary[0], 'w+b')
fh = os.fdopen(temporary[0], 'w+b')
self.assertEqual(temporary[0], fh.fileno())
def testMkstempDir(self):
"""test tempfile.mkstemp(dir=)."""
# expect fail: /dir does not exist
self.assertRaises(OSError, self.tempfile.mkstemp, dir='/dir')
self.assertRaises(OSError, tempfile.mkstemp, dir='/dir')
# expect pass: /dir exists
self.filesystem.CreateDirectory('/dir')
next_fd = len(self.filesystem.open_files)
temporary = self.tempfile.mkstemp(dir='/dir')
self.fs.CreateDirectory('/dir')
next_fd = len(self.fs.open_files)
temporary = tempfile.mkstemp(dir='/dir')
self.assertEqual(2, len(temporary))
self.assertEqual(next_fd, temporary[0])
self.assertTrue(temporary[1].startswith('/dir/tmp'))
created_filenames = self.tempfile.FakeReturnedMktempValues()
self.assertEqual(temporary[1], created_filenames[0])
self.assertTrue(self.filesystem.Exists(temporary[1]))
self.assertEqual(self.filesystem.GetObject(temporary[1]).st_mode,
self.assertTrue(temporary[1].startswith(os.path.join(os.sep, 'dir', 'tmp')))
self.assertTrue(self.fs.Exists(temporary[1]))
self.assertEqual(self.fs.GetObject(temporary[1]).st_mode,
stat.S_IFREG | 0o600)
# pylint: disable-msg=C6002
# TODO: add a test that /dir is actually writable.
def testMkdtemp(self):
dirname = self.tempfile.mkdtemp()
dirname = tempfile.mkdtemp()
self.assertTrue(dirname)
created_filenames = self.tempfile.FakeReturnedMktempValues()
self.assertEqual(dirname, created_filenames[0])
self.assertTrue(self.filesystem.Exists(dirname))
self.assertEqual(self.filesystem.GetObject(dirname).st_mode,
self.assertTrue(self.fs.Exists(dirname))
self.assertEqual(self.fs.GetObject(dirname).st_mode,
stat.S_IFDIR | 0o700)
def testGettempdir(self):
self.assertEqual(None, self.tempfile.tempdir)
self.assertEqual('/tmp', self.tempfile.gettempdir())
self.assertEqual('/tmp', self.tempfile.tempdir)
def testGettempprefix(self):
"""test tempfile.gettempprefix() and the tempfile.template setter."""
self.assertEqual('tmp', self.tempfile.gettempprefix())
# set and verify
self.tempfile.template = 'strung'
self.assertEqual('strung', self.tempfile.gettempprefix())
self.assertEqual('tempfile.template= is a NOP in python2.4',
self.fake_logging.message())
def testMktemp(self):
self.assertRaises(NotImplementedError, self.tempfile.mktemp)
def testTemplateGet(self):
"""verify tempfile.template still unimplemented."""
self.assertRaises(NotImplementedError, getattr,
self.tempfile, 'template')
@unittest.skipIf(sys.version_info < (3, 0), "TemporaryDirectory showed up in 3")
def testTemporaryDirectory(self):
with self.tempfile.TemporaryDirectory() as tmpdir:
with tempfile.TemporaryDirectory() as tmpdir:
self.assertTrue(tmpdir)
created_filenames = self.tempfile.FakeReturnedMktempValues()
self.assertEqual(tmpdir, created_filenames[0])
self.assertTrue(self.filesystem.Exists(tmpdir))
self.assertEqual(self.filesystem.GetObject(tmpdir).st_mode,
self.assertTrue(self.fs.Exists(tmpdir))
self.assertEqual(self.fs.GetObject(tmpdir).st_mode,
stat.S_IFDIR | 0o700)
@@ -27,5 +27,4 @@
path = 'path attribute value'
pathlib = 'pathlib attribute value'
shutil = 'shutil attribute value'
tempfile = 'tempfile attribute value'
io = 'io attribute value'
@@ -131,10 +131,9 @@
'a+': (False, True, True, False, True, False),
'w!': (False, False, True, False, False, False),
'w!+': (False, True, True, False, False, False),
'x': (False, False, True, False, False, True),
'x+': (False, True, True, False, False, True),
}
if sys.version_info >= (3, 3):
_OPEN_MODE_MAP['x'] = (False, False, True, False, False, True)
_OPEN_MODE_MAP['x+'] = (False, True, True, False, False, True)
if sys.platform.startswith('linux'):
# on newer Linux system, the default maximum recursion depth is 40
@@ -3190,7 +3189,12 @@ def open(self, file_path, flags, mode=None, dir_fd=None):
# low level open is always binary
str_flags += 'b'
fake_file = FakeFileOpen(self.filesystem, low_level=True)(file_path, str_flags)
delete_on_close = False
if hasattr(os, 'O_TEMPORARY'):
delete_on_close = flags & os.O_TEMPORARY == os.O_TEMPORARY
fake_file = FakeFileOpen(self.filesystem,
delete_on_close=delete_on_close,
low_level=True)(file_path, str_flags)
if mode:
self.chmod(file_path, mode)
return fake_file.fileno()
@@ -4026,7 +4030,7 @@ def __init__(self, file_object, file_path, update=False, read=False, append=Fals
if delete_on_close:
assert filesystem, 'delete_on_close=True requires filesystem'
self._filesystem = filesystem
self._delete_on_close = delete_on_close
self.delete_on_close = delete_on_close
# override, don't modify FakeFile.name, as FakeFilesystem expects
# it to be the file name only, no directories.
self.name = file_object.opened_as
@@ -4061,8 +4065,8 @@ def close(self):
self._file_object.SetContents(self._io.getvalue(), self._encoding)
if self._closefd:
self._filesystem.CloseOpenFile(self.filedes)
if self._delete_on_close:
self._filesystem.RemoveObject(self.name)
if self.delete_on_close:
self._filesystem.RemoveObject(self.GetObject().GetPath())
def flush(self):
"""Flush file contents to 'disk'."""
@@ -4323,6 +4327,8 @@ def Call(self, file_, mode='r', buffering=-1, encoding=None,
if mode not in _OPEN_MODE_MAP:
raise ValueError('Invalid mode: %r' % orig_modes)
if 'x' in mode and not self.low_level and sys.version_info < (3, 3):
raise ValueError('Exclusive mode not supported before Python 3.3')
must_exist, need_read, need_write, truncate, append, must_not_exist = _OPEN_MODE_MAP[mode]
@@ -4331,6 +4337,8 @@ def Call(self, file_, mode='r', buffering=-1, encoding=None,
# opening a file descriptor
if isinstance(file_, int):
filedes = file_
wrapper = self.filesystem.GetOpenFile(filedes)
self._delete_on_close = wrapper.delete_on_close
file_object = self.filesystem.GetOpenFile(filedes).GetObject()
file_path = file_object.name
else:
Oops, something went wrong.

0 comments on commit 1adfccb

Please sign in to comment.