Skip to content
This repository has been archived by the owner on Sep 22, 2023. It is now read-only.

Commit

Permalink
feat: Add "--assign-agent" option when starting/running sessions (#178)
Browse files Browse the repository at this point in the history
* This is the client-side support for lablup/backend.ai-manager#469.

Co-authored-by: Joongi Kim <joongi@lablup.com>
  • Loading branch information
kmkwon94 and achimnol committed Nov 6, 2021
1 parent 1aa6fde commit 3a18973
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 50 deletions.
2 changes: 2 additions & 0 deletions changes/178.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Add `--agent-list` option to `run` and `create` commands so that superadmins can manually assign the agent(s) for new sessions to debug and diagnose the cluster and scheduler.

53 changes: 53 additions & 0 deletions src/ai/backend/client/cli/params.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import re
from decimal import Decimal
from typing import (
Any,
Optional,
Expand Down Expand Up @@ -85,3 +86,55 @@ def convert(
except json.JSONDecodeError:
self.fail(f"cannot parse {value!r} as JSON", param, ctx)
return value


def drange(start: Decimal, stop: Decimal, num: int):
"""
A simplified version of numpy.linspace with default options
"""
delta = stop - start
step = delta / (num - 1)
yield from (start + step * Decimal(tick) for tick in range(0, num))


class RangeExprOptionType(click.ParamType):
"""
Accepts a range expression which generates a range of values for a variable.
Linear space range: "linspace:1,2,10" (start, stop, num) as in numpy.linspace
Pythonic range: "range:1,10,2" (start, stop[, step]) as in Python's range
Case range: "case:a,b,c" (comma-separated strings)
"""
_rx_range_key = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]*$')
name = 'Range Expression'

def convert(self, arg, param, ctx):
key, value = arg.split('=', maxsplit=1)
assert self._rx_range_key.match(key), 'The key must be a valid slug string.'
try:
if value.startswith('case:'):
return key, value[5:].split(',')
elif value.startswith('linspace:'):
start, stop, num = value[9:].split(',')
return key, tuple(drange(Decimal(start), Decimal(stop), int(num)))
elif value.startswith('range:'):
range_args = map(int, value[6:].split(','))
return key, tuple(range(*range_args))
else:
self.fail('Unrecognized range expression type', param, ctx)
except ValueError as e:
self.fail(str(e), param, ctx)


class CommaSeparatedListType(click.ParamType):

name = 'List Expression'

def convert(self, arg, param, ctx):
try:
if isinstance(arg, int):
return arg
elif isinstance(arg, str):
return arg.split(',')
except ValueError as e:
self.fail(repr(e), param, ctx)
68 changes: 20 additions & 48 deletions src/ai/backend/client/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import getpass
import itertools
import json
import re
import secrets
import string
import sys
Expand All @@ -31,51 +30,11 @@
print_info, print_wait, print_done, print_error, print_fail, print_warn,
format_info,
)

_rx_range_key = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]*$')
from .params import RangeExprOptionType, CommaSeparatedListType

tabulate_mod.PRESERVE_WHITESPACE = True


def drange(start: Decimal, stop: Decimal, num: int):
"""
A simplified version of numpy.linspace with default options
"""
delta = stop - start
step = delta / (num - 1)
yield from (start + step * Decimal(tick) for tick in range(0, num))


class RangeExprOptionType(click.ParamType):
"""
Accepts a range expression which generates a range of values for a variable.
Linear space range: "linspace:1,2,10" (start, stop, num) as in numpy.linspace
Pythonic range: "range:1,10,2" (start, stop[, step]) as in Python's range
Case range: "case:a,b,c" (comma-separated strings)
"""

name = 'Range Expression'

def convert(self, arg, param, ctx):
key, value = arg.split('=', maxsplit=1)
assert _rx_range_key.match(key), 'The key must be a valid slug string.'
try:
if value.startswith('case:'):
return key, value[5:].split(',')
elif value.startswith('linspace:'):
start, stop, num = value[9:].split(',')
return key, tuple(drange(Decimal(start), Decimal(stop), int(num)))
elif value.startswith('range:'):
range_args = map(int, value[6:].split(','))
return key, tuple(range(*range_args))
else:
self.fail('Unrecognized range expression type', param, ctx)
except ValueError as e:
self.fail(str(e), param, ctx)


range_expr = RangeExprOptionType()
list_expr = CommaSeparatedListType()


