/
approve.py
executable file
·235 lines (205 loc) · 7.67 KB
/
approve.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
#!/usr/bin/env python3
import sys
import time
from argparse import ArgumentParser
from configparser import ConfigParser
from textwrap import dedent
import yaml
from celery import Celery
from Crypto.PublicKey import RSA
from ocflib.account.creation import encrypt_password
from ocflib.account.creation import NewAccountRequest
from ocflib.account.submission import get_tasks
from ocflib.account.submission import NewAccountResponse
from ocflib.account.validators import validate_password
from ocflib.constants import CREATE_PUBLIC_KEY
from ocflib.misc.shell import bold
from ocflib.misc.shell import edit_file
from ocflib.misc.shell import green
from ocflib.misc.shell import prompt_for_new_password
from ocflib.misc.shell import red
from ocflib.misc.shell import yellow
from ocflib.ucb.groups import group_by_oid
TEMPLATE = dedent(
# "\n\" is to hack around linters complaining about trailing whitespace
"""\
user_name: \n\
group_name: {group_name}
callink_oid: {callink_oid}
signatory: \n\
email: {email}
# Please ensure that:
# * Person requesting account is signatory of group
# - Look up the signatory's CalNet UID on directory.berkeley.edu
# - Use `signat <uid>` to list groups they are a signatory for
# * Group does not have existing account (use checkacct)
# * Requested account name is based on group name
#
# vim: ft=yaml
"""
)
def wait_for_task(celery, task):
"""Wait for a validate_then_create_account task."""
print('Waiting...', end='')
task.wait() # this should be almost instant
if isinstance(task.result, NewAccountResponse):
print()
return task.result
task = celery.AsyncResult(task.result)
last_status_len = 0
while not task.ready():
time.sleep(0.25)
meta = task.info
if isinstance(meta, dict) and 'status' in meta:
status = meta['status']
if len(status) > last_status_len:
for line in status[last_status_len:]:
print()
print(line, end='')
last_status_len = len(status)
print('.', end='')
sys.stdout.flush()
print()
if isinstance(task.result, Exception):
raise task.result
else:
return task.result
def main():
def_group_name = ''
def_callink_oid = ''
def_email = ''
parser = ArgumentParser(description='Create new OCF group accounts.')
parser.add_argument('oid', type=int, nargs='?', help='CalLink OID for the group.')
args = parser.parse_args()
if args.oid:
group = group_by_oid(args.oid)
if not group:
print(red('No group with OID {}').format(args.oid))
return
if group['accounts']:
print(yellow(
'Warning: there is an existing group account with OID {}: {}'.format(
args.oid,
', '.join(group['accounts']),
),
))
input('Press any key to continue...')
def_group_name = group['name']
def_callink_oid = args.oid
def_email = group['email']
content = TEMPLATE.format(
group_name=def_group_name,
callink_oid=def_callink_oid,
email=def_email
)
while True:
content = edit_file(content)
try:
account = yaml.safe_load(content)
except yaml.YAMLError as ex:
print('Error parsing your YAML:')
print(ex)
input('Press enter to continue...')
continue
missing_key = False
for key in ['user_name', 'group_name', 'callink_oid', 'email']:
if account.get(key) is None:
print('Missing value for key: ' + key)
missing_key = True
if missing_key:
input('Press enter to continue...')
continue
try:
password = prompt_for_new_password(
validator=lambda pwd: validate_password(
account['user_name'], pwd),
)
except KeyboardInterrupt:
# we want to allow cancelling during the "enter password" stage
# without completely exiting approve
print()
input('Press enter to start over (or ^C again to cancel)...')
continue
request = NewAccountRequest(
user_name=account['user_name'],
real_name=account['group_name'],
is_group=True,
calnet_uid=None,
callink_oid=account['callink_oid'],
email=account['email'],
encrypted_password=encrypt_password(
password,
RSA.importKey(CREATE_PUBLIC_KEY),
),
handle_warnings=NewAccountRequest.WARNINGS_WARN,
)
print()
print(bold('Pending account request:'))
print(dedent(
"""\
User Name: {request.user_name}
Group Name: {request.real_name}
CalLink OID: {request.callink_oid}
Email: {request.email}
"""
).format(request=request))
if input('Submit request? [yN] ') != 'y':
input('Press enter to continue.')
continue
conf = ConfigParser()
conf.read('/etc/ocf-create/ocf-create.conf')
celery = Celery(
broker=conf.get('celery', 'broker'),
backend=conf.get('celery', 'backend'),
)
tasks = get_tasks(celery)
task = tasks.validate_then_create_account.delay(request)
response = wait_for_task(celery, task)
new_request = None
if response.status == NewAccountResponse.REJECTED:
print(bold(red(
'Account requested was rejected for the following reasons:'
)))
for error in response.errors:
print(red(' - {}'.format(error)))
input('Press enter to start over (or ^C to cancel)...')
continue
elif response.status == NewAccountResponse.FLAGGED:
print(bold(yellow(
'Account requested was flagged for the following reasons:'
)))
for error in response.errors:
print(yellow(' - {}'.format(error)))
print(bold(
'You can either create the account anyway, or go back and '
'modify the request.'
))
choice = input('Create the account anyway? [yN] ')
if choice in ('y', 'Y'):
new_request = request._replace(
handle_warnings=NewAccountRequest.WARNINGS_CREATE,
)
task = tasks.validate_then_create_account.delay(new_request)
response = wait_for_task(celery, task)
else:
input('Starting over, press enter to continue...')
continue
if response.status == NewAccountResponse.CREATED:
print(bold(green('Account created!')))
print('Your account was created successfully.')
print('You\'ve been sent an email with more information.')
return
else:
# this shouldn't be possible; we must have entered some weird state
# TODO: report via ocflib
print(bold(red('Error: Entered unexpected state.')))
print(red('The request we submitted was:'))
print(red(request))
print(red('The new request we submitted (if any) was:'))
print(red(new_request))
print(red('The response we received was:'))
print(red(response))
print(bold(red('Not really sure what to do here, sorry.')))
input('Press enter to start over...')
if __name__ == '__main__':
main()