/
utils.py
279 lines (232 loc) · 10.3 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import random
import string
import time
from kubernetes import client as k8s_config
from kubernetes.client import api_client
from kubernetes.client.apis import core_v1_api
from kubernetes.client.rest import ApiException
from rally.common import cfg
from rally.common import utils as common_utils
from rally import exceptions
from rally.task import atomic
from rally.task import utils
from rally_openstack import scenario
CONF = cfg.CONF
class MagnumScenario(scenario.OpenStackScenario):
"""Base class for Magnum scenarios with basic atomic actions."""
@atomic.action_timer("magnum.list_cluster_templates")
def _list_cluster_templates(self, **kwargs):
"""Return list of cluster_templates.
:param limit: (Optional) The maximum number of results to return
per request, if:
1) limit > 0, the maximum number of cluster_templates to return.
2) limit param is NOT specified (None), the number of items
returned respect the maximum imposed by the Magnum API
(see Magnum's api.max_limit option).
:param kwargs: Optional additional arguments for cluster_templates
listing
:returns: cluster_templates list
"""
return self.clients("magnum").cluster_templates.list(**kwargs)
@atomic.action_timer("magnum.create_cluster_template")
def _create_cluster_template(self, **kwargs):
"""Create a cluster_template
:param kwargs: optional additional arguments for cluster_template
creation
:returns: magnum cluster_template
"""
kwargs["name"] = self.generate_random_name()
return self.clients("magnum").cluster_templates.create(**kwargs)
@atomic.action_timer("magnum.get_cluster_template")
def _get_cluster_template(self, cluster_template):
"""Return details of the specify cluster template.
:param cluster_template: ID or name of the cluster template to show
:returns: clustertemplate detail
"""
return self.clients("magnum").cluster_templates.get(cluster_template)
@atomic.action_timer("magnum.list_clusters")
def _list_clusters(self, limit=None, **kwargs):
"""Return list of clusters.
:param limit: Optional, the maximum number of results to return
per request, if:
1) limit > 0, the maximum number of clusters to return.
2) limit param is NOT specified (None), the number of items
returned respect the maximum imposed by the Magnum API
(see Magnum's api.max_limit option).
:param kwargs: Optional additional arguments for clusters listing
:returns: clusters list
"""
return self.clients("magnum").clusters.list(limit=limit, **kwargs)
@atomic.action_timer("magnum.create_cluster")
def _create_cluster(self, cluster_template, node_count, **kwargs):
"""Create a cluster
:param cluster_template: cluster_template for the cluster
:param node_count: the cluster node count
:param kwargs: optional additional arguments for cluster creation
:returns: magnum cluster
"""
name = self.generate_random_name()
cluster = self.clients("magnum").clusters.create(
name=name, cluster_template_id=cluster_template,
node_count=node_count, **kwargs)
common_utils.interruptable_sleep(
CONF.openstack.magnum_cluster_create_prepoll_delay)
cluster = utils.wait_for_status(
cluster,
ready_statuses=["CREATE_COMPLETE"],
failure_statuses=["CREATE_FAILED", "ERROR"],
update_resource=utils.get_from_manager(),
timeout=CONF.openstack.magnum_cluster_create_timeout,
check_interval=CONF.openstack.magnum_cluster_create_poll_interval,
id_attr="uuid"
)
return cluster
@atomic.action_timer("magnum.get_cluster")
def _get_cluster(self, cluster):
"""Return details of the specify cluster.
:param cluster: ID or name of the cluster to show
:returns: cluster detail
"""
return self.clients("magnum").clusters.get(cluster)
@atomic.action_timer("magnum.get_ca_certificate")
def _get_ca_certificate(self, cluster_uuid):
"""Get CA certificate for this cluster
:param cluster_uuid: uuid of the cluster
"""
return self.clients("magnum").certificates.get(cluster_uuid)
@atomic.action_timer("magnum.create_ca_certificate")
def _create_ca_certificate(self, csr_req):
"""Send csr to Magnum to have it signed
:param csr_req: {"cluster_uuid": <uuid>, "csr": <csr file content>}
"""
return self.clients("magnum").certificates.create(**csr_req)
def _get_k8s_api_client(self):
cluster_uuid = self.context["tenant"]["cluster"]
cluster = self._get_cluster(cluster_uuid)
cluster_template = self._get_cluster_template(
cluster.cluster_template_id)
key_file = None
cert_file = None
ca_certs = None
if not cluster_template.tls_disabled:
dir = self.context["ca_certs_directory"]
key_file = cluster_uuid + ".key"
key_file = os.path.join(dir, key_file)
cert_file = cluster_uuid + ".crt"
cert_file = os.path.join(dir, cert_file)
ca_certs = cluster_uuid + "_ca.crt"
ca_certs = os.path.join(dir, ca_certs)
if hasattr(k8s_config, "ConfigurationObject"):
# k8sclient < 4.0.0
config = k8s_config.ConfigurationObject()
else:
config = k8s_config.Configuration()
config.host = cluster.api_address
config.ssl_ca_cert = ca_certs
config.cert_file = cert_file
config.key_file = key_file
if hasattr(k8s_config, "ConfigurationObject"):
# k8sclient < 4.0.0
client = api_client.ApiClient(config=config)
else:
client = api_client.ApiClient(config)
return core_v1_api.CoreV1Api(client)
@atomic.action_timer("magnum.k8s_list_v1pods")
def _list_v1pods(self):
"""List all pods.
"""
k8s_api = self._get_k8s_api_client()
return k8s_api.list_node(namespace="default")
@atomic.action_timer("magnum.k8s_create_v1pod")
def _create_v1pod(self, manifest):
"""Create a pod on the specify cluster.
:param manifest: manifest use to create the pod
"""
k8s_api = self._get_k8s_api_client()
podname = manifest["metadata"]["name"] + "-"
for i in range(5):
podname = podname + random.choice(string.ascii_lowercase)
manifest["metadata"]["name"] = podname
for i in range(150):
try:
k8s_api.create_namespaced_pod(body=manifest,
namespace="default")
break
except ApiException as e:
if e.status != 403:
raise
time.sleep(2)
start = time.time()
while True:
resp = k8s_api.read_namespaced_pod(
name=podname, namespace="default")
if resp.status.conditions:
for condition in resp.status.conditions:
if condition.type.lower() == "ready" and \
condition.status.lower() == "true":
return resp
if (time.time() - start > CONF.openstack.k8s_pod_create_timeout):
raise exceptions.TimeoutException(
desired_status="Ready",
resource_name=podname,
resource_type="Pod",
resource_id=resp.metadata.uid,
resource_status=resp.status,
timeout=CONF.openstack.k8s_pod_create_timeout)
common_utils.interruptable_sleep(
CONF.openstack.k8s_pod_create_poll_interval)
@atomic.action_timer("magnum.k8s_list_v1rcs")
def _list_v1rcs(self):
"""List all rcs.
"""
k8s_api = self._get_k8s_api_client()
return k8s_api.list_namespaced_replication_controller(
namespace="default")
@atomic.action_timer("magnum.k8s_create_v1rc")
def _create_v1rc(self, manifest):
"""Create rc on the specify cluster.
:param manifest: manifest use to create the replication controller
"""
k8s_api = self._get_k8s_api_client()
suffix = "-"
for i in range(5):
suffix = suffix + random.choice(string.ascii_lowercase)
rcname = manifest["metadata"]["name"] + suffix
manifest["metadata"]["name"] = rcname
resp = k8s_api.create_namespaced_replication_controller(
body=manifest,
namespace="default")
expectd_status = resp.spec.replicas
start = time.time()
while True:
resp = k8s_api.read_namespaced_replication_controller(
name=rcname,
namespace="default")
status = resp.status.replicas
if status == expectd_status:
return resp
else:
if time.time() - start > CONF.openstack.k8s_rc_create_timeout:
raise exceptions.TimeoutException(
desired_status=expectd_status,
resource_name=rcname,
resource_type="ReplicationController",
resource_id=resp.metadata.uid,
resource_status=status,
timeout=CONF.openstack.k8s_rc_create_timeout)
common_utils.interruptable_sleep(
CONF.openstack.k8s_rc_create_poll_interval)