-
Notifications
You must be signed in to change notification settings - Fork 1
/
cred_plan.py
83 lines (76 loc) · 4.08 KB
/
cred_plan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# -*- coding: utf-8 -*-
from __future__ import (unicode_literals, print_function)
from creds import constants
from creds.ssh import write_authorized_keys
from creds.user_utils import (generate_add_user_command, generate_modify_user_command, generate_delete_user_command,
compare_user, get_user_by_uid)
from creds.utils import execute_command
from external.six import six
def create_plan(existing_users=None, proposed_users=None, purge_undefined=None, protected_users=None,
allow_non_unique_id=None):
""" Determine what changes are required """
plan = list()
proposed_usernames = list()
if not purge_undefined:
purge_undefined = constants.PURGE_UNDEFINED
if not protected_users:
protected_users = constants.PROTECTED_USERS
if not allow_non_unique_id:
allow_non_unique_id = constants.ALLOW_NON_UNIQUE_ID
# Create list of modifications to make based on proposed users compared to existing users
for proposed_user in proposed_users.user_list:
proposed_usernames.append(proposed_user.name)
user_matching_name = existing_users.describe_users(users_filter=dict(name=proposed_user.name))
user_matching_id = get_user_by_uid(uid=proposed_user.uid, user_list=existing_users)
# If user does not exist
if not allow_non_unique_id and user_matching_id and not user_matching_name:
plan.append(
dict(action='fail', error='uid_clash', proposed_user=proposed_user, state='existing', result=None))
elif not user_matching_name:
plan.append(dict(action='add', proposed_user=proposed_user, state='missing', result=None))
# If they do, then compare
else:
user_comparison = compare_user(passed_user=proposed_user, user_list=existing_users)
plan.append(
dict(action='update', proposed_user=proposed_user, state='existing', user_comparison=user_comparison))
# Application of the proposed user list will not result in deletion of users that need to be removed
# If 'PURGE_UNDEFINED' then look for existing users that are not defined in proposed usernames and mark for removal
if purge_undefined:
for existing_user in existing_users.user_list:
if existing_user.name not in proposed_usernames:
if existing_user.name not in protected_users:
plan.append(dict(action='delete', username=existing_user.name, state='existing'))
return plan
# TODO: Add 'interactive' option
def execute_plan(plan=None):
""" Create, Modify or Delete, depending on plan item"""
execution_result = list()
for task in plan:
action = task['action']
if action == 'delete':
command = generate_delete_user_command(username=task.get('username'))
command_output = execute_command(command)
execution_result.append(dict(task=task, command_output=command_output))
elif action == 'add':
command = generate_add_user_command(task.get('proposed_user'))
command_output = execute_command(command)
if task['proposed_user'].public_keys:
write_authorized_keys(task['proposed_user'])
execution_result.append(dict(task=task, command_output=command_output))
elif action == 'update':
result = task['user_comparison'].get('result')
print(result)
# Don't modify user if only keys have changed
action_count = 0
for k, _ in six.iteritems(result):
if '_action' in k:
action_count += 1
command_output = None
if action_count == 1 and 'public_keys_action' in result:
write_authorized_keys(task['proposed_user'])
else:
command = generate_modify_user_command(task=task)
command_output = execute_command(command)
if result.get('public_keys_action'):
write_authorized_keys(task['proposed_user'])
execution_result.append(dict(task=task, command_output=command_output))