Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/6809.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Support for DGX Spark
72 changes: 49 additions & 23 deletions src/ai/backend/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,11 @@ async def get_extra_envs(self) -> Mapping[str, str]:
async def prepare_resource_spec(
self,
) -> tuple[KernelResourceSpec, Optional[Mapping[str, Any]]]:
"""
Generates base resource spec lacking non agent backend agnostic information
(e.g. unified device allocation). Do not call this method directly outside
`generate_resource_spec` implementation.
"""
raise NotImplementedError

@abstractmethod
Expand Down Expand Up @@ -659,37 +664,36 @@ def mount_static_binary(
# Inject ComputeDevice-specific hooks
already_injected_hooks: set[Path] = set()

for dev_type, device_alloc in resource_spec.allocations.items():
computer_ctx = self.computers[dev_type]
for device_view in resource_spec.device_list:
computer_ctx = self.computers[device_view.device]
await self.apply_accelerator_allocation(
computer_ctx.instance,
device_alloc,
{device_view.slot: device_view.device_alloc},
)
accelerator_mounts = await self.generate_accelerator_mounts(
computer_ctx.instance,
device_alloc,
{device_view.slot: device_view.device_alloc},
)

for mount_info in accelerator_mounts:
_mount(mount_info.mode, mount_info.src_path, mount_info.dst_path.as_posix())
alloc_sum = Decimal(0)
for dev_id, per_dev_alloc in device_alloc.items():
alloc_sum += sum(per_dev_alloc.values())
if alloc_sum > 0:
hook_paths = await computer_ctx.instance.get_hooks(distro, arch)
if hook_paths:
log.debug(
"accelerator {} provides hooks: {}",
type(computer_ctx.instance).__name__,
", ".join(map(str, hook_paths)),
)
for hook_path in map(lambda p: Path(p).absolute(), hook_paths):
if hook_path in already_injected_hooks:
continue
container_hook_path = f"/opt/kernel/{hook_path.name}"
_mount(MountTypes.BIND, hook_path, container_hook_path)
environ["LD_PRELOAD"] += ":" + container_hook_path
already_injected_hooks.add(hook_path)

# `.device_list()` guarantees every listed device (slot) would
# actually attach device to the container
hook_paths = await computer_ctx.instance.get_hooks(distro, arch)
if hook_paths:
log.debug(
"accelerator {} provides hooks: {}",
type(computer_ctx.instance).__name__,
", ".join(map(str, hook_paths)),
)
for hook_path in map(lambda p: Path(p).absolute(), hook_paths):
if hook_path in already_injected_hooks:
continue
container_hook_path = f"/opt/kernel/{hook_path.name}"
_mount(MountTypes.BIND, hook_path, container_hook_path)
environ["LD_PRELOAD"] += ":" + container_hook_path
already_injected_hooks.add(hook_path)

async def inject_additional_device_env_vars(
self, resource_spec: KernelResourceSpec, environ: MutableMapping[str, str]
Expand Down Expand Up @@ -722,6 +726,27 @@ def get_supplementary_gids(self) -> set[int]:
# TODO(BA-3073): This should be separated out to its own class/module.
return self.supplementary_gids

async def generate_resource_spec(self) -> tuple[KernelResourceSpec, Mapping[str, Any] | None]:
"""
Creates complete set of `KernelResourceSpec` based on the results from
`prepare_resource_spec()` result.
"""
base_resource_spec, resource_opts = await self.prepare_resource_spec()
unified_devices: list[tuple[DeviceName, SlotName]] = []

# implicitly attach unified accelerators to every created kernels
for dev_type, computer_ctx in self.computers.items():
for slot_name, slot_type in computer_ctx.instance.slot_types:
if slot_type == SlotTypes.UNIFIED:
log.debug(
"mount_krunner(): Attaching Unified device {} to kernel {}",
slot_name,
self.kernel_id,
)
unified_devices.append((dev_type, slot_name))
base_resource_spec.unified_devices = unified_devices
return base_resource_spec, resource_opts


KernelCreationContextType = TypeVar(
"KernelCreationContextType", bound=AbstractKernelCreationContext
Expand Down Expand Up @@ -2628,7 +2653,8 @@ async def create_kernel(

# Get the resource spec from existing kernel scratches
# or create a new resource spec from ctx.kernel_config
resource_spec, resource_opts = await ctx.prepare_resource_spec()
resource_spec, resource_opts = await ctx.generate_resource_spec()

# When creating a new kernel,
# we need to allocate agent resources, prepare the networks,
# adn specify the container mounts.
Expand Down
37 changes: 37 additions & 0 deletions src/ai/backend/agent/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
MutableMapping,
Sequence,
)
from dataclasses import dataclass
from decimal import Decimal
from functools import cached_property
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
Iterable,
Optional,
TextIO,
Type,
Expand Down Expand Up @@ -100,6 +102,13 @@ class ComputerContext:
alloc_map: AbstractAllocMap


@dataclass
class DeviceView:
device: DeviceName
slot: SlotName
device_alloc: Mapping[DeviceId, Decimal]


@attrs.define(slots=True)
class KernelResourceSpec:
"""
Expand All @@ -124,6 +133,11 @@ class KernelResourceSpec:
mounts: list["Mount"] = attrs.Factory(list)
"""The mounted vfolder list."""

unified_devices: Iterable[tuple[DeviceName, SlotName]] = attrs.Factory(list)
"""
Represents unified devices mounted to the kernel.
"""

def freeze(self) -> None:
"""Replace the attribute setter to make it immutable."""
# TODO: implement
Expand All @@ -136,14 +150,36 @@ def freeze(self) -> None:
# # TODO: wrap slots and allocations with frozendict?
# setattr(self, '__setattr__', _frozen_setattr) # <-- __setattr__ is read-only... :(

@property
def device_list(self) -> Iterable[DeviceView]:
"""
View of effective list of devices mounted to kernel, aggregating both non-unified and unified devices.
DeviceView representing unified devices will always have empty `device_alloc` map.
Unlike the `allocations` property, this view will not list slots with zero allocation - that is, slots without any alloc map defined.
"""
devices = []
for device, allocs in self.allocations.items():
for slot, device_alloc in allocs.items():
alloc_sum = Decimal(0)
for dev_id, per_dev_alloc in device_alloc.items():
alloc_sum += per_dev_alloc
if alloc_sum > 0:
devices.append(DeviceView(device, slot, device_alloc))
for device, slot in self.unified_devices:
devices.append(DeviceView(device, slot, {}))

return devices

def write_to_string(self) -> str:
mounts_str = ",".join(map(str, self.mounts))
slots_str = dump_json_str({k: str(v) for k, v in self.slots.items()})
unified_devices_str = dump_json_str(self.unified_devices)

resource_str = ""
resource_str += f"SCRATCH_SIZE={BinarySize(self.scratch_disk_size):m}\n"
resource_str += f"MOUNTS={mounts_str}\n"
resource_str += f"SLOTS={slots_str}\n"
resource_str += f"UNIFIED_DEVICES={unified_devices_str}\n"

for device_name, slots in self.allocations.items():
for slot_name, per_device_alloc in slots.items():
Expand Down Expand Up @@ -208,6 +244,7 @@ def read_from_string(cls, text: str) -> "KernelResourceSpec":
return cls(
scratch_disk_size=BinarySize.finite_from_str(kvpairs["SCRATCH_SIZE"]),
allocations=dict(allocations),
unified_devices=load_json(kvpairs.get("UNIFIED_DEVICES") or "[]"),
slots=ResourceSlot.from_json(load_json(kvpairs["SLOTS"])),
mounts=mounts,
)
Expand Down
11 changes: 2 additions & 9 deletions src/ai/backend/agent/stage/kernel_lifecycle/docker/environ.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from collections.abc import Collection, Mapping, Sequence
from dataclasses import dataclass
from decimal import Decimal
from typing import Any, Final, Optional, Self, override

from ai.backend.agent.resources import ComputerContext, KernelResourceSpec
Expand Down Expand Up @@ -163,14 +162,8 @@ def _get_core_count(self, spec: EnvironSpec) -> dict[str, str]:

async def _get_container_hooks(self, spec: EnvironSpec) -> set[str]:
container_hook_path_set: set[str] = set()
for dev_type, device_alloc in spec.kernel_info.resource_spec.allocations.items():
alloc_sum = Decimal(0)
for per_dev_alloc in device_alloc.values():
alloc_sum += sum(per_dev_alloc.values())
do_hook_mount = alloc_sum > 0
if not do_hook_mount:
continue
computer_ctx = spec.agent_info.computers[dev_type]
for device_view in spec.kernel_info.resource_spec.device_list:
computer_ctx = spec.agent_info.computers[device_view.device]
hook_paths = await computer_ctx.instance.get_hooks(
spec.agent_info.distro, spec.agent_info.architecture
)
Expand Down
40 changes: 17 additions & 23 deletions src/ai/backend/agent/stage/kernel_lifecycle/docker/mount/krunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import re
from collections.abc import Mapping
from dataclasses import dataclass
from decimal import Decimal
from pathlib import Path
from typing import Final, override

Expand Down Expand Up @@ -231,29 +230,24 @@ async def _prepare_hook_mounts(
) -> list[Mount]:
mounts: list[Mount] = []
already_injected_hooks: set[Path] = set()
for dev_type, device_alloc in spec.resource_spec.allocations.items():
computer_ctx = spec.existing_computers[dev_type]
alloc_sum = Decimal(0)
for per_dev_alloc in device_alloc.values():
alloc_sum += sum(per_dev_alloc.values())
do_hook_mount = alloc_sum > 0
if do_hook_mount:
hook_paths = await computer_ctx.instance.get_hooks(
runner_info.distro, runner_info.architecture
)
for hook_path in hook_paths:
if hook_path in already_injected_hooks:
continue
container_hook_path = f"/opt/kernel/{hook_path.name}"
already_injected_hooks.add(hook_path)
mounts.append(
Mount(
MountTypes.BIND,
hook_path,
Path(container_hook_path),
MountPermission.READ_ONLY,
)
for device_view in spec.resource_spec.device_list:
computer_ctx = spec.existing_computers[device_view.device]
hook_paths = await computer_ctx.instance.get_hooks(
runner_info.distro, runner_info.architecture
)
for hook_path in hook_paths:
if hook_path in already_injected_hooks:
continue
container_hook_path = f"/opt/kernel/{hook_path.name}"
already_injected_hooks.add(hook_path)
mounts.append(
Mount(
MountTypes.BIND,
hook_path,
Path(container_hook_path),
MountPermission.READ_ONLY,
)
)
return mounts

@cached(
Expand Down
1 change: 1 addition & 0 deletions src/ai/backend/common/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ class SlotTypes(enum.StrEnum):
COUNT = "count"
BYTES = "bytes"
UNIQUE = "unique"
UNIFIED = "unified"


class HardwareMetadata(TypedDict):
Expand Down
Loading