Skip to content

Commit

Permalink
Delete duplicated OPENSSH-format parsing in Ed25519Key
Browse files Browse the repository at this point in the history
The Ed25519Key class contained key parsing and decryption logic for
OPENSSH-format keys before that code became generic. Now that paramiko#1343 is
merged, this logic becomes redundant and can be removed.

This cleanup was extracted from
ploxiln#13 and the original author
@ploxiln is credited as commit author even though I (@intgr) am
submitting this pull request.

Signed-off-by: Marti Raudsepp <marti@juffo.org>
  • Loading branch information
ploxiln authored and intgr committed Dec 11, 2019
1 parent 8019623 commit acaebb4
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 105 deletions.
119 changes: 19 additions & 100 deletions paramiko/ed25519key.py
Expand Up @@ -44,6 +44,8 @@ def __init__(
):
self.public_blob = None
verifying_key = signing_key = None
pkformat = None

if msg is None and data is not None:
msg = Message(data)
if msg is not None:
Expand All @@ -54,115 +56,32 @@ def __init__(
)
verifying_key = nacl.signing.VerifyKey(msg.get_binary())
elif filename is not None:
with open(filename, "r") as f:
pkformat, data = self._read_private_key("OPENSSH", f)
pkformat, data = self._read_private_key_file(
"OPENSSH", filename, password
)
elif file_obj is not None:
pkformat, data = self._read_private_key("OPENSSH", file_obj)

pkformat, data = self._read_private_key("-", file_obj, password)
if filename or file_obj:
signing_key = self._parse_signing_key_data(data, password)
if pkformat != self._PRIVATE_KEY_FORMAT_OPENSSH:
raise SSHException("Invalid key format")
signing_key = self._parse_signing_key_data(data)

if signing_key is None and verifying_key is None:
raise ValueError("need a key")

self._signing_key = signing_key
self._verifying_key = verifying_key

def _parse_signing_key_data(self, data, password):
from paramiko.transport import Transport

# We may eventually want this to be usable for other key types, as
# OpenSSH moves to it, but for now this is just for Ed25519 keys.
# This format is described here:
# https://github.com/openssh/openssh-portable/blob/master/PROTOCOL.key
# The description isn't totally complete, and I had to refer to the
# source for a full implementation.
def _parse_signing_key_data(self, data):
message = Message(data)
if message.get_bytes(len(OPENSSH_AUTH_MAGIC)) != OPENSSH_AUTH_MAGIC:
raise SSHException("Invalid key")

ciphername = message.get_text()
kdfname = message.get_text()
kdfoptions = message.get_binary()
num_keys = message.get_int()

if kdfname == "none":
# kdfname of "none" must have an empty kdfoptions, the ciphername
# must be "none"
if kdfoptions or ciphername != "none":
raise SSHException("Invalid key")
elif kdfname == "bcrypt":
if not password:
raise PasswordRequiredException(
"Private key file is encrypted"
)
kdf = Message(kdfoptions)
bcrypt_salt = kdf.get_binary()
bcrypt_rounds = kdf.get_int()
else:
raise SSHException("Invalid key")

if ciphername != "none" and ciphername not in Transport._cipher_info:
raise SSHException("Invalid key")

public_keys = []
for _ in range(num_keys):
pubkey = Message(message.get_binary())
if pubkey.get_text() != "ssh-ed25519":
raise SSHException("Invalid key")
public_keys.append(pubkey.get_binary())

private_ciphertext = message.get_binary()
if ciphername == "none":
private_data = private_ciphertext
else:
cipher = Transport._cipher_info[ciphername]
key = bcrypt.kdf(
password=b(password),
salt=bcrypt_salt,
desired_key_bytes=cipher["key-size"] + cipher["block-size"],
rounds=bcrypt_rounds,
# We can't control how many rounds are on disk, so no sense
# warning about it.
ignore_few_rounds=True,
)
decryptor = Cipher(
cipher["class"](key[: cipher["key-size"]]),
cipher["mode"](key[cipher["key-size"] :]),
backend=default_backend(),
).decryptor()
private_data = (
decryptor.update(private_ciphertext) + decryptor.finalize()
)

message = Message(_unpad_openssh(private_data))
if message.get_int() != message.get_int():
raise SSHException("Invalid key")

signing_keys = []
for i in range(num_keys):
if message.get_text() != "ssh-ed25519":
raise SSHException("Invalid key")
# A copy of the public key, again, ignore.
public = message.get_binary()
key_data = message.get_binary()
# The second half of the key data is yet another copy of the public
# key...
signing_key = nacl.signing.SigningKey(key_data[:32])
# Verify that all the public keys are the same...
assert (
signing_key.verify_key.encode()
== public
== public_keys[i]
== key_data[32:]
)
signing_keys.append(signing_key)
# Comment, ignore.
message.get_binary()

if len(signing_keys) != 1:
raise SSHException("Invalid key")
return signing_keys[0]
public = message.get_binary()
key_data = message.get_binary()
# The second half of the key data is yet another copy of the public key...
signing_key = nacl.signing.SigningKey(key_data[:32])
# Verify that all the public keys are the same...
if not signing_key.verify_key.encode() == public == key_data[32:]:
raise SSHException("Invalid key public part mis-match")
comment = message.get_binary() # noqa: F841
return signing_key

def asbytes(self):
if self.can_sign():
Expand Down
10 changes: 5 additions & 5 deletions paramiko/pkey.py
Expand Up @@ -49,7 +49,7 @@ def _unpad_openssh(data):
# really ought to be made constant time (possibly by upstreaming this logic
# into pyca/cryptography).
padding_length = six.indexbytes(data, -1)
if 0x20 <= padding_length < 0x7f:
if 0x20 <= padding_length < 0x7F:
return data # no padding, last byte part comment (printable ascii)
if padding_length > 15:
raise SSHException("Invalid key")
Expand Down Expand Up @@ -330,12 +330,12 @@ def _read_private_key(self, tag, f, password=None):
end += 1
m = self.END_TAG.match(lines[end])

if keytype == tag:
data = self._read_private_key_pem(lines, end, password)
pkformat = self._PRIVATE_KEY_FORMAT_ORIGINAL
elif keytype == "OPENSSH":
if keytype == "OPENSSH":
data = self._read_private_key_openssh(lines[start:end], password)
pkformat = self._PRIVATE_KEY_FORMAT_OPENSSH
elif keytype == tag:
data = self._read_private_key_pem(lines, end, password)
pkformat = self._PRIVATE_KEY_FORMAT_ORIGINAL
else:
raise SSHException(
"encountered {} key, expected {} key".format(keytype, tag)
Expand Down

0 comments on commit acaebb4

Please sign in to comment.