Skip to content

Commit

Permalink
Sub pycrypto with cryptography in simple_crypto
Browse files Browse the repository at this point in the history
pycrypto is no longer maintained [1]. This patch rewrites functions
using pycrypto and replaces them with the cryptography equivalent for
the simple_crypto plugin

[1] http://lists.openstack.org/pipermail/openstack-dev/2017-March/113568

Change-Id: I72b7148d9d863468dc71353c50f854557b6c87e4
  • Loading branch information
Rohan Arora committed May 1, 2017
1 parent 42a2ec1 commit 64d3da6
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 85 deletions.
98 changes: 54 additions & 44 deletions barbican/plugin/crypto/simple_crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
# limitations under the License.
import os

from Crypto.PublicKey import DSA
from Crypto.PublicKey import RSA
from Crypto.Util import asn1
from cryptography import fernet
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
from oslo_config import cfg
from oslo_utils import encodeutils
import six

from barbican.common import config
Expand Down Expand Up @@ -121,36 +123,50 @@ def generate_asymmetric(self, generate_dto, kek_meta_dto, project_id):
- RSA, with passphrase (supported)
- RSA, without passphrase (supported)
- DSA, without passphrase (supported)
- DSA, with passphrase (not supported)
Note: PyCrypto is not capable of serializing DSA
keys and DER formatted keys. Such keys will be
serialized to Base64 PEM to store in DB.
TODO (atiwari/reaperhulk): PyCrypto is not capable to serialize
DSA keys and DER formatted keys, later we need to pick better
crypto lib.
- DSA, with passphrase (supported)
"""
if(generate_dto.algorithm is None or generate_dto
.algorithm.lower() == 'rsa'):
private_key = RSA.generate(
generate_dto.bit_length, None, None, 65537)
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=generate_dto.bit_length,
backend=default_backend()
)
elif generate_dto.algorithm.lower() == 'dsa':
private_key = DSA.generate(generate_dto.bit_length, None, None)
private_key = dsa.generate_private_key(
key_size=generate_dto.bit_length,
backend=default_backend()
)
else:
raise c.CryptoPrivateKeyFailureException()

public_key = private_key.publickey()
public_key = private_key.public_key()

# Note (atiwari): key wrapping format PEM only supported
if generate_dto.algorithm.lower() == 'rsa':
public_key, private_key = self._wrap_key(public_key, private_key,
generate_dto.passphrase)
private_key = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=self._get_encryption_algorithm(
generate_dto.passphrase)
)

public_key = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)

if generate_dto.algorithm.lower() == 'dsa':
if generate_dto.passphrase:
raise ValueError(u._('Passphrase not supported for DSA key'))
public_key, private_key = self._serialize_dsa_key(public_key,
private_key)
private_key = private_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=self._get_encryption_algorithm(
generate_dto.passphrase)
)
public_key = public_key.public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)

private_dto = self.encrypt(c.EncryptDTO(private_key),
kek_meta_dto,
project_id)
Expand Down Expand Up @@ -186,29 +202,23 @@ def supports(self, type_enum, algorithm=None, bit_length=None,
else:
return False

def _wrap_key(self, public_key, private_key,
passphrase):
pkcs = 8
key_wrap_format = 'PEM'

private_key = private_key.exportKey(key_wrap_format, passphrase, pkcs)
public_key = public_key.exportKey(key_wrap_format)

return public_key, private_key

def _serialize_dsa_key(self, public_key, private_key):
def _get_encryption_algorithm(self, passphrase):
"""Choose whether to use encryption or not based on passphrase
pub_seq = asn1.DerSequence()
pub_seq[:] = [0, public_key.p, public_key.q,
public_key.g, public_key.y]
public_key = pub_seq.encode()

