Skip to content
Permalink
Browse files Browse the repository at this point in the history
Improved error handling to prevent unhandled exceptions in calling code.
  • Loading branch information
mike-arnica committed Jun 22, 2022
1 parent 26e0a47 commit d5b53b4
Showing 1 changed file with 95 additions and 86 deletions.
181 changes: 95 additions & 86 deletions openssh_key/private_key_list.py
Expand Up @@ -221,118 +221,127 @@ def from_bytes(
Raises:
ValueError: The provided byte string is not an ``openssh-key-v1``
key list or the declared key count is negative.
key list, when the declared key count is negative, or when an
EOF is found while parsing the key.
UserWarning: The check numbers in the decrypted private byte string
do not match (likely due to an incorrect passphrase), the key
type or parameter values of a private key do not match that of
the corresponding public key in the list, or the padding bytes
at the end of the decrypted private byte string are not as
expected.
"""
byte_stream = PascalStyleByteStream(byte_string)
try:
byte_stream = PascalStyleByteStream(byte_string)

header = byte_stream.read_from_format_instructions_dict(
cls.HEADER_FORMAT_INSTRUCTIONS_DICT
)
header = byte_stream.read_from_format_instructions_dict(
cls.HEADER_FORMAT_INSTRUCTIONS_DICT
)

if header['auth_magic'] != b'openssh-key-v1\x00':
raise ValueError('Not an openssh-key-v1 key')

if header['auth_magic'] != b'openssh-key-v1\x00':
raise ValueError('Not an openssh-key-v1 key')
num_keys = header['num_keys']

num_keys = header['num_keys']
if num_keys < 0:
raise ValueError('Cannot parse negative number of keys')

if num_keys < 0:
raise ValueError('Cannot parse negative number of keys')
public_key_list = []
for i in range(num_keys):
public_key_bytes = byte_stream.read_from_format_instruction(
PascalStyleFormatInstruction.BYTES
)
public_key_list.append(
PublicKey.from_bytes(public_key_bytes)
)

public_key_list = []
for i in range(num_keys):
public_key_bytes = byte_stream.read_from_format_instruction(
cipher_bytes = byte_stream.read_from_format_instruction(
PascalStyleFormatInstruction.BYTES
)
public_key_list.append(
PublicKey.from_bytes(public_key_bytes)
)

cipher_bytes = byte_stream.read_from_format_instruction(
PascalStyleFormatInstruction.BYTES
)

kdf_class = get_kdf_options_class(header['kdf'])
kdf_options = kdf_class(
PascalStyleByteStream(
header['kdf_options']
).read_from_format_instructions_dict(
kdf_class.FORMAT_INSTRUCTIONS_DICT
kdf_class = get_kdf_options_class(header['kdf'])
kdf_options = kdf_class(
PascalStyleByteStream(
header['kdf_options']
).read_from_format_instructions_dict(
kdf_class.FORMAT_INSTRUCTIONS_DICT
)
)
)

cipher_class = get_cipher_class(header['cipher'])
cipher_class = get_cipher_class(header['cipher'])

if kdf_class == NoneKDFOptions:
passphrase = ''
elif passphrase is None:
passphrase = getpass.getpass('Key passphrase: ')
if kdf_class == NoneKDFOptions:
passphrase = ''
elif passphrase is None:
passphrase = getpass.getpass('Key passphrase: ')

if issubclass(cipher_class, ConfidentialityIntegrityCipher):
cipher_bytes += byte_stream.read_fixed_bytes(
cipher_class.TAG_LENGTH
)

decipher_bytes = cipher_class.decrypt(
kdf_class(kdf_options),
passphrase,
cipher_bytes
)

decipher_byte_stream = PascalStyleByteStream(decipher_bytes)
if issubclass(cipher_class, ConfidentialityIntegrityCipher):
cipher_bytes += byte_stream.read_fixed_bytes(
cipher_class.TAG_LENGTH
)

decipher_bytes_header = \
decipher_byte_stream.read_from_format_instructions_dict(
cls.DECIPHER_BYTES_HEADER_FORMAT_INSTRUCTIONS_DICT
decipher_bytes = cipher_class.decrypt(
kdf_class(kdf_options),
passphrase,
cipher_bytes
)

if decipher_bytes_header['check_int_1'] \
!= decipher_bytes_header['check_int_2']:
warnings.warn('Cipher header check numbers do not match')
decipher_byte_stream = PascalStyleByteStream(decipher_bytes)

initlist = []
for i in range(num_keys):
initlist.append(
PublicPrivateKeyPair(
public_key_list[i],
PrivateKey.from_byte_stream(decipher_byte_stream)
)
)
if initlist[i].public.header['key_type'] \
!= initlist[i].private.header['key_type']:
warnings.warn(
f'Inconsistency between private and public '
f'key types for key {i}'
decipher_bytes_header = \
decipher_byte_stream.read_from_format_instructions_dict(
cls.DECIPHER_BYTES_HEADER_FORMAT_INSTRUCTIONS_DICT
)
if not all(
(
initlist[i].public.params[k] ==
initlist[i].private.params[k]
) for k in (
initlist[i].public.params.keys() &
initlist[i].private.params.keys()

if decipher_bytes_header['check_int_1'] \
!= decipher_bytes_header['check_int_2']:
warnings.warn('Cipher header check numbers do not match')

initlist = []
for i in range(num_keys):
initlist.append(
PublicPrivateKeyPair(
public_key_list[i],
PrivateKey.from_byte_stream(decipher_byte_stream)
)
)
if initlist[i].public.header['key_type'] \
!= initlist[i].private.header['key_type']:
warnings.warn(
f'Inconsistency between private and public '
f'key types for key {i}'
)
if not all(
(
initlist[i].public.params[k] ==
initlist[i].private.params[k]
) for k in (
initlist[i].public.params.keys() &
initlist[i].private.params.keys()
)
):
warnings.warn(
f'Inconsistency between private and public '
f'values for key {i}'
)

decipher_padding = decipher_byte_stream.read()

if (
len(decipher_byte_stream.getvalue()) %
cipher_class.BLOCK_SIZE != 0
) or not (
bytes(
range(1, 1 + cipher_class.BLOCK_SIZE)
).startswith(decipher_padding)
):
warnings.warn(
f'Inconsistency between private and public '
f'values for key {i}'
)

decipher_padding = decipher_byte_stream.read()

if (
len(decipher_byte_stream.getvalue()) %
cipher_class.BLOCK_SIZE != 0
) or not (
bytes(
range(1, 1 + cipher_class.BLOCK_SIZE)
).startswith(decipher_padding)
):
warnings.warn('Incorrect padding at end of ciphertext')
warnings.warn('Incorrect padding at end of ciphertext')
except ValueError as e:
raise e
except EOFError as e:
raise ValueError('Premature EOF detected while parsing key.')
except e:
raise ValueError('Unexpected error condition reached.')

return cls(
initlist,
Expand Down

0 comments on commit d5b53b4

Please sign in to comment.