Skip to content

Commit

Permalink
Merge pull request #5 from mike-arnica/security-review
Browse files Browse the repository at this point in the history
Fix insecure/undeclared raised errors; initial commit of scripts for atheris fuzzer
  • Loading branch information
scottcwang committed Jun 25, 2022
2 parents 69fe5b7 + ae4d131 commit 274447f
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 87 deletions.
3 changes: 2 additions & 1 deletion openssh_key/pascal_style_byte_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,8 @@ def read_fixed_bytes(self, num_bytes: int) -> bytes:
"""
read_bytes = self.read(num_bytes)
if len(read_bytes) < num_bytes:
raise EOFError(read_bytes)
raise EOFError("Fewer than 'num_bytes' bytes remaining in the "
"underlying bytestream")
return read_bytes

def read_pascal_bytes(self, string_length_size: int) -> bytes:
Expand Down
181 changes: 95 additions & 86 deletions openssh_key/private_key_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,118 +221,127 @@ def from_bytes(
Raises:
ValueError: The provided byte string is not an ``openssh-key-v1``
key list or the declared key count is negative.
key list, when the declared key count is negative, or when an
EOF is found while parsing the key.
UserWarning: The check numbers in the decrypted private byte string
do not match (likely due to an incorrect passphrase), the key
type or parameter values of a private key do not match that of
the corresponding public key in the list, or the padding bytes
at the end of the decrypted private byte string are not as
expected.
"""
byte_stream = PascalStyleByteStream(byte_string)
try:
byte_stream = PascalStyleByteStream(byte_string)

header = byte_stream.read_from_format_instructions_dict(
cls.HEADER_FORMAT_INSTRUCTIONS_DICT
)
header = byte_stream.read_from_format_instructions_dict(
cls.HEADER_FORMAT_INSTRUCTIONS_DICT
)

if header['auth_magic'] != b'openssh-key-v1\x00':
raise ValueError('Not an openssh-key-v1 key')

if header['auth_magic'] != b'openssh-key-v1\x00':
raise ValueError('Not an openssh-key-v1 key')
num_keys = header['num_keys']

num_keys = header['num_keys']
if num_keys < 0:
raise ValueError('Cannot parse negative number of keys')

if num_keys < 0:
raise ValueError('Cannot parse negative number of keys')
public_key_list = []
for i in range(num_keys):
public_key_bytes = byte_stream.read_from_format_instruction(
PascalStyleFormatInstruction.BYTES
)
public_key_list.append(
PublicKey.from_bytes(public_key_bytes)
)

