Skip to content

Commit

Permalink
Adds store_secret_supports to secret_store
Browse files Browse the repository at this point in the history
This adds a new method, store_secret_supports, to the SecretStore interface.
Given a KeySpec, this method will allow the plugin manager to check if a plugin
supports storing the secret data.

For example, a plugin that only supports storing secrets if the secret
attributes are defined and valid will check the attributes are not None, but
some plugins may be able to store any secret as a blob whether or not the
attributes are defined and thus will define this method to always return True.

Implements: blueprint create-secret-store
(https://blueprints.launchpad.net/barbican/+spec/create-secret-store)
Change-Id: I5f1b196f36d02587cd35aac7cdfa1c152743aaa6
  • Loading branch information
kaitersgonnakait committed Aug 7, 2014
1 parent 248c7d6 commit ef79bc5
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 33 deletions.
8 changes: 8 additions & 0 deletions barbican/plugin/dogtag.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,14 @@ def generate_supports(self, key_spec):
"""
return self._map_algorithm(key_spec.alg) is not None

def store_secret_supports(self, key_spec):
"""Key storage supported?
Specifies whether the plugin supports storage of the secret given
the attributes included in the KeySpec
"""
return True

@staticmethod
def _map_algorithm(algorithm):
"""Map Barbican algorithms to Dogtag plugin algorithms.
Expand Down
38 changes: 34 additions & 4 deletions barbican/plugin/interface/secret_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,23 @@ class KeyAlgorithm(object):
RSA = "rsa"
"""Constant for the Elliptic Curve algorithm."""
EC = "ec"
"""List of asymmetric algorithms"""
ASYMMETRIC_ALGORITHMS = [DIFFIE_HELLMAN, DSA, RSA, EC]

"""Constant for the AES algorithm."""
AES = "aes"
"""Constant for the DES algorithm."""
DES = "des"
"""Constant for the DESede (triple-DES) algorithm."""
DESEDE = "desede"
"""List of symmetric algorithms"""
SYMMETRIC_ALGORITHMS = [AES, DES, DESEDE]

def get_secret_type(self, alg):
if alg in self.SYMMETRIC_ALGORITHMS:
return SecretType.SYMMETRIC
else: # TODO(kaitlin-farr) add asymmetric once it's supported
return None


class KeySpec(object):
Expand Down Expand Up @@ -345,6 +355,20 @@ def delete_secret(self, secret_metadata):
"""
raise NotImplementedError # pragma: no cover

@abc.abstractmethod
def store_secret_supports(self, key_spec):
"""Returns a boolean indicating if the secret can be stored.
Checks if the secret store can store the secret, give the attributes
of the secret in the KeySpec. For example, some plugins may need to
know the attributes in order to store the secret, but other plugins
may be able to store the secret as a blob if no attributes are given.
:param key_spec: KeySpec for the secret
:returns: a boolean indicating if the secret can be stored
"""
raise NotImplementedError # pragma: no cover

def get_transport_key(self):
"""Gets a transport key.
Expand Down Expand Up @@ -382,9 +406,11 @@ def __init__(self, conf=CONF, invoke_on_load=True,
invoke_kwds=invoke_kwargs
)

def get_plugin_store(self, plugin_name=None, transport_key_needed=False):
def get_plugin_store(self, key_spec, plugin_name=None,
transport_key_needed=False):
"""Gets a secret store plugin.
:param: plugin_name: set to plugin_name to get specific plugin
:param: key_spec: KeySpec of key that will be stored
:param: transport_key_needed: set to True if a transport
key is required.
:returns: SecretStoreBase plugin implementation
Expand All @@ -400,10 +426,14 @@ def get_plugin_store(self, plugin_name=None, transport_key_needed=False):
raise SecretStoreSupportedPluginNotFound()

if not transport_key_needed:
return self.extensions[0].obj
for ext in self.extensions:
if ext.obj.store_secret_supports(key_spec):
return ext.obj

for ext in self.extensions:
if ext.obj.get_transport_key() is not None:
else:
for ext in self.extensions:
if ext.obj.get_transport_key() is not None and\
ext.obj.store_secret_supports(key_spec):
return ext.obj

raise SecretStoreSupportedPluginNotFound()
Expand Down
21 changes: 13 additions & 8 deletions barbican/plugin/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
from barbican.plugin.util import translations as tr


def get_transport_key_model(repos, transport_key_needed):
def get_transport_key_model(key_spec, repos, transport_key_needed):
key_model = None
if transport_key_needed:
# get_plugin_store() will throw an exception if no suitable
# plugin with transport key is found
store_plugin = secret_store.SecretStorePluginManager(). \
get_plugin_store(transport_key_needed=True)
get_plugin_store(key_spec=key_spec, transport_key_needed=True)
plugin_name = utils.generate_fullname_for(store_plugin)

key_repo = repos.transport_key_repo
Expand Down Expand Up @@ -72,11 +72,18 @@ def store_secret(unencrypted_raw, content_type_raw, content_encoding,
elif _secret_already_has_stored_data(secret_model):
raise ValueError('Secret already has encrypted data stored for it.')

# Create a KeySpec to find a plugin that will support storing the secret
key_spec = secret_store.KeySpec(alg=spec.get('algorithm'),
bit_length=spec.get('bit_length'),
mode=spec.get('mode'))

# If there is no secret data to store, then just create Secret entity and
# leave. A subsequent call to this method should provide both the Secret
# entity created here *and* the secret data to store into it.
if not unencrypted_raw:
key_model = get_transport_key_model(repos, transport_key_needed)
key_model = get_transport_key_model(key_spec,
repos,
transport_key_needed)

_save_secret(secret_model, tenant_model, repos)
return secret_model, key_model
Expand All @@ -86,7 +93,7 @@ def store_secret(unencrypted_raw, content_type_raw, content_encoding,

# Locate a suitable plugin to store the secret.
store_plugin = secret_store.SecretStorePluginManager().\
get_plugin_store(plugin_name=plugin_name)
get_plugin_store(key_spec=key_spec, plugin_name=plugin_name)

# Normalize inputs prior to storage.
#TODO(john-wood-w) Normalize all secrets to base64, so we don't have to
Expand All @@ -101,10 +108,8 @@ def store_secret(unencrypted_raw, content_type_raw, content_encoding,
context = secret_store.SecretStoreContext(secret_model=secret_model,
tenant_model=tenant_model,
repos=repos)
key_spec = secret_store.KeySpec(alg=spec.get('algorithm'),
bit_length=spec.get('bit_length'),
mode=spec.get('mode'))
secret_dto = secret_store.SecretDTO(type=None,
secret_type = secret_store.KeyAlgorithm().get_secret_type(key_spec.alg)
secret_dto = secret_store.SecretDTO(type=secret_type,
secret=unencrypted,
key_spec=key_spec,
content_type=content_type,
Expand Down
8 changes: 8 additions & 0 deletions barbican/plugin/store_crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,14 @@ def generate_supports(self, key_spec):
"""
return key_spec and sstore.KeyAlgorithm.AES == key_spec.alg.lower()

def store_secret_supports(self, key_spec):
"""Key storage supported?
Specifies whether the plugin supports storage of the secret given
the attributes included in the KeySpec
"""
return True

def _find_or_create_kek_objects(self, plugin_inst, tenant_model, kek_repo):
# Find or create a key encryption key.
full_plugin_name = utils.generate_fullname_for(plugin_inst)
Expand Down
90 changes: 69 additions & 21 deletions barbican/tests/plugin/interface/test_secret_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
class TestSecretStore(str.SecretStoreBase):
"""Secret store plugin for testing support."""

def __init__(self, generate_supports_response):
def __init__(self, supported_alg_list):
super(TestSecretStore, self).__init__()
self.generate_supports_response = generate_supports_response
self.alg_list = supported_alg_list

def generate_symmetric_key(self, key_spec):
raise NotImplementedError # pragma: no cover
Expand All @@ -39,21 +39,24 @@ def get_secret(self, secret_metadata):
raise NotImplementedError # pragma: no cover

def generate_supports(self, key_spec):
return self.generate_supports_response
return key_spec.alg in self.alg_list

def delete_secret(self, secret_metadata):
raise NotImplementedError # pragma: no cover

def store_secret_supports(self, key_spec):
return key_spec.alg in self.alg_list


class TestSecretStoreWithTransportKey(str.SecretStoreBase):
"""Secret store plugin for testing support.
This plugin will override the relevant methods for key wrapping.
"""

def __init__(self, generate_supports_response):
def __init__(self, supported_alg_list):
super(TestSecretStoreWithTransportKey, self).__init__()
self.generate_supports_response = generate_supports_response
self.alg_list = supported_alg_list

def generate_symmetric_key(self, key_spec):
raise NotImplementedError # pragma: no cover
Expand All @@ -68,11 +71,14 @@ def get_secret(self, secret_metadata):
raise NotImplementedError # pragma: no cover

def generate_supports(self, key_spec):
return self.generate_supports_response
return key_spec.alg in self.alg_list

def delete_secret(self, secret_metadata):
raise NotImplementedError # pragma: no cover

def store_secret_supports(self, key_spec):
return key_spec.alg in self.alg_list

def get_transport_key(self):
return "transport key"

Expand All @@ -87,65 +93,107 @@ def setUp(self):
self.manager = str.SecretStorePluginManager()

def test_get_store_supported_plugin(self):
plugin = TestSecretStore(True)
plugin = TestSecretStore([str.KeyAlgorithm.AES])
plugin_mock = mock.MagicMock(obj=plugin)
self.manager.extensions = [plugin_mock]
keySpec = str.KeySpec(str.KeyAlgorithm.AES, 128)

self.assertEqual(plugin,
self.manager.get_plugin_store())
self.manager.get_plugin_store(keySpec))

