Skip to content

Commit

Permalink
tests: Rework pytest to use DynamicClient
Browse files Browse the repository at this point in the history
Since we no longer use python-kubernetes models in the product let's
also move to `DynamicClient` in the tests.

NOTE: Tests can likely be rewrited a bit more but this commit just move
to `DynamicClient`
  • Loading branch information
TeddyAndrieux committed Aug 27, 2021
1 parent 884b6fc commit 387924c
Show file tree
Hide file tree
Showing 19 changed files with 202 additions and 275 deletions.
74 changes: 24 additions & 50 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,17 @@ def kubeconfig(kubeconfig_data, tmp_path):
@pytest.fixture
def control_plane_ingress_ip(k8s_client):
"""Return the Control Plane Ingress IP from Kubernetes service"""
ingress_svc = k8s_client.read_namespaced_service(
ingress_svc = k8s_client.resources.get(api_version="v1", kind="Service").get(
name="ingress-nginx-control-plane-controller",
namespace="metalk8s-ingress",
)
return ingress_svc.spec.load_balancer_ip or ingress_svc.spec.external_i_ps[0]
return ingress_svc.spec.loadBalancerIP or ingress_svc.spec.externalIPs[0]


@pytest.fixture
def control_plane_ingress_ep(k8s_client, control_plane_ingress_ip):
"""Return the Control Plane Ingress Endpoint from Kubernetes service"""
ingress_svc = k8s_client.read_namespaced_service(
ingress_svc = k8s_client.resources.get(api_version="v1", kind="Service").get(
name="ingress-nginx-control-plane-controller",
namespace="metalk8s-ingress",
)
Expand All @@ -104,49 +104,23 @@ def control_plane_ingress_ep(k8s_client, control_plane_ingress_ip):


@pytest.fixture
def k8s_apiclient(kubeconfig):
"""Return an ApiClient to use for interacting with all K8s APIs."""
return kubernetes.config.new_client_from_config(
def k8s_client(request, kubeconfig):
"""Return a DynamicClient to use for interaction with all K8s APIs."""
k8s_apiclient = kubernetes.config.new_client_from_config(
config_file=kubeconfig, persist_config=False
)
return kubernetes.dynamic.DynamicClient(k8s_apiclient)


@pytest.fixture
def k8s_client(request, k8s_apiclient):
"""Parametrized fixture to instantiate a client for a single K8s API.
By default, this will return a CoreV1Api client.
One can decorate a test function to use another API, like so:
```
@pytest.mark.parametrize(
'k8s_client', ['AppsV1Api'], indirect=True
)
def test_something(k8s_client):
assert k8s_client.list_namespaced_deployment(namespace="default")
```
FIXME: this is not working as of right now, since `pytest-bdd` manipulates
fixtures in its own way through the various scenario/when/then/given
decorators.
"""
api_name = getattr(request, "param", "CoreV1Api")
api_cls = getattr(kubernetes.client, api_name, None)

if api_cls is None:
pytest.fail(
"Unknown K8s API '{}' to use with `k8s_client` fixture.".format(api_name)
)

return api_cls(api_client=k8s_apiclient)


@pytest.fixture
def admin_sa(k8s_client, k8s_apiclient):
def admin_sa(k8s_client):
"""Fixture to create a ServiceAccount which is bind to `cluster-admin`
ClusterRole and return the ServiceAccount name
"""
rbac_k8s_client = kubernetes.client.RbacAuthorizationV1Api(api_client=k8s_apiclient)
sa_k8s_client = k8s_client.resources.get(api_version="v1", kind="ServiceAccount")
crb_k8s_client = k8s_client.resources.get(
api_version="rbac.authorization.k8s.io/v1", kind="ClusterRoleBinding"
)
sa_name = "test-admin"
sa_namespace = "default"
sa_manifest = {
Expand All @@ -168,24 +142,20 @@ def admin_sa(k8s_client, k8s_apiclient):
],
}

k8s_client.create_namespaced_service_account(
body=sa_manifest, namespace=sa_namespace
)
rbac_k8s_client.create_cluster_role_binding(body=crb_manifest)
sa_k8s_client.create(body=sa_manifest, namespace=sa_namespace)
crb_k8s_client.create(body=crb_manifest)

def _check_crb_exists():
try:
rbac_k8s_client.read_cluster_role_binding(name=sa_name)
crb_k8s_client.get(name=sa_name)
except kubernetes.client.rest.ApiException as err:
if err.status == 404:
raise AssertionError("ClusterRoleBinding not yet created")
raise

def _check_sa_exists():
try:
sa_obj = k8s_client.read_namespaced_service_account(
name=sa_name, namespace=sa_namespace
)
sa_obj = sa_k8s_client.get(name=sa_name, namespace=sa_namespace)
except kubernetes.client.rest.ApiException as err:
if err.status == 404:
raise AssertionError("ServiceAccount not yet created")
Expand All @@ -195,7 +165,7 @@ def _check_sa_exists():
assert sa_obj.secrets[0].name

try:
secret_obj = k8s_client.read_namespaced_secret(
secret_obj = k8s_client.resources.get(api_version="v1", kind="Secret").get(
sa_obj.secrets[0].name, sa_namespace
)
except kubernetes.client.rest.ApiException as err:
Expand All @@ -214,14 +184,14 @@ def _check_sa_exists():
yield (sa_name, sa_namespace)

try:
rbac_k8s_client.delete_cluster_role_binding(
crb_k8s_client.delete(
name=sa_name,
body=kubernetes.client.V1DeleteOptions(propagation_policy="Foreground"),
)
except kubernetes.client.rest.ApiException:
pass

k8s_client.delete_namespaced_service_account(
sa_k8s_client.delete(
name=sa_name,
namespace=sa_namespace,
body=kubernetes.client.V1DeleteOptions(propagation_policy="Foreground"),
Expand Down Expand Up @@ -477,6 +447,10 @@ def _verify_kubeapi_service(host):

def etcdctl(k8s_client, command, ssh_config):
"""Run an etcdctl command inside the etcd container."""
# NOTE: We use Kubernetes client instead of DynamicClient as it
# ease the execution of command in a Pod
client = kubernetes.client.CoreV1Api(api_client=k8s_client.client)

name = "etcd-{}".format(utils.get_node_name("bootstrap", ssh_config))

etcd_command = [
Expand All @@ -491,7 +465,7 @@ def etcdctl(k8s_client, command, ssh_config):
"/etc/kubernetes/pki/etcd/server.crt",
] + command
output = kubernetes.stream.stream(
k8s_client.connect_get_namespaced_pod_exec,
client.connect_get_namespaced_pod_exec,
name=name,
namespace="kube-system",
command=etcd_command,
Expand Down
12 changes: 9 additions & 3 deletions tests/install/steps/test_expansion.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ def declare_node(host, ssh_config, version, k8s_client, node_type, node_name):
"""Declare the given node in Kubernetes."""
node_ip = get_node_ip(host, node_name, ssh_config)
node_manifest = get_node_manifest(node_type, version, node_ip, node_name)
k8s_client.create_node(body=node_from_manifest(node_manifest))
k8s_client.resources.get(api_version="v1", kind="Node").create(
body=node_from_manifest(node_manifest)
)


@when(parsers.parse('we deploy the node "{node_name}"'))
Expand Down Expand Up @@ -72,7 +74,7 @@ def deploy_node(host, ssh_config, version, node_name):
def check_node_is_registered(k8s_client, node_name):
"""Check if the given node is registered in Kubernetes."""
try:
k8s_client.read_node(node_name)
k8s_client.resources.get(api_version="v1", kind="Node").get(name=node_name)
except k8s.client.rest.ApiException as exn:
pytest.fail(str(exn))

Expand All @@ -83,7 +85,11 @@ def check_node_status(k8s_client, node_name, expected_status):

def _check_node_status():
try:
status = k8s_client.read_node_status(node_name).status
status = (
k8s_client.resources.get(api_version="v1", kind="Node")
.get(name=node_name)
.status
)
except k8s.client.rest.ApiException as exn:
raise AssertionError(exn)
# If really not ready, status may not have been pushed yet.
Expand Down

0 comments on commit 387924c

Please sign in to comment.