Skip to content

Commit

Permalink
tests: create ducktape class Cloudv2Cluster to help create clusters
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewhsu committed May 23, 2023
1 parent 2db8930 commit 9feac11
Showing 1 changed file with 144 additions and 108 deletions.
252 changes: 144 additions & 108 deletions tests/rptest/services/redpanda.py
Original file line number Diff line number Diff line change
Expand Up @@ -1131,116 +1131,101 @@ def set_cluster_config(self, values: dict, timeout: int = 300):
self._helm.upgrade_config_cluster(values)


class RedpandaServiceCloudv2(RedpandaServiceK8s):
GLOBAL_CLOUDV2_OAUTH_URL = 'cloudv2_oauth_url'
GLOBAL_CLOUDV2_OAUTH_CLIENT_ID = 'cloudv2_oauth_client_id'
GLOBAL_CLOUDV2_OAUTH_CLIENT_SECRET = 'cloudv2_oauth_client_secret'
GLOBAL_CLOUDV2_OAUTH_AUDIENCE = 'cloudv2_oauth_audience'
GLOBAL_CLOUDV2_CLUSTER_ID = 'cloudv2_cluster_id'
GLOBAL_TELEPORT_TARGET = 'teleport_target'

def __init__(self, context, num_brokers):
super(RedpandaServiceCloudv2,
self).__init__(context, None, cluster_spec=ClusterSpec.empty())
self.logger.info(
f'num_brokers is {num_brokers}, but setting to None for cloudv2')

self._trim_logs = False

self._cloudv2_cluster_name = f'rp-ducktape-{uuid.uuid1()}'

self._cloudv2_oauth_url = context.globals.get(
self.GLOBAL_CLOUDV2_OAUTH_URL, None)
self._cloudv2_oauth_client_id = context.globals.get(
self.GLOBAL_CLOUDV2_OAUTH_CLIENT_ID, None)
self._cloudv2_oauth_client_secret = context.globals.get(
self.GLOBAL_CLOUDV2_OAUTH_CLIENT_SECRET, None)
self._cloudv2_oauth_audience = context.globals.get(
self.GLOBAL_CLOUDV2_OAUTH_AUDIENCE, None)
self._cloudv2_cluster_id = context.globals.get(
self.GLOBAL_CLOUDV2_CLUSTER_ID, None)
if self._cloudv2_cluster_id is not None:
self.logger.debug(
f'using existing cluster_id: {self._cloudv2_cluster_id}')

target = context.globals.get(self.GLOBAL_TELEPORT_TARGET, None)
self._kubectl = KubectlTool(self,
cmd_prefix=['tsh', 'ssh', target],
cluster_id=self._cloudv2_cluster_id)
class Cloudv2Cluster():
def __init__(self,
logger,
oauth_url,
oauth_client_id,
oauth_client_secret,
oauth_audience,
api_url,
namespace_uuid=None):
self._logger = logger
self._oauth_url = oauth_url
self._oauth_client_id = oauth_client_id
self._oauth_client_secret = oauth_client_secret
self._oauth_audience = oauth_audience
self._api_url = api_url
self._namespace_uuid = namespace_uuid

def _get_token(self):
headers = {'Content-Type': "application/x-www-form-urlencoded"}
data = {
'grant_type': 'client_credentials',
'client_id': f'{self._oauth_client_id}',
'client_secret': f'{self._oauth_client_secret}',
'audience': f'{self._oauth_audience}'
}
resp = requests.post(f'{self._oauth_url}', headers=headers, data=data)
resp.raise_for_status()
j = resp.json()
return j['access_token']

def start_node(self, node, **kwargs):
pass
def _http_get(self, url, **kwargs):
token = self._get_token()
headers = {
'Authorization': f'Bearer {token}',
'Accept': 'application/json'
}
resp = requests.get(url, headers=headers, **kwargs)
resp.raise_for_status()
return resp.json()

