Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

benchmark: add monitor command #593

Merged
merged 1 commit into from
Nov 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 20 additions & 2 deletions reana/reana_benchmark/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from reana.reana_benchmark.start import start
from reana.reana_benchmark.submit import submit
from reana.reana_benchmark.utils import logger
from reana.reana_benchmark.monitor import monitor

urllib3.disable_warnings()

Expand Down Expand Up @@ -98,7 +99,7 @@ def _to_range(workflow_range: str) -> (int, int):
concurrency_option = click.option(
"--concurrency",
"-c",
help=f"Number of workers to submit workflows, default {WORKERS_DEFAULT_COUNT}",
help=f"Number of workers to submit workflows [default {WORKERS_DEFAULT_COUNT}]",
type=int,
default=WORKERS_DEFAULT_COUNT,
)
Expand Down Expand Up @@ -176,7 +177,7 @@ def launch(
@click.option(
"--interval",
"-i",
help="Execution progress plot interval in minutes.",
help="Execution progress plot interval in minutes [default=10]",
type=int,
default=10,
)
Expand Down Expand Up @@ -210,3 +211,20 @@ def collect_command(workflow: str, force: bool) -> NoReturn:
collect(workflow, force)
except Exception as e:
logger.error(f"Something went wrong when collecting results: {e}")


@reana_benchmark.command(name="monitor")
@workflow_option
@click.option(
"--sleep",
"-s",
help="Sleep in seconds between collecting metrics [default=30]",
type=int,
default=30,
)
def monitor_command(workflow: str, sleep: int) -> NoReturn:
"""Monitor various metrics and record results."""
try:
monitor(workflow, sleep)
except Exception as e:
logger.error(f"Something went wrong during monitoring: {e}")
192 changes: 192 additions & 0 deletions reana/reana_benchmark/monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
# This file is part of REANA.
# Copyright (C) 2021 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Responsible for monitoring K8s cluster, DB connections."""

import json
import time
from pathlib import Path
from typing import Dict, Any, List, Generator
from collections import defaultdict
import subprocess
from abc import abstractmethod, ABC

from reana.reana_benchmark.utils import get_utc_now_timestamp, logger


class BaseMetric(ABC):
"""Base class for other metrics."""

@property
@abstractmethod
def name(self) -> str:
"""Name of the metric."""
raise NotImplementedError

@abstractmethod
def _collect(self, parameters: Dict) -> Any:
raise NotImplementedError

def collect(self, parameters: Dict) -> Dict[str, Any]: # noqa: D102
result = self._collect(parameters)
return {
self.name: result,
}


class WorkflowDBStatusesMetric(BaseMetric):
"""Count number of workflows statuses directly from DB."""

name = "workflow_db_statuses"

def _collect(self, parameters: Dict) -> Any:
workflow_prefix = parameters.get("workflow")

if not workflow_prefix:
logger.warning(
f"{self.name} metrics cannot find workflow parameter. Metric will not be collected."
)
return {}

cmd = [
"kubectl",
"exec",
"deployment/reana-db",
"--",
"psql",
"-U",
"reana",
"-c",
f"SELECT status,COUNT(*) FROM __reana.workflow WHERE name LIKE '{workflow_prefix}-%' GROUP BY status;",
]
output = subprocess.check_output(cmd).decode("ascii")
result = {}

rows = output.splitlines()[2:-2]

for row in rows:
status, count = row.split("|")[0].strip(), int(row.split("|")[1].strip())
result[status] = count

return result


class NumberOfDBConnectionsMetric(BaseMetric):
"""Count number of server processes in REANA DB."""

name = "db_connections_number"

def _collect(self, parameters: Dict) -> Any:
cmd = [
"kubectl",
"exec",
"deployment/reana-db",
"--",
"psql",
"-U",
"reana",
"-c",
"SELECT COUNT(*) FROM pg_stat_activity;",
]
output = subprocess.check_output(cmd).decode("ascii")
result = int(output.splitlines()[2].strip())
return result


class WorkflowPodsMetric(BaseMetric):
"""Count number of job and batch jobs in different phases."""

name = "workflows_pods_status"

@staticmethod
def _filter(pods: List[Dict], name_contains: str) -> Generator[Dict, None, None]:
for pod in pods:
name = pod.get("metadata", {}).get("name", "")
if name_contains in name:
yield pod

@staticmethod
def _count(pods: List[Dict], name_contains: str) -> Dict[str, int]:
statistics = defaultdict(lambda: 0)
for pod in WorkflowPodsMetric._filter(pods, name_contains):
phase = pod.get("status", {}).get("phase")
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: The phase value is not representing exactly what kubectl get pods (STATUS column) is displaying. It has Pending, Running and Succeed statuses. In addition, it has Failed status which, for some reason, appears for batch pods at the end when workflows are successfully finished.

I guess this happens because batch pods enter NotReady state because the workflow engine successfully finished and exits. You cannot easily get NotReady state from kubectl JSON output, you need to deduct it from container statuses.

What do you think about the situation? Any ideas on how to get a more accurate status?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The phase values are described here. I think it's a pretty accurate metric for us to use. But this case with successfully finished Failed status could be confusing.. I guess one way how we could solve it is to include the containerStatuses to the final output (downsize is that it will make it much more verbose..). Here is an example:

kubectl get pod -o=jsonpath='{range .items[*]}{.metadata.name}{"\t"}{.status.phase}{"\t"}{.status.containerStatuses[*].state}{"\n"}{end}'
...
reana-server-69c494847b-fxldq   Running {"running":{"startedAt":"2021-11-23T08:09:29Z"}} {"running":{"startedAt":"2021-11-23T08:09:29Z"}}
...

Then it would be visible that the Failed status happens because the containers terminated. But not sure if we need it, could be confusing to have too much output..

Copy link
Author

@VMois VMois Nov 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Example of a JSON output about pod status for a single pod from kubectl get pods -o json command:

"status": {
      "conditions": [
          {
              "lastProbeTime": null,
              "lastTransitionTime": "2021-11-11T16:15:07Z",
              "status": "True",
              "type": "Initialized"
          },
          {
              "lastProbeTime": null,
              "lastTransitionTime": "2021-11-11T16:15:12Z",
              "status": "True",
              "type": "Ready"
          },
          {
              "lastProbeTime": null,
              "lastTransitionTime": "2021-11-11T16:15:12Z",
              "status": "True",
              "type": "ContainersReady"
          },
          {
              "lastProbeTime": null,
              "lastTransitionTime": "2021-11-11T16:15:07Z",
              "status": "True",
              "type": "PodScheduled"
          }
      ],
      "containerStatuses": [
          {
              "containerID": "containerd://78d3916f2dfca74427f8b0cc38cc344a746a2594c8fd6f746133e7a10b0cfdbb",
              "image": "docker.io/reanahub/reana-workflow-controller:latest",
              "imageID": "sha256:75e1ea2066193703f73e1c71fce92969a7b9c47b8af3e307dc2d5ee812e2d40d",
              "lastState": {},
              "name": "job-status-consumer",
              "ready": true,
              "restartCount": 0,
              "started": true,
              "state": {
                  "running": {
                      "startedAt": "2021-11-11T16:15:10Z"
                  }
              }
          },
          {
              "containerID": "containerd://4b915673af6c64cbec168cc8512969507544365d8ba35b3900f836836d384a26",
              "image": "docker.io/reanahub/reana-workflow-controller:latest",
              "imageID": "sha256:75e1ea2066193703f73e1c71fce92969a7b9c47b8af3e307dc2d5ee812e2d40d",
              "lastState": {},
              "name": "rest-api",
              "ready": true,
              "restartCount": 0,
              "started": true,
              "state": {
                  "running": {
                      "startedAt": "2021-11-11T16:15:11Z"
                  }
              }
          }
      ],
      "hostIP": "172.18.0.2",
      "phase": "Running",
      "podIP": "10.244.0.117",
      "podIPs": [
          {
              "ip": "10.244.0.117"
          }
      ],
      "qosClass": "BestEffort",
      "startTime": "2021-11-11T16:15:07Z"
}

As described above phase can show pods as Failed while they are terminating and it is not so straightforward how to extract more precise info from the above JSON. Some checks can be done on containerStatuses or, maybe, in conditions.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current goal for monitoring command is to collect data that later can be visualized along with execution progress plot. With additional containerStatuses output it is not clear how to visualize it (there is a phase but also one or more containers in containerStatuses). I would prefer to reduce it to one field (Running: 5, Failed: 1, etc.) but the question is how :)

