From 28b62e31151240ce39ce4ab3c063c6a1b8deb03e Mon Sep 17 00:00:00 2001 From: Michael Waskom Date: Fri, 26 Apr 2024 21:08:28 +0000 Subject: [PATCH 1/6] Add modal queue CLI --- modal/cli/entry_point.py | 2 + modal/cli/queues.py | 124 +++++++++++++++++++++++++++++++++++++++ modal/queue.py | 17 +++++- test/cli_test.py | 40 +++++++++++++ test/conftest.py | 52 ++++++++++++---- 5 files changed, 222 insertions(+), 13 deletions(-) create mode 100644 modal/cli/queues.py diff --git a/modal/cli/entry_point.py b/modal/cli/entry_point.py index b5e0477ff..98515ad91 100644 --- a/modal/cli/entry_point.py +++ b/modal/cli/entry_point.py @@ -17,6 +17,7 @@ from .launch import launch_cli from .network_file_system import nfs_cli from .profile import profile_cli +from .queues import queue_cli from .secret import secret_cli from .token import _new_token, token_cli from .volume import volume_cli @@ -92,6 +93,7 @@ async def setup(profile: Optional[str] = None): entrypoint_cli_typer.add_typer(profile_cli) entrypoint_cli_typer.add_typer(secret_cli) entrypoint_cli_typer.add_typer(token_cli) +entrypoint_cli_typer.add_typer(queue_cli) entrypoint_cli_typer.add_typer(volume_cli) entrypoint_cli_typer.command("deploy", help="Deploy a Modal stub as an application.", no_args_is_help=True)(run.deploy) diff --git a/modal/cli/queues.py b/modal/cli/queues.py new file mode 100644 index 000000000..8cb273406 --- /dev/null +++ b/modal/cli/queues.py @@ -0,0 +1,124 @@ +# Copyright Modal Labs 2024 +from typing import Optional + +import typer +from rich.console import Console +from typer import Argument, Option, Typer + +from modal._resolver import Resolver +from modal._utils.async_utils import synchronizer +from modal._utils.grpc_utils import retry_transient_errors +from modal.cli.utils import ENV_OPTION, YES_OPTION, display_table, timestamp_to_local +from modal.client import _Client +from modal.environments import ensure_env +from modal.queue import _Queue +from modal_proto import api_pb2 + +queue_cli = Typer( + name="queue", + no_args_is_help=True, + help="Manage `modal.Queue` objects and inspect their contents.", +) + +PARTITION_OPTION = Option( + None, + "-p", + "--partition", + help="Name of the partition to use, otherwise use the default (anonymous) partition.", +) + + +@queue_cli.command(name="create") +@synchronizer.create_blocking +async def create(name: str, *, env: Optional[str] = ENV_OPTION): + """Create a named Queue object. + + Note: This is a no-op when the Queue already exists. + """ + q = _Queue.from_name(name, environment_name=env, create_if_missing=True) + client = await _Client.from_env() + resolver = Resolver(client=client) + await resolver.load(q) + + +@queue_cli.command(name="delete") +@synchronizer.create_blocking +async def delete(name: str, *, yes: bool = YES_OPTION, env: Optional[str] = ENV_OPTION): + """Delete a named Dict object and all of its data.""" + # Lookup first to validate the name, even though delete is a staticmethod + await _Queue.lookup(name, environment_name=env) + if not yes: + typer.confirm( + f"Are you sure you want to irrevocably delete the modal.Queue '{name}'?", + default=False, + abort=True, + ) + await _Queue.delete(name, environment_name=env) + + +@queue_cli.command(name="list") +@synchronizer.create_blocking +async def list(*, json: bool = False, env: Optional[str] = ENV_OPTION): + """List all named Queue objects.""" + env = ensure_env(env) + + max_total_size = 100_000 + client = await _Client.from_env() + request = api_pb2.QueueListRequest(environment_name=env, total_size_limit=max_total_size + 1) + response = await retry_transient_errors(client.stub.QueueList, request) + + rows = [ + ( + q.name, + timestamp_to_local(q.created_at, json), + str(q.num_partitions), + str(q.total_size) if q.total_size <= max_total_size else f">{max_total_size}", + ) + for q in response.queues + ] + display_table(["Name", "Created at", "Partitions", "Total size"], rows, json) + + +@queue_cli.command(name="clear") +@synchronizer.create_blocking +async def clear( + name: str, + partition: Optional[str] = PARTITION_OPTION, + all: bool = Option(False, "-a", "--all", help="Clear the contents of all partitions."), + *, + env: Optional[str] = ENV_OPTION, +): + """Clear the contents of a queue by removing all of its data.""" + q = await _Queue.lookup(name, environment_name=env) + await q.clear(partition=partition, all=all) + + +@queue_cli.command(name="next") +@synchronizer.create_blocking +async def next( + name: str, n: int = Argument(1), partition: Optional[str] = PARTITION_OPTION, *, env: Optional[str] = ENV_OPTION +): + """Print the next N items in the queue or queue partition (without removal).""" + q = await _Queue.lookup(name, environment_name=env) + console = Console() + i = 0 + async for item in q.iterate(partition=partition): + console.print(item) + i += 1 + if i >= n: + break + + +@queue_cli.command(name="len") +@synchronizer.create_blocking +async def len( + name: str, + partition: Optional[str] = PARTITION_OPTION, + total: bool = Option(False, "-t", "--total", help="Compute the sum of the queue lengths across all partitions"), + *, + env: Optional[str] = ENV_OPTION, +): + """Print the length of a queue partition or the total length of all partitions.""" + q = await _Queue.lookup(name, environment_name=env) + console = Console() + console.print(await q.len(partition=partition, total=total)) diff --git a/modal/queue.py b/modal/queue.py index f287e8127..0e6d14dc1 100644 --- a/modal/queue.py +++ b/modal/queue.py @@ -259,6 +259,18 @@ async def _get_blocking(self, partition: Optional[str], timeout: Optional[float] raise queue.Empty() + @live_method + async def clear(self, *, partition: Optional[str] = None, all: bool = False) -> None: + """Clear the contents of a single partition or all partitions.""" + if partition and all: + raise InvalidError("Partition must be null when requesting to clear all.") + request = api_pb2.QueueClearRequest( + queue_id=self.object_id, + partition_key=self.validate_partition_key(partition), + all_partitions=all, + ) + await retry_transient_errors(self._client.stub.QueueClear, request) + @live_method async def get( self, block: bool = True, timeout: Optional[float] = None, *, partition: Optional[str] = None @@ -391,11 +403,14 @@ async def _put_many_nonblocking(self, partition: Optional[str], partition_ttl: i raise queue.Full(exc.message) if exc.status == Status.RESOURCE_EXHAUSTED else exc @live_method - async def len(self, *, partition: Optional[str] = None) -> int: + async def len(self, *, partition: Optional[str] = None, total: bool = False) -> int: """Return the number of objects in the queue partition.""" + if partition and total: + raise InvalidError("Partition must be null when requesting total length.") request = api_pb2.QueueLenRequest( queue_id=self.object_id, partition_key=self.validate_partition_key(partition), + total=total, ) response = await retry_transient_errors(self._client.stub.QueueLen, request) return response.len diff --git a/test/cli_test.py b/test/cli_test.py index e3becd30a..c70692f1e 100644 --- a/test/cli_test.py +++ b/test/cli_test.py @@ -725,3 +725,43 @@ def test_dict_show_get_clear(servicer, server_url_env, set_env_client): res = _run(["dict", "clear", "baz-dict", "--yes"]) assert servicer.dicts[dict_id] == {} + + +def test_queue_create_list_delete(servicer, server_url_env, set_env_client): + _run(["queue", "create", "foo-queue"]) + _run(["queue", "create", "bar-queue"]) + res = _run(["queue", "list"]) + assert "foo-queue" in res.stdout + assert "bar-queue" in res.stdout + + _run(["queue", "delete", "bar-queue", "--yes"]) + + res = _run(["queue", "list"]) + assert "foo-queue" in res.stdout + assert "bar-queue" not in res.stdout + + +def test_queue_next_len_clear(servicer, server_url_env, set_env_client): + # Kind of hacky to be modifying the attributes on the servicer like this + name = "queue-who" + key = (name, api_pb2.DEPLOYMENT_NAMESPACE_WORKSPACE, os.environ.get("MODAL_ENVIRONMENT", "main")) + queue_id = "qu-abc123" + servicer.deployed_queues[key] = queue_id + servicer.queue = {b"": [dumps("a"), dumps("b"), dumps("c")], b"alt": [dumps("x"), dumps("y")]} + + assert _run(["queue", "next", name]).stdout == "a\n" + assert _run(["queue", "next", name, "-p", "alt"]).stdout == "x\n" + assert _run(["queue", "next", name, "3"]).stdout == "a\nb\nc\n" + assert _run(["queue", "next", name, "3", "--partition", "alt"]).stdout == "x\ny\n" + + assert _run(["queue", "len", name]).stdout == "3\n" + assert _run(["queue", "len", name, "--partition", "alt"]).stdout == "2\n" + assert _run(["queue", "len", name, "--total"]).stdout == "5\n" + + _run(["queue", "clear", name]) + assert _run(["queue", "len", name]).stdout == "0\n" + assert _run(["queue", "next", name, "--partition", "alt"]).stdout == "x\n" + + _run(["queue", "clear", name, "--all"]) + assert _run(["queue", "len", name, "--total"]).stdout == "0\n" + assert _run(["queue", "next", name, "--partition", "alt"]).stdout == "" diff --git a/test/conftest.py b/test/conftest.py index 5b1b3ff79..93e3cf32b 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -16,7 +16,7 @@ import traceback from collections import defaultdict from pathlib import Path -from typing import Dict, Iterator, Optional, get_args +from typing import Dict, Iterator, List, Optional, get_args import aiohttp.web import aiohttp.web_runner @@ -86,7 +86,7 @@ def __init__(self, blob_host, blobs): self.container_outputs = [] self.fc_data_in = defaultdict(lambda: asyncio.Queue()) # unbounded self.fc_data_out = defaultdict(lambda: asyncio.Queue()) # unbounded - self.queue = [] + self.queue: Dict[bytes, List[bytes]] = {b"": []} self.deployed_apps = { client_mount_name(): "ap-x", } @@ -172,7 +172,7 @@ def __init__(self, blob_host, blobs): self.sandbox_result: Optional[api_pb2.GenericResult] = None self.token_flow_localhost_port = None - self.queue_max_len = 1_00 + self.queue_max_len = 100 self.container_heartbeat_response = None self.container_heartbeat_abort = threading.Event() @@ -816,6 +816,15 @@ async def ProxyGetOrCreate(self, stream): ### Queue + async def QueueClear(self, stream): + request: api_pb2.QueueClearRequest = await stream.recv_message() + if request.all_partitions: + self.queue = {b"": []} + else: + if request.partition_key in self.queue: + self.queue[request.partition_key] = [] + await stream.send_message(Empty()) + async def QueueCreate(self, stream): request: api_pb2.QueueCreateRequest = await stream.recv_message() if request.existing_queue_id: @@ -834,6 +843,7 @@ async def QueueGetOrCreate(self, stream): self.n_queues += 1 queue_id = f"qu-{self.n_queues}" self.deployed_queues[k] = queue_id + self.deployed_apps[request.deployment_name] = f"ap-{queue_id}" elif request.object_creation_type == api_pb2.OBJECT_CREATION_TYPE_EPHEMERAL: self.n_queues += 1 queue_id = f"qu-{self.n_queues}" @@ -853,28 +863,46 @@ async def QueueHeartbeat(self, stream): async def QueuePut(self, stream): request: api_pb2.QueuePutRequest = await stream.recv_message() - if len(self.queue) >= self.queue_max_len: + if sum(map(len, self.queue.values())) >= self.queue_max_len: raise GRPCError(Status.RESOURCE_EXHAUSTED, f"Hit servicer's max len for Queues: {self.queue_max_len}") - self.queue += request.values + q = self.queue.setdefault(request.partition_key, []) + q += request.values await stream.send_message(Empty()) async def QueueGet(self, stream): - await stream.recv_message() - if len(self.queue) > 0: - values = [self.queue.pop(0)] + request: api_pb2.QueueGetRequest = await stream.recv_message() + q = self.queue.get(request.partition_key, []) + if len(q) > 0: + values = [q.pop(0)] else: values = [] await stream.send_message(api_pb2.QueueGetResponse(values=values)) async def QueueLen(self, stream): - await stream.recv_message() - await stream.send_message(api_pb2.QueueLenResponse(len=len(self.queue))) + request = await stream.recv_message() + if request.total: + value = sum(map(len, self.queue.values())) + else: + q = self.queue.get(request.partition_key, []) + value = len(q) + await stream.send_message(api_pb2.QueueLenResponse(len=value)) + + async def QueueList(self, stream): + # TODO Note that the actual self.queue holding the data assumes we have a single queue + # So there is a mismatch and I am not implementing a mock for the num_partitions / total_size + queues = [ + api_pb2.QueueListResponse.QueueInfo(name=name, created_at=1) + for name, _, _ in self.deployed_queues + if name in self.deployed_apps + ] + await stream.send_message(api_pb2.QueueListResponse(queues=queues)) async def QueueNextItems(self, stream): request: api_pb2.QueueNextItemsRequest = await stream.recv_message() next_item_idx = int(request.last_entry_id) + 1 if request.last_entry_id else 0 - if next_item_idx < len(self.queue): - item = api_pb2.QueueItem(value=self.queue[next_item_idx], entry_id=f"{next_item_idx}") + q = self.queue.get(request.partition_key, []) + if next_item_idx < len(q): + item = api_pb2.QueueItem(value=q[next_item_idx], entry_id=f"{next_item_idx}") await stream.send_message(api_pb2.QueueNextItemsResponse(items=[item])) else: if request.item_poll_timeout > 0: From 025e30750e99c6abc4f1e857102eb2a13ea5d357 Mon Sep 17 00:00:00 2001 From: Michael Waskom Date: Mon, 29 Apr 2024 21:42:39 +0000 Subject: [PATCH 2/6] Fix formatting in modal dict changelog entry --- CHANGELOG.md | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ae4b951b0..7011ad45d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,18 +12,18 @@ We appreciate your patience while we speedily work towards a stable release of t ### 0.62.116 (2024-04-26) -* Added a command-line interface for interacting with modal.Dict objects. Run modal dict --help in your terminal to see what is available. +* Added a command-line interface for interacting with `modal.Dict` objects. Run `modal dict --help` in your terminal to see what is available. ### 0.62.114 (2024-04-25) -* `Secret.from_dotenv` now accepts an optional filename keyword argument: - - ```python - @app.function(secrets=[modal.Secret.from_dotenv(filename=".env-dev")]) - def run(): - ... +* `Secret.from_dotenv` now accepts an optional filename keyword argument: + + ```python + @app.function(secrets=[modal.Secret.from_dotenv(filename=".env-dev")]) + def run(): + ... ``` From fad047e9168b5253f680e97d612bc5d890b17aa2 Mon Sep 17 00:00:00 2001 From: Michael Waskom Date: Mon, 29 Apr 2024 22:03:20 +0000 Subject: [PATCH 3/6] Fix Dict->Queue copy pasta --- modal/cli/queues.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modal/cli/queues.py b/modal/cli/queues.py index 8cb273406..c4371e003 100644 --- a/modal/cli/queues.py +++ b/modal/cli/queues.py @@ -44,7 +44,7 @@ async def create(name: str, *, env: Optional[str] = ENV_OPTION): @queue_cli.command(name="delete") @synchronizer.create_blocking async def delete(name: str, *, yes: bool = YES_OPTION, env: Optional[str] = ENV_OPTION): - """Delete a named Dict object and all of its data.""" + """Delete a named Queue object and all of its data.""" # Lookup first to validate the name, even though delete is a staticmethod await _Queue.lookup(name, environment_name=env) if not yes: From bbc8ac398a5076e66997b95890f1e7186b522c59 Mon Sep 17 00:00:00 2001 From: Michael Waskom Date: Tue, 30 Apr 2024 17:37:46 +0000 Subject: [PATCH 4/6] Add confirmation to modal queue clear --- modal/cli/queues.py | 7 +++++++ test/cli_test.py | 18 +++++++++--------- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/modal/cli/queues.py b/modal/cli/queues.py index c4371e003..c28af08b4 100644 --- a/modal/cli/queues.py +++ b/modal/cli/queues.py @@ -85,11 +85,18 @@ async def clear( name: str, partition: Optional[str] = PARTITION_OPTION, all: bool = Option(False, "-a", "--all", help="Clear the contents of all partitions."), + yes: bool = YES_OPTION, *, env: Optional[str] = ENV_OPTION, ): """Clear the contents of a queue by removing all of its data.""" q = await _Queue.lookup(name, environment_name=env) + if not yes: + typer.confirm( + f"Are you sure you want to irrevocably delete the contents of modal.Queue '{name}'?", + default=False, + abort=True, + ) await q.clear(partition=partition, all=all) diff --git a/test/cli_test.py b/test/cli_test.py index c70692f1e..6111c1149 100644 --- a/test/cli_test.py +++ b/test/cli_test.py @@ -741,7 +741,7 @@ def test_queue_create_list_delete(servicer, server_url_env, set_env_client): assert "bar-queue" not in res.stdout -def test_queue_next_len_clear(servicer, server_url_env, set_env_client): +def test_queue_peek_len_clear(servicer, server_url_env, set_env_client): # Kind of hacky to be modifying the attributes on the servicer like this name = "queue-who" key = (name, api_pb2.DEPLOYMENT_NAMESPACE_WORKSPACE, os.environ.get("MODAL_ENVIRONMENT", "main")) @@ -749,19 +749,19 @@ def test_queue_next_len_clear(servicer, server_url_env, set_env_client): servicer.deployed_queues[key] = queue_id servicer.queue = {b"": [dumps("a"), dumps("b"), dumps("c")], b"alt": [dumps("x"), dumps("y")]} - assert _run(["queue", "next", name]).stdout == "a\n" - assert _run(["queue", "next", name, "-p", "alt"]).stdout == "x\n" - assert _run(["queue", "next", name, "3"]).stdout == "a\nb\nc\n" - assert _run(["queue", "next", name, "3", "--partition", "alt"]).stdout == "x\ny\n" + assert _run(["queue", "peek", name]).stdout == "a\n" + assert _run(["queue", "peek", name, "-p", "alt"]).stdout == "x\n" + assert _run(["queue", "peek", name, "3"]).stdout == "a\nb\nc\n" + assert _run(["queue", "peek", name, "3", "--partition", "alt"]).stdout == "x\ny\n" assert _run(["queue", "len", name]).stdout == "3\n" assert _run(["queue", "len", name, "--partition", "alt"]).stdout == "2\n" assert _run(["queue", "len", name, "--total"]).stdout == "5\n" - _run(["queue", "clear", name]) + _run(["queue", "clear", name, "--yes"]) assert _run(["queue", "len", name]).stdout == "0\n" - assert _run(["queue", "next", name, "--partition", "alt"]).stdout == "x\n" + assert _run(["queue", "peek", name, "--partition", "alt"]).stdout == "x\n" - _run(["queue", "clear", name, "--all"]) + _run(["queue", "clear", name, "--all", "--yes"]) assert _run(["queue", "len", name, "--total"]).stdout == "0\n" - assert _run(["queue", "next", name, "--partition", "alt"]).stdout == "" + assert _run(["queue", "peek", name, "--partition", "alt"]).stdout == "" From 726eb4eca6b7bbffcd9a2238bd3cd4ba22432a9b Mon Sep 17 00:00:00 2001 From: Michael Waskom Date: Tue, 30 Apr 2024 17:38:01 +0000 Subject: [PATCH 5/6] Rename next to peek --- modal/cli/queues.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modal/cli/queues.py b/modal/cli/queues.py index c28af08b4..5722e6957 100644 --- a/modal/cli/queues.py +++ b/modal/cli/queues.py @@ -100,9 +100,9 @@ async def clear( await q.clear(partition=partition, all=all) -@queue_cli.command(name="next") +@queue_cli.command(name="peek") @synchronizer.create_blocking -async def next( +async def peek( name: str, n: int = Argument(1), partition: Optional[str] = PARTITION_OPTION, *, env: Optional[str] = ENV_OPTION ): """Print the next N items in the queue or queue partition (without removal).""" From f88cbd83f49fcf74e77bf54ff9791d4802b05a19 Mon Sep 17 00:00:00 2001 From: Michael Waskom Date: Tue, 30 Apr 2024 17:38:14 +0000 Subject: [PATCH 6/6] Remove 'object' from modal dict and queue command descriptions --- modal/cli/dict.py | 4 ++-- modal/cli/queues.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/modal/cli/dict.py b/modal/cli/dict.py index fe3e594ed..4f6361e9d 100644 --- a/modal/cli/dict.py +++ b/modal/cli/dict.py @@ -37,7 +37,7 @@ async def create(name: str, *, env: Optional[str] = ENV_OPTION): @dict_cli.command(name="list") @synchronizer.create_blocking async def list(*, json: bool = False, env: Optional[str] = ENV_OPTION): - """List all named Dict objects.""" + """List all named Dicts.""" env = ensure_env(env) client = await _Client.from_env() request = api_pb2.DictListRequest(environment_name=env) @@ -64,7 +64,7 @@ async def clear(name: str, *, yes: bool = YES_OPTION, env: Optional[str] = ENV_O @dict_cli.command(name="delete") @synchronizer.create_blocking async def delete(name: str, *, yes: bool = YES_OPTION, env: Optional[str] = ENV_OPTION): - """Delete a named Dict object and all of its data.""" + """Delete a named Dict and all of its data.""" # Lookup first to validate the name, even though delete is a staticmethod await _Dict.lookup(name, environment_name=env) if not yes: diff --git a/modal/cli/queues.py b/modal/cli/queues.py index 5722e6957..cfaac82f7 100644 --- a/modal/cli/queues.py +++ b/modal/cli/queues.py @@ -31,7 +31,7 @@ @queue_cli.command(name="create") @synchronizer.create_blocking async def create(name: str, *, env: Optional[str] = ENV_OPTION): - """Create a named Queue object. + """Create a named Queue. Note: This is a no-op when the Queue already exists. """ @@ -44,7 +44,7 @@ async def create(name: str, *, env: Optional[str] = ENV_OPTION): @queue_cli.command(name="delete") @synchronizer.create_blocking async def delete(name: str, *, yes: bool = YES_OPTION, env: Optional[str] = ENV_OPTION): - """Delete a named Queue object and all of its data.""" + """Delete a named Queue and all of its data.""" # Lookup first to validate the name, even though delete is a staticmethod await _Queue.lookup(name, environment_name=env) if not yes: @@ -59,7 +59,7 @@ async def delete(name: str, *, yes: bool = YES_OPTION, env: Optional[str] = ENV_ @queue_cli.command(name="list") @synchronizer.create_blocking async def list(*, json: bool = False, env: Optional[str] = ENV_OPTION): - """List all named Queue objects.""" + """List all named Queues.""" env = ensure_env(env) max_total_size = 100_000