diff --git a/changes/6809.feature.md b/changes/6809.feature.md new file mode 100644 index 00000000000..0e3c8f7c4e2 --- /dev/null +++ b/changes/6809.feature.md @@ -0,0 +1 @@ +Support for DGX Spark diff --git a/src/ai/backend/agent/agent.py b/src/ai/backend/agent/agent.py index 9b7cdc23d86..a2dde244c4d 100644 --- a/src/ai/backend/agent/agent.py +++ b/src/ai/backend/agent/agent.py @@ -377,6 +377,11 @@ async def get_extra_envs(self) -> Mapping[str, str]: async def prepare_resource_spec( self, ) -> tuple[KernelResourceSpec, Optional[Mapping[str, Any]]]: + """ + Generates base resource spec lacking non agent backend agnostic information + (e.g. unified device allocation). Do not call this method directly outside + `generate_resource_spec` implementation. + """ raise NotImplementedError @abstractmethod @@ -659,37 +664,36 @@ def mount_static_binary( # Inject ComputeDevice-specific hooks already_injected_hooks: set[Path] = set() - for dev_type, device_alloc in resource_spec.allocations.items(): - computer_ctx = self.computers[dev_type] + for device_view in resource_spec.device_list: + computer_ctx = self.computers[device_view.device] await self.apply_accelerator_allocation( computer_ctx.instance, - device_alloc, + {device_view.slot: device_view.device_alloc}, ) accelerator_mounts = await self.generate_accelerator_mounts( computer_ctx.instance, - device_alloc, + {device_view.slot: device_view.device_alloc}, ) for mount_info in accelerator_mounts: _mount(mount_info.mode, mount_info.src_path, mount_info.dst_path.as_posix()) - alloc_sum = Decimal(0) - for dev_id, per_dev_alloc in device_alloc.items(): - alloc_sum += sum(per_dev_alloc.values()) - if alloc_sum > 0: - hook_paths = await computer_ctx.instance.get_hooks(distro, arch) - if hook_paths: - log.debug( - "accelerator {} provides hooks: {}", - type(computer_ctx.instance).__name__, - ", ".join(map(str, hook_paths)), - ) - for hook_path in map(lambda p: Path(p).absolute(), hook_paths): - if hook_path in already_injected_hooks: - continue - container_hook_path = f"/opt/kernel/{hook_path.name}" - _mount(MountTypes.BIND, hook_path, container_hook_path) - environ["LD_PRELOAD"] += ":" + container_hook_path - already_injected_hooks.add(hook_path) + + # `.device_list()` guarantees every listed device (slot) would + # actually attach device to the container + hook_paths = await computer_ctx.instance.get_hooks(distro, arch) + if hook_paths: + log.debug( + "accelerator {} provides hooks: {}", + type(computer_ctx.instance).__name__, + ", ".join(map(str, hook_paths)), + ) + for hook_path in map(lambda p: Path(p).absolute(), hook_paths): + if hook_path in already_injected_hooks: + continue + container_hook_path = f"/opt/kernel/{hook_path.name}" + _mount(MountTypes.BIND, hook_path, container_hook_path) + environ["LD_PRELOAD"] += ":" + container_hook_path + already_injected_hooks.add(hook_path) async def inject_additional_device_env_vars( self, resource_spec: KernelResourceSpec, environ: MutableMapping[str, str] @@ -722,6 +726,27 @@ def get_supplementary_gids(self) -> set[int]: # TODO(BA-3073): This should be separated out to its own class/module. return self.supplementary_gids + async def generate_resource_spec(self) -> tuple[KernelResourceSpec, Mapping[str, Any] | None]: + """ + Creates complete set of `KernelResourceSpec` based on the results from + `prepare_resource_spec()` result. + """ + base_resource_spec, resource_opts = await self.prepare_resource_spec() + unified_devices: list[tuple[DeviceName, SlotName]] = [] + + # implicitly attach unified accelerators to every created kernels + for dev_type, computer_ctx in self.computers.items(): + for slot_name, slot_type in computer_ctx.instance.slot_types: + if slot_type == SlotTypes.UNIFIED: + log.debug( + "mount_krunner(): Attaching Unified device {} to kernel {}", + slot_name, + self.kernel_id, + ) + unified_devices.append((dev_type, slot_name)) + base_resource_spec.unified_devices = unified_devices + return base_resource_spec, resource_opts + KernelCreationContextType = TypeVar( "KernelCreationContextType", bound=AbstractKernelCreationContext @@ -2628,7 +2653,8 @@ async def create_kernel( # Get the resource spec from existing kernel scratches # or create a new resource spec from ctx.kernel_config - resource_spec, resource_opts = await ctx.prepare_resource_spec() + resource_spec, resource_opts = await ctx.generate_resource_spec() + # When creating a new kernel, # we need to allocate agent resources, prepare the networks, # adn specify the container mounts. diff --git a/src/ai/backend/agent/resources.py b/src/ai/backend/agent/resources.py index 057a1df94a3..a4c08ff8c58 100644 --- a/src/ai/backend/agent/resources.py +++ b/src/ai/backend/agent/resources.py @@ -14,12 +14,14 @@ MutableMapping, Sequence, ) +from dataclasses import dataclass from decimal import Decimal from functools import cached_property from pathlib import Path from typing import ( TYPE_CHECKING, Any, + Iterable, Optional, TextIO, Type, @@ -100,6 +102,13 @@ class ComputerContext: alloc_map: AbstractAllocMap +@dataclass +class DeviceView: + device: DeviceName + slot: SlotName + device_alloc: Mapping[DeviceId, Decimal] + + @attrs.define(slots=True) class KernelResourceSpec: """ @@ -124,6 +133,11 @@ class KernelResourceSpec: mounts: list["Mount"] = attrs.Factory(list) """The mounted vfolder list.""" + unified_devices: Iterable[tuple[DeviceName, SlotName]] = attrs.Factory(list) + """ + Represents unified devices mounted to the kernel. + """ + def freeze(self) -> None: """Replace the attribute setter to make it immutable.""" # TODO: implement @@ -136,14 +150,36 @@ def freeze(self) -> None: # # TODO: wrap slots and allocations with frozendict? # setattr(self, '__setattr__', _frozen_setattr) # <-- __setattr__ is read-only... :( + @property + def device_list(self) -> Iterable[DeviceView]: + """ + View of effective list of devices mounted to kernel, aggregating both non-unified and unified devices. + DeviceView representing unified devices will always have empty `device_alloc` map. + Unlike the `allocations` property, this view will not list slots with zero allocation - that is, slots without any alloc map defined. + """ + devices = [] + for device, allocs in self.allocations.items(): + for slot, device_alloc in allocs.items(): + alloc_sum = Decimal(0) + for dev_id, per_dev_alloc in device_alloc.items(): + alloc_sum += per_dev_alloc + if alloc_sum > 0: + devices.append(DeviceView(device, slot, device_alloc)) + for device, slot in self.unified_devices: + devices.append(DeviceView(device, slot, {})) + + return devices + def write_to_string(self) -> str: mounts_str = ",".join(map(str, self.mounts)) slots_str = dump_json_str({k: str(v) for k, v in self.slots.items()}) + unified_devices_str = dump_json_str(self.unified_devices) resource_str = "" resource_str += f"SCRATCH_SIZE={BinarySize(self.scratch_disk_size):m}\n" resource_str += f"MOUNTS={mounts_str}\n" resource_str += f"SLOTS={slots_str}\n" + resource_str += f"UNIFIED_DEVICES={unified_devices_str}\n" for device_name, slots in self.allocations.items(): for slot_name, per_device_alloc in slots.items(): @@ -208,6 +244,7 @@ def read_from_string(cls, text: str) -> "KernelResourceSpec": return cls( scratch_disk_size=BinarySize.finite_from_str(kvpairs["SCRATCH_SIZE"]), allocations=dict(allocations), + unified_devices=load_json(kvpairs.get("UNIFIED_DEVICES") or "[]"), slots=ResourceSlot.from_json(load_json(kvpairs["SLOTS"])), mounts=mounts, ) diff --git a/src/ai/backend/agent/stage/kernel_lifecycle/docker/environ.py b/src/ai/backend/agent/stage/kernel_lifecycle/docker/environ.py index 0ab2d0bf476..8a367ae2169 100644 --- a/src/ai/backend/agent/stage/kernel_lifecycle/docker/environ.py +++ b/src/ai/backend/agent/stage/kernel_lifecycle/docker/environ.py @@ -1,6 +1,5 @@ from collections.abc import Collection, Mapping, Sequence from dataclasses import dataclass -from decimal import Decimal from typing import Any, Final, Optional, Self, override from ai.backend.agent.resources import ComputerContext, KernelResourceSpec @@ -163,14 +162,8 @@ def _get_core_count(self, spec: EnvironSpec) -> dict[str, str]: async def _get_container_hooks(self, spec: EnvironSpec) -> set[str]: container_hook_path_set: set[str] = set() - for dev_type, device_alloc in spec.kernel_info.resource_spec.allocations.items(): - alloc_sum = Decimal(0) - for per_dev_alloc in device_alloc.values(): - alloc_sum += sum(per_dev_alloc.values()) - do_hook_mount = alloc_sum > 0 - if not do_hook_mount: - continue - computer_ctx = spec.agent_info.computers[dev_type] + for device_view in spec.kernel_info.resource_spec.device_list: + computer_ctx = spec.agent_info.computers[device_view.device] hook_paths = await computer_ctx.instance.get_hooks( spec.agent_info.distro, spec.agent_info.architecture ) diff --git a/src/ai/backend/agent/stage/kernel_lifecycle/docker/mount/krunner.py b/src/ai/backend/agent/stage/kernel_lifecycle/docker/mount/krunner.py index b258a03dc7b..acfa7241667 100644 --- a/src/ai/backend/agent/stage/kernel_lifecycle/docker/mount/krunner.py +++ b/src/ai/backend/agent/stage/kernel_lifecycle/docker/mount/krunner.py @@ -2,7 +2,6 @@ import re from collections.abc import Mapping from dataclasses import dataclass -from decimal import Decimal from pathlib import Path from typing import Final, override @@ -231,29 +230,24 @@ async def _prepare_hook_mounts( ) -> list[Mount]: mounts: list[Mount] = [] already_injected_hooks: set[Path] = set() - for dev_type, device_alloc in spec.resource_spec.allocations.items(): - computer_ctx = spec.existing_computers[dev_type] - alloc_sum = Decimal(0) - for per_dev_alloc in device_alloc.values(): - alloc_sum += sum(per_dev_alloc.values()) - do_hook_mount = alloc_sum > 0 - if do_hook_mount: - hook_paths = await computer_ctx.instance.get_hooks( - runner_info.distro, runner_info.architecture - ) - for hook_path in hook_paths: - if hook_path in already_injected_hooks: - continue - container_hook_path = f"/opt/kernel/{hook_path.name}" - already_injected_hooks.add(hook_path) - mounts.append( - Mount( - MountTypes.BIND, - hook_path, - Path(container_hook_path), - MountPermission.READ_ONLY, - ) + for device_view in spec.resource_spec.device_list: + computer_ctx = spec.existing_computers[device_view.device] + hook_paths = await computer_ctx.instance.get_hooks( + runner_info.distro, runner_info.architecture + ) + for hook_path in hook_paths: + if hook_path in already_injected_hooks: + continue + container_hook_path = f"/opt/kernel/{hook_path.name}" + already_injected_hooks.add(hook_path) + mounts.append( + Mount( + MountTypes.BIND, + hook_path, + Path(container_hook_path), + MountPermission.READ_ONLY, ) + ) return mounts @cached( diff --git a/src/ai/backend/common/types.py b/src/ai/backend/common/types.py index e6e0d980db7..d82aaded195 100644 --- a/src/ai/backend/common/types.py +++ b/src/ai/backend/common/types.py @@ -461,6 +461,7 @@ class SlotTypes(enum.StrEnum): COUNT = "count" BYTES = "bytes" UNIQUE = "unique" + UNIFIED = "unified" class HardwareMetadata(TypedDict):