/
clusters.py
114 lines (94 loc) · 3.69 KB
/
clusters.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
from typing import Any, cast, Dict, Optional, Union
from gitlab import exceptions as exc
from gitlab.base import RequiredOptional, RESTManager, RESTObject
from gitlab.mixins import CreateMixin, CRUDMixin, ObjectDeleteMixin, SaveMixin
__all__ = [
"GroupCluster",
"GroupClusterManager",
"ProjectCluster",
"ProjectClusterManager",
]
class GroupCluster(SaveMixin, ObjectDeleteMixin, RESTObject):
pass
class GroupClusterManager(CRUDMixin, RESTManager):
_path = "/groups/{group_id}/clusters"
_obj_cls = GroupCluster
_from_parent_attrs = {"group_id": "id"}
_create_attrs = RequiredOptional(
required=("name", "platform_kubernetes_attributes"),
optional=("domain", "enabled", "managed", "environment_scope"),
)
_update_attrs = RequiredOptional(
optional=(
"name",
"domain",
"management_project_id",
"platform_kubernetes_attributes",
"environment_scope",
),
)
@exc.on_http_error(exc.GitlabStopError)
def create(
self, data: Optional[Dict[str, Any]] = None, **kwargs: Any
) -> GroupCluster:
"""Create a new object.
Args:
data (dict): Parameters to send to the server to create the
resource
**kwargs: Extra options to send to the server (e.g. sudo or
'ref_name', 'stage', 'name', 'all')
Raises:
GitlabAuthenticationError: If authentication is not correct
GitlabCreateError: If the server cannot perform the request
Returns:
RESTObject: A new instance of the manage object class build with
the data sent by the server
"""
path = f"{self.path}/user"
return cast(GroupCluster, CreateMixin.create(self, data, path=path, **kwargs))
def get(
self, id: Union[str, int], lazy: bool = False, **kwargs: Any
) -> GroupCluster:
return cast(GroupCluster, super().get(id=id, lazy=lazy, **kwargs))
class ProjectCluster(SaveMixin, ObjectDeleteMixin, RESTObject):
pass
class ProjectClusterManager(CRUDMixin, RESTManager):
_path = "/projects/{project_id}/clusters"
_obj_cls = ProjectCluster
_from_parent_attrs = {"project_id": "id"}
_create_attrs = RequiredOptional(
required=("name", "platform_kubernetes_attributes"),
optional=("domain", "enabled", "managed", "environment_scope"),
)
_update_attrs = RequiredOptional(
optional=(
"name",
"domain",
"management_project_id",
"platform_kubernetes_attributes",
"environment_scope",
),
)
@exc.on_http_error(exc.GitlabStopError)
def create(
self, data: Optional[Dict[str, Any]] = None, **kwargs: Any
) -> ProjectCluster:
"""Create a new object.
Args:
data (dict): Parameters to send to the server to create the
resource
**kwargs: Extra options to send to the server (e.g. sudo or
'ref_name', 'stage', 'name', 'all')
Raises:
GitlabAuthenticationError: If authentication is not correct
GitlabCreateError: If the server cannot perform the request
Returns:
RESTObject: A new instance of the manage object class build with
the data sent by the server
"""
path = f"{self.path}/user"
return cast(ProjectCluster, CreateMixin.create(self, data, path=path, **kwargs))
def get(
self, id: Union[str, int], lazy: bool = False, **kwargs: Any
) -> ProjectCluster:
return cast(ProjectCluster, super().get(id=id, lazy=lazy, **kwargs))