def _get_cluster_id(token, namespaceUuid, name):
params = {'namespaceUuid': namespaceUuid}
def _http_post(self, url, **kwargs):
token = self._get_token()
headers = {
'Authorization': f'Bearer {token}',
'Accept': 'application/json'
}
response = requests.get(f'{self.__cloudv2_api_url}/api/v1/clusters',
params=params,
headers=headers)
response.raise_for_status()
clusters = response.json()
resp = requests.post(url, headers=headers, **kwargs)
resp.raise_for_status()
return resp.json()

def _create_namespace(self):
if self._namespace_uuid is None:
u = str(uuid.uuid1())
name = f'rp-ducktape-ns-{u[:8]}' # e.g. rp-ducktape-ns-3b36f516
self._logger.debug(f'creating cloudv2 namespace name {name}')
body = {'name': name}
r = self._http_post(f'{self._api_url}/api/v1/namespaces',
json=body)
self._namespace_uuid = r['id']
self._logger.debug(
f'created cloudv2 namespace id {self._namespace_uuid}')

def _get_cluster_id(name):
params = {'namespaceUuid': self._namespaceUuid}
clusters = self._http_get(f'{self._api_url}/api/v1/clusters',
params=params)
for c in clusters:
if c['name'] == name:
return c['id']
return None

@staticmethod
def _cluster_ready(token, namespaceUuid, name):
logger.debug(f'checking readiness of cloudv2 cluster {name}')
params = {'namespaceUuid': namespaceUuid}
headers = {
'Authorization': f'Bearer {token}',
'Accept': 'application/json'
}
response = requests.get(f'{self.__cloudv2_api_url}/api/v1/clusters',
params=params,
headers=headers)
response.raise_for_status()
clusters = response.json()
def _cluster_ready(self, name):
self._logger.debug(f'checking readiness of cloudv2 cluster {name}')
params = {'namespaceUuid': self._namespace_uuid}
clusters = self._http_get(f'{self._api_url}/api/v1/clusters',
params=params)
for c in clusters:
if c['name'] == name:
if c['state'] == 'ready':
return True
return False

def _get_oauth_token():
def create(self, name):
"""
Returns token from oauth server.
Create a cloudv2 cluster and block until cluster is finished creating.
Returns the clusterId.
"""

headers = {'Content-Type': "application/x-www-form-urlencoded"}
data = {
'grant_type': 'client_credentials',
'client_id': self._cloudv2_oauth_client_id,
'client_secret': self._cloudv2_oauth_client_secret,
'audience': self._cloudv2_oauth_audience
}
r = requests.post(f'{self._cloudv2_oauth_url}/oauth/token',
headers=headers,
data=data)
r.raise_for_status()
j = r.json()
return j['access_token']

def _create_cluster(token):
"""
Creates a cloudv2 cluster but does not wait for it to be ready,
which could take several minutes.
"""

headers = {
'Authorization': f'Bearer {token}',
'Accept': 'application/json'
}
self._create_namespace()

