Skip to content

Commit

Permalink
fix: CLI agent info related bugs (#1934)
Browse files Browse the repository at this point in the history
Co-authored-by: Joongi Kim <joongi@lablup.com>
  • Loading branch information
fregataa and achimnol committed Feb 27, 2024
1 parent d7be593 commit 8fd3142
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 75 deletions.
1 change: 1 addition & 0 deletions changes/1934.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix CLI `agent info` related issues by replacing `HardwareMetadata` to `dict` when class check and adding parameter to default metric value formatter.
4 changes: 2 additions & 2 deletions src/ai/backend/accelerator/cuda_open/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,15 +248,15 @@ async def gather_node_measures(
return [
NodeMeasurement(
MetricKey("cuda_mem"),
MetricTypes.USAGE,
MetricTypes.GAUGE,
unit_hint="bytes",
stats_filter=frozenset({"max"}),
per_node=Measurement(Decimal(mem_used_total), Decimal(mem_avail_total)),
per_device=mem_stats,
),
NodeMeasurement(
MetricKey("cuda_util"),
MetricTypes.USAGE,
MetricTypes.UTILIZATION,
unit_hint="percent",
stats_filter=frozenset({"avg", "max"}),
per_node=Measurement(Decimal(util_total), Decimal(dev_count * 100)),
Expand Down
12 changes: 6 additions & 6 deletions src/ai/backend/accelerator/mock/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,31 +412,31 @@ async def gather_node_measures(self, ctx: StatContext) -> Sequence[NodeMeasureme
return [
NodeMeasurement(
MetricKey(f"{self.key}_mem"),
MetricTypes.USAGE,
MetricTypes.GAUGE,
unit_hint="bytes",
stats_filter=frozenset({"max"}),
per_node=Measurement(Decimal(mem_used_total), Decimal(mem_avail_total)),
per_device=mem_stats,
),
NodeMeasurement(
MetricKey(f"{self.key}_util"),
MetricTypes.USAGE,
MetricTypes.GAUGE,
unit_hint="percent",
stats_filter=frozenset({"avg", "max"}),
per_node=Measurement(Decimal(util_total), Decimal(dev_count * 100)),
per_device=util_stats,
),
NodeMeasurement(
MetricKey(f"{self.key}_power"),
MetricTypes.USAGE,
MetricTypes.GAUGE,
unit_hint="Watts",
stats_filter=frozenset({"avg", "max"}),
per_node=Measurement(Decimal(power_usage_total), Decimal(power_max_total)),
per_device=power_usage,
),
NodeMeasurement(
MetricKey(f"{self.key}_temperature"),
MetricTypes.USAGE,
MetricTypes.GAUGE,
unit_hint="Celsius",
stats_filter=frozenset({"avg", "max"}),
per_node=Measurement(
Expand Down Expand Up @@ -508,7 +508,7 @@ async def gather_container_measures(
return [
ContainerMeasurement(
MetricKey(f"{self.key}_mem"),
MetricTypes.USAGE,
MetricTypes.GAUGE,
unit_hint="bytes",
stats_filter=frozenset({"max"}),
per_container={
Expand All @@ -521,7 +521,7 @@ async def gather_container_measures(
),
ContainerMeasurement(
MetricKey(f"{self.key}_util"),
MetricTypes.USAGE,
MetricTypes.UTILIZATION,
unit_hint="percent",
stats_filter=frozenset({"avg", "max"}),
per_container={
Expand Down
2 changes: 1 addition & 1 deletion src/ai/backend/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -1395,7 +1395,7 @@ async def _get(
"status_info": str(result),
"metadata": {},
}
case HardwareMetadata():
case dict(): # HardwareMetadata
hwinfo[device_name] = result
return hwinfo

Expand Down
22 changes: 11 additions & 11 deletions src/ai/backend/agent/docker/intrinsic.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ async def api_impl(container_id):
),
ContainerMeasurement(
MetricKey("cpu_used"),
MetricTypes.USAGE,
MetricTypes.ACCUMULATION,
unit_hint="msec",
per_container=per_container_cpu_used,
),
Expand Down Expand Up @@ -336,7 +336,7 @@ async def api_impl(cid: str, pids: List[int]) -> List[Optional[Decimal]]:
),
ProcessMeasurement(
MetricKey("cpu_used"),
MetricTypes.USAGE,
MetricTypes.ACCUMULATION,
unit_hint="msec",
per_process=per_process_cpu_used,
),
Expand Down Expand Up @@ -512,7 +512,7 @@ def get_disk_stat():
return [
NodeMeasurement(
MetricKey("mem"),
MetricTypes.USAGE,
MetricTypes.GAUGE,
unit_hint="bytes",
stats_filter=frozenset({"max"}),
per_node=Measurement(total_mem_used_bytes, total_mem_capacity_bytes),
Expand All @@ -522,7 +522,7 @@ def get_disk_stat():
),
NodeMeasurement(
MetricKey("disk"),
MetricTypes.USAGE,
MetricTypes.GAUGE,
unit_hint="bytes",
per_node=Measurement(total_disk_usage, total_disk_capacity),
per_device=per_disk_stat,
Expand Down Expand Up @@ -714,21 +714,21 @@ async def api_impl(container_id):
return [
ContainerMeasurement(
MetricKey("mem"),
MetricTypes.USAGE,
MetricTypes.GAUGE,
unit_hint="bytes",
stats_filter=frozenset({"max"}),
per_container=per_container_mem_used_bytes,
),
ContainerMeasurement(
MetricKey("io_read"),
MetricTypes.USAGE,
MetricTypes.GAUGE,
unit_hint="bytes",
stats_filter=frozenset({"rate"}),
per_container=per_container_io_read_bytes,
),
ContainerMeasurement(
MetricKey("io_write"),
MetricTypes.USAGE,
MetricTypes.GAUGE,
unit_hint="bytes",
stats_filter=frozenset({"rate"}),
per_container=per_container_io_write_bytes,
Expand All @@ -749,7 +749,7 @@ async def api_impl(container_id):
),
ContainerMeasurement(
MetricKey("io_scratch_size"),
MetricTypes.USAGE,
MetricTypes.GAUGE,
unit_hint="bytes",
stats_filter=frozenset({"max"}),
per_container=per_container_io_scratch_size,
Expand Down Expand Up @@ -817,21 +817,21 @@ async def api_impl(
return [
ProcessMeasurement(
MetricKey("mem"),
MetricTypes.USAGE,
MetricTypes.GAUGE,
unit_hint="bytes",
stats_filter=frozenset({"max"}),
per_process=per_process_mem_used_bytes,
),
ProcessMeasurement(
MetricKey("io_read"),
MetricTypes.USAGE,
MetricTypes.GAUGE,
unit_hint="bytes",
stats_filter=frozenset({"rate"}),
per_process=per_process_io_read_bytes,
),
ProcessMeasurement(
MetricKey("io_write"),
MetricTypes.USAGE,
MetricTypes.GAUGE,
unit_hint="bytes",
stats_filter=frozenset({"rate"}),
per_process=per_process_io_write_bytes,
Expand Down
77 changes: 50 additions & 27 deletions src/ai/backend/agent/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,39 @@ def get_preferred_mode():


class MetricTypes(enum.Enum):
USAGE = 0 # for instant snapshot (e.g., used memory bytes, used cpu msec)
RATE = 1 # for rate of increase (e.g., I/O bps)
UTILIZATION = (
2 # for ratio of resource occupation time per measurement interval (e.g., CPU util)
)
ACCUMULATED = 3 # for accumulated value (e.g., total number of events)
"""
Specifies the type of a metric value.
Currently this DOES NOT affect calculation and processing of the metric,
but serves as a metadata for code readers.
The actual calculation and formatting is controlled by :meth:`Metric.current_hook()`,
:attr:`Metric.unit_hint` and :attr:`Metric.stats_filter`.
"""

GAUGE = 0
"""
Represents an instantly measured occupancy value.
(e.g., used space as bytes, occupied amount as the number of items or a bandwidth)
"""
USAGE = 0
"""
This is same to GAUGE, but just kept for backward compatibility of compute plugins.
"""
RATE = 1
"""
Represents a rate of changes calculated from underlying gauge/accumulation values
(e.g., I/O bps calculated from RX/TX accum.bytes)
"""
UTILIZATION = 2
"""
Represents a ratio of resource occupation time per each measurement interval
(e.g., CPU utilization)
"""
ACCUMULATION = 3
"""
Represents an accumulated value
(e.g., total number of events, total period of occupation)
"""


@attrs.define(auto_attribs=True, slots=True)
Expand All @@ -108,9 +135,9 @@ class NodeMeasurement:
type: MetricTypes
per_node: Measurement
per_device: Mapping[DeviceId, Measurement] = attrs.Factory(dict)
unit_hint: Optional[str] = None
stats_filter: FrozenSet[str] = attrs.Factory(frozenset)
current_hook: Optional[Callable[["Metric"], Decimal]] = None
unit_hint: str = "count"


@attrs.define(auto_attribs=True, slots=True)
Expand All @@ -122,9 +149,9 @@ class ContainerMeasurement:
key: MetricKey
type: MetricTypes
per_container: Mapping[str, Measurement] = attrs.Factory(dict)
unit_hint: Optional[str] = None
stats_filter: FrozenSet[str] = attrs.Factory(frozenset)
current_hook: Optional[Callable[["Metric"], Decimal]] = None
unit_hint: str = "count"


@attrs.define(auto_attribs=True, slots=True)
Expand All @@ -136,9 +163,9 @@ class ProcessMeasurement:
key: MetricKey
type: MetricTypes
per_process: Mapping[int, Measurement] = attrs.Factory(dict)
unit_hint: Optional[str] = None
stats_filter: FrozenSet[str] = attrs.Factory(frozenset)
current_hook: Optional[Callable[["Metric"], Decimal]] = None
unit_hint: str = "count"


class MovingStatistics:
Expand Down Expand Up @@ -228,11 +255,11 @@ def to_serializable_dict(self) -> MovingStatValue:
class Metric:
key: str
type: MetricTypes
unit_hint: str
stats: MovingStatistics
stats_filter: FrozenSet[str]
current: Decimal
capacity: Optional[Decimal] = None
unit_hint: Optional[str] = None
current_hook: Optional[Callable[["Metric"], Decimal]] = None

def update(self, value: Measurement):
Expand Down Expand Up @@ -274,12 +301,10 @@ def to_serializable_dict(self) -> MetricValue:
class StatContext:
agent: "AbstractAgent"
mode: StatModes
node_metrics: Mapping[MetricKey, Metric]
device_metrics: Mapping[MetricKey, MutableMapping[DeviceId, Metric]]
kernel_metrics: MutableMapping[KernelId, MutableMapping[MetricKey, Metric]]
process_metrics: MutableMapping[
ContainerId, MutableMapping[PID, MutableMapping[MetricKey, Metric]]
]
node_metrics: dict[MetricKey, Metric]
device_metrics: dict[MetricKey, dict[DeviceId, Metric]]
kernel_metrics: dict[KernelId, dict[MetricKey, Metric]]
process_metrics: dict[ContainerId, dict[PID, dict[MetricKey, Metric]]]

def __init__(
self, agent: "AbstractAgent", mode: StatModes = None, *, cache_lifespan: int = 120
Expand Down Expand Up @@ -322,12 +347,12 @@ async def collect_node_stat(self):
# Here we use asyncio.gather() instead of aiotools.TaskGroup
# to keep methods of other plugins running when a plugin raises an error
# instead of cancelling them.
_tasks = []
_tasks: list[asyncio.Task[Sequence[NodeMeasurement]]] = []
for computer in self.agent.computers.values():
_tasks.append(computer.instance.gather_node_measures(self))
_tasks.append(asyncio.create_task(computer.instance.gather_node_measures(self)))
results = await asyncio.gather(*_tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
if isinstance(result, BaseException):
log.error("collect_node_stat(): gather_node_measures() error", exc_info=result)
continue
for node_measure in result:
Expand All @@ -347,9 +372,7 @@ async def collect_node_stat(self):
else:
self.node_metrics[metric_key].update(node_measure.per_node)
# update per-device metric
# NOTE: device IDs are defined by each metric keys.
for dev_id, measure in node_measure.per_device.items():
dev_id = str(dev_id)
if metric_key not in self.device_metrics:
self.device_metrics[metric_key] = {}
if dev_id not in self.device_metrics[metric_key]:
Expand All @@ -371,7 +394,7 @@ async def collect_node_stat(self):
"node": {key: obj.to_serializable_dict() for key, obj in self.node_metrics.items()},
"devices": {
metric_key: {
dev_id: obj.to_serializable_dict() for dev_id, obj in per_device.items()
str(dev_id): obj.to_serializable_dict() for dev_id, obj in per_device.items()
}
for metric_key, per_device in self.device_metrics.items()
},
Expand Down Expand Up @@ -418,7 +441,7 @@ async def collect_container_stat(
# Here we use asyncio.gather() instead of aiotools.TaskGroup
# to keep methods of other plugins running when a plugin raises an error
# instead of cancelling them.
_tasks = []
_tasks: list[asyncio.Task[Sequence[ContainerMeasurement]]] = []
kernel_id = None
for computer in self.agent.computers.values():
_tasks.append(
Expand Down Expand Up @@ -466,7 +489,7 @@ async def _pipe_builder(r: Redis) -> Pipeline:
for kernel_id in updated_kernel_ids:
metrics = self.kernel_metrics[kernel_id]
serializable_metrics = {
key: obj.to_serializable_dict() for key, obj in metrics.items()
str(key): obj.to_serializable_dict() for key, obj in metrics.items()
}
if self.agent.local_config["debug"]["log-stats"]:
log.debug("kernel_updates: {0}: {1}", kernel_id, serializable_metrics)
Expand Down Expand Up @@ -513,7 +536,7 @@ async def collect_per_container_process_stat(
# Here we use asyncio.gather() instead of aiotools.TaskGroup
# to keep methods of other plugins running when a plugin raises an error
# instead of cancelling them.
_tasks = []
_tasks: list[asyncio.Task[Sequence[ProcessMeasurement]]] = []
for computer in self.agent.computers.values():
_tasks.append(
asyncio.create_task(
Expand All @@ -525,7 +548,7 @@ async def collect_per_container_process_stat(
results = await asyncio.gather(*_tasks, return_exceptions=True)
updated_cids: Set[ContainerId] = set()
for result in results:
if isinstance(result, Exception):
if isinstance(result, BaseException):
log.error(
"collect_per_container_process_stat(): gather_process_measures() error",
exc_info=result,
Expand Down Expand Up @@ -563,7 +586,7 @@ async def _pipe_builder(r: Redis) -> Pipeline:
for pid in self.process_metrics[cid].keys():
metrics = self.process_metrics[cid][pid]
serializable_metrics = {
key: obj.to_serializable_dict() for key, obj in metrics.items()
str(key): obj.to_serializable_dict() for key, obj in metrics.items()
}
serializable_table[pid] = serializable_metrics
if self.agent.local_config["debug"]["log-stats"]:
Expand Down

0 comments on commit 8fd3142

Please sign in to comment.