async def exec_loop(stdout, stderr, compute_session, mode, code, *, opts=None,
Expand Down Expand Up @@ -382,8 +341,12 @@ def prepare_mount_arg(
@click.option('-g', '--group', metavar='GROUP_NAME', default=None,
help='Group name where the session is spawned. '
'User should be a member of the group to execute the code.')
@click.option('--preopen', default=None,
@click.option('--preopen', default=None, type=list_expr,
help='Pre-open service ports')
@click.option('--assign-agent', default=None, type=list_expr,
help='Show mapping list of tuple which mapped containers with agent. '
'When user role is Super Admin. '
'(e.g., --assign-agent agent_id_1,agent_id_2,...)')
def run(image, files, name, # base args
type, starts_at, enqueue_only, max_wait, no_reuse, # job scheduling options
code, terminal, # query-mode options
Expand All @@ -394,7 +357,8 @@ def run(image, files, name, # base args
mount, scaling_group, resources, # resource spec
cluster_size, cluster_mode,
resource_opts,
domain, group, preopen): # resource grouping
domain, group, preopen, assign_agent, # resource grouping
):
"""
Run the given code snippet or files in a session.
Depending on the session ID you give (default is random),
Expand Down Expand Up @@ -444,7 +408,12 @@ def run(image, files, name, # base args
build_template = string.Template(build)
exec_template = string.Template(exec)
env_templates = {k: string.Template(v) for k, v in envs.items()}
preopen_ports = [] if preopen is None else list(map(int, preopen.split(',')))

if preopen is None: preopen = [] # noqa
if assign_agent is None: assign_agent = [] # noqa

preopen_ports = preopen
assigned_agent_list = assign_agent
for env_vmap, build_vmap, exec_vmap in vmaps_product:
interpolated_envs = tuple((k, vt.substitute(env_vmap))
for k, vt in env_templates.items())
Expand Down Expand Up @@ -494,7 +463,8 @@ def _run_legacy(session, idx, name, envs,
domain_name=domain,
group_name=group,
scaling_group=scaling_group,
tag=tag)
tag=tag,
)
except Exception as e:
print_error(e)
sys.exit(1)
Expand Down Expand Up @@ -595,7 +565,9 @@ async def _run(session, idx, name, envs,
scaling_group=scaling_group,
bootstrap_script=bootstrap_script.read() if bootstrap_script is not None else None,
tag=tag,
preopen_ports=preopen_ports)
preopen_ports=preopen_ports,
assign_agent=assigned_agent_list,
)
except Exception as e:
print_fail('[{0}] {1}'.format(idx, e))
return
Expand Down
15 changes: 13 additions & 2 deletions src/ai/backend/client/cli/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
from ..exceptions import BackendAPIError
from ..session import Session, AsyncSession
from ..types import Undefined, undefined
from .params import CommaSeparatedListType

list_expr = CommaSeparatedListType()


@main.group()
Expand Down Expand Up @@ -81,7 +84,7 @@ def _create_cmd(docs: str = None):
@click.option('--resource-opts', metavar='KEY=VAL', type=str, multiple=True,
help='Resource options for creating compute session '
'(e.g: shmem=64m)')
@click.option('--preopen', default=None,
@click.option('--preopen', default=None, type=list_expr,
help='Pre-open service ports')
# resource grouping
@click.option('-d', '--domain', metavar='DOMAIN_NAME', default=None,
Expand All @@ -90,6 +93,10 @@ def _create_cmd(docs: str = None):
@click.option('-g', '--group', metavar='GROUP_NAME', default=None,
help='Group name where the session is spawned. '
'User should be a member of the group to execute the code.')
@click.option('--assign-agent', default=None, type=list_expr,
help='Show mapping list of tuple which mapped containers with agent. '
'When user role is Super Admin. '
'(e.g., --assign-agent agent_id_1,agent_id_2,...)')
def create(
# base args
image: str,
Expand All @@ -115,6 +122,7 @@ def create(
cluster_mode: Literal['single-node', 'multi-node'],
resource_opts: Sequence[str],
preopen: str | None,
assign_agent: str | None,
# resource grouping
domain: str | None,
group: str | None,
Expand All @@ -140,7 +148,9 @@ def create(
resources = prepare_resource_arg(resources)
resource_opts = prepare_resource_arg(resource_opts)
mount, mount_map = prepare_mount_arg(mount)
preopen_ports = [] if preopen is None else list(map(int, preopen.split(',')))

preopen_ports = preopen
assigned_agent_list = assign_agent
with Session() as session:
try:
compute_session = session.ComputeSession.get_or_create(
Expand All @@ -166,6 +176,7 @@ def create(
bootstrap_script=bootstrap_script.read() if bootstrap_script is not None else None,
tag=tag,
preopen_ports=preopen_ports,
assign_agent=assigned_agent_list,
)
except Exception as e:
print_error(e)
Expand Down
2 changes: 2 additions & 0 deletions src/ai/backend/client/func/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ async def get_or_create(
scaling_group: str = None,
owner_access_key: str = None,
preopen_ports: List[int] = None,
assign_agent: List[str] = None,
) -> ComputeSession:
"""
Get-or-creates a compute session.
Expand Down Expand Up @@ -278,6 +279,7 @@ async def get_or_create(
params['config'].update({
'mount_map': mount_map,
'preopen_ports': preopen_ports,
'agentList': assign_agent,
})
params.update({
'starts_at': starts_at,
Expand Down

0 comments on commit 3a18973

Please sign in to comment.