body = {
"namespaceUuid": self._cloudv2_namespace_uuid,
"namespaceUuid": self._namespace_uuid,
"connectionType": "public",
"network": {
"displayName": f"public-network-{self._cloudv2_cluster_name}",
"displayName": f"public-network-{name}",
"spec": {
"deploymentType": "FMC",
"provider": "AWS",
Expand All @@ -1251,7 +1236,7 @@ def _create_cluster(token):
}
},
"cluster": {
"name": self._cloudv2_cluster_name,
"name": name,
"productId": "cgrdrd9jiflmsknn2nl0",
"spec": {
"clusterType": "FMC",
Expand All @@ -1268,30 +1253,81 @@ def _create_cluster(token):
}
}

r = requests.post(
f'{self._cloudv2_api_url}/api/v1/workflows/network-cluster',
json=body,
headers=headers)
r.raise_for_status()
r = self._http_post(
f'{self._api_url}/api/v1/workflows/network-cluster', json=body)

def start(self, **kwargs):
if self._cloudv2_cluster_id is None:
token = self._get_oauth_token()
self._create_cluster(token)
self._logger.info(
f'waiting for creation of cluster {name} namespaceUuid {r["namespaceUuid"]}'
)

# periodically check if cluster is ready to be used
wait_until(
lambda: RedpandaServiceCloudv2._cluster_ready(
token, self._cloudv2_namespace_uuid, self.
_cloudv2_cluster_name),
timeout_sec=3600,
backoff_sec=5.0,
err_msg=
f'Unable to deterimine readiness of cloudv2 cluster {name}')
token = self._get_token()
wait_until(
lambda: self._cluster_ready(name),
timeout_sec=3600,
backoff_sec=5.0,
err_msg=f'Unable to deterimine readiness of cloudv2 cluster {name}'
)
cluster_id = self._get_cluster_id(name)

return cluster_id

def delete(self, cluster_id):
"""
Deletes a cloudv2 cluster and the namespace it belongs to.
"""
pass


class RedpandaServiceCloudv2(RedpandaServiceK8s):
GLOBAL_CLOUDV2_OAUTH_URL = 'cloudv2_oauth_url'
GLOBAL_CLOUDV2_OAUTH_CLIENT_ID = 'cloudv2_oauth_client_id'
GLOBAL_CLOUDV2_OAUTH_CLIENT_SECRET = 'cloudv2_oauth_client_secret'
GLOBAL_CLOUDV2_OAUTH_AUDIENCE = 'cloudv2_oauth_audience'
GLOBAL_CLOUDV2_API_URL = 'cloudv2_api_url'
GLOBAL_CLOUDV2_CLUSTER_ID = 'cloudv2_cluster_id'
GLOBAL_TELEPORT_TARGET = 'teleport_target'

def __init__(self, context, num_brokers):
super(RedpandaServiceCloudv2,
self).__init__(context, None, cluster_spec=ClusterSpec.empty())
self.logger.info(
f'num_brokers is {num_brokers}, but setting to None for cloudv2')

self._trim_logs = False

self._cloudv2_cluster_id = self._get_cloudv2_cluster_id(
token, self._cloudv2_namespace_uuid,
self._cloudv2_cluster_name)
self._cloudv2_oauth_url = context.globals.get(
self.GLOBAL_CLOUDV2_OAUTH_URL, None)
self._cloudv2_oauth_client_id = context.globals.get(
self.GLOBAL_CLOUDV2_OAUTH_CLIENT_ID, None)
self._cloudv2_oauth_client_secret = context.globals.get(
self.GLOBAL_CLOUDV2_OAUTH_CLIENT_SECRET, None)
self._cloudv2_oauth_audience = context.globals.get(
self.GLOBAL_CLOUDV2_OAUTH_AUDIENCE, None)
self._cloudv2_api_url = context.globals.get(
self.GLOBAL_CLOUDV2_API_URL, None)
self._cloudv2_cluster_id = context.globals.get(
self.GLOBAL_CLOUDV2_CLUSTER_ID, None)
if self._cloudv2_cluster_id is not None:
self.logger.debug(
f'using existing cluster_id: {self._cloudv2_cluster_id}')

target = context.globals.get(self.GLOBAL_TELEPORT_TARGET, None)
self._cloudv2_cluster = Cloudv2Cluster(
self.logger, self._cloudv2_oauth_url,
self._cloudv2_oauth_client_id, self._cloudv2_oauth_client_secret,
self._cloudv2_oauth_audience, self._cloudv2_api_url)
self._kubectl = KubectlTool(self,
cmd_prefix=['tsh', 'ssh', target],
cluster_id=self._cloudv2_cluster_id)

def start_node(self, node, **kwargs):
pass

def start(self, **kwargs):
if self._cloudv2_cluster_id is None:
u = str(uuid.uuid1())
name = f'rp-ducktape-cluster-{u[:8]}' # e.g. rp-ducktape-cluster-9c1e57ca
self._cloudv2_cluster_id = self._cloudv2_cluster.create(name)

def stop_node(self, node, **kwargs):
"""
Expand Down

0 comments on commit 9feac11

Please sign in to comment.