Copy link
Author

@VMois VMois Nov 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understood correctly when the pod is terminated SIGTERM is sent by k8s and apparently job controller takes too much time to exit so status k8s sends SIGKILL (code: 137) and marks pod as Failed.

Example of JSON:

"containerStatuses": [
      {
        "containerID": "containerd://400831aac7443eba9a41228abf264d1adfa63d8279b3a4b71f34f819af7df420",
        "image": "docker.io/reanahub/reana-job-controller:latest",
        "imageID": "sha256:268ad55769e69f1c41d448fb7b54a95dc074cba5e2e1cfa6a2642d8484756551",
        "lastState": {},
        "name": "job-controller",
        "ready": false,
        "restartCount": 0,
        "started": false,
        "state": {
          "terminated": {
            "containerID": "containerd://400831aac7443eba9a41228abf264d1adfa63d8279b3a4b71f34f819af7df420",
            "exitCode": 137,
            "finishedAt": "2021-11-24T10:28:32Z",
            "reason": "Error",
            "startedAt": "2021-11-24T10:27:44Z"
          }
        }
      },
      {
        "containerID": "containerd://d5e4a82bfe1acd86411058ebaa0302dd8a6d9632b1a07d6f849789071918b1b3",
        "image": "docker.io/reanahub/reana-workflow-engine-serial:latest",
        "imageID": "sha256:5756537cf4d2673d2725e0824f6f6883062c363229d6c81a500b19a1487b8641",
        "lastState": {},
        "name": "workflow-engine",
        "ready": false,
        "restartCount": 0,
        "started": false,
        "state": {
          "terminated": {
            "containerID": "containerd://d5e4a82bfe1acd86411058ebaa0302dd8a6d9632b1a07d6f849789071918b1b3",
            "exitCode": 0,
            "finishedAt": "2021-11-24T10:28:02Z",
            "reason": "Completed",
            "startedAt": "2021-11-24T10:27:44Z"
          }
        }
      }
    ],