def test_get_generate_supported_plugin(self):
plugin = TestSecretStore(True)
plugin = TestSecretStore([str.KeyAlgorithm.AES])
plugin_mock = mock.MagicMock(obj=plugin)
self.manager.extensions = [plugin_mock]
keySpec = str.KeySpec('AES', 128)
keySpec = str.KeySpec(str.KeyAlgorithm.AES, 128)

self.assertEqual(plugin,
self.manager.get_plugin_generate(keySpec))

def test_get_store_no_plugin_found(self):
self.manager.extensions = []
keySpec = str.KeySpec(str.KeyAlgorithm.AES, 128)
self.assertRaises(
str.SecretStorePluginNotFound,
self.manager.get_plugin_store,
keySpec,
)

def test_get_generate_no_plugin_found(self):
self.manager.extensions = []
keySpec = str.KeySpec('AES', 128)
keySpec = str.KeySpec(str.KeyAlgorithm.AES, 128)
self.assertRaises(
str.SecretStorePluginNotFound,
self.manager.get_plugin_generate,
keySpec,
)

def test_get_store_no_supported_plugin(self):
plugin = TestSecretStore([])
plugin_mock = mock.MagicMock(obj=plugin)
self.manager.extensions = [plugin_mock]
keySpec = str.KeySpec(str.KeyAlgorithm.AES, 128)
self.assertRaises(
str.SecretStoreSupportedPluginNotFound,
self.manager.get_plugin_store,
keySpec,
)