prv_seq = asn1.DerSequence()
prv_seq[:] = [0, private_key.p, private_key.q,
private_key.g, private_key.y, private_key.x]
private_key = prv_seq.encode()
serialization.BestAvailableEncryption fails if passphrase is not
given or if less than one byte therefore we need to check if it is
valid or not
"""
if passphrase:
# encryption requires password in bytes format
algorithm = serialization.BestAvailableEncryption(
# default encoding is utf-8
encodeutils.safe_encode(passphrase)
)
else:
algorithm = serialization.NoEncryption()

return public_key, private_key
return algorithm

def _is_algorithm_supported(self, algorithm=None, bit_length=None):
"""check if algorithm and bit_length combination is supported."""
Expand Down
161 changes: 120 additions & 41 deletions barbican/tests/plugin/crypto/test_crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@

import os

from Crypto.PublicKey import DSA
from Crypto.PublicKey import RSA
from Crypto.Util import asn1
from cryptography import fernet
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import mock
import six

Expand Down Expand Up @@ -246,31 +245,6 @@ def test_supports_asymmetric_key_generation(self):
"RSA", 64)
)

def test_generate_512_bit_RSA_key(self):
generate_dto = plugin.GenerateDTO('rsa', 512, None, None)
kek_meta_dto = self._get_mocked_kek_meta_dto()
self.assertRaises(ValueError,
self.plugin.generate_asymmetric,
generate_dto,
kek_meta_dto,
mock.MagicMock())

def test_generate_2048_bit_DSA_key(self):
generate_dto = plugin.GenerateDTO('dsa', 2048, None, None)
kek_meta_dto = self._get_mocked_kek_meta_dto()
self.assertRaises(ValueError, self.plugin.generate_asymmetric,
generate_dto,
kek_meta_dto,
mock.MagicMock())

def test_generate_1024_bit_DSA_key_with_passphrase(self):
generate_dto = plugin.GenerateDTO('dsa', 1024, None, 'Passphrase')
kek_meta_dto = self._get_mocked_kek_meta_dto()
self.assertRaises(ValueError, self.plugin.generate_asymmetric,
generate_dto,
kek_meta_dto,
mock.MagicMock())

def test_generate_asymmetric_1024_bit_key(self):
generate_dto = plugin.GenerateDTO('rsa', 1024, None, None)
kek_meta_dto = self._get_mocked_kek_meta_dto()
Expand All @@ -290,13 +264,35 @@ def test_generate_asymmetric_1024_bit_key(self):
public_dto.kek_meta_extended,
mock.MagicMock())

public_dto = RSA.importKey(public_dto)
private_dto = RSA.importKey(private_dto)
self.assertEqual(1023, public_dto.size())
self.assertEqual(1023, private_dto.size())
self.assertTrue(private_dto.has_private)
# check we can reload the private and public keys
private_key = serialization.load_pem_private_key(
data=private_dto,
password=None,
backend=default_backend()
)

public_key = serialization.load_pem_public_key(
data=public_dto,
backend=default_backend()
)

self.assertEqual(1024, private_key.key_size)
self.assertEqual(1024, public_key.key_size)

public_key = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.PKCS1
)

def test_generate_1024_bit_RSA_key_in_pem(self):
# get the public key from the private key we recovered to compare
recovered_key = private_key.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.PKCS1
)

self.assertTrue(public_key == recovered_key)

def test_generate_1024_bit_RSA_key_with_passphrase(self):
generate_dto = plugin.GenerateDTO('rsa', 1024, None, 'changeme')
kek_meta_dto = self._get_mocked_kek_meta_dto()

Expand All @@ -305,14 +301,96 @@ def test_generate_1024_bit_RSA_key_in_pem(self):
kek_meta_dto,
mock.MagicMock()
)

decrypt_dto = plugin.DecryptDTO(private_dto.cypher_text)
private_dto = self.plugin.decrypt(decrypt_dto,
kek_meta_dto,
private_dto.kek_meta_extended,
mock.MagicMock())

private_dto = RSA.importKey(private_dto, 'changeme')
self.assertTrue(private_dto.has_private())
decrypt_dto = plugin.DecryptDTO(public_dto.cypher_text)
public_dto = self.plugin.decrypt(decrypt_dto,
kek_meta_dto,
public_dto.kek_meta_extended,
mock.MagicMock())

# check we can reload the private and public keys
private_key = serialization.load_pem_private_key(
data=private_dto,
password='changeme'.encode(),
backend=default_backend()
)

public_key = serialization.load_pem_public_key(
data=public_dto,
backend=default_backend()
)

self.assertEqual(1024, private_key.key_size)
self.assertEqual(1024, public_key.key_size)

public_key = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.PKCS1
)

# get the public key from the private key we recovered to compare
recovered_key = private_key.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.PKCS1
)

self.assertTrue(public_key == recovered_key)

def test_generate_1024_bit_DSA_key_with_passphrase(self):
generate_dto = plugin.GenerateDTO('dsa', 1024, None, 'changeme')
kek_meta_dto = self._get_mocked_kek_meta_dto()

private_dto, public_dto, passwd_dto = self.plugin.generate_asymmetric(
generate_dto,
kek_meta_dto,
mock.MagicMock()
)

decrypt_dto = plugin.DecryptDTO(private_dto.cypher_text)
private_dto = self.plugin.decrypt(decrypt_dto,
kek_meta_dto,
private_dto.kek_meta_extended,
mock.MagicMock())

decrypt_dto = plugin.DecryptDTO(public_dto.cypher_text)
public_dto = self.plugin.decrypt(decrypt_dto,
kek_meta_dto,
public_dto.kek_meta_extended,
mock.MagicMock())

# check we can reload the private and public keys
private_key = serialization.load_der_private_key(
data=private_dto,
password='changeme'.encode(),
backend=default_backend()
)

public_key = serialization.load_der_public_key(
data=public_dto,
backend=default_backend()
)

self.assertEqual(1024, private_key.key_size)
self.assertEqual(1024, public_key.key_size)

public_key = public_key.public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)

# get the public key from the private key we recovered to compare
recovered_key = private_key.public_key().public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)

self.assertTrue(public_key == recovered_key)

def test_generate_1024_DSA_key_in_pem_and_reconstruct_key_der(self):
generate_dto = plugin.GenerateDTO('dsa', 1024, None, None)
Expand All @@ -330,12 +408,13 @@ def test_generate_1024_DSA_key_in_pem_and_reconstruct_key_der(self):
private_dto.kek_meta_extended,
mock.MagicMock())

prv_seq = asn1.DerSequence()
prv_seq.decode(private_dto)
p, q, g, y, x = prv_seq[1:]
private_key = serialization.load_der_private_key(
data=private_dto,
password=None,
backend=default_backend()
)

private_dto = DSA.construct((y, g, p, q, x))
self.assertTrue(private_dto.has_private())
self.assertEqual(1024, private_key.key_size)

def test_generate_128_bit_hmac_key(self):
secret = models.Secret()
Expand Down

0 comments on commit 64d3da6

Please sign in to comment.