Skip to content

Commit

Permalink
[Metrics] Fix metrics and docs (#3233)
Browse files Browse the repository at this point in the history
* Fix metrics and docs

* Fix ray storage metrics
* Add valid check for metrics
* Fix mars usage demo

* Fix metrics check

* Remove unused log

* Update mars/metrics/api.py

Co-authored-by: Shawn <shawn.ck.yang@gmail.com>

* Update mars/metrics/api.py

Co-authored-by: Shawn <shawn.ck.yang@gmail.com>

* Support lazy initialize metrics

* Restore op_executed_number

* Fix log

Co-authored-by: buhe <zhongchun.yzc@antgroup.com>
Co-authored-by: Shawn <shawn.ck.yang@gmail.com>
  • Loading branch information
3 people committed Sep 9, 2022
1 parent a1a752f commit 6c6fc48
Show file tree
Hide file tree
Showing 11 changed files with 211 additions and 96 deletions.
4 changes: 2 additions & 2 deletions docs/source/development/oscar/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Actor Definition
def method_a(self, arg_1, arg_2, **kw_1): # user-defined function
pass
async def method_b(self, arg_1, arg_2, **kw_1) # user-defined async function
async def method_b(self, arg_1, arg_2, **kw_1): # user-defined async function
pass
Expand All @@ -38,7 +38,7 @@ Creating Actors
import mars.oscar as mo
actor_ref = await mo.create_actor(
MyActor, 1, 2, a=1, b=2
MyActor, 1, 2, a=1, b=2,
address='<ip>:<port>', uid='UniqueActorName')
Expand Down
6 changes: 5 additions & 1 deletion mars/core/operand/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,11 @@ def execute(results: Dict[str, Any], op: OperandType):
try:
result = executor(results, op)
succeeded = True
op_executed_number.record(1, {"op": op.__class__.__name__})
if op.stage is not None:
op_name = f"{op.__class__.__name__}:{op.stage.name}"
else:
op_name = op.__class__.__name__
op_executed_number.record(1, {"op": op_name})
return result
except UFuncTypeError as e: # pragma: no cover
raise TypeError(str(e)).with_traceback(sys.exc_info()[2]) from None
Expand Down
6 changes: 6 additions & 0 deletions mars/deploy/oscar/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from ... import oscar as mo
from ...core.entrypoints import init_extension_entrypoints
from ...lib.aio import get_isolation, stop_isolation
from ...metrics import init_metrics
from ...oscar.backends.router import Router
from ...resource import cpu_count, cuda_count, mem_total
from ...services import NodeRole
Expand Down Expand Up @@ -218,6 +219,11 @@ async def start(self):
# start service
await self._start_service()

# init metrics to guarantee metrics use in driver
metric_configs = self._config.get("metrics", {})
metric_backend = metric_configs.get("backend")
init_metrics(metric_backend, config=metric_configs.get(metric_backend))

if self._web:
from ...services.web.supervisor import WebActor

Expand Down
97 changes: 96 additions & 1 deletion mars/metrics/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@

import logging
import time
import weakref

from contextlib import contextmanager
from enum import Enum
from queue import PriorityQueue
from typing import Any, Callable, Dict, List, NamedTuple, Optional, Tuple

from .backends.console import console_metric
from .backends.metric import AbstractMetric
from .backends.prometheus import prometheus_metric
from .backends.ray import ray_metric

Expand All @@ -35,6 +37,9 @@
}


_metrics_to_be_initialized = weakref.WeakSet()


def init_metrics(backend="console", config: Dict[str, Any] = None):
global _init
if _init is True:
Expand All @@ -61,14 +66,76 @@ def init_metrics(backend="console", config: Dict[str, Any] = None):
"Failed to start prometheus http server because there is no prometheus_client"
)
_init = True
logger.info("Finished initialize the metrics with backend %s", _metric_backend)
for m in _metrics_to_be_initialized:
cls = getattr(_backends_cls[_metric_backend], m.type)
metric = cls(m.name, m.description, m.tag_keys)
m.set_metric(metric)
logger.info("Finished initialize the metrics of backend: %s.", _metric_backend)


def shutdown_metrics():
global _metric_backend
_metric_backend = "console"
global _init
_init = False
logger.info("Shutdown metrics of backend: %s.", _metric_backend)


class _MetricWrapper(AbstractMetric):
_metric: AbstractMetric

def __init__(
self,
name: str,
description: str = "",
tag_keys: Optional[Tuple[str]] = None,
metric_type: str = "Counter",
):
self._name = name
self._description = description
self._tag_keys = tag_keys or tuple()
self._type = metric_type
self._metric = None

