diff --git a/securesystemslib/interface.py b/securesystemslib/interface.py index db7cd614..96eb0d21 100755 --- a/securesystemslib/interface.py +++ b/securesystemslib/interface.py @@ -219,17 +219,17 @@ def generate_and_write_rsa_keypair(filepath=None, bits=DEFAULT_RSA_KEY_BITS, # Write the public key (i.e., 'public', which is in PEM format) to # '.pub'. (1) Create a temporary file, (2) write the contents of # the public key, and (3) move to final destination. - file_object = securesystemslib.util.TempFile() + file_object = tempfile.TemporaryFile() file_object.write(public.encode('utf-8')) # The temporary file is closed after the final move. - file_object.move(filepath + '.pub') + securesystemslib.util.persist_temp_file(file_object, filepath + '.pub') # Write the private key in encrypted PEM format to ''. # Unlike the public key file, the private key does not have a file # extension. - file_object = securesystemslib.util.TempFile() + file_object = tempfile.TemporaryFile() file_object.write(private.encode('utf-8')) - file_object.move(filepath) + securesystemslib.util.persist_temp_file(file_object, filepath) return filepath @@ -490,7 +490,7 @@ def generate_and_write_ed25519_keypair(filepath=None, password=None): # Create a temporary file, write the contents of the public key, and move # to final destination. - file_object = securesystemslib.util.TempFile() + file_object = tempfile.TemporaryFile() # Generate the ed25519 public key file contents in metadata format (i.e., # does not include the keyid portion). @@ -506,11 +506,11 @@ def generate_and_write_ed25519_keypair(filepath=None, password=None): # '.pub'. (1) Create a temporary file, (2) write the contents of # the public key, and (3) move to final destination. # The temporary file is closed after the final move. - file_object.move(filepath + '.pub') + securesystemslib.util.persist_temp_file(file_object, filepath + '.pub') # Write the encrypted key string, conformant to # 'securesystemslib.formats.ENCRYPTEDKEY_SCHEMA', to ''. - file_object = securesystemslib.util.TempFile() + file_object = tempfile.TemporaryFile() # Encrypt the private key if 'password' is set. if len(password): @@ -524,7 +524,7 @@ def generate_and_write_ed25519_keypair(filepath=None, password=None): # Raise 'securesystemslib.exceptions.CryptoError' if 'ed25519_key' cannot be # encrypted. file_object.write(ed25519_key.encode('utf-8')) - file_object.move(filepath) + securesystemslib.util.persist_temp_file(file_object, filepath) return filepath @@ -745,7 +745,7 @@ def generate_and_write_ecdsa_keypair(filepath=None, password=None): # Create a temporary file, write the contents of the public key, and move # to final destination. - file_object = securesystemslib.util.TempFile() + file_object = tempfile.TemporaryFile() # Generate the ECDSA public key file contents in metadata format (i.e., does # not include the keyid portion). @@ -760,16 +760,16 @@ def generate_and_write_ecdsa_keypair(filepath=None, password=None): # Write the public key (i.e., 'public', which is in PEM format) to # '.pub'. (1) Create a temporary file, (2) write the contents of # the public key, and (3) move to final destination. - file_object.move(filepath + '.pub') + securesystemslib.util.persist_temp_file(file_object, filepath + '.pub') # Write the encrypted key string, conformant to # 'securesystemslib.formats.ENCRYPTEDKEY_SCHEMA', to ''. - file_object = securesystemslib.util.TempFile() + file_object = tempfile.TemporaryFile() # Raise 'securesystemslib.exceptions.CryptoError' if 'ecdsa_key' cannot be # encrypted. encrypted_key = securesystemslib.keys.encrypt_key(ecdsa_key, password) file_object.write(encrypted_key.encode('utf-8')) - file_object.move(filepath) + securesystemslib.util.persist_temp_file(file_object, filepath) return filepath diff --git a/securesystemslib/util.py b/securesystemslib/util.py index fd7ce0c4..09f1ba41 100755 --- a/securesystemslib/util.py +++ b/securesystemslib/util.py @@ -14,8 +14,7 @@ Provides utility services. This module supplies utility functions such as: get_file_details() that computes the length and hash of a file, import_json - that tries to import a working json module, load_json_* functions, and a - TempFile class that generates a file-like object for temporary storage, etc. + that tries to import a working json module, load_json_* functions, etc. """ # Help with Python 3 compatibility, where the print statement is a function, an @@ -31,6 +30,7 @@ import shutil import logging import tempfile +import warnings import securesystemslib.exceptions import securesystemslib.settings @@ -42,306 +42,6 @@ logger = logging.getLogger('securesystemslib_util') -class TempFile(object): - """ - - A high-level temporary file that cleans itself up or can be manually - cleaned up. This isn't a complete file-like object. The file functions - that are supported make additional common-case safe assumptions. There - are additional functions that aren't part of file-like objects. TempFile - is used in the download.py module to temporarily store downloaded data while - all security checks (file hashes/length) are performed. - """ - - def _default_temporary_directory(self, prefix): - """__init__ helper.""" - try: - self.temporary_file = tempfile.NamedTemporaryFile(prefix=prefix) - - except OSError as err: # pragma: no cover - logger.critical('Cannot create a system temporary directory: '+repr(err)) - raise securesystemslib.exceptions.Error(err) - - - # TODO: Is it safe to de-TUF the prefix? TUF heavily uses `TempFile` without - # ever overriding `prefix`, thus people might expect the default prefix. - def __init__(self, prefix='tuf_temp_'): - """ - - Initializes TempFile. - - - prefix: - A string argument to be used with tempfile.NamedTemporaryFile function. - - - securesystemslib.exceptions.Error on failure to load temp dir. - - - None. - """ - - self._compression = None - - # If compression is set then the original file is saved in 'self._orig_file'. - self._orig_file = None - temp_dir = securesystemslib.settings.temporary_directory - if temp_dir is not None and securesystemslib.formats.PATH_SCHEMA.matches(temp_dir): - try: - self.temporary_file = tempfile.NamedTemporaryFile(prefix=prefix, - dir=temp_dir) - except OSError as err: - logger.error('Temp file in ' + temp_dir + ' failed: ' +repr(err)) - logger.error('Will attempt to use system default temp dir.') - self._default_temporary_directory(prefix) - - else: - self._default_temporary_directory(prefix) - - - # TODO: No code across ssl, tuf, and in-toto uses this function. Remove? - def get_compressed_length(self): - """ - - Get the compressed length of the file. This will be correct information - even when the file is read as an uncompressed one. - - - None. - - - OSError. - - - Nonnegative integer representing compressed file size. - """ - - # Even if we read a compressed file with the gzip standard library module, - # the original file will remain compressed. - return os.stat(self.temporary_file.name).st_size - - - def flush(self): - """ - - Flushes buffered output for the file. - - - None. - - - None. - - - None. - """ - - self.temporary_file.flush() - - - def read(self, size=None): - """ - - Read specified number of bytes. If size is not specified then the whole - file is read and the file pointer is placed at the beginning of the file. - - - size: - Number of bytes to be read. - - - securesystemslib.exceptions.FormatError: if 'size' is invalid. - - - String of data. - """ - - if size is None: - self.temporary_file.seek(0) - data = self.temporary_file.read() - self.temporary_file.seek(0) - - return data - - else: - if not (isinstance(size, int) and size > 0): - raise securesystemslib.exceptions.FormatError - - return self.temporary_file.read(size) - - - def write(self, data, auto_flush=True): - """ - - Writes a data string to the file. - - - data: - A string containing some data. - - auto_flush: - Boolean argument, if set to 'True', all data will be flushed from - internal buffer. - - - None. - - - None. - """ - - self.temporary_file.write(data) - if auto_flush: - self.flush() - - - def move(self, destination_path): - """ - - Copies 'self.temporary_file' to a non-temp file at 'destination_path' and - closes 'self.temporary_file' so that it is removed. - - - destination_path: - Path to store the file in. - - - None. - - - None. - """ - - self.flush() - self.seek(0) - destination_file = open(destination_path, 'wb') - shutil.copyfileobj(self.temporary_file, destination_file) - # Force the destination file to be written to disk from Python's internal - # and the operation system's buffers. os.fsync() should follow flush(). - destination_file.flush() - os.fsync(destination_file.fileno()) - destination_file.close() - - # 'self.close()' closes temporary file which destroys itself. - self.close_temp_file() - - - def seek(self, *args): - """ - - Set file's current position. - - - *args: - (*-operator): unpacking argument list is used because seek method - accepts two args: offset and whence. If whence is not specified, its - default is 0. Indicate offset to set the file's current position. - Refer to the python manual for more info. - - - None. - - - None. - """ - - self.temporary_file.seek(*args) - - - # TODO: No code across ssl, tuf, and in-toto uses this function. Remove? - def decompress_temp_file_object(self, compression): - """ - - To decompress a compressed temp file object. Decompression is performed - on a temp file object that is compressed, this occurs after downloading - a compressed file. For instance if a compressed version of some meta - file in the repository is downloaded, the temp file containing the - compressed meta file will be decompressed using this function. - Note that after calling this method, write() can no longer be called. - - meta.json.gz - |...[download] - temporary_file (containing meta.json.gz) - / \ - temporary_file _orig_file - containing meta.json containing meta.json.gz - (decompressed data) - - - compression: - A string indicating the type of compression that was used to compress - a file. Only gzip is allowed. - - - securesystemslib.exceptions.FormatError: If 'compression' is improperly formatted. - - securesystemslib.exceptions.Error: If an invalid compression is given. - - securesystemslib.exceptions.DecompressionError: If the compression failed for any reason. - - - 'self._orig_file' is used to store the original data of 'temporary_file'. - - - None. - """ - - # Does 'compression' have the correct format? - # Raise 'securesystemslib.exceptions.FormatError' if there is a mismatch. - securesystemslib.formats.NAME_SCHEMA.check_match(compression) - - if self._orig_file is not None: - raise securesystemslib.exceptions.Error('Can only set compression on a' - ' TempFile once.') - - if compression != 'gzip': - raise securesystemslib.exceptions.Error('Only gzip compression is' - ' supported.') - - self.seek(0) - self._compression = compression - self._orig_file = self.temporary_file - - try: - gzip_file_object = gzip.GzipFile(fileobj=self.temporary_file, mode='rb') - uncompressed_content = gzip_file_object.read() - self.temporary_file = tempfile.NamedTemporaryFile() - self.temporary_file.write(uncompressed_content) - self.flush() - - except Exception as exception: - raise securesystemslib.exceptions.DecompressionError(exception) - - - def close_temp_file(self): - """ - - Closes the temporary file object. 'close_temp_file' mimics usual - file.close(), however temporary file destroys itself when - 'close_temp_file' is called. Further if compression is set, second - temporary file instance 'self._orig_file' is also closed so that no open - temporary files are left open. - - - None. - - - None. - - - Closes 'self._orig_file'. - - - None. - """ - - self.temporary_file.close() - # If compression has been set, we need to explicitly close the original - # file object. - if self._orig_file is not None: - self._orig_file.close() - - def get_file_details(filepath, hash_algorithms=['sha256']): """ @@ -395,6 +95,39 @@ def get_file_details(filepath, hash_algorithms=['sha256']): return file_length, file_hashes +def persist_temp_file(temp_file, persist_path): + """ + + Copies 'temp_file' (a file like object) to a newly created non-temp file at + 'persist_path' and closes 'temp_file' so that it is removed. + + + temp_file: + File object to persist, typically a file object returned by one of the + interfaces in the tempfile module of the standard library. + + persist_path: + File path to create the persistent file in. + + + None. + + + None. + """ + + temp_file.flush() + temp_file.seek(0) + + with open(persist_path, 'wb') as destination_file: + shutil.copyfileobj(temp_file, destination_file) + # Force the destination file to be written to disk from Python's internal + # and the operation system's buffers. os.fsync() should follow flush(). + destination_file.flush() + os.fsync(destination_file.fileno()) + + temp_file.close() + def ensure_parent_dir(filename): """ diff --git a/tests/test_util.py b/tests/test_util.py index c3bdd555..28077b8e 100755 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -48,122 +48,12 @@ class TestUtil(unittest_toolbox.Modified_TestCase): def setUp(self): unittest_toolbox.Modified_TestCase.setUp(self) - self.temp_fileobj = securesystemslib.util.TempFile() + self.temp_fileobj = tempfile.TemporaryFile() - def tearDown(self): unittest_toolbox.Modified_TestCase.tearDown(self) - self.temp_fileobj.close_temp_file() - - - - def test_A1_tempfile_close_temp_file(self): - # Was the temporary file closed? - self.temp_fileobj.close_temp_file() - self.assertTrue(self.temp_fileobj.temporary_file.closed) - - - - def _extract_tempfile_directory(self, config_temp_dir=None): - """ - Takes a directory (essentially specified in the settings.py as - 'temporary_directory') and substitutes tempfile.TemporaryFile() with - tempfile.mkstemp() in order to extract actual directory of the stored - tempfile. Returns the config's temporary directory (or default temp - directory) and actual directory. - """ - - # Patching 'settings.temporary_directory'. - securesystemslib.settings.temporary_directory = config_temp_dir - - if config_temp_dir is None: - # 'config_temp_dir' needs to be set to default. - config_temp_dir = tempfile.gettempdir() - - # Patching 'tempfile.TemporaryFile()' (by substituting - # temfile.TemporaryFile() with tempfile.mkstemp()) in order to get the - # directory of the stored tempfile object. - saved_tempfile_TemporaryFile = securesystemslib.util.tempfile.NamedTemporaryFile - securesystemslib.util.tempfile.NamedTemporaryFile = tempfile.mkstemp - _temp_fileobj = securesystemslib.util.TempFile() - securesystemslib.util.tempfile.NamedTemporaryFile = saved_tempfile_TemporaryFile - junk, _tempfilepath = _temp_fileobj.temporary_file - _tempfile_dir = os.path.dirname(_tempfilepath) - - # In the case when 'config_temp_dir' is None or some other discrepancy, - # '_temp_fileobj' needs to be closed manually since tempfile.mkstemp() - # was used. - if os.path.exists(_tempfilepath): - os.remove(_tempfilepath) - - return config_temp_dir, _tempfile_dir - - - - def test_A2_tempfile_init(self): - # Goal: Verify that temporary files are stored in the appropriate temp - # directory. The location of the temporary files is set in 'settings.py'. - - # Test: Expected input verification. - # Assumed 'settings.temporary_directory' is 'None' initially. - temp_file = securesystemslib.util.TempFile() - temp_file_directory = os.path.dirname(temp_file.temporary_file.name) - self.assertEqual(tempfile.gettempdir(), temp_file_directory) - - saved_temporary_directory = securesystemslib.settings.temporary_directory - temp_directory = self.make_temp_directory() - securesystemslib.settings.temporary_directory = temp_directory - temp_file = securesystemslib.util.TempFile() - temp_file_directory = os.path.dirname(temp_file.temporary_file.name) - self.assertEqual(temp_directory, temp_file_directory) - - securesystemslib.settings.temporary_directory = saved_temporary_directory - - # Test: Unexpected input handling. - config_temp_dirs = [self.random_string(), 123, ['a'], {'a':1}] - for config_temp_dir in config_temp_dirs: - config_temp_dir, actual_dir = \ - self._extract_tempfile_directory(config_temp_dir) - self.assertEqual(tempfile.gettempdir(), actual_dir) - - - - def test_A3_tempfile_read(self): - filepath = self.make_temp_data_file(data = '1234567890') - fileobj = open(filepath, 'rb') - - # Patching 'temp_fileobj.temporary_file'. - self.temp_fileobj.temporary_file = fileobj - - # Test: Expected input. - self.assertEqual(self.temp_fileobj.read().decode('utf-8'), '1234567890') - self.assertEqual(self.temp_fileobj.read(4).decode('utf-8'), '1234') - - # Test: Unexpected input. - for bogus_arg in ['abcd', ['abcd'], {'a':'a'}, -100]: - self.assertRaises(securesystemslib.exceptions.FormatError, - self.temp_fileobj.read, bogus_arg) - - - - def test_A4_tempfile_write(self): - data = self.random_string() - self.temp_fileobj.write(data.encode('utf-8')) - self.assertEqual(data, self.temp_fileobj.read().decode('utf-8')) - - self.temp_fileobj.write(data.encode('utf-8'), auto_flush=False) - self.assertEqual(data, self.temp_fileobj.read().decode('utf-8')) - - - - def test_A5_tempfile_move(self): - # Destination directory to save the temporary file in. - dest_temp_dir = self.make_temp_directory() - dest_path = os.path.join(dest_temp_dir, self.random_string()) - self.temp_fileobj.write(self.random_string().encode('utf-8')) - self.temp_fileobj.move(dest_path) - self.assertTrue(dest_path) + self.temp_fileobj.close() @@ -191,66 +81,6 @@ def _compress_existing_file(self, filepath): - def _decompress_file(self, compressed_filepath): - """[Helper]""" - if os.path.exists(compressed_filepath): - f = gzip.open(compressed_filepath, 'rb') - file_content = f.read() - f.close() - return file_content - - else: - logger.error('Decompression of ' + repr(compressed_filepath) + ' failed.' - ' Path does not exist.') - sys.exit(1) - - - - def test_A6_tempfile_decompress_temp_file_object(self): - # Setup: generate a temp file (self.make_temp_data_file()), - # compress it. Write it to self.temp_fileobj(). - filepath = self.make_temp_data_file() - fileobj = open(filepath, 'rb') - compressed_filepath = self._compress_existing_file(filepath) - compressed_fileobj = open(compressed_filepath, 'rb') - self.temp_fileobj.write(compressed_fileobj.read()) - os.remove(compressed_filepath) - - # Try decompression using incorrect compression type i.e. compressions - # other than 'gzip'. In short feeding incorrect input. - bogus_args = ['zip', 1234, self.random_string()] - for arg in bogus_args: - self.assertRaises(securesystemslib.exceptions.Error, - self.temp_fileobj.decompress_temp_file_object, arg) - - # Test for a valid util.decompress_temp_file_object() call. - self.temp_fileobj.decompress_temp_file_object('gzip') - self.assertEqual(self.temp_fileobj.read(), fileobj.read()) - - # Checking the content of the TempFile's '_orig_file' instance. - check_compressed_original = self.make_temp_file() - with open(check_compressed_original, 'wb') as file_object: - self.temp_fileobj._orig_file.seek(0) - original_content = self.temp_fileobj._orig_file.read() - file_object.write(original_content) - - data_in_orig_file = self._decompress_file(check_compressed_original) - fileobj.seek(0) - self.assertEqual(data_in_orig_file, fileobj.read()) - - # Try decompressing once more. - self.assertRaises(securesystemslib.exceptions.Error, - self.temp_fileobj.decompress_temp_file_object, 'gzip') - - # Test decompression of invalid gzip file. - temp_file = securesystemslib.util.TempFile() - temp_file.write(b'bad zip') - contents = temp_file.read() - self.assertRaises(securesystemslib.exceptions.DecompressionError, - temp_file.decompress_temp_file_object, 'gzip') - - - def test_B1_get_file_details(self): # Goal: Verify proper output given certain expected/unexpected input. @@ -385,6 +215,17 @@ def test_B6_load_json_file(self): + def test_B7_persist_temp_file(self): + # Destination directory to save the temporary file in. + dest_temp_dir = self.make_temp_directory() + dest_path = os.path.join(dest_temp_dir, self.random_string()) + tmpfile = tempfile.TemporaryFile() + tmpfile.write(self.random_string().encode('utf-8')) + securesystemslib.util.persist_temp_file(tmpfile, dest_path) + self.assertTrue(dest_path) + + + def test_C5_unittest_toolbox_make_temp_directory(self): # Verify that the tearDown function does not fail when # unittest_toolbox.make_temp_directory deletes the generated temp directory @@ -401,13 +242,6 @@ def test_c5_unittest_toolbox_random_path(self): self.assertTrue(10, len(random_path)) - def test_c6_get_compressed_length(self): - self.temp_fileobj.write(b'hello world') - self.assertTrue(self.temp_fileobj.get_compressed_length() == 11) - - temp_file = securesystemslib.util.TempFile() - - def test_digests_are_equal(self): digest = 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'