Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 78 additions & 6 deletions securesystemslib/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,53 @@ def get_file_details(filepath, hash_algorithms=['sha256'],
if storage_backend is None:
storage_backend = securesystemslib.storage.FilesystemBackend()

# The returned file hashes of 'filepath'.
file_hashes = {}
file_length = get_file_length(filepath, storage_backend)
file_hashes = get_file_hashes(filepath, hash_algorithms, storage_backend)

return file_length, file_hashes


def get_file_hashes(filepath, hash_algorithms=['sha256'],
storage_backend=None):
"""
<Purpose>
Compute hash(es) of the file at filepath using each of the specified
hash algorithms. If no algorithms are specified, then the hash is
computed using the SHA-256 algorithm.

<Arguments>
filepath:
Absolute file path of a file.

hash_algorithms:
A list of hash algorithms with which the file's hash should be computed.
Defaults to ['sha256']

storage_backend:
An object which implements
securesystemslib.storage.StorageBackendInterface. When no object is
passed a FilesystemBackend will be instantiated and used.

<Exceptions>
securesystemslib.exceptions.FormatError: If hash of the file does not match
HASHDICT_SCHEMA.

securesystemslib.exceptions.Error: If 'filepath' does not exist.

<Returns>
A dictionary conforming to securesystemslib.formats.HASHDICT_SCHEMA
containing information about the hashes of the file at "filepath".
"""

# Making sure that the format of 'filepath' is a path string.
# 'securesystemslib.exceptions.FormatError' is raised on incorrect format.
securesystemslib.formats.PATH_SCHEMA.check_match(filepath)
securesystemslib.formats.HASHALGORITHMS_SCHEMA.check_match(hash_algorithms)

filepath = os.path.abspath(filepath)
if storage_backend is None:
storage_backend = securesystemslib.storage.FilesystemBackend()

# Obtaining length of the file.
file_length = storage_backend.getsize(filepath)
file_hashes = {}

with storage_backend.get(filepath) as fileobj:
# Obtaining hash of the file.
Expand All @@ -99,7 +139,39 @@ def get_file_details(filepath, hash_algorithms=['sha256'],
# Raise 'securesystemslib.exceptions.FormatError' if there is a mismatch.
securesystemslib.formats.HASHDICT_SCHEMA.check_match(file_hashes)

return file_length, file_hashes
return file_hashes



def get_file_length(filepath, storage_backend=None):
"""
<Purpose>
To get file's length information.

<Arguments>
filepath:
Absolute file path of a file.

storage_backend:
An object which implements
securesystemslib.storage.StorageBackendInterface. When no object is
passed a FilesystemBackend will be instantiated and used.

<Exceptions>
securesystemslib.exceptions.Error: If 'filepath' does not exist.

<Returns>
The length, in bytes, of the file at 'filepath'.
"""

# Making sure that the format of 'filepath' is a path string.
# 'securesystemslib.exceptions.FormatError' is raised on incorrect format.
securesystemslib.formats.PATH_SCHEMA.check_match(filepath)

if storage_backend is None:
storage_backend = securesystemslib.storage.FilesystemBackend()

return storage_backend.getsize(filepath)


def persist_temp_file(temp_file, persist_path, storage_backend=None,
Expand Down
67 changes: 61 additions & 6 deletions tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,62 @@ def test_B1_get_file_details(self):



def test_B2_ensure_parent_dir(self):
def test_B2_get_file_hashes(self):
# Goal: Verify proper output given certain expected/unexpected input.

# Making a temporary file.
filepath = self.make_temp_data_file()

# Computing the hash of the tempfile.
digest_object = securesystemslib.hash.digest_filename(filepath, algorithm='sha256')
file_hash = {'sha256' : digest_object.hexdigest()}

# Test: Expected input.
self.assertEqual(securesystemslib.util.get_file_hashes(filepath),
file_hash)

# Test: Incorrect input.
bogus_inputs = [self.random_string(), 1234, [self.random_string()],
{'a': 'b'}, None]

for bogus_input in bogus_inputs:
if isinstance(bogus_input, six.string_types):
self.assertRaises(securesystemslib.exceptions.Error,
securesystemslib.util.get_file_hashes, bogus_input)
else:
self.assertRaises(securesystemslib.exceptions.FormatError,
securesystemslib.util.get_file_hashes, bogus_input)



def test_B3_get_file_length(self):
# Goal: Verify proper output given certain expected/unexpected input.

# Making a temporary file.
filepath = self.make_temp_data_file()

# Computing the length of the tempfile.
digest_object = securesystemslib.hash.digest_filename(filepath, algorithm='sha256')
file_length = os.path.getsize(filepath)

# Test: Expected input.
self.assertEqual(securesystemslib.util.get_file_length(filepath), file_length)

# Test: Incorrect input.
bogus_inputs = [self.random_string(), 1234, [self.random_string()],
{'a': 'b'}, None]

for bogus_input in bogus_inputs:
if isinstance(bogus_input, six.string_types):
self.assertRaises(securesystemslib.exceptions.Error,
securesystemslib.util.get_file_length, bogus_input)
else:
self.assertRaises(securesystemslib.exceptions.FormatError,
securesystemslib.util.get_file_length, bogus_input)



def test_B4_ensure_parent_dir(self):
existing_parent_dir = self.make_temp_directory()
non_existing_parent_dir = os.path.join(existing_parent_dir, 'a', 'b')

Expand All @@ -100,7 +155,7 @@ def test_B2_ensure_parent_dir(self):



def test_B3_file_in_confined_directories(self):
def test_B5_file_in_confined_directories(self):
# Goal: Provide invalid input for 'filepath' and 'confined_directories'.
# Include inputs like: '[1, 2, "a"]' and such...
# Reference to 'file_in_confined_directories()' to improve readability.
Expand Down Expand Up @@ -131,7 +186,7 @@ def test_B3_file_in_confined_directories(self):
self.assertTrue(in_confined_directory('a/b/c/..', ['a/']))


def test_B4_import_json(self):
def test_B6_import_json(self):
self.assertTrue('json' in sys.modules)
json_module = securesystemslib.util.import_json()
self.assertTrue(json_module is not None)
Expand All @@ -142,7 +197,7 @@ def test_B4_import_json(self):



def test_B5_load_json_string(self):
def test_B7_load_json_string(self):
# Test normal case.
data = ['a', {'b': ['c', None, 30.3, 29]}]
json_string = securesystemslib.util.json.dumps(data)
Expand All @@ -157,7 +212,7 @@ def test_B5_load_json_string(self):



def test_B6_load_json_file(self):
def test_B8_load_json_file(self):
data = ['a', {'b': ['c', None, 30.3, 29]}]
filepath = self.make_temp_file()
fileobj = open(filepath, 'wt')
Expand Down Expand Up @@ -185,7 +240,7 @@ def test_B6_load_json_file(self):



def test_B7_persist_temp_file(self):
def test_B9_persist_temp_file(self):
# Destination directory to save the temporary file in.
dest_temp_dir = self.make_temp_directory()

Expand Down