@property
def type(self):
return self._type

@property
def value(self):
assert (
self._metric is not None
), "Metric is not initialized, please call `init_metrics()` before using metrics."
return self._metric.value

def set_metric(self, metric):
assert metric is not None, "Argument metric is None, please check it."
self._metric = metric

def record(self, value=1, tags: Optional[Dict[str, str]] = None):
if self._metric is not None:
self._metric.record(value, tags)
else:
logger.warning(
"Metric is not initialized, please call `init_metrics()` before using metrics."
)


def gen_metric(func):
def wrapper(name, descriptions: str = "", tag_keys: Optional[Tuple[str]] = None):
if _init is True:
return func(name, descriptions, tag_keys)
else:
logger.info(
"Metric %s will be initialized when invoking `init_metrics()`.", name
)
metric = _MetricWrapper(
name, descriptions, tag_keys, func.__name__.capitalize()
)
_metrics_to_be_initialized.add(metric)
return metric

return wrapper


class Metrics:
Expand Down Expand Up @@ -97,19 +164,47 @@ class Metrics:
"""

@staticmethod
@gen_metric
def counter(name, description: str = "", tag_keys: Optional[Tuple[str]] = None):
logger.info(
"Initializing a counter with name: %s, tag keys: %s, backend: %s",
name,
tag_keys,
_metric_backend,
)
return _backends_cls[_metric_backend].Counter(name, description, tag_keys)

@staticmethod
@gen_metric
def gauge(name, description: str = "", tag_keys: Optional[Tuple[str]] = None):
logger.info(
"Initializing a gauge whose name: %s, tag keys: %s, backend: %s",
name,
tag_keys,
_metric_backend,
)
return _backends_cls[_metric_backend].Gauge(name, description, tag_keys)

@staticmethod
@gen_metric
def meter(name, description: str = "", tag_keys: Optional[Tuple[str]] = None):
logger.info(
"Initializing a meter whose name: %s, tag keys: %s, backend: %s",
name,
tag_keys,
_metric_backend,
)
return _backends_cls[_metric_backend].Meter(name, description, tag_keys)

@staticmethod
@gen_metric
def histogram(name, description: str = "", tag_keys: Optional[Tuple[str]] = None):
logger.info(
"Initializing a histogram whose name: %s, tag keys: %s, backend: %s",
name,
tag_keys,
_metric_backend,
)
return _backends_cls[_metric_backend].Histogram(name, description, tag_keys)


Expand Down
8 changes: 4 additions & 4 deletions mars/metrics/backends/console/tests/test_console_metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def test_counter():
assert c.name == "test_counter"
assert c.description == "A test counter"
assert c.tag_keys == ("service", "tenant")
assert c.type == "counter"
assert c.type == "Counter"
c.record(1, {"service": "mars", "tenant": "test"})
c.record(2, {"service": "mars", "tenant": "test"})
assert c.value == 3
Expand All @@ -31,7 +31,7 @@ def test_gauge():
assert g.name == "test_gauge"
assert g.description == "A test gauge"
assert g.tag_keys == ()
assert g.type == "gauge"
assert g.type == "Gauge"
g.record(1)
assert g.value == 1
g.record(2)
Expand All @@ -43,7 +43,7 @@ def test_meter():
assert m.name == "test_meter"
assert m.description == ""
assert m.tag_keys == ()
assert m.type == "meter"
assert m.type == "Meter"
m.record(1)
assert m.value == 0
m.record(2001)
Expand All @@ -55,7 +55,7 @@ def test_histogram():
assert h.name == "test_histogram"
assert h.description == ""
assert h.tag_keys == ()
assert h.type == "histogram"
assert h.type == "Histogram"
h.record(1)
assert h.value == 0
for i in range(2002):
Expand Down
12 changes: 5 additions & 7 deletions mars/metrics/backends/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import time

from abc import ABC, abstractmethod
from abc import ABC
from typing import Dict, Optional, Tuple

_THRESHOLD = 2000
Expand Down Expand Up @@ -56,7 +56,6 @@ def description(self):
def tag_keys(self):
return self._tag_keys

@abstractmethod
def _init(self):
"""Some initialization in subclass."""
pass
Expand All @@ -65,7 +64,6 @@ def record(self, value=1, tags: Optional[Dict[str, str]] = None):
"""A public method called by users."""
pass

@abstractmethod
def _record(self, value: float = 1.0, tags: Optional[Dict[str, str]] = None):
"""An internal method called by record() and should be
implemented by different metric backends.
Expand All @@ -76,7 +74,7 @@ def _record(self, value: float = 1.0, tags: Optional[Dict[str, str]] = None):
class AbstractCounter(AbstractMetric):
"""A counter records the counts of events."""