Check timestamps in containerStatuses. workflow-engine finished at 10:28:02 so around that time REANA asks k8s to delete pod. Take a look at job-controller finish time - 10:28:32. Exactly 30 seconds after workflow-engine, 30 seconds is a default time in k8s for graceful shutdown of a pod (details).

The same happened in other batch pods - job controller is not terminating within 30 seconds when k8s asks for it. @tiborsimko

suggestion: I think the above case deserves a separate issue. I can leave the phase value in the metric for this PR. I assume when we will figure out why the exit code is 137 instead of 0, the phase value will show Succeeded status for batch pods.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened the issue to fix Failed phase for batch pods. We can proceed forward with this PR and leave phase as it is.

statistics[phase] += 1
return dict(statistics)

def _collect(self, parameters: Dict) -> Any:
kubectl_cmd = ("kubectl", "get", "pods", "-o", "json")
VMois marked this conversation as resolved.
Show resolved Hide resolved
output = subprocess.check_output(kubectl_cmd)
pods = json.loads(output).get("items", [])

result = {
"batch_pods": self._count(pods, "run-batch"),
"job_pods": self._count(pods, "run-job"),
}

return result


METRICS = [
NumberOfDBConnectionsMetric(),
WorkflowPodsMetric(),
WorkflowDBStatusesMetric(),
]


def _build_monitored_results_path(workflow: str) -> Path:
return Path(f"{workflow}_monitored_results.json")


def _save_metrics(workflow: str, results: Dict) -> None:
with open(_build_monitored_results_path(workflow), "w") as f:
json.dump(results, f)


def _collect_metrics(parameters: Dict) -> Dict[str, Any]:
collected_metrics = {}
for metric in METRICS:
try:
result = metric.collect(parameters)
collected_metrics = dict(collected_metrics, **result)
except Exception as error:
logger.error(
f"Error during collection of {metric.name} metric. Details: {error}"
)
return collected_metrics


def _print_metrics() -> None:
logger.info("Following metrics will be collected:")
for m in METRICS:
logger.info(f"- {m.name}")


def monitor(workflow: str, sleep: int) -> None:
"""Start periodically collect defined metrics and save them to JSON file.

This function is blocking.
"""
_print_metrics()
logger.info("Starting monitoring...")

all_metrics = {}
metrics_parameters = {
"workflow": workflow,
}

try:
while True:
# if metrics will take, for example, couple of seconds to collect monitored_date will be less accurate
monitored_date = get_utc_now_timestamp()
collected_metrics = _collect_metrics(metrics_parameters)
all_metrics[monitored_date] = collected_metrics
_save_metrics(workflow, all_metrics)

time.sleep(sleep)
except KeyboardInterrupt:
logger.info("Stopping monitoring...")
finally:
_save_metrics(workflow, all_metrics)