public_key_list = []
for i in range(num_keys):
public_key_bytes = byte_stream.read_from_format_instruction(
cipher_bytes = byte_stream.read_from_format_instruction(
PascalStyleFormatInstruction.BYTES
)
public_key_list.append(
PublicKey.from_bytes(public_key_bytes)
)

cipher_bytes = byte_stream.read_from_format_instruction(
PascalStyleFormatInstruction.BYTES
)

kdf_class = get_kdf_options_class(header['kdf'])
kdf_options = kdf_class(
PascalStyleByteStream(
header['kdf_options']
).read_from_format_instructions_dict(
kdf_class.FORMAT_INSTRUCTIONS_DICT
kdf_class = get_kdf_options_class(header['kdf'])
kdf_options = kdf_class(
PascalStyleByteStream(
header['kdf_options']
).read_from_format_instructions_dict(
kdf_class.FORMAT_INSTRUCTIONS_DICT
)
)
)

cipher_class = get_cipher_class(header['cipher'])
cipher_class = get_cipher_class(header['cipher'])

if kdf_class == NoneKDFOptions:
passphrase = ''
elif passphrase is None:
passphrase = getpass.getpass('Key passphrase: ')
if kdf_class == NoneKDFOptions:
passphrase = ''
elif passphrase is None:
passphrase = getpass.getpass('Key passphrase: ')

if issubclass(cipher_class, ConfidentialityIntegrityCipher):
cipher_bytes += byte_stream.read_fixed_bytes(
cipher_class.TAG_LENGTH
)

decipher_bytes = cipher_class.decrypt(
kdf_class(kdf_options),
passphrase,
cipher_bytes
)

decipher_byte_stream = PascalStyleByteStream(decipher_bytes)
if issubclass(cipher_class, ConfidentialityIntegrityCipher):
cipher_bytes += byte_stream.read_fixed_bytes(
cipher_class.TAG_LENGTH
)

decipher_bytes_header = \
decipher_byte_stream.read_from_format_instructions_dict(
cls.DECIPHER_BYTES_HEADER_FORMAT_INSTRUCTIONS_DICT
decipher_bytes = cipher_class.decrypt(
kdf_class(kdf_options),
passphrase,
cipher_bytes
)

if decipher_bytes_header['check_int_1'] \
!= decipher_bytes_header['check_int_2']:
warnings.warn('Cipher header check numbers do not match')
decipher_byte_stream = PascalStyleByteStream(decipher_bytes)

initlist = []
for i in range(num_keys):
initlist.append(
PublicPrivateKeyPair(
public_key_list[i],
PrivateKey.from_byte_stream(decipher_byte_stream)
)
)
if initlist[i].public.header['key_type'] \
!= initlist[i].private.header['key_type']:
warnings.warn(
f'Inconsistency between private and public '
f'key types for key {i}'
decipher_bytes_header = \
decipher_byte_stream.read_from_format_instructions_dict(
cls.DECIPHER_BYTES_HEADER_FORMAT_INSTRUCTIONS_DICT
)
if not all(
(
initlist[i].public.params[k] ==
initlist[i].private.params[k]
) for k in (
initlist[i].public.params.keys() &
initlist[i].private.params.keys()

if decipher_bytes_header['check_int_1'] \
!= decipher_bytes_header['check_int_2']:
warnings.warn('Cipher header check numbers do not match')

initlist = []
for i in range(num_keys):
initlist.append(
PublicPrivateKeyPair(
public_key_list[i],
PrivateKey.from_byte_stream(decipher_byte_stream)
)
)
if initlist[i].public.header['key_type'] \
!= initlist[i].private.header['key_type']:
warnings.warn(
f'Inconsistency between private and public '
f'key types for key {i}'
)
if not all(
(
initlist[i].public.params[k] ==
initlist[i].private.params[k]
) for k in (
initlist[i].public.params.keys() &
initlist[i].private.params.keys()
)
):
warnings.warn(
f'Inconsistency between private and public '
f'values for key {i}'
)

decipher_padding = decipher_byte_stream.read()

if (
len(decipher_byte_stream.getvalue()) %
cipher_class.BLOCK_SIZE != 0
) or not (
bytes(
range(1, 1 + cipher_class.BLOCK_SIZE)
).startswith(decipher_padding)
):
warnings.warn(
f'Inconsistency between private and public '
f'values for key {i}'
)

decipher_padding = decipher_byte_stream.read()

if (
len(decipher_byte_stream.getvalue()) %
cipher_class.BLOCK_SIZE != 0
) or not (
bytes(
range(1, 1 + cipher_class.BLOCK_SIZE)
).startswith(decipher_padding)
):
warnings.warn('Incorrect padding at end of ciphertext')
warnings.warn('Incorrect padding at end of ciphertext')
except ValueError as e:
raise e
except EOFError as e:
raise ValueError('Premature EOF detected while parsing key.')
except e:
raise ValueError('Unexpected error condition reached.')

return cls(
initlist,
Expand Down
23 changes: 23 additions & 0 deletions tests/fuzzer/fuzz_b64.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/usr/bin/env python3.10

import atheris,sys
import sys
with atheris.instrument_imports():
#import openssh_key
from openssh_key.private_key_list import PrivateKeyList

@atheris.instrument_func
def TestOneInput(data):
key = bytes("-----BEGIN OPENSSH PRIVATE KEY-----\nopenssh-key-v1", 'utf-8') \
+ data \
+ bytes("\n-----END OPENSSH PRIVATE KEY-----\n", 'utf-8')
try:
parsed = PrivateKeyList.from_string(key, None)
except ValueError as e:
if e.args[0] == "Not an openssh private key":
pass


if __name__ == "__main__":
atheris.Setup(sys.argv, TestOneInput)
atheris.Fuzz()
27 changes: 27 additions & 0 deletions tests/fuzzer/fuzz_valid_magic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/usr/bin/env python3.10

import atheris
import sys, re
from base64 import b64encode

with atheris.instrument_imports():
#import openssh_key
from openssh_key.private_key_list import PrivateKeyList

@atheris.instrument_func
def TestOneInput(data):
headered_data = bytes("openssh-key-v1", 'utf-8')
b = bytes("-----BEGIN OPENSSH PRIVATE KEY-----\n", 'utf-8') \
+ re.sub(b"(.{70})", b"\\1\n", b64encode(headered_data), 0, re.DOTALL) \
+ bytes("\n-----END OPENSSH PRIVATE KEY-----\n", 'utf-8')
key = b.decode("utf-8")
try:
parsed = PrivateKeyList.from_string(key, None)
except ValueError as e:
if not e.args[0] == "Unexpected error condition reached.":
pass


if __name__ == "__main__":
atheris.Setup(sys.argv, TestOneInput)
atheris.Fuzz()
4 changes: 4 additions & 0 deletions tests/fuzzer/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
bcrypt@3.2.2
cffi@1.15.0
pycparser@2.21
atheris@2.0.11

0 comments on commit 274447f

Please sign in to comment.