diff --git a/tests/interop/conftest.py b/tests/interop/conftest.py index 2631a087..fb301d57 100644 --- a/tests/interop/conftest.py +++ b/tests/interop/conftest.py @@ -1,51 +1,2 @@ -import os - -import pytest -from kubernetes import config -from kubernetes.client import Configuration -from openshift.dynamic import DynamicClient - -from . import __loggername__ -from .css_logger import CSS_Logger - - -def pytest_addoption(parser): - parser.addoption( - "--kubeconfig", - action="store", - default=None, - help="The full path to the kubeconfig file to be used", - ) - - -@pytest.fixture(scope="session") -def get_kubeconfig(request): - if request.config.getoption("--kubeconfig"): - k8config = request.config.getoption("--kubeconfig") - elif "KUBECONFIG" in os.environ.keys() and os.environ["KUBECONFIG"]: - k8config = os.environ["KUBECONFIG"] - else: - raise ValueError( - "A kubeconfig file was not provided. Please provide one either " - "via the --kubeconfig command option or by setting a KUBECONFIG " - "environment variable" - ) - return k8config - - -@pytest.fixture(scope="session") -def kube_config(get_kubeconfig): - kc = Configuration - config.load_kube_config(config_file=get_kubeconfig, client_configuration=kc) - return kc - - -@pytest.fixture(scope="session") -def openshift_dyn_client(get_kubeconfig): - return DynamicClient(client=config.new_client_from_config(get_kubeconfig)) - - -@pytest.fixture(scope="session", autouse=True) -def setup_logger(): - logger = CSS_Logger(__loggername__) - return logger +from validatedpatterns_tests.interop.conftest_logger import * # noqa: F401, F403 +from validatedpatterns_tests.interop.conftest_openshift import * # noqa: F401, F403 diff --git a/tests/interop/crd.py b/tests/interop/crd.py deleted file mode 100644 index 8a433c5c..00000000 --- a/tests/interop/crd.py +++ /dev/null @@ -1,55 +0,0 @@ -from ocp_resources.resource import NamespacedResource, Resource - - -class ArgoCD(NamespacedResource): - """ - OpenShift ArgoCD / GitOps object. - """ - - api_group = "argoproj.io" - api_version = NamespacedResource.ApiVersion.V1ALPHA1 - kind = "Application" - - @property - def health(self): - """ - Check the health of of the argocd application - :return: boolean - """ - - if ( - self.instance.status.operationState.phase == "Succeeded" - and self.instance.status.health.status == "Healthy" - ): - return True - return False - - -class ManagedCluster(Resource): - """ - OpenShift Managed Cluster object. - """ - - api_version = "cluster.open-cluster-management.io/v1" - - @property - def self_registered(self): - """ - Check if managed cluster is self registered in to ACM running on hub site - :param name: (str) name of managed cluster - :param namespace: namespace - :return: Tuple of boolean and dict on success - """ - is_joined = False - status = dict() - - for condition in self.instance.status.conditions: - if condition["type"] == "HubAcceptedManagedCluster": - status["HubAcceptedManagedCluster"] = condition["status"] - elif condition["type"] == "ManagedClusterConditionAvailable": - status["ManagedClusterConditionAvailable"] = condition["status"] - elif condition["type"] == "ManagedClusterJoined": - is_joined = True - status["ManagedClusterJoined"] = condition["status"] - - return is_joined, status diff --git a/tests/interop/css_logger.py b/tests/interop/css_logger.py deleted file mode 100644 index 37d54cb3..00000000 --- a/tests/interop/css_logger.py +++ /dev/null @@ -1,57 +0,0 @@ -import logging -import os -from datetime import datetime -from logging.handlers import RotatingFileHandler - -if os.getenv("EXTERNAL_TEST") == "true": - LOG_DIR = os.path.join(os.environ["WORKSPACE"], ".results/test_execution_logs") -else: - LOG_DIR = os.path.join( - os.environ["WORKSPACE"], ".teflo/.results/test_execution_logs" - ) -if not os.path.exists(LOG_DIR): - os.makedirs(LOG_DIR, exist_ok=True) - - -class CSS_Logger(object): - _logger = None - - def __new__(cls, *args, **kwargs): - if cls._logger is None: - cls._logger = super(CSS_Logger, cls).__new__(cls) - # Put any initialization here. - cls._logger = logging.getLogger(args[0]) - cls._logger.setLevel(logging.DEBUG) - - pytest_current_test = os.environ.get("PYTEST_CURRENT_TEST") - split_test_name = pytest_current_test.split("::")[1] - short_test_name = split_test_name.split(" ")[0] - - datestring = datetime.now().strftime("%Y_%m_%d_%H_%M_%S") - filename = "{}_{}.log".format(short_test_name, datestring) - filepath = os.path.join(LOG_DIR, filename) - - # Create a file handler for logging level above DEBUG - file_handler = RotatingFileHandler( - filepath, maxBytes=1024 * 1024 * 1024, backupCount=20 - ) - - # Create a logging format - log_formatter = logging.Formatter( - "%(asctime)s " - "[%(levelname)s] " - "%(module)s:%(lineno)d " - "%(message)s" - ) - file_handler.setFormatter(log_formatter) - - # Create a stream handler for logging level above INFO - stream_handler = logging.StreamHandler() - stream_handler.setLevel(logging.INFO) - stream_handler.setFormatter(log_formatter) - - # Add the handlers to the logger - cls._logger.addHandler(file_handler) - cls._logger.addHandler(stream_handler) - - return cls._logger diff --git a/tests/interop/edge_util.py b/tests/interop/edge_util.py deleted file mode 100644 index 45bcb8ff..00000000 --- a/tests/interop/edge_util.py +++ /dev/null @@ -1,147 +0,0 @@ -import base64 -import fileinput -import logging -import os -import subprocess - -import requests -import yaml -from ocp_resources.secret import Secret -from requests import HTTPError, RequestException -from urllib3.exceptions import InsecureRequestWarning, ProtocolError - -from . import __loggername__ - -logger = logging.getLogger(__loggername__) - - -def load_yaml_file(file_path): - """ - Load and parse the yaml file - :param file_path: (str) file path - :return: (dict) yaml_config_obj in the form of Python dict - """ - yaml_config_obj = None - with open(file_path, "r") as yfh: - try: - yaml_config_obj = yaml.load(yfh, Loader=yaml.FullLoader) - except Exception as ex: - raise yaml.YAMLError("YAML Syntax Error:\n %s" % ex) - logger.info("Yaml Config : %s", yaml_config_obj) - return yaml_config_obj - - -def find_number_of_edge_sites(dir_path): - """ - Find the number of edge (managed cluster) sites folder - :param dir_path: (dtr) dir path where edge site manifest resides - :return: (list) site_names - """ - site_names = list() - list_of_dirs = os.listdir(path=dir_path) - - for site_dir in list_of_dirs: - if "staging" in site_dir: - site_names.append(site_dir) - - return site_names - - -def get_long_live_bearer_token( - dyn_client, namespace="default", sub_string="default-token" -): - """ - Get bearer token from secrets to authorize openshift cluster - :param sub_string: (str) substring of secrets name to find actual secret name since openshift append random - 5 ascii digit at the end of every secret name - :param namespace: (string) name of namespace where secret exist - :return: (string) secret token for specified secret - """ - filtered_secrets = [] - try: - for secret in Secret.get(dyn_client=dyn_client, namespace=namespace): - if sub_string in secret.instance.metadata.name: - filtered_secrets.append(secret.instance.data.token) - except StopIteration as e: - logger.exception( - "Specified substring %s doesn't exist in namespace %s: %s", - sub_string, - namespace, - e, - ) - except ProtocolError as e: - # See https://github.com/kubernetes-client/python/issues/1225 - logger.info( - "Skip %s... because kubelet disconnect client after default 10m...", e - ) - - # All secret tokens in openshift are base64 encoded. - # Decode base64 string into byte and convert byte to str - if len(filtered_secrets) > 0: - bearer_token = base64.b64decode(filtered_secrets[-1]).decode() - return bearer_token - else: - return None - - -def get_site_response(site_url, bearer_token): - """ - - :param site_url: (str) Site API end point - :param bearer_token: (str) bearer token - :return: (dict) site_response - """ - site_response = None - headers = {"Authorization": "Bearer " + bearer_token} - - try: - # Suppress only the single warning from urllib3 needed. - requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) - site_response = requests.get(site_url, headers=headers, verify=False) - except (ConnectionError, HTTPError, RequestException) as e: - logger.exception( - "Failed to connect %s due to refused connection or unsuccessful status code %s", - site_url, - e, - ) - logger.debug("Site Response %s: ", site_response) - - return site_response - - -def execute_shell_command_local(cmd): - """ - Executes a shell command in a subprocess, wait until it has completed. - :param cmd: Command to execute. - """ - proc = subprocess.Popen( - cmd, - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - universal_newlines=True, - ) - (out, error) = proc.communicate() - exit_code = proc.wait() - return exit_code, out, error - - -def modify_file_content(file_name): - with open(file_name, "r") as frb: - logger.debug(f"Current content : {frb.readlines()}") - - with fileinput.FileInput(file_name, inplace=True, backup=".bak") as file: - for line in file: - print( - line.replace( - 'SENSOR_TEMPERATURE_ENABLED: "false"', - 'SENSOR_TEMPERATURE_ENABLED: "true"', - ), - end="", - ) - - with open(file_name, "r") as fra: - contents = fra.readlines() - logger.debug(f"Modified content : {contents}") - - return contents diff --git a/tests/interop/test_subscription_status_edge.py b/tests/interop/test_subscription_status_edge.py deleted file mode 100644 index 8d2ef630..00000000 --- a/tests/interop/test_subscription_status_edge.py +++ /dev/null @@ -1,94 +0,0 @@ -import logging - -import pytest -from ocp_resources.cluster_version import ClusterVersion -from ocp_resources.subscription import Subscription - -from . import __loggername__ - -logger = logging.getLogger(__loggername__) - - -@pytest.mark.subscription_status_edge -def test_subscription_status_edge(openshift_dyn_client): - # These are the operator subscriptions and their associated namespaces - expected_subs = { - "openshift-gitops-operator": ["openshift-operators"], - } - - operator_versions = [] - missing_subs = [] - unhealthy_subs = [] - missing_installplans = [] - upgrades_pending = [] - - for key in expected_subs.keys(): - for val in expected_subs[key]: - try: - subs = Subscription.get( - dyn_client=openshift_dyn_client, name=key, namespace=val - ) - sub = next(subs) - except StopIteration: - missing_subs.append(f"{key} in {val} namespace") - continue - - logger.info( - f"State for {sub.instance.metadata.name}: {sub.instance.status.state}" - ) - if sub.instance.status.state == "UpgradePending": - upgrades_pending.append( - f"{sub.instance.metadata.name} in {sub.instance.metadata.namespace} namespace" - ) - - logger.info( - f"CatalogSourcesUnhealthy: {sub.instance.status.conditions[0].status}" - ) - if sub.instance.status.conditions[0].status != "False": - logger.info(f"Subscription {sub.instance.metadata.name} is unhealthy") - unhealthy_subs.append(sub.instance.metadata.name) - else: - operator_versions.append( - f"installedCSV: {sub.instance.status.installedCSV}" - ) - - logger.info(f"installPlanRef: {sub.instance.status.installPlanRef}") - if not sub.instance.status.installPlanRef: - logger.info( - f"No install plan found for subscription {sub.instance.metadata.name} " - f"in {sub.instance.metadata.namespace} namespace" - ) - missing_installplans.append( - f"{sub.instance.metadata.name} in {sub.instance.metadata.namespace} namespace" - ) - - logger.info("") - - if missing_subs: - logger.error(f"FAIL: The following subscriptions are missing: {missing_subs}") - if unhealthy_subs: - logger.error( - f"FAIL: The following subscriptions are unhealthy: {unhealthy_subs}" - ) - if missing_installplans: - logger.error( - f"FAIL: The install plan for the following subscriptions is missing: {missing_installplans}" - ) - if upgrades_pending: - logger.error( - f"FAIL: The following subscriptions are in UpgradePending state: {upgrades_pending}" - ) - - for line in operator_versions: - logger.info(line) - - versions = ClusterVersion.get(dyn_client=openshift_dyn_client) - version = next(versions) - logger.info(f"Openshift version:\n{version.instance.status.history}") - - if missing_subs or unhealthy_subs or missing_installplans or upgrades_pending: - err_msg = "Subscription status check failed" - logger.error(f"FAIL: {err_msg}") - assert False, err_msg - else: - logger.info("PASS: Subscription status check passed") diff --git a/tests/interop/test_subscription_status_hub.py b/tests/interop/test_subscription_status_hub.py index 1c53d0df..cc3306c9 100644 --- a/tests/interop/test_subscription_status_hub.py +++ b/tests/interop/test_subscription_status_hub.py @@ -1,13 +1,7 @@ -import difflib import logging -import os -import re -import subprocess import pytest -from ocp_resources.cluster_version import ClusterVersion -from ocp_resources.subscription import Subscription -from openshift.dynamic.exceptions import NotFoundError +from validatedpatterns_tests.interop import subscription from . import __loggername__ @@ -19,147 +13,17 @@ def test_subscription_status_hub(openshift_dyn_client): # These are the operator subscriptions and their associated namespaces expected_subs = { "openshift-gitops-operator": ["openshift-operators"], - "advanced-cluster-management": ["open-cluster-management"], - "multicluster-engine": ["multicluster-engine"], + "prometheus": ["llm-monitoring"], + "grafana-operator": ["llm-monitoring"], + "nfd": ["openshift-nfd"], + "gpu-operator-certified": ["nvidia-gpu-operator"], } - operator_versions = [] - missing_subs = [] - unhealthy_subs = [] - missing_installplans = [] - upgrades_pending = [] - - for key in expected_subs.keys(): - for val in expected_subs[key]: - try: - subs = Subscription.get( - dyn_client=openshift_dyn_client, name=key, namespace=val - ) - sub = next(subs) - except NotFoundError: - missing_subs.append(f"{key} in {val} namespace") - continue - - logger.info( - f"State for {sub.instance.metadata.name}: {sub.instance.status.state}" - ) - if sub.instance.status.state == "UpgradePending": - upgrades_pending.append( - f"{sub.instance.metadata.name} in {sub.instance.metadata.namespace} namespace" - ) - - logger.info( - f"CatalogSourcesUnhealthy: {sub.instance.status.conditions[0].status}" - ) - if sub.instance.status.conditions[0].status != "False": - logger.info(f"Subscription {sub.instance.metadata.name} is unhealthy") - unhealthy_subs.append( - f"{sub.instance.metadata.name} in {sub.instance.metadata.namespace} namespace" - ) - else: - operator_versions.append( - f"installedCSV: {sub.instance.status.installedCSV}" - ) - - logger.info(f"installPlanRef: {sub.instance.status.installPlanRef}") - if not sub.instance.status.installPlanRef: - logger.info( - f"No install plan found for subscription {sub.instance.metadata.name} " - f"in {sub.instance.metadata.namespace} namespace" - ) - missing_installplans.append( - f"{sub.instance.metadata.name} in {sub.instance.metadata.namespace} namespace" - ) - - logger.info("") - - if missing_subs: - logger.error(f"FAIL: The following subscriptions are missing: {missing_subs}") - if unhealthy_subs: - logger.error( - f"FAIL: The following subscriptions are unhealthy: {unhealthy_subs}" - ) - if missing_installplans: - logger.error( - f"FAIL: The install plan for the following subscriptions is missing: {missing_installplans}" - ) - if upgrades_pending: - logger.error( - f"FAIL: The following subscriptions are in UpgradePending state: {upgrades_pending}" - ) - - versions = ClusterVersion.get(dyn_client=openshift_dyn_client) - version = next(versions) - logger.info(f"Openshift version:\n{version.instance.status.history}") - - if os.getenv("EXTERNAL_TEST") != "true": - shortversion = re.sub("(.[0-9]+$)", "", os.getenv("OPENSHIFT_VER")) - currentfile = os.getcwd() + "/operators_hub_current" - sourceFile = open(currentfile, "w") - for line in operator_versions: - logger.info(line) - print(line, file=sourceFile) - sourceFile.close() - - logger.info("Clone operator-versions repo") - try: - operator_versions_repo = ( - "git@gitlab.cee.redhat.com:mpqe/mps/vp/operator-versions.git" - ) - clone = subprocess.run( - ["git", "clone", operator_versions_repo], capture_output=True, text=True - ) - logger.info(clone.stdout) - logger.info(clone.stderr) - except Exception: - pass - - previouspath = os.getcwd() + f"/operator-versions/mcgitops_hub_{shortversion}" - previousfile = f"mcgitops_hub_{shortversion}" - - logger.info("Ensure previous file exists") - checkpath = os.path.exists(previouspath) - logger.info(checkpath) - - if checkpath is True: - logger.info("Diff current operator list with previous file") - diff = opdiff(open(previouspath).readlines(), open(currentfile).readlines()) - diffstring = "".join(diff) - logger.info(diffstring) - - logger.info("Write diff to file") - sourceFile = open("operator_diffs_hub.log", "w") - print(diffstring, file=sourceFile) - sourceFile.close() - else: - logger.info("Skipping operator diff - previous file not found") - - if missing_subs or unhealthy_subs or missing_installplans or upgrades_pending: - err_msg = "Subscription status check failed" + err_msg = subscription.subscription_status( + openshift_dyn_client, expected_subs, diff=False + ) + if err_msg: logger.error(f"FAIL: {err_msg}") assert False, err_msg else: - # Only push the new operarator list if the test passed - # and we are not testing a pre-release operator nor - # running externally - if os.getenv("EXTERNAL_TEST") != "true": - if checkpath is True and not os.environ["INDEX_IMAGE"]: - os.remove(previouspath) - os.rename(currentfile, previouspath) - - cwd = os.getcwd() + "/operator-versions" - logger.info(f"CWD: {cwd}") - - logger.info("Push new operator list") - subprocess.run(["git", "add", previousfile], cwd=cwd) - subprocess.run( - ["git", "commit", "-m", "Update operator versions list"], - cwd=cwd, - ) - subprocess.run(["git", "push"], cwd=cwd) - logger.info("PASS: Subscription status check passed") - - -def opdiff(*args): - return filter(lambda x: not x.startswith(" "), difflib.ndiff(*args)) diff --git a/tests/interop/test_validate_edge_site_components.py b/tests/interop/test_validate_edge_site_components.py deleted file mode 100644 index d8500917..00000000 --- a/tests/interop/test_validate_edge_site_components.py +++ /dev/null @@ -1,240 +0,0 @@ -import logging -import os -import subprocess - -import pytest -from ocp_resources.namespace import Namespace -from ocp_resources.pod import Pod -from ocp_resources.route import Route -from openshift.dynamic.exceptions import NotFoundError - -from . import __loggername__ -from .crd import ArgoCD -from .edge_util import get_long_live_bearer_token, get_site_response - -logger = logging.getLogger(__loggername__) - -oc = os.environ["HOME"] + "/oc_client/oc" - -""" -Validate following rag-llm-gitops components pods and endpoints on edge site (line server): - -1) argocd -2) ACM agents -3) applications health (Applications deployed through argocd) -""" - - -@pytest.mark.test_validate_edge_site_components -def test_validate_edge_site_components(): - logger.info("Checking Openshift version on edge site") - version_out = subprocess.run(["oc", "version"], capture_output=True) - version_out = version_out.stdout.decode("utf-8") - logger.info(f"Openshift version:\n{version_out}") - - -@pytest.mark.validate_edge_site_reachable -def test_validate_edge_site_reachable(kube_config, openshift_dyn_client): - logger.info("Check if edge site API end point is reachable") - edge_api_url = kube_config.host - if not edge_api_url: - err_msg = "Edge site url is missing in kubeconfig file" - logger.error(f"FAIL: {err_msg}") - assert False, err_msg - else: - logger.info(f"EDGE api url : {edge_api_url}") - - bearer_token = get_long_live_bearer_token(dyn_client=openshift_dyn_client) - if not bearer_token: - assert False, "Bearer token is missing for hub site" - - edge_api_response = get_site_response( - site_url=edge_api_url, bearer_token=bearer_token - ) - - if edge_api_response.status_code != 200: - err_msg = "Edge site is not reachable. Please check the deployment." - logger.error(f"FAIL: {err_msg}") - assert False, err_msg - else: - logger.info("PASS: Edge site is reachable") - - -@pytest.mark.check_pod_status_edge -def test_check_pod_status(openshift_dyn_client): - logger.info("Checking pod status") - - err_msg = [] - failed_pods = [] - missing_pods = [] - missing_projects = [] - projects = [ - "openshift-operators", - "open-cluster-management-agent", - "open-cluster-management-agent-addon", - "openshift-gitops", - ] - - for project in projects: - # Check for missing project - try: - namespaces = Namespace.get(dyn_client=openshift_dyn_client, name=project) - next(namespaces) - except NotFoundError: - missing_projects.append(project) - continue - # Check for absence of pods in project - try: - pods = Pod.get(dyn_client=openshift_dyn_client, namespace=project) - pod = next(pods) - except StopIteration: - missing_pods.append(project) - continue - - for project in projects: - pods = Pod.get(dyn_client=openshift_dyn_client, namespace=project) - logger.info(f"Checking pods in namespace '{project}'") - for pod in pods: - for container in pod.instance.status.containerStatuses: - logger.info( - f"{pod.instance.metadata.name} : {container.name} :" - f" {container.state}" - ) - if container.state.terminated: - if container.state.terminated.reason != "Completed": - logger.info( - f"Pod {pod.instance.metadata.name} in" - f" {pod.instance.metadata.namespace} namespace is" - " FAILED:" - ) - failed_pods.append(pod.instance.metadata.name) - logger.info(describe_pod(project, pod.instance.metadata.name)) - logger.info( - get_log_output( - project, - pod.instance.metadata.name, - container.name, - ) - ) - elif not container.state.running: - logger.info( - f"Pod {pod.instance.metadata.name} in" - f" {pod.instance.metadata.namespace} namespace is" - " FAILED:" - ) - failed_pods.append(pod.instance.metadata.name) - logger.info(describe_pod(project, pod.instance.metadata.name)) - logger.info( - get_log_output( - project, pod.instance.metadata.name, container.name - ) - ) - - if missing_projects: - err_msg.append(f"The following namespaces are missing: {missing_projects}") - - if missing_pods: - err_msg.append( - f"The following namespaces have no pods deployed: {missing_pods}" - ) - - if failed_pods: - err_msg.append(f"The following pods are failed: {failed_pods}") - - if err_msg: - logger.error(f"FAIL: {err_msg}") - assert False, err_msg - else: - logger.info("PASS: Pod status check succeeded.") - - -def describe_pod(project, pod): - cmd_out = subprocess.run( - [oc, "describe", "pod", "-n", project, pod], capture_output=True - ) - if cmd_out.stdout: - return cmd_out.stdout.decode("utf-8") - else: - assert False, cmd_out.stderr - - -def get_log_output(project, pod, container): - cmd_out = subprocess.run( - [oc, "logs", "-n", project, pod, "-c", container], capture_output=True - ) - if cmd_out.stdout: - return cmd_out.stdout.decode("utf-8") - else: - assert False, cmd_out.stderr - - -@pytest.mark.validate_argocd_reachable_edge_site -def test_validate_argocd_reachable_edge_site(openshift_dyn_client): - namespace = "openshift-gitops" - - try: - for route in Route.get( - dyn_client=openshift_dyn_client, - namespace=namespace, - name="openshift-gitops-server", - ): - argocd_route_url = route.instance.spec.host - except StopIteration: - err_msg = f"Argocd url/route is missing in {namespace} namespace" - logger.error(f"FAIL: {err_msg}") - assert False, err_msg - - logger.info("Check if argocd route/url on hub site is reachable") - if not argocd_route_url: - err_msg = f"Argocd url/route is missing in {namespace} namespace" - logger.error(f"FAIL: {err_msg}") - assert False, err_msg - else: - final_argocd_url = f"{'https://'}{argocd_route_url}" - logger.info(f"Argocd route/url : {final_argocd_url}") - - bearer_token = get_long_live_bearer_token( - dyn_client=openshift_dyn_client, - namespace=namespace, - sub_string="openshift-gitops-argocd-server-token", - ) - if not bearer_token: - err_msg = f"Bearer token is missing for argocd-server in {namespace} namespace" - logger.error(f"FAIL: {err_msg}") - assert False, err_msg - else: - logger.debug(f"Argocd bearer token : {bearer_token}") - - argocd_route_response = get_site_response( - site_url=final_argocd_url, bearer_token=bearer_token - ) - - logger.info(f"Argocd route response : {argocd_route_response}") - - if argocd_route_response.status_code != 200: - err_msg = "Argocd is not reachable. Please check the deployment." - logger.error(f"FAIL: {err_msg}") - assert False, err_msg - else: - logger.info("PASS: Argocd is reachable") - - -@pytest.mark.validate_argocd_applications_health_edge_site -def test_validate_argocd_applications_health_edge_site(openshift_dyn_client): - namespace = "oepnshift-gitops" - - argocd_apps_status = dict() - logger.info("Get all applications deployed by argocd on edge site") - - for app in ArgoCD.get(dyn_client=openshift_dyn_client, namespace=namespace): - app_name = app.instance.metadata.name - app_health = app.health - argocd_apps_status[app_name] = app_health - logger.info(f"Health status of {app_name} is: {app_health}") - - if False in (argocd_apps_status.values()): - err_msg = f"Some or all applications deployed on edge site are Degraded/Unhealthy: {argocd_apps_status}" - logger.error(f"FAIL: {err_msg}") - assert False, err_msg - else: - logger.info("PASS: All applications deployed on edge site are healthy.") diff --git a/tests/interop/test_validate_gpu_nodes.py b/tests/interop/test_validate_gpu_nodes.py new file mode 100644 index 00000000..6b591ba3 --- /dev/null +++ b/tests/interop/test_validate_gpu_nodes.py @@ -0,0 +1,174 @@ +import os +import re +import subprocess +import pytest +import logging +import yaml +from ocp_resources.machine_set import MachineSet +from ocp_resources.node import Node +from ocp_resources.pod import Pod +from . import __loggername__ +from openshift.dynamic.exceptions import NotFoundError + +logger = logging.getLogger(__loggername__) + +oc = os.environ["HOME"] + "/oc_client/oc" + + +@pytest.mark.validate_gpu_machineset +def test_validate_gpu_nodes(openshift_dyn_client): + """ + Check for the existence of the GPU machineset + """ + logger.info("Checking GPU machineset") + machinesets = MachineSet.get( + dyn_client=openshift_dyn_client, namespace="openshift-machine-api" + ) + + found = False + for machineset in machinesets: + logger.info(machineset.instance.metadata.name) + if re.search("gpu", machineset.instance.metadata.name): + gpu_machineset = machineset + found = True + break + + err_msg = "GPU machineset not found" + if found == True: + logger.info( + f"PASS: Found GPU machineset: {gpu_machineset.instance.metadata.name}" + ) + else: + logger.error(f"FAIL: {err_msg}") + assert False, err_msg + + """ + Check for the existence of the GPU machineset taint + """ + logger.info("Checking GPU machineset taint") + + err_msg = "No taints found for GPU machineset" + try: + logger.info(gpu_machineset.instance.spec.template.spec.taints) + except AttributeError: + logger.error(f"FAIL: {err_msg}") + assert False, err_msg + + if gpu_machineset.instance.spec.template.spec.taints == "None": + logger.error(f"FAIL: {err_msg}") + assert False, err_msg + + logger.info( + f"PASS: Found GPU machineset taint: {gpu_machineset.instance.spec.template.spec.taints}" + ) + + """ + Check for the existence of the GPU machineset label + """ + logger.info("Checking GPU machineset label") + + err_msg = "No label found for GPU machineset" + try: + logger.info(gpu_machineset.instance.spec.template.spec.metadata.labels) + except AttributeError: + logger.error(f"FAIL: {err_msg}") + assert False, err_msg + + labels = str(gpu_machineset.instance.spec.template.spec.metadata.labels) + if labels == "None": + logger.error(f"FAIL: {err_msg}") + assert False, err_msg + + logger.info(f"PASS: Found GPU machineset labels: {labels}") + + """ + Check for the existence of the GPU machineset instance type + """ + logger.info("Checking GPU machineset instance type") + + err_msg = "No instanceType found for GPU machineset" + try: + logger.info( + machineset.instance.spec.template.spec.providerSpec.value.instanceType + ) + except AttributeError: + logger.error(f"FAIL: {err_msg}") + assert False, err_msg + + instance_type = str( + machineset.instance.spec.template.spec.providerSpec.value.instanceType + ) + if instance_type == "None": + logger.error(f"FAIL: {err_msg}") + assert False, err_msg + + logger.info(f"PASS: Found GPU machineset instance type: {instance_type}") + + +@pytest.mark.validate_gpu_node_role_labels_pods +def test_validate_gpu_node_role_labels_pods(openshift_dyn_client): + """ + Check for the expected node-role labels for GPU nodes + """ + logger.info("Checking GPU node-role labels") + + nodes = Node.get(dyn_client=openshift_dyn_client) + gpu_nodes = [] + for node in nodes: + logger.info(node.instance.metadata.name) + labels = node.instance.metadata.labels + logger.info(labels) + label_str = str(labels) + + odh_label = "'node-role.kubernetes.io/odh-notebook': ''" + worker_label = "'node-role.kubernetes.io/worker': ''" + + if odh_label in label_str and worker_label in label_str: + gpu_nodes.append(node) + + # logger.info(node_count) + + if len(gpu_nodes) == 3: + logger.info(f"PASS: Found 'worker' and 'odh-notebook' GPU node-role labels") + else: + err_msg = "Could not find 'worker' and 'odh-notebook' GPU node-role label" + logger.error(f"FAIL: {err_msg}") + assert False, err_msg + + """ + Check for the expected number of pods deployed on GPU nodes + """ + logger.info("Checking pod count on GPU nodes") + + for gpu_node in gpu_nodes: + name = gpu_node.instance.metadata.name + field_select = "--field-selector=spec.host=" + name + pod_count = 0 + expected_count = 20 + failed_nodes = [] + cmd_out = subprocess.run( + [oc, "get", "pod", "-A", field_select, "--no-headers"], capture_output=True + ) + + if cmd_out.stdout: + out_decoded = cmd_out.stdout.decode("utf-8") + logger.info(node.instance.metadata.name + "\n" + out_decoded) + out_split = out_decoded.splitlines() + + for line in out_split: + if "Completed" in line: + continue + else: + pod_count += 1 + + if pod_count < expected_count: + failed_nodes.append(node.instance.metadata.name) + else: + assert False, cmd_out.stderr + + if failed_nodes: + err_msg = f"Did not find the expected pod count on: {failed_nodes}" + logger.error(f"FAIL: {err_msg}") + assert False, err_msg + else: + logger.info(f"PASS: Found the expected pod count for GPU nodes") diff --git a/tests/interop/test_validate_hub_site_components.py b/tests/interop/test_validate_hub_site_components.py index acf89e71..5699c632 100644 --- a/tests/interop/test_validate_hub_site_components.py +++ b/tests/interop/test_validate_hub_site_components.py @@ -3,40 +3,27 @@ import subprocess import pytest -from ocp_resources.namespace import Namespace from ocp_resources.pod import Pod from ocp_resources.route import Route -from ocp_resources.storage_class import StorageClass from openshift.dynamic.exceptions import NotFoundError +from ocp_resources.storage_class import StorageClass +from validatedpatterns_tests.interop import application, components from . import __loggername__ -from .crd import ArgoCD, ManagedCluster -from .edge_util import get_long_live_bearer_token, get_site_response logger = logging.getLogger(__loggername__) oc = os.environ["HOME"] + "/oc_client/oc" -""" -Validate following rag-llm-gitops components pods and endpoints on hub site (central server): - -1) ACM (Advanced Cluster Manager) and self-registration -2) argocd -3) openshift operators -4) applications health (Applications deployed through argocd) -""" - @pytest.mark.test_validate_hub_site_components def test_validate_hub_site_components(openshift_dyn_client): logger.info("Checking Openshift version on hub site") - version_out = subprocess.run(["oc", "version"], capture_output=True) - version_out = version_out.stdout.decode("utf-8") + version_out = components.dump_openshift_version() logger.info(f"Openshift version:\n{version_out}") logger.info("Dump PVC and storageclass info") - pvcs_out = subprocess.run(["oc", "get", "pvc", "-A"], capture_output=True) - pvcs_out = pvcs_out.stdout.decode("utf-8") + pvcs_out = components.dump_pvc() logger.info(f"PVCs:\n{pvcs_out}") for sc in StorageClass.get(dyn_client=openshift_dyn_client): @@ -46,24 +33,8 @@ def test_validate_hub_site_components(openshift_dyn_client): @pytest.mark.validate_hub_site_reachable def test_validate_hub_site_reachable(kube_config, openshift_dyn_client): logger.info("Check if hub site API end point is reachable") - hub_api_url = kube_config.host - if not hub_api_url: - err_msg = "Hub site url is missing in kubeconfig file" - logger.error(f"FAIL: {err_msg}") - assert False, err_msg - else: - logger.info(f"HUB api url : {hub_api_url}") - - bearer_token = get_long_live_bearer_token(dyn_client=openshift_dyn_client) - if not bearer_token: - assert False, "Bearer token is missing for hub site" - - hub_api_response = get_site_response( - site_url=hub_api_url, bearer_token=bearer_token - ) - - if hub_api_response.status_code != 200: - err_msg = "Hub site is not reachable. Please check the deployment." + err_msg = components.validate_site_reachable(kube_config, openshift_dyn_client) + if err_msg: logger.error(f"FAIL: {err_msg}") assert False, err_msg else: @@ -73,85 +44,11 @@ def test_validate_hub_site_reachable(kube_config, openshift_dyn_client): @pytest.mark.check_pod_status_hub def test_check_pod_status(openshift_dyn_client): logger.info("Checking pod status") - - err_msg = [] - failed_pods = [] - missing_pods = [] - missing_projects = [] projects = [ - "openshift-operators", - "open-cluster-management", - "open-cluster-management-hub", - "openshift-gitops", - "vault", + "nvidia-gpu-operator", + "rag-llm" ] - - for project in projects: - # Check for missing project - try: - namespaces = Namespace.get(dyn_client=openshift_dyn_client, name=project) - next(namespaces) - except NotFoundError: - missing_projects.append(project) - continue - # Check for absence of pods in project - try: - pods = Pod.get(dyn_client=openshift_dyn_client, namespace=project) - pod = next(pods) - except StopIteration: - missing_pods.append(project) - continue - - for project in projects: - pods = Pod.get(dyn_client=openshift_dyn_client, namespace=project) - logger.info(f"Checking pods in namespace '{project}'") - for pod in pods: - for container in pod.instance.status.containerStatuses: - logger.info( - f"{pod.instance.metadata.name} : {container.name} :" - f" {container.state}" - ) - if container.state.terminated: - if container.state.terminated.reason != "Completed": - logger.info( - f"Pod {pod.instance.metadata.name} in" - f" {pod.instance.metadata.namespace} namespace is" - " FAILED:" - ) - failed_pods.append(pod.instance.metadata.name) - logger.info(describe_pod(project, pod.instance.metadata.name)) - logger.info( - get_log_output( - project, - pod.instance.metadata.name, - container.name, - ) - ) - elif not container.state.running: - logger.info( - f"Pod {pod.instance.metadata.name} in" - f" {pod.instance.metadata.namespace} namespace is" - " FAILED:" - ) - failed_pods.append(pod.instance.metadata.name) - logger.info(describe_pod(project, pod.instance.metadata.name)) - logger.info( - get_log_output( - project, pod.instance.metadata.name, container.name - ) - ) - - if missing_projects: - err_msg.append(f"The following namespaces are missing: {missing_projects}") - - if missing_pods: - err_msg.append( - f"The following namespaces have no pods deployed: {missing_pods}" - ) - - if failed_pods: - err_msg.append(f"The following pods are failed: {failed_pods}") - + err_msg = components.check_pod_status(openshift_dyn_client, projects) if err_msg: logger.error(f"FAIL: {err_msg}") assert False, err_msg @@ -159,161 +56,109 @@ def test_check_pod_status(openshift_dyn_client): logger.info("PASS: Pod status check succeeded.") -def describe_pod(project, pod): - cmd_out = subprocess.run( - [oc, "describe", "pod", "-n", project, pod], capture_output=True - ) - if cmd_out.stdout: - return cmd_out.stdout.decode("utf-8") - else: - assert False, cmd_out.stderr - - -def get_log_output(project, pod, container): - cmd_out = subprocess.run( - [oc, "logs", "-n", project, pod, "-c", container], capture_output=True - ) - if cmd_out.stdout: - return cmd_out.stdout.decode("utf-8") - else: - assert False, cmd_out.stderr - - -# No longer needed for ACM 2.7 -# -# @pytest.mark.validate_acm_route_reachable -# def test_validate_acm_route_reachable(openshift_dyn_client): -# namespace = "open-cluster-management" - -# logger.info("Check if ACM route is reachable") -# try: -# for route in Route.get(dyn_client=openshift_dyn_client, namespace=namespace, name="multicloud-console"): -# acm_route_url = route.instance.spec.host -# except StopIteration: -# err_msg = "ACM url/route is missing in open-cluster-management namespace" -# logger.error(f"FAIL: {err_msg}") -# assert False, err_msg +@pytest.mark.check_pod_count_hub +def test_check_pod_count_hub(openshift_dyn_client): + logger.info("Checking pod count") + projects = { + "rag-llm": 4 + } -# final_acm_url = f"{'http://'}{acm_route_url}" -# logger.info(f"ACM route/url : {final_acm_url}") + failed = [] + for key in projects.keys(): + logger.info(f"Checking project: {key}") + pods = Pod.get(dyn_client=openshift_dyn_client, namespace=key) + count = 0 + for pod in pods: + logger.info(pod.instance.metadata.name) + count += 1 -# bearer_token = get_long_live_bearer_token(dyn_client=openshift_dyn_client, -# namespace=namespace, -# sub_string="multiclusterhub-operator-token") -# if not bearer_token: -# err_msg = "Bearer token is missing for ACM in open-cluster-management namespace" -# logger.error(f"FAIL: {err_msg}") -# assert False, err_msg -# else: -# logger.debug(f"ACM bearer token : {bearer_token}") - -# acm_route_response = get_site_response(site_url=final_acm_url, bearer_token=bearer_token) - -# logger.info(f"ACM route response : {acm_route_response}") - -# if acm_route_response.status_code != 200: -# err_msg = "ACM is not reachable. Please check the deployment" -# logger.error(f"FAIL: {err_msg}") -# assert False, err_msg -# else: -# logger.info("PASS: ACM is reachable.") - - -@pytest.mark.validate_acm_self_registration_managed_clusters -def test_validate_acm_self_registration_managed_clusters(openshift_dyn_client): - logger.info("Check ACM self registration for edge site") - site_name = ( - os.environ["EDGE_CLUSTER_PREFIX"] - + "-" - + os.environ["INFRA_PROVIDER"] - + "-" - + os.environ["MPTS_TEST_RUN_ID"] - ) - clusters = ManagedCluster.get(dyn_client=openshift_dyn_client, name=site_name) - cluster = next(clusters) - is_managed_cluster_joined, managed_cluster_status = cluster.self_registered - - logger.info(f"Cluster Managed : {is_managed_cluster_joined}") - logger.info(f"Managed Cluster Status : {managed_cluster_status}") + logger.info(f"Found {count} pods") + if count < projects[key]: failed.append(key) - if not is_managed_cluster_joined: - err_msg = f"{site_name} is not self registered" + if len(failed) > 0: + err_msg = f"Failed to find the expected pod count for: {failed}" logger.error(f"FAIL: {err_msg}") assert False, err_msg else: - logger.info(f"PASS: {site_name} is self registered") + logger.info("PASS: Found the expected pod count") @pytest.mark.validate_argocd_reachable_hub_site def test_validate_argocd_reachable_hub_site(openshift_dyn_client): - namespace = "openshift-gitops" logger.info("Check if argocd route/url on hub site is reachable") + err_msg = components.validate_argocd_reachable(openshift_dyn_client) + if err_msg: + logger.error(f"FAIL: {err_msg}") + assert False, err_msg + else: + logger.info("PASS: Argocd is reachable") + + +@pytest.mark.validate_llm_ui_route +def test_validate_llm_ui_route(openshift_dyn_client): + namespace = "rag-llm" + logger.info("Check for the existence of the llm-ui route") try: for route in Route.get( dyn_client=openshift_dyn_client, namespace=namespace, - name="openshift-gitops-server", + name="llm-ui", ): - argocd_route_url = route.instance.spec.host - except StopIteration: - err_msg = "Argocd url/route is missing in open-cluster-management namespace" + logger.info(route.instance.spec.host) + except NotFoundError: + err_msg = "llm-ui url/route is missing in rag-llm namespace" logger.error(f"FAIL: {err_msg}") assert False, err_msg - final_argocd_url = f"{'http://'}{argocd_route_url}" - logger.info(f"ACM route/url : {final_argocd_url}") + logger.info("PASS: Found llm-ui route") + + +@pytest.mark.validate_nodefeaturediscovery +def test_validate_nodefeaturediscovery(): + namespace = "openshift-nfd" + name = "nfd-instance" + logger.info("Check for nodefeaturediscovery instance") - bearer_token = get_long_live_bearer_token( - dyn_client=openshift_dyn_client, - namespace=namespace, - sub_string="openshift-gitops-argocd-server-token", + cmd_out = subprocess.run( + [oc, "get", "NodeFeatureDiscovery", "-n", namespace, name, "--no-headers"], capture_output=True ) - if not bearer_token: - err_msg = ( - "Bearer token is missing for argocd-server in openshift-gitops namespace" - ) - logger.error(f"FAIL: {err_msg}") - assert False, err_msg + if cmd_out.stdout: + logger.info(cmd_out.stdout.decode("utf-8")) + logger.info("PASS: Found nodefeaturediscovery instance") else: - logger.debug(f"Argocd bearer token : {bearer_token}") + assert False, cmd_out.stderr - argocd_route_response = get_site_response( - site_url=final_argocd_url, bearer_token=bearer_token - ) - logger.info(f"Argocd route response : {argocd_route_response}") +@pytest.mark.validate_gpu_clusterpolicy +def test_validate_gpu_clusterpolicy(): + name = "rag-llm-gpu-cluster-policy" + tolerations = '"tolerations":[{"effect":"NoSchedule","key":"odh-notebook","value":"true"}]' + logger.info("Check for GPU clusterpolicy") - if argocd_route_response.status_code != 200: - err_msg = "Argocd is not reachable. Please check the deployment" - logger.error(f"FAIL: {err_msg}") - assert False, err_msg + cmd_out = subprocess.run( + [oc, "get", "ClusterPolicy", "-o", "yaml", name], capture_output=True + ) + if cmd_out.stdout: + logger.info(cmd_out.stdout.decode("utf-8")) + + if tolerations in cmd_out.stdout.decode("utf-8"): + logger.info("PASS: Found GPU clusterpolicy and tolerations") + else: + err_msg =f"FAIL: Expected tolerations not found" + logger.error(f"FAIL: {err_msg}") + assert False, err_msg else: - logger.info("PASS: Argocd is reachable") + assert False, cmd_out.stderr @pytest.mark.validate_argocd_applications_health_hub_site def test_validate_argocd_applications_health_hub_site(openshift_dyn_client): - unhealthy_apps = [] logger.info("Get all applications deployed by argocd on hub site") projects = ["openshift-gitops", "rag-llm-gitops-hub"] - for project in projects: - for app in ArgoCD.get(dyn_client=openshift_dyn_client, namespace=project): - app_name = app.instance.metadata.name - app_health = app.instance.status.health.status - app_sync = app.instance.status.sync.status - - logger.info(f"Status for {app_name} : {app_health} : {app_sync}") - - if "Healthy" != app_health or "Synced" != app_sync: - logger.info(f"Dumping failed resources for app: {app_name}") - unhealthy_apps.append(app_name) - for res in app.instance.status.resources: - if ( - res.health and res.health.status != "Healthy" - ) or res.status != "Synced": - logger.info(f"\n{res}") - + unhealthy_apps = application.get_argocd_application_status( + openshift_dyn_client, projects + ) if unhealthy_apps: err_msg = "Some or all applications deployed on hub site are unhealthy" logger.error(f"FAIL: {err_msg}:\n{unhealthy_apps}")