-
Notifications
You must be signed in to change notification settings - Fork 16
/
hashicorp.py
207 lines (175 loc) · 9.05 KB
/
hashicorp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
"""Secrets Provider for HashiCorp Vault."""
from django import forms
from django.conf import settings
try:
import boto3
except ImportError:
boto3 = None
try:
import hvac
except ImportError:
hvac = None
from nautobot.core.forms import BootstrapMixin
from nautobot.extras.secrets import exceptions, SecretsProvider
from .choices import HashicorpKVVersionChoices
__all__ = ("HashiCorpVaultSecretsProvider",)
K8S_TOKEN_DEFAULT_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/token" # nosec B105
AUTH_METHOD_CHOICES = ["approle", "aws", "kubernetes", "token"]
# Default mount point for the HVAC client
try:
plugins_config = settings.PLUGINS_CONFIG["nautobot_secrets_providers"]
DEFAULT_MOUNT_POINT = plugins_config["hashicorp_vault"]["default_mount_point"]
except KeyError:
DEFAULT_MOUNT_POINT = "secret"
# Default kv version for the HVAC client
try:
plugins_config = settings.PLUGINS_CONFIG["nautobot_secrets_providers"]
DEFAULT_KV_VERSION = plugins_config["hashicorp_vault"]["default_kv_version"]
except KeyError:
DEFAULT_KV_VERSION = HashicorpKVVersionChoices.KV_VERSION_2
class HashiCorpVaultSecretsProvider(SecretsProvider):
"""A secrets provider for HashiCorp Vault."""
slug = "hashicorp-vault"
name = "HashiCorp Vault"
is_available = hvac is not None
# TBD: Remove after pylint-nautobot bump
# pylint: disable-next=nb-incorrect-base-class
class ParametersForm(BootstrapMixin, forms.Form):
"""Required parameters for HashiCorp Vault."""
path = forms.CharField(
required=True,
help_text="The path to the HashiCorp Vault secret",
)
key = forms.CharField(
required=True,
help_text="The key of the HashiCorp Vault secret",
)
mount_point = forms.CharField(
required=False,
help_text=f"The path where the secret engine was mounted on (Default: <code>{DEFAULT_MOUNT_POINT}</code>)",
initial=DEFAULT_MOUNT_POINT,
)
kv_version = forms.ChoiceField(
required=False,
choices=HashicorpKVVersionChoices,
help_text=f"The version of the kv engine (either v1 or v2) (Default: <code>{DEFAULT_KV_VERSION}</code>)",
initial=DEFAULT_KV_VERSION,
)
@classmethod
def validate_vault_settings(cls, secret=None):
"""Validate the vault settings."""
# This is only required for HashiCorp Vault therefore not defined in
# `required_settings` for the plugin config.
plugin_settings = settings.PLUGINS_CONFIG["nautobot_secrets_providers"]
if "hashicorp_vault" not in plugin_settings:
raise exceptions.SecretProviderError(secret, cls, "HashiCorp Vault is not configured!")
vault_settings = plugin_settings.get("hashicorp_vault", {})
auth_method = vault_settings.get("auth_method", "token")
kv_version = vault_settings.get("kv_version", HashicorpKVVersionChoices.KV_VERSION_2)
if "url" not in vault_settings:
raise exceptions.SecretProviderError(secret, cls, "HashiCorp Vault configuration is missing a url")
if auth_method not in AUTH_METHOD_CHOICES:
raise exceptions.SecretProviderError(secret, cls, f"HashiCorp Vault Auth Method {auth_method} is invalid!")
if kv_version not in HashicorpKVVersionChoices.as_dict():
raise exceptions.SecretProviderError(secret, cls, f"HashiCorp Vault KV version {kv_version} is invalid!")
if auth_method == "aws":
if not boto3:
raise exceptions.SecretProviderError(
secret, cls, "HashiCorp Vault AWS Authentication Method requires the boto3 library!"
)
elif auth_method == "token":
if "token" not in vault_settings:
raise exceptions.SecretProviderError(
secret, cls, "HashiCorp Vault configuration is missing a token for token authentication!"
)
elif auth_method == "kubernetes":
if "role_name" not in vault_settings:
raise exceptions.SecretProviderError(
secret, cls, "HashiCorp Vault configuration is missing a role name for kubernetes authentication!"
)
elif auth_method == "approle":
if "role_id" not in vault_settings or "secret_id" not in vault_settings:
raise exceptions.SecretProviderError(
secret, cls, "HashiCorp Vault configuration is missing a role_id and/or secret_id!"
)
return vault_settings
@classmethod
def get_client(cls, secret=None):
"""Authenticate and return a hashicorp client."""
vault_settings = cls.validate_vault_settings(secret)
auth_method = vault_settings.get("auth_method", "token")
k8s_token_path = vault_settings.get("k8s_token_path", K8S_TOKEN_DEFAULT_PATH)
login_kwargs = vault_settings.get("login_kwargs", {})
# According to the docs (https://hvac.readthedocs.io/en/stable/source/hvac_v1.html?highlight=verify#hvac.v1.Client.__init__)
# the client verify parameter is either a boolean or a path to a ca certificate file to verify. This is non-intuitive
# so we use a parameter to specify the path to the ca_cert, if not provided we use the default of None
ca_cert = vault_settings.get("ca_cert", None)
namespace = vault_settings.get("namespace", None)
# Get the client and attempt to retrieve the secret.
try:
if auth_method == "token":
client = hvac.Client(
url=vault_settings["url"], token=vault_settings["token"], verify=ca_cert, namespace=namespace
)
else:
client = hvac.Client(url=vault_settings["url"], verify=ca_cert, namespace=namespace)
if auth_method == "approle":
client.auth.approle.login(
role_id=vault_settings["role_id"],
secret_id=vault_settings["secret_id"],
**login_kwargs,
)
elif auth_method == "kubernetes":
with open(k8s_token_path, "r", encoding="utf-8") as token_file:
jwt = token_file.read()
client.auth.kubernetes.login(role=vault_settings["role_name"], jwt=jwt, **login_kwargs)
elif auth_method == "aws":
session = boto3.Session()
aws_creds = session.get_credentials()
aws_region = session.region_name or "us-east-1"
client.auth.aws.iam_login(
access_key=aws_creds.access_key,
secret_key=aws_creds.secret_key,
session_token=aws_creds.token,
region=aws_region,
role=vault_settings.get("role_name", None),
**login_kwargs,
)
except hvac.exceptions.InvalidRequest as err:
raise exceptions.SecretProviderError(
secret, cls, f"HashiCorp Vault Login failed (auth_method: {auth_method}). Error: {err}"
) from err
except hvac.exceptions.Forbidden as err:
raise exceptions.SecretProviderError(
secret, cls, f"HashiCorp Vault Access Denied (auth_method: {auth_method}). Error: {err}"
) from err
return client
@classmethod
def get_value_for_secret(cls, secret, obj=None, **kwargs):
"""Return the value stored under the secret’s key in the secret’s path."""
# Try to get parameters and error out early.
parameters = secret.rendered_parameters(obj=obj)
try:
secret_path = parameters["path"]
secret_key = parameters["key"]
secret_mount_point = parameters.get("mount_point", DEFAULT_MOUNT_POINT)
secret_kv_version = parameters.get("kv_version", DEFAULT_KV_VERSION)
except KeyError as err:
msg = f"The secret parameter could not be retrieved for field {err}"
raise exceptions.SecretParametersError(secret, cls, msg) from err
client = cls.get_client(secret)
try:
if secret_kv_version == HashicorpKVVersionChoices.KV_VERSION_1:
response = client.secrets.kv.v1.read_secret(path=secret_path, mount_point=secret_mount_point)
else:
response = client.secrets.kv.v2.read_secret(path=secret_path, mount_point=secret_mount_point)
except hvac.exceptions.InvalidPath as err:
raise exceptions.SecretValueNotFoundError(secret, cls, str(err)) from err
# Retrieve the value using the key or complain loudly.
try:
if secret_kv_version == HashicorpKVVersionChoices.KV_VERSION_1:
return response["data"][secret_key]
return response["data"]["data"][secret_key]
except KeyError as err:
msg = f"The secret value could not be retrieved using key {err}"
raise exceptions.SecretValueNotFoundError(secret, cls, msg) from err