-
Notifications
You must be signed in to change notification settings - Fork 4
/
app.py
144 lines (117 loc) · 5 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
"""
Generate an SSH certificate using AAD information
"""
#pylint: disable=import-error,no-init,no-self-use,invalid-name,missing-docstring,too-few-public-methods
import base64
import json
from pprint import pformat as pf
import os
from msrest.exceptions import AuthenticationError
import web
from azure_ad import get_groups, get_user_name, get_graph_token
from azure_keyvault import get_vault_client, get_signing_pubkey, get_signing_privkey
import cert
from cert.request import SSHCSR, CSR_SCHEMA
TENANT_ID = os.environ['WEBSITE_AUTH_OPENID_ISSUER'].split('/', 4)[3]
CLIENT_ID = os.environ['WEBSITE_AUTH_CLIENT_ID']
CLIENT_SECRET = os.environ['WEBSITE_AUTH_CLIENT_SECRET']
AZURE_KEYVAULT_URL = os.environ['APPSETTING_AZURE_KEYVAULT_URL']
URLS = (
'/pubkey', 'CAKeyFile',
'/cert', 'SSHCertGenerator'
)
WSGI_APP = web.application(URLS, globals()).wsgifunc()
class CAKeyFile(object):
def GET(self):
""" Retrieve the signing pubkey out of the vault, and return an OpenSSH CA pubkey file """
try:
vault_client = get_vault_client(CLIENT_ID, CLIENT_SECRET, TENANT_ID)
pubkey_numbers = get_signing_pubkey(
vault_client,
"{}/secrets/signing-pubkey".format(AZURE_KEYVAULT_URL)
).public_numbers()
except AuthenticationError as err:
raise web.HTTPError(
"401 Unauthorized",
data="could not get vault client: {}".format(err)
)
except RuntimeError as err:
raise web.HTTPError(
'503 Service Unavailable',
data="could not get signing key: {}".format(err)
)
# Yes, this is redundant (since we just store the openssh-encoded public key as a vault
# secret directly), but it illustrates how to operate when we instead call get_signing_jwk
# to get values from a Key Vault key.
keyfile = cert.SSHPublicKeyFile("ssh-rsa")
try:
keyfile.e = pubkey_numbers.e
keyfile.n = pubkey_numbers.n
except TypeError:
raise web.HTTPError(
"500 Internal Error",
data="could not decode key from {}".format(pf(keyfile.__dict__))
)
encoded_pubkey = base64.b64encode(keyfile.build_keyfile())
web.header('Content-Type', 'text/plain; charset=UTF-8')
return "{} {} {}".format(
'ssh-rsa',
encoded_pubkey,
"ALTAR OpenSSH CA"
)
class SSHCertGenerator(object):
def GET(self):
web.header('Content-Type', 'application/json')
return CSR_SCHEMA
def POST(self):
"""
Accepts a cert.request.CSR_PROTOTYPE-alike JSON object, validates its signature,
constructs a new certificate, and signs it.
"""
try:
permitted_group = os.environ['APPSETTING_PERMITTED_GROUP']
except KeyError:
raise web.HTTPError('503 Service Unavailable', data="no groups permitted")
graph_bearer_token = get_graph_token(
CLIENT_ID,
TENANT_ID,
web.ctx.env.get('HTTP_X_MS_TOKEN_AAD_REFRESH_TOKEN')
)
try:
user_name = get_user_name(graph_bearer_token)
groups = get_groups(TENANT_ID, graph_bearer_token)
if permitted_group not in groups:
raise web.HTTPError('403 Forbidden')
csr = cert.SSHCSR.load(json.loads(web.data()))
except RuntimeError as err:
raise web.HTTPError('503 Service Unavailable', data=str(err))
except ValueError as err:
raise web.HTTPError('400 Bad Request', data=err)
if not csr.verify():
raise web.HTTPError('403 Forbidden', data="bad request signature")
vault_client = get_vault_client(CLIENT_ID, CLIENT_SECRET, TENANT_ID)
signing_key = get_signing_privkey(
vault_client,
"{}/secrets/signing-key".format(AZURE_KEYVAULT_URL)
)
pubkey_numbers = signing_key.public_key().public_numbers()
signing_pubkey = cert.SSHPublicKeyFile("ssh-rsa")
signing_pubkey.e = pubkey_numbers.e
signing_pubkey.n = pubkey_numbers.n
user_cert = cert.SSHCertificate(csr.certificate_format)
for number, value in csr.public_key.__dict__.items():
user_cert.__setattr__(number, value)
user_cert.type = SSHCSR.CSR_CERTIFICATE_TYPES[csr.certificate_type]
user_cert.key_id = "{}_{}".format(csr.certificate_type, csr.principal)
user_cert.valid_principals = [user_name]
user_cert.critical_options = csr.critical_options
user_cert.extensions = csr.extensions
user_cert.signature_key = signing_pubkey.build_keyfile()
user_cert.sign(signing_key)
return "{} {} {}".format(
user_cert.certificate_format,
base64.b64encode(user_cert.build_certificate()),
user_name
)
if __name__ == "__main__":
web.application(URLS, globals()).run() #pylint: disable=no-member