Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions scripts/awsbatchint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,13 @@ else
echo "expected 2 log lines"
exit 1
fi

torchx list -s aws_batch
LIST_LINES="$(torchx list -s aws_batch | grep -c "$APP_ID")"

if [ "$LIST_LINES" -ne 1 ]
then
echo "expected $APP_ID to be listed"
exit 1
fi
fi
11 changes: 9 additions & 2 deletions torchx/cli/cmd_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,19 @@
import argparse
import logging

from tabulate import tabulate

from torchx.cli.cmd_base import SubCommand
from torchx.runner import get_runner
from torchx.schedulers import get_default_scheduler_name, get_scheduler_factories

logger: logging.Logger = logging.getLogger(__name__)


HANDLE_HEADER = "APP HANDLE"
STATUS_HEADER = "APP STATUS"


class CmdList(SubCommand):
def add_arguments(self, subparser: argparse.ArgumentParser) -> None:
scheduler_names = get_scheduler_factories().keys()
Expand All @@ -30,5 +36,6 @@ def add_arguments(self, subparser: argparse.ArgumentParser) -> None:

def run(self, args: argparse.Namespace) -> None:
with get_runner() as runner:
jobs = runner.list(args.scheduler)
print(*jobs, sep="\n")
apps = runner.list(args.scheduler)
apps_data = [[app.app_handle, str(app.state)] for app in apps]
print(tabulate(apps_data, headers=[HANDLE_HEADER, STATUS_HEADER]))
16 changes: 8 additions & 8 deletions torchx/runner/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from pyre_extensions import none_throws
from torchx.runner.events import log_event
from torchx.schedulers import get_scheduler_factories, SchedulerFactory
from torchx.schedulers.api import Scheduler, Stream
from torchx.schedulers.api import ListAppResponse, Scheduler, Stream
from torchx.specs import (
AppDef,
AppDryRunInfo,
Expand Down Expand Up @@ -552,18 +552,18 @@ def log_lines(
def list(
self,
scheduler: str,
) -> List[str]:
) -> List[ListAppResponse]:
"""
Returns the list of app handles launched by the scheduler.
For apps launched on the scheduler, this API returns a list of ListAppResponse
objects each of which have app id, app handle and its status.
Note: This API is in prototype phase and is subject to change.
"""
with log_event("list", scheduler):
sched = self._scheduler(scheduler)
app_handles = [
make_app_handle(scheduler, self._name, app_id)
for app_id in sched.list()
]
return app_handles
apps = sched.list()
for app in apps:
app.app_handle = make_app_handle(scheduler, self._name, app.app_id)
return apps

# pyre-fixme: Scheduler opts
def _scheduler(self, scheduler: str) -> Scheduler:
Expand Down
29 changes: 20 additions & 9 deletions torchx/runner/test/api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from pyre_extensions import none_throws
from torchx.runner import get_runner, Runner
from torchx.schedulers.api import DescribeAppResponse, Scheduler
from torchx.schedulers.api import DescribeAppResponse, ListAppResponse, Scheduler
from torchx.schedulers.local_scheduler import (
LocalDirectoryImageProvider,
LocalScheduler,
Expand Down Expand Up @@ -184,7 +184,7 @@ def _submit_dryrun(
def describe(self, app_id: str) -> Optional[DescribeAppResponse]:
pass

def list(self) -> List[str]:
def list(self) -> List[DescribeAppResponse]:
pass

def _cancel_existing(self, app_id: str) -> None:
Expand Down Expand Up @@ -380,18 +380,29 @@ def test_log_lines(self, _) -> None:

def test_list(self, _) -> None:
scheduler_mock = MagicMock()
app_ids_return = ["app_handle1", "app_handle2"]
scheduler_mock.list.return_value = app_ids_return
app_handles_expected = [
"kubernetes://test_session/app_handle1",
"kubernetes://test_session/app_handle2",
sched_list_return = [
ListAppResponse(app_id="app_id1", state=AppState.RUNNING),
ListAppResponse(app_id="app_id2", state=AppState.SUCCEEDED),
]
scheduler_mock.list.return_value = sched_list_return
apps_expected = [
ListAppResponse(
app_id="app_id1",
app_handle="kubernetes://test_session/app_id1",
state=AppState.RUNNING,
),
ListAppResponse(
app_id="app_id2",
app_handle="kubernetes://test_session/app_id2",
state=AppState.SUCCEEDED,
),
]
with Runner(
name=SESSION_NAME,
scheduler_factories={"kubernetes": lambda name: scheduler_mock},
) as runner:
app_handles = runner.list("kubernetes")
self.assertEqual(app_handles, app_handles_expected)
apps = runner.list("kubernetes")
self.assertEqual(apps, apps_expected)
scheduler_mock.list.assert_called_once()

@patch("json.dumps")
Expand Down
4 changes: 2 additions & 2 deletions torchx/runner/test/config_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
load_sections,
)
from torchx.schedulers import get_scheduler_factories, Scheduler
from torchx.schedulers.api import DescribeAppResponse, Stream
from torchx.schedulers.api import DescribeAppResponse, ListAppResponse, Stream
from torchx.specs import AppDef, AppDryRunInfo, CfgVal, runopts


Expand Down Expand Up @@ -59,7 +59,7 @@ def log_iter(
) -> Iterable[str]:
raise NotImplementedError()

def list(self) -> List[str]:
def list(self) -> List[ListAppResponse]:
raise NotImplementedError()

def run_opts(self) -> runopts:
Expand Down
32 changes: 30 additions & 2 deletions torchx/schedulers/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,33 @@ class DescribeAppResponse:
roles: List[Role] = field(default_factory=list)


@dataclass
class ListAppResponse:
"""
Response object returned by ``scheduler.list()`` and ``runner.list()`` APIs.
Contains the app_id, app_handle and status of the application.
App ID : The unique identifier that identifies apps submitted on the scheduler
App handle: Identifier for apps run with torchx in a url format like
{scheduler_backend}://{session_name}/{app_id}, which is created by the runner
when it submits a job on a scheduler. Handle info in ListAppResponse is filled
in by ``runner.list()``. This handle can be used to further describe the app
with torchx CLI or a torchx runner instance.

Since this class is a data class with some member variables we keep the usage
simple and chose to access the member vars directly rather than provide accessors.
"""

app_id: str
state: AppState
app_handle: str = "<NOT_SET>"

# Implementing __hash__() makes ListAppResponse hashable which makes
# it easier to check if a ListAppResponse object exists in a list of
# objects for testing purposes.
def __hash__(self) -> int:
return hash((self.app_id, self.app_handle, self.state))


T = TypeVar("T")


Expand Down Expand Up @@ -175,9 +202,10 @@ def describe(self, app_id: str) -> Optional[DescribeAppResponse]:
raise NotImplementedError()

@abc.abstractmethod
def list(self) -> List[str]:
def list(self) -> List[ListAppResponse]:
"""
Lists the app ids launched on the scheduler.
For apps launched on the scheduler, this API returns a list of ListAppResponse
objects each of which have app id and its status.
Note: This API is in prototype phase and is subject to change.
"""
raise NotImplementedError()
Expand Down
21 changes: 13 additions & 8 deletions torchx/schedulers/aws_batch_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
AppDryRunInfo,
DescribeAppResponse,
filter_regex,
ListAppResponse,
Scheduler,
Stream,
)
Expand Down Expand Up @@ -576,22 +577,28 @@ def log_iter(
else:
return iterator

def list(self) -> List[str]:
def list(self) -> List[ListAppResponse]:
# TODO: get queue name input instead of iterating over all queues?
resp = self._client.describe_job_queues()
queue_names = [queue["jobQueueName"] for queue in resp["jobQueues"]]
all_app_ids = []
all_apps = []
for qn in queue_names:
all_app_ids += self._list_by_queue(qn)
return all_app_ids
apps_in_queue = self._list_by_queue(qn)
all_apps += [
ListAppResponse(
app_id=f"{qn}:{app['jobName']}", state=JOB_STATE[app["status"]]
)
for app in apps_in_queue
]
return all_apps

def _list_by_queue(self, queue_name: str) -> List[str]:
def _list_by_queue(self, queue_name: str) -> List[Dict[str, Any]]:
# By default only running jobs are listed by batch/boto client's list_jobs API
# When 'filters' parameter is specified, jobs with all statuses are listed
# So use AFTER_CREATED_AT filter to list jobs in all statuses
# milli_seconds_after_epoch can later be used to list jobs by timeframe
milli_seconds_after_epoch = "1"
jobs = self._client.list_jobs(
return self._client.list_jobs(
jobQueue=queue_name,
filters=[
{
Expand All @@ -602,8 +609,6 @@ def _list_by_queue(self, queue_name: str) -> List[str]:
},
],
)["jobSummaryList"]
app_ids = [f"{queue_name}:{job['jobName']}" for job in jobs]
return app_ids

def _stream_events(
self,
Expand Down
35 changes: 21 additions & 14 deletions torchx/schedulers/docker_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
AppDryRunInfo,
DescribeAppResponse,
filter_regex,
ListAppResponse,
Scheduler,
split_lines,
Stream,
Expand Down Expand Up @@ -354,6 +355,19 @@ def run_opts(self) -> runopts:
)
return opts

def _get_app_state(self, container: "Container") -> AppState:
if container.status == "exited":
# docker doesn't have success/failed states -- we have to call
# `wait()` to get the exit code to determine that
status = container.wait(timeout=10)
if status["StatusCode"] == 0:
state = AppState.SUCCEEDED
else:
state = AppState.FAILED
else:
state = CONTAINER_STATE[container.status]
return state

def describe(self, app_id: str) -> Optional[DescribeAppResponse]:
roles = {}
roles_statuses = {}
Expand All @@ -374,16 +388,7 @@ def describe(self, app_id: str) -> Optional[DescribeAppResponse]:
roles_statuses[role] = RoleStatus(role, [])
roles[role].num_replicas += 1

if container.status == "exited":
# docker doesn't have success/failed states -- we have to call
# `wait()` to get the exit code to determine that
status = container.wait(timeout=10)
if status["StatusCode"] == 0:
state = AppState.SUCCEEDED
else:
state = AppState.FAILED
else:
state = CONTAINER_STATE[container.status]
state = self._get_app_state(container)

roles_statuses[role].replicas.append(
ReplicaStatus(
Expand Down Expand Up @@ -447,14 +452,16 @@ def log_iter(
else:
return logs

def list(self) -> List[str]:
unique_app_ids = {
cntr.labels[LABEL_APP_ID]
def list(self) -> List[ListAppResponse]:
unique_apps = {
ListAppResponse(
app_id=cntr.labels[LABEL_APP_ID], state=self._get_app_state(cntr)
)
for cntr in self._docker_client.containers.list(
all=True, filters={"label": f"{LABEL_APP_ID}"}
)
}
return list(unique_app_ids)
return list(unique_apps)


def _to_str(a: Union[str, bytes]) -> str:
Expand Down
17 changes: 9 additions & 8 deletions torchx/schedulers/kubernetes_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
AppDryRunInfo,
DescribeAppResponse,
filter_regex,
ListAppResponse,
Scheduler,
split_lines,
Stream,
Expand Down Expand Up @@ -751,8 +752,7 @@ def log_iter(
else:
return iterator

def list(self) -> List[str]:
app_ids = []
def list(self) -> List[ListAppResponse]:
namespace = "default"
resp = self._custom_objects_api().list_namespaced_custom_object(
group="batch.volcano.sh",
Expand All @@ -761,12 +761,13 @@ def list(self) -> List[str]:
plural="jobs",
timeout_seconds=30,
)
items = resp["items"]
for item in items:
name = item["metadata"]["name"]
app_id = f"{namespace}:{name}"
app_ids.append(app_id)
return app_ids
return [
ListAppResponse(
app_id=f"{namespace}:{app['metadata']['name']}",
state=JOB_STATE[app["status"]["state"]["phase"]],
)
for app in resp["items"]
]


def create_scheduler(session_name: str, **kwargs: Any) -> KubernetesScheduler:
Expand Down
3 changes: 2 additions & 1 deletion torchx/schedulers/local_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
AppDryRunInfo,
DescribeAppResponse,
filter_regex,
ListAppResponse,
Scheduler,
split_lines_iterator,
Stream,
Expand Down Expand Up @@ -972,7 +973,7 @@ def log_iter(
iterator = filter_regex(regex, iterator)
return iterator

def list(self) -> List[str]:
def list(self) -> List[ListAppResponse]:
raise Exception(
"App handles cannot be listed for local scheduler as they are not persisted by torchx"
)
Expand Down
Loading