forked from influxdata/influxdb-client-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathauthorizations_api.py
134 lines (102 loc) · 4.59 KB
/
authorizations_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
"""Authorization is about managing the security of your InfluxDB instance."""
from influxdb_client import Authorization, AuthorizationsService, User, Organization
class AuthorizationsApi(object):
"""Implementation for '/api/v2/authorizations' endpoint."""
def __init__(self, influxdb_client):
"""Initialize defaults."""
self._influxdb_client = influxdb_client
self._authorizations_service = AuthorizationsService(influxdb_client.api_client)
def create_authorization(self, org_id=None, permissions: list = None,
authorization: Authorization = None) -> Authorization:
"""
Create an authorization.
:type permissions: list of Permission
:param org_id: organization id
:param permissions: list of permissions
:type authorization: authorization object
"""
if authorization is not None:
return self._authorizations_service.post_authorizations(authorization_post_request=authorization)
# if org_id is not None and permissions is not None:
authorization = Authorization(org_id=org_id, permissions=permissions)
return self._authorizations_service.post_authorizations(authorization_post_request=authorization)
def find_authorization_by_id(self, auth_id: str) -> Authorization:
"""
Find authorization by id.
:param auth_id: authorization id
:return: Authorization
"""
return self._authorizations_service.get_authorizations_id(auth_id=auth_id)
def find_authorizations(self, **kwargs):
"""
Get a list of all authorizations.
:key str user_id: filter authorizations belonging to a user id
:key str user: filter authorizations belonging to a user name
:key str org_id: filter authorizations belonging to a org id
:key str org: filter authorizations belonging to a org name
:return: Authorizations
"""
authorizations = self._authorizations_service.get_authorizations(**kwargs)
return authorizations.authorizations
def find_authorizations_by_user(self, user: User):
"""
Find authorization by User.
:return: Authorization list
"""
return self.find_authorizations(user_id=user.id)
def find_authorizations_by_user_id(self, user_id: str):
"""
Find authorization by user id.
:return: Authorization list
"""
return self.find_authorizations(user_id=user_id)
def find_authorizations_by_user_name(self, user_name: str):
"""
Find authorization by user name.
:return: Authorization list
"""
return self.find_authorizations(user=user_name)
def find_authorizations_by_org(self, org: Organization):
"""
Find authorization by user name.
:return: Authorization list
"""
if isinstance(org, Organization):
return self.find_authorizations(org_id=org.id)
def find_authorizations_by_org_name(self, org_name: str):
"""
Find authorization by org name.
:return: Authorization list
"""
return self.find_authorizations(org=org_name)
def find_authorizations_by_org_id(self, org_id: str):
"""
Find authorization by org id.
:return: Authorization list
"""
return self.find_authorizations(org_id=org_id)
def update_authorization(self, auth):
"""
Update authorization object.
:param auth:
:return:
"""
return self._authorizations_service.patch_authorizations_id(auth_id=auth.id, authorization_update_request=auth)
def clone_authorization(self, auth) -> Authorization:
"""Clone an authorization."""
if isinstance(auth, Authorization):
cloned = Authorization(org_id=auth.org_id, permissions=auth.permissions)
# cloned.description = auth.description
# cloned.status = auth.status
return self.create_authorization(authorization=cloned)
if isinstance(auth, str):
authorization = self.find_authorization_by_id(auth)
return self.clone_authorization(auth=authorization)
raise ValueError("Invalid argument")
def delete_authorization(self, auth):
"""Delete a authorization."""
if isinstance(auth, Authorization):
return self._authorizations_service.delete_authorizations_id(auth_id=auth.id)
if isinstance(auth, str):
return self._authorizations_service.delete_authorizations_id(auth_id=auth)
raise ValueError("Invalid argument")