def test_get_generate_no_supported_plugin(self):
plugin = TestSecretStore(False)
plugin = TestSecretStore([])
plugin_mock = mock.MagicMock(obj=plugin)
self.manager.extensions = [plugin_mock]
keySpec = str.KeySpec('AES', 128)
keySpec = str.KeySpec(str.KeyAlgorithm.AES, 128)
self.assertRaises(
str.SecretStoreSupportedPluginNotFound,
self.manager.get_plugin_generate,
keySpec,
)

def test_get_store_no_plugin_with_tkey(self):
plugin = TestSecretStore(False)
def test_get_store_no_plugin_with_tkey_and_no_supports_storage(self):
plugin = TestSecretStore([])
plugin_mock = mock.MagicMock(obj=plugin)
self.manager.extensions = [plugin_mock]
keySpec = str.KeySpec(str.KeyAlgorithm.AES, 128)
self.assertRaises(
str.SecretStoreSupportedPluginNotFound,
self.manager.get_plugin_store,
key_spec=keySpec,
transport_key_needed=True,
)

def test_get_store_plugin_with_tkey_and_no_supports_storage(self):
plugin = TestSecretStoreWithTransportKey([])
plugin_mock = mock.MagicMock(obj=plugin)
self.manager.extensions = [plugin_mock]
keySpec = str.KeySpec(str.KeyAlgorithm.AES, 128)
self.assertRaises(
str.SecretStoreSupportedPluginNotFound,
self.manager.get_plugin_store,
key_spec=keySpec,
transport_key_needed=True,
)

def test_get_store_plugin_with_no_tkey_and_supports_storage(self):
plugin = TestSecretStore([str.KeyAlgorithm.AES])
plugin_mock = mock.MagicMock(obj=plugin)
self.manager.extensions = [plugin_mock]
keySpec = str.KeySpec(str.KeyAlgorithm.AES, 128)
self.assertRaises(
str.SecretStoreSupportedPluginNotFound,
self.manager.get_plugin_store,
key_spec=keySpec,
transport_key_needed=True,
)

def test_get_store_with_tkey(self):
plugin1 = TestSecretStore(False)
def test_get_store_plugin_with_tkey_and_supports_storage(self):
plugin1 = TestSecretStore([str.KeyAlgorithm.AES])
plugin1_mock = mock.MagicMock(obj=plugin1)
plugin2 = TestSecretStoreWithTransportKey(False)
plugin2 = TestSecretStoreWithTransportKey([str.KeyAlgorithm.AES])
plugin2_mock = mock.MagicMock(obj=plugin2)
self.manager.extensions = [plugin1_mock, plugin2_mock]
self.assertEqual(
plugin2,
self.manager.get_plugin_store(transport_key_needed=True))
keySpec = str.KeySpec(str.KeyAlgorithm.AES, 128)
self.assertEqual(plugin2,
self.manager.get_plugin_store(
key_spec=keySpec,
transport_key_needed=True))

0 comments on commit ef79bc5

Please sign in to comment.