_type = "counter"
_type = "Counter"

def __init__(
self, name: str, description: str = "", tag_keys: Optional[Tuple[str]] = None
Expand All @@ -94,7 +92,7 @@ class AbstractGauge(AbstractMetric):
arbitrarily set.
"""

_type = "gauge"
_type = "Gauge"

def record(self, value=1, tags: Optional[Dict[str, str]] = None):
self._record(value, tags)
Expand All @@ -103,7 +101,7 @@ def record(self, value=1, tags: Optional[Dict[str, str]] = None):
class AbstractMeter(AbstractMetric):
"""A meter measures the rate at which a set of events occur."""

_type = "meter"
_type = "Meter"

def __init__(
self, name: str, description: str = "", tag_keys: Optional[Tuple[str]] = None
Expand All @@ -126,7 +124,7 @@ def record(self, value=1, tags: Optional[Dict[str, str]] = None):
class AbstractHistogram(AbstractMetric):
"""A histogram measures the distribution of values in a stream of data."""

_type = "histogram"
_type = "Histogram"

def __init__(
self, name: str, description: str = "", tag_keys: Optional[Tuple[str]] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def test_counter(start_prometheus_http_server):
assert c.description == "A test counter"
assert set(["host", "pid"]).issubset(set(c.tag_keys))
assert set(["service", "tenant"]).issubset(set(c.tag_keys))
assert c.type == "counter"
assert c.type == "Counter"
c.record(1, {"service": "mars", "tenant": "test"})
verify_metric("test_counter", 1.0)
c.record(2, {"service": "mars", "tenant": "test"})
Expand All @@ -68,7 +68,7 @@ def test_gauge(start_prometheus_http_server):
assert g.name == "test_gauge"
assert g.description == "A test gauge"
assert set(["host", "pid"]).issubset(set(g.tag_keys))
assert g.type == "gauge"
assert g.type == "Gauge"
g.record(0.1)
verify_metric("test_gauge", 0.1)
g.record(1.1)
Expand All @@ -80,7 +80,7 @@ def test_meter(start_prometheus_http_server):
assert m.name == "test_meter"
assert m.description == ""
assert set(["host", "pid"]).issubset(set(m.tag_keys))
assert m.type == "meter"
assert m.type == "Meter"
num = 3
while num > 0:
m.record(1)
Expand All @@ -94,7 +94,7 @@ def test_histogram(start_prometheus_http_server):
assert h.name == "test_histogram"
assert h.description == ""
assert set(["host", "pid"]).issubset(set(h.tag_keys))
assert h.type == "histogram"
assert h.type == "Histogram"
num = 3
while num > 0:
h.record(1)
Expand Down
15 changes: 4 additions & 11 deletions mars/metrics/backends/ray/tests/test_ray_metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,7 @@
@require_ray
def test_record():
c = Counter("test_counter")
from .. import ray_metric

original_value = ray_metric._ray_gauge_set_available
ray_metric._ray_gauge_set_available = True
assert c.record(1) is None
ray_metric._ray_gauge_set_available = False
assert c.record(1) is None
ray_metric._ray_gauge_set_available = original_value


@require_ray
Expand All @@ -35,7 +28,7 @@ def test_counter():
assert c.name == "test_counter"
assert c.description == "A test counter"
assert c.tag_keys == ("service", "tenant")
assert c.type == "counter"
assert c.type == "Counter"
assert c.record(1, {"service": "mars", "tenant": "test"}) is None


Expand All @@ -45,7 +38,7 @@ def test_gauge():
assert g.name == "test_gauge"
assert g.description == "A test gauge"
assert g.tag_keys == ()
assert g.type == "gauge"
assert g.type == "Gauge"
assert g.record(1) is None


Expand All @@ -55,7 +48,7 @@ def test_meter():
assert m.name == "test_meter"
assert m.description == ""
assert m.tag_keys == ()
assert m.type == "meter"
assert m.type == "Meter"
assert m.record(1) is None


Expand All @@ -65,5 +58,5 @@ def test_histogram():
assert h.name == "test_histogram"
assert h.description == ""
assert h.tag_keys == ()
assert h.type == "histogram"
assert h.type == "Histogram"
assert h.record(1) is None

0 comments on commit 6c6fc48

Please sign in to comment.