Skip to content

Commit

Permalink
feat: Per-user container image creation from running sessions (#1973)
Browse files Browse the repository at this point in the history
Co-authored-by: Joongi Kim <joongi@lablup.com>
  • Loading branch information
kyujin-cho and achimnol committed Apr 4, 2024
1 parent 652c1aa commit dbb3c19
Show file tree
Hide file tree
Showing 32 changed files with 1,224 additions and 312 deletions.
1 change: 1 addition & 0 deletions changes/1973.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add new API to create new image from live session
26 changes: 24 additions & 2 deletions src/ai/backend/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
from trafaret import DataError

from ai.backend.common import msgpack, redis_helper
from ai.backend.common.bgtask import BackgroundTaskManager
from ai.backend.common.config import model_definition_iv
from ai.backend.common.defs import REDIS_STAT_DB, REDIS_STREAM_DB
from ai.backend.common.docker import MAX_KERNELSPEC, MIN_KERNELSPEC, ImageRef
Expand Down Expand Up @@ -563,6 +564,8 @@ class AbstractAgent(
stats_monitor: StatsPluginContext # unused currently
error_monitor: ErrorPluginContext # unused in favor of produce_error_event()

background_task_manager: BackgroundTaskManager

_pending_creation_tasks: Dict[KernelId, Set[asyncio.Task]]
_ongoing_exec_batch_tasks: weakref.WeakSet[asyncio.Task]
_ongoing_destruction_tasks: weakref.WeakValueDictionary[KernelId, asyncio.Task]
Expand Down Expand Up @@ -638,6 +641,8 @@ async def __ainit__(self) -> None:
db=REDIS_STAT_DB,
)

self.background_task_manager = BackgroundTaskManager(self.event_producer)

alloc_map_mod.log_alloc_map = self.local_config["debug"]["log-alloc-map"]
computers = await self.load_resources()

Expand Down Expand Up @@ -1494,6 +1499,12 @@ async def scan_images(self) -> Mapping[str, str]:
async def _scan_images_wrapper(self, interval: float) -> None:
self.images = await self.scan_images()

@abstractmethod
async def push_image(self, image_ref: ImageRef, registry_conf: ImageRegistry) -> None:
"""
Push the given image to the given registry.
"""

@abstractmethod
async def pull_image(self, image_ref: ImageRef, registry_conf: ImageRegistry) -> None:
"""
Expand Down Expand Up @@ -2363,8 +2374,19 @@ async def shutdown_service(self, kernel_id: KernelId, service: str):
except Exception:
log.exception("unhandled exception while shutting down service app ${}", service)

async def commit(self, reporter, kernel_id: KernelId, subdir: str, filename: str):
return await self.kernel_registry[kernel_id].commit(kernel_id, subdir, filename)
async def commit(
self,
reporter,
kernel_id: KernelId,
subdir: str,
*,
canonical: str | None = None,
filename: str | None = None,
extra_labels: dict[str, str] = {},
):
return await self.kernel_registry[kernel_id].commit(
kernel_id, subdir, canonical=canonical, filename=filename, extra_labels=extra_labels
)

async def get_commit_status(self, kernel_id: KernelId, subdir: str) -> CommitStatus:
return await self.kernel_registry[kernel_id].check_duplicate_commit(kernel_id, subdir)
Expand Down
18 changes: 18 additions & 0 deletions src/ai/backend/agent/docker/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -1259,6 +1259,24 @@ async def handle_agent_socket(self):
else:
zmq_ctx.destroy()

async def push_image(self, image_ref: ImageRef, registry_conf: ImageRegistry) -> None:
if image_ref.is_local:
return
auth_config = None
reg_user = registry_conf.get("username")
reg_passwd = registry_conf.get("password")
log.info("pushing image {} to registry", image_ref.canonical)
if reg_user and reg_passwd:
encoded_creds = base64.b64encode(f"{reg_user}:{reg_passwd}".encode("utf-8")).decode(
"ascii"
)
auth_config = {
"auth": encoded_creds,
}

async with closing_async(Docker()) as docker:
await docker.images.push(image_ref.canonical, auth=auth_config)

async def pull_image(self, image_ref: ImageRef, registry_conf: ImageRegistry) -> None:
auth_config = None
reg_user = registry_conf.get("username")
Expand Down
82 changes: 53 additions & 29 deletions src/ai/backend/agent/docker/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,20 @@ async def check_duplicate_commit(self, kernel_id: KernelId, subdir: str) -> Comm
return CommitStatus.ONGOING
return CommitStatus.READY

async def commit(self, kernel_id: KernelId, subdir: str, filename: str):
async def commit(
self,
kernel_id,
subdir,
*,
canonical: str | None = None,
filename: str | None = None,
extra_labels: dict[str, str] = {},
) -> None:
assert self.runner is not None

loop = asyncio.get_running_loop()
path, lock_path = self._get_commit_path(kernel_id, subdir)
container_id: str = str(self.data["container_id"])
filepath = path / filename
try:
Path(path).mkdir(exist_ok=True, parents=True)
Path(lock_path).parent.mkdir(exist_ok=True, parents=True)
Expand All @@ -184,37 +191,54 @@ def _write_chunks(

try:
async with FileLock(path=lock_path, timeout=0.1, remove_when_unlock=True):
log.info("Container is being committed to {}", filepath)
log.info("Container (k: {}) is being committed", kernel_id)
docker = Docker()
container = docker.containers.container(container_id)
changes: list[str] = []
for label_name, label_value in extra_labels.items():
changes.append(f"LABEL {label_name}={label_value}")
try:
response: Mapping[str, Any] = await container.commit()
if canonical:
if ":" in canonical:
repo, tag = canonical.split(":", maxsplit=2)
else:
repo, tag = canonical, "latest"
log.debug("tagging image as {}:{}", repo, tag)
else:
repo, tag = None, None
response: Mapping[str, Any] = await container.commit(
changes=changes,
repository=repo,
tag=tag,
)
image_id = response["Id"]
try:
q: janus.Queue[bytes | Sentinel] = janus.Queue(
maxsize=DEFAULT_INFLIGHT_CHUNKS
)
async with docker._query(f"images/{image_id}/get") as tb_resp:
with gzip.open(filepath, "wb") as fileobj:
write_task = loop.run_in_executor(
None,
functools.partial(
_write_chunks,
fileobj,
q.sync_q,
),
)
try:
await asyncio.sleep(0) # let write_task get started
async for chunk in tb_resp.content.iter_chunked(
DEFAULT_CHUNK_SIZE
):
await q.async_q.put(chunk)
finally:
await q.async_q.put(Sentinel.TOKEN)
await write_task
finally:
await docker.images.delete(image_id)
if filename:
filepath = path / filename
try:
q: janus.Queue[bytes | Sentinel] = janus.Queue(
maxsize=DEFAULT_INFLIGHT_CHUNKS
)
async with docker._query(f"images/{image_id}/get") as tb_resp:
with gzip.open(filepath, "wb") as fileobj:
write_task = loop.run_in_executor(
None,
functools.partial(
_write_chunks,
fileobj,
q.sync_q,
),
)
try:
await asyncio.sleep(0) # let write_task get started
async for chunk in tb_resp.content.iter_chunked(
DEFAULT_CHUNK_SIZE
):
await q.async_q.put(chunk)
finally:
await q.async_q.put(Sentinel.TOKEN)
await write_task
finally:
await docker.images.delete(image_id)
finally:
await docker.close()
except asyncio.TimeoutError:
Expand Down
10 changes: 9 additions & 1 deletion src/ai/backend/agent/dummy/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,15 @@ async def check_duplicate_commit(self, kernel_id, subdir) -> CommitStatus:
return CommitStatus.ONGOING
return CommitStatus.READY

async def commit(self, kernel_id, subdir, filename):
async def commit(
self,
kernel_id,
subdir,
*,
canonical: str | None = None,
filename: str | None = None,
extra_labels: dict[str, str] = {},
) -> None:
self.is_commiting = True
delay = self.dummy_kernel_cfg["delay"]["commit"]
await asyncio.sleep(delay)
Expand Down
4 changes: 4 additions & 0 deletions src/ai/backend/agent/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ class ResourceError(ValueError):
pass


class InvalidArgumentError(RuntimeError):
pass


class UnsupportedResource(ResourceError):
pass

Expand Down
10 changes: 9 additions & 1 deletion src/ai/backend/agent/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,15 @@ async def check_duplicate_commit(self, kernel_id, subdir) -> CommitStatus:
raise NotImplementedError

@abstractmethod
async def commit(self, kernel_id, subdir, filename):
async def commit(
self,
kernel_id,
subdir,
*,
canonical: str | None = None,
filename: str | None = None,
extra_labels: dict[str, str] = {},
):
raise NotImplementedError

@abstractmethod
Expand Down
10 changes: 9 additions & 1 deletion src/ai/backend/agent/kubernetes/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,15 @@ async def check_duplicate_commit(self, kernel_id, subdir):
log.error("Committing in Kubernetes is not supported yet.")
raise NotImplementedError

async def commit(self, kernel_id, subdir, filename):
async def commit(
self,
kernel_id,
subdir,
*,
canonical: str | None = None,
filename: str | None = None,
extra_labels: dict[str, str] = {},
) -> None:
# TODO: Implement container commit on Kubernetes kernel.
log.error("Committing in Kubernetes is not supported yet.")
raise NotImplementedError
Expand Down
72 changes: 51 additions & 21 deletions src/ai/backend/agent/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,20 @@

from ai.backend.common import config, identity, msgpack, utils
from ai.backend.common.auth import AgentAuthHandler, PublicKey, SecretKey
from ai.backend.common.bgtask import BackgroundTaskManager
from ai.backend.common.defs import REDIS_STREAM_DB
from ai.backend.common.bgtask import ProgressReporter
from ai.backend.common.docker import ImageRef
from ai.backend.common.etcd import AsyncEtcd, ConfigScopes
from ai.backend.common.events import (
EventProducer,
KernelLifecycleEventReason,
KernelTerminatedEvent,
)
from ai.backend.common.logging import BraceStyleAdapter, Logger
from ai.backend.common.types import (
ClusterInfo,
CommitStatus,
EtcdRedisConfig,
HardwareMetadata,
HostPortPair,
ImageRegistry,
KernelCreationConfig,
KernelId,
LogSeverity,
Expand Down Expand Up @@ -217,7 +216,6 @@ async def __ainit__(self) -> None:

await self.read_agent_config()
await self.read_agent_config_container()
await self.init_background_task_manager()

self.stats_monitor = AgentStatsPluginContext(self.etcd, self.local_config)
self.error_monitor = AgentErrorPluginContext(self.etcd, self.local_config)
Expand Down Expand Up @@ -338,13 +336,6 @@ async def read_agent_config_container(self):
self.local_config["container"][k] = v
log.info("etcd: container-config: {}={}".format(k, v))

async def init_background_task_manager(self):
event_producer = await EventProducer.new(
cast(EtcdRedisConfig, self.local_config["redis"]),
db=REDIS_STREAM_DB,
)
self.local_config["background_task_manager"] = BackgroundTaskManager(event_producer)

async def __aenter__(self) -> None:
await self.rpc_server.__aenter__()

Expand Down Expand Up @@ -608,20 +599,59 @@ async def commit(
self,
kernel_id: str,
subdir: str,
filename: str,
*,
canonical: str | None = None,
filename: str | None = None,
extra_labels: dict[str, str] = {},
) -> dict[str, Any]:
log.info("rpc::commit(k:{})", kernel_id)
bgtask_mgr = self.local_config["background_task_manager"]
task_id = await bgtask_mgr.start(
self.agent.commit,
kernel_id=KernelId(UUID(kernel_id)),
subdir=subdir,
filename=filename,
)
bgtask_mgr = self.agent.background_task_manager

async def _commit(reporter: ProgressReporter) -> None:
await self.agent.commit(
reporter,
KernelId(UUID(kernel_id)),
subdir,
canonical=canonical,
filename=filename,
extra_labels=extra_labels,
)

task_id = await bgtask_mgr.start(_commit)
return {
"bgtask_id": str(task_id),
"kernel": kernel_id,
"path": str(Path(subdir, filename)),
"path": str(Path(subdir, filename)) if filename else None,
}

@rpc_function
@collect_error
async def push_image(
self,
canonical: str,
architecture: str,
registry_conf: ImageRegistry,
*,
is_local: bool = False,
) -> dict[str, Any]:
log.info("rpc::push_image(c:{})", canonical)
bgtask_mgr = self.agent.background_task_manager

async def _push_image(reporter: ProgressReporter) -> None:
await self.agent.push_image(
ImageRef(
canonical,
known_registries=["*"],
is_local=is_local,
architecture=architecture,
),
registry_conf,
)

task_id = await bgtask_mgr.start(_push_image)
return {
"bgtask_id": str(task_id),
"canonical": canonical,
}

@rpc_function
Expand Down
1 change: 1 addition & 0 deletions src/ai/backend/client/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from . import config # noqa # type: ignore
from . import dotfile # noqa # type: ignore
from . import extensions # noqa # type: ignore
from . import image # noqa # type: ignore
from . import model # noqa # type: ignore
from . import server_log # noqa # type: ignore
from . import service # noqa # type: ignore
Expand Down

0 comments on commit dbb3c19

Please sign in to comment.