diff --git a/examples/Dockerfile b/examples/Dockerfile deleted file mode 100644 index 408113a..0000000 --- a/examples/Dockerfile +++ /dev/null @@ -1,13 +0,0 @@ -FROM python:3.11-slim - -WORKDIR /usr/src/app - -COPY requirements.txt ./ -RUN pip install --no-cache-dir -r requirements.txt - -COPY . . - -EXPOSE 9080 - -ENV PYTHONPATH="/usr/src/app/src" -CMD ["hypercorn", "example:app", "--config", "hypercorn-config.toml"] diff --git a/examples/client.py b/examples/client.py new file mode 100644 index 0000000..629295d --- /dev/null +++ b/examples/client.py @@ -0,0 +1,34 @@ +# +# Copyright (c) 2023-2024 - Restate Software, Inc., Restate GmbH +# +# This file is part of the Restate SDK for Python, +# which is released under the MIT license. +# +# You can find a copy of the license in file LICENSE in the root +# directory of this repository or package, or at +# https://github.com/restatedev/sdk-typescript/blob/main/LICENSE +# +import restate + +from virtual_object import increment, count + +# +# uv run examples/client.py +# + + +async def main(): + async with restate.create_client("http://localhost:8080") as client: + await client.object_call(increment, key="a", arg=5) + + await client.object_send(increment, key="a", arg=5) + + current_count = await client.object_call(count, key="a", arg=None) + + print(f"Current count for 'a': {current_count}") + + +if __name__ == "__main__": + import asyncio + + asyncio.run(main()) diff --git a/examples/example.py b/examples/example.py index 51224ee..75b086f 100644 --- a/examples/example.py +++ b/examples/example.py @@ -12,6 +12,11 @@ # pylint: disable=C0116 # pylint: disable=W0613 + +# +# uv run examples/example.py +# + import logging import restate diff --git a/examples/harness.py b/examples/harness.py new file mode 100644 index 0000000..664a2af --- /dev/null +++ b/examples/harness.py @@ -0,0 +1,38 @@ +# +# Copyright (c) 2023-2024 - Restate Software, Inc., Restate GmbH +# +# This file is part of the Restate SDK for Python, +# which is released under the MIT license. +# +# You can find a copy of the license in file LICENSE in the root +# directory of this repository or package, or at +# https://github.com/restatedev/sdk-typescript/blob/main/LICENSE +# +import restate + +from virtual_object import increment, count, counter + +# +# uv run examples/harness.py +# + + +async def main(): + app = restate.app([counter]) + async with restate.create_test_harness(app) as harness: + await harness.client.object_call(increment, key="a", arg=5) + await harness.client.object_call(increment, key="a", arg=5) + + current_count = await harness.client.object_call(count, key="a", arg=None) + + print(f"Current count for 'a': {current_count}") + + send = await harness.client.object_send(increment, key="b", arg=-10, idempotency_key="op1") + + print(f"Sent increment to 'b', invocation ID: {send.invocation_id}") + + +if __name__ == "__main__": + import asyncio + + asyncio.run(main()) diff --git a/examples/requirements.txt b/examples/requirements.txt deleted file mode 100644 index a1fe5a6..0000000 --- a/examples/requirements.txt +++ /dev/null @@ -1,4 +0,0 @@ -hypercorn -restate_sdk -pydantic -dacite diff --git a/pyproject.toml b/pyproject.toml index 86140ea..b147d69 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ test = ["pytest", "hypercorn"] lint = ["mypy>=1.11.2", "pyright>=1.1.390", "ruff>=0.6.9"] harness = ["testcontainers", "hypercorn", "httpx"] serde = ["dacite", "pydantic"] +client = ["httpx[http2]"] [build-system] requires = ["maturin>=1.6,<2.0"] diff --git a/python/restate/__init__.py b/python/restate/__init__.py index 8850752..1135ad2 100644 --- a/python/restate/__init__.py +++ b/python/restate/__init__.py @@ -12,7 +12,7 @@ Restate SDK for Python """ -from contextlib import contextmanager +from contextlib import asynccontextmanager import typing from restate.server_types import RestateAppT @@ -26,6 +26,7 @@ from .context import Context, ObjectContext, ObjectSharedContext from .context import WorkflowContext, WorkflowSharedContext from .retry_policy import InvocationRetryPolicy +from .client_types import RestateClient, RestateClientSendHandle # pylint: disable=line-too-long from .context import ( @@ -50,15 +51,15 @@ # we don't have the appropriate dependencies installed # pylint: disable=unused-argument, redefined-outer-name - @contextmanager + @asynccontextmanager def create_test_harness( app: RestateAppT, follow_logs: bool = False, restate_image: str = "restatedev/restate:latest", always_replay: bool = False, disable_retries: bool = False, - ) -> typing.Generator[TestHarnessEnvironment, None, None]: - """a dummy harness constructor to raise ImportError""" + ) -> typing.AsyncGenerator[TestHarnessEnvironment, None]: + """a dummy harness constructor to raise ImportError. Install restate-sdk[harness] to use this feature""" raise ImportError("Install restate-sdk[harness] to use this feature") def test_harness( @@ -68,10 +69,24 @@ def test_harness( always_replay: bool = False, disable_retries: bool = False, ): - """a dummy harness constructor to raise ImportError""" + """a dummy harness constructor to raise ImportError. Install restate-sdk[harness] to use this feature""" raise ImportError("Install restate-sdk[harness] to use this feature") +try: + from .client import create_client +except ImportError: + # we don't have the appropriate dependencies installed + + @asynccontextmanager + async def create_client( + ingress: str, headers: typing.Optional[dict] = None + ) -> typing.AsyncGenerator[RestateClient, None]: + """a dummy client constructor to raise ImportError. Install restate-sdk[client] to use this feature""" + raise ImportError("Install restate-sdk[client] to use this feature") + yield # type: ignore + + __all__ = [ "Service", "VirtualObject", @@ -101,4 +116,7 @@ def test_harness( "SdkInternalBaseException", "is_internal_exception", "getLogger", + "RestateClient", + "RestateClientSendHandle", + "create_client", ] diff --git a/python/restate/client.py b/python/restate/client.py new file mode 100644 index 0000000..19db2bc --- /dev/null +++ b/python/restate/client.py @@ -0,0 +1,318 @@ +# +# Copyright (c) 2023-2025 - Restate Software, Inc., Restate GmbH +# +# This file is part of the Restate SDK for Python, +# which is released under the MIT license. +# +# You can find a copy of the license in file LICENSE in the root +# directory of this repository or package, or at +# https://github.com/restatedev/sdk-typescript/blob/main/LICENSE +# +""" +This is a basic remote client for the Restate service. +""" + +from datetime import timedelta +import httpx +import typing +from contextlib import asynccontextmanager + +from .client_types import RestateClient, RestateClientSendHandle + +from .context import HandlerType +from .serde import BytesSerde, JsonSerde, Serde +from .handler import handler_from_callable + +I = typing.TypeVar("I") +O = typing.TypeVar("O") + + +class Client(RestateClient): + """ + A basic client for connecting to the Restate service. + """ + + def __init__(self, client: httpx.AsyncClient, headers: typing.Optional[dict] = None): + self.headers = headers or {} + self.client = client + + async def do_call( + self, + tpe: HandlerType[I, O], + parameter: I, + key: typing.Optional[str] = None, + send_delay: typing.Optional[timedelta] = None, + send: bool = False, + idempotency_key: str | None = None, + headers: typing.Dict[str, str] | None = None, + force_json_output: bool = False, + ) -> O: + """Make an RPC call to the given handler""" + target_handler = handler_from_callable(tpe) + input_serde = target_handler.handler_io.input_serde + input_type = target_handler.handler_io.input_type + + if input_type is not None and input_type.is_void: + content_type = None + else: + content_type = target_handler.handler_io.content_type + + if headers is None: + headers = {} + if headers.get("Content-Type") is None and content_type is not None: + headers["Content-Type"] = content_type + + service = target_handler.service_tag.name + handler = target_handler.name + output_serde = target_handler.handler_io.output_serde if force_json_output is False else JsonSerde() + + return await self.do_raw_call( + service=service, + handler=handler, + input_param=parameter, + input_serde=input_serde, + output_serde=output_serde, + key=key, + send_delay=send_delay, + send=send, + idempotency_key=idempotency_key, + headers=headers, + ) + + async def do_raw_call( + self, + service: str, + handler: str, + input_param: I, + input_serde: Serde[I], + output_serde: Serde[O], + key: typing.Optional[str] = None, + send_delay: typing.Optional[timedelta] = None, + send: bool = False, + idempotency_key: str | None = None, + headers: typing.Dict[str, str] | None = None, + ) -> O: + """Make an RPC call to the given handler""" + parameter = input_serde.serialize(input_param) + if headers is not None: + headers_kvs = list(headers.items()) + else: + headers_kvs = [] + if send_delay is not None: + ms = int(send_delay.total_seconds() * 1000) + else: + ms = None + + res = await self.post( + service=service, + handler=handler, + send=send, + content=parameter, + headers=headers_kvs, + key=key, + delay=ms, + idempotency_key=idempotency_key, + ) + return output_serde.deserialize(res) # type: ignore + + async def post( + self, + /, + service: str, + handler: str, + send: bool, + content: bytes, + headers: typing.List[typing.Tuple[(str, str)]] | None = None, + key: str | None = None, + delay: int | None = None, + idempotency_key: str | None = None, + ) -> bytes: + """ + Send a POST request to the Restate service. + """ + endpoint = service + if key: + endpoint += f"/{key}" + endpoint += f"/{handler}" + if send: + endpoint += "/send" + if delay is not None: + endpoint = endpoint + f"?delay={delay}" + dict_headers = dict(headers) if headers is not None else {} + if idempotency_key is not None: + dict_headers["Idempotency-Key"] = idempotency_key + res = await self.client.post(endpoint, headers=dict_headers, content=content) + res.raise_for_status() + return res.content + + @typing.final + @typing.override + async def service_call( + self, + tpe: HandlerType[I, O], + arg: I, + idempotency_key: str | None = None, + headers: typing.Dict[str, str] | None = None, + ) -> O: + coro = await self.do_call(tpe, arg, idempotency_key=idempotency_key, headers=headers) + return coro + + @typing.final + @typing.override + async def service_send( + self, + tpe: HandlerType[I, O], + arg: I, + send_delay: typing.Optional[timedelta] = None, + idempotency_key: str | None = None, + headers: typing.Dict[str, str] | None = None, + ) -> RestateClientSendHandle: + send_handle = await self.do_call( + tpe, + parameter=arg, + send=True, + send_delay=send_delay, + idempotency_key=idempotency_key, + headers=headers, + force_json_output=True, + ) + + send = typing.cast(typing.Dict[str, str], send_handle) + + return RestateClientSendHandle(send.get("invocationId", ""), 200) # TODO: verify + + @typing.final + @typing.override + async def object_call( + self, + tpe: HandlerType[I, O], + key: str, + arg: I, + idempotency_key: str | None = None, + headers: typing.Dict[str, str] | None = None, + ) -> O: + coro = await self.do_call(tpe, arg, key, idempotency_key=idempotency_key, headers=headers) + return coro + + @typing.final + @typing.override + async def object_send( + self, + tpe: HandlerType[I, O], + key: str, + arg: I, + send_delay: typing.Optional[timedelta] = None, + idempotency_key: str | None = None, + headers: typing.Dict[str, str] | None = None, + ) -> RestateClientSendHandle: + send_handle = await self.do_call( + tpe, + parameter=arg, + key=key, + send=True, + send_delay=send_delay, + idempotency_key=idempotency_key, + headers=headers, + force_json_output=True, + ) + + send = typing.cast(typing.Dict[str, str], send_handle) + + return RestateClientSendHandle(send.get("invocationId", ""), 200) # TODO: verify + + @typing.final + @typing.override + async def workflow_call( + self, + tpe: HandlerType[I, O], + key: str, + arg: I, + idempotency_key: str | None = None, + headers: typing.Dict[str, str] | None = None, + ) -> O: + return await self.object_call(tpe, key, arg, idempotency_key=idempotency_key, headers=headers) + + @typing.final + @typing.override + async def workflow_send( + self, + tpe: HandlerType[I, O], + key: str, + arg: I, + send_delay: typing.Optional[timedelta] = None, + idempotency_key: str | None = None, + headers: typing.Dict[str, str] | None = None, + ) -> RestateClientSendHandle: + return await self.object_send( + tpe, + key, + arg, + send_delay=send_delay, + idempotency_key=idempotency_key, + headers=headers, + ) + + @typing.final + @typing.override + async def generic_call( + self, + service: str, + handler: str, + arg: bytes, + key: str | None = None, + idempotency_key: str | None = None, + headers: typing.Dict[str, str] | None = None, + ) -> bytes: + serde = BytesSerde() + call_handle = await self.do_raw_call( + service=service, + handler=handler, + input_param=arg, + input_serde=serde, + output_serde=serde, + key=key, + idempotency_key=idempotency_key, + headers=headers, + ) + return call_handle + + @typing.final + @typing.override + async def generic_send( + self, + service: str, + handler: str, + arg: bytes, + key: str | None = None, + send_delay: timedelta | None = None, + idempotency_key: str | None = None, + headers: typing.Dict[str, str] | None = None, + ) -> RestateClientSendHandle: + serde = BytesSerde() + output_serde: Serde[dict] = JsonSerde() + + send_handle_json = await self.do_raw_call( + service=service, + handler=handler, + input_param=arg, + input_serde=serde, + output_serde=output_serde, + key=key, + send_delay=send_delay, + send=True, + idempotency_key=idempotency_key, + headers=headers, + ) + + return RestateClientSendHandle(send_handle_json.get("invocationId", ""), 200) # TODO: verify + + +@asynccontextmanager +async def create_client( + ingress: str, headers: typing.Optional[dict] = None +) -> typing.AsyncGenerator[RestateClient, None]: + """ + Create a new Restate client. + """ + async with httpx.AsyncClient(base_url=ingress, headers=headers, http2=True) as http_client: + yield Client(http_client, headers) diff --git a/python/restate/client_types.py b/python/restate/client_types.py new file mode 100644 index 0000000..b8d5121 --- /dev/null +++ b/python/restate/client_types.py @@ -0,0 +1,141 @@ +# +# Copyright (c) 2023-2025 - Restate Software, Inc., Restate GmbH +# +# This file is part of the Restate SDK for Python, +# which is released under the MIT license. +# +# You can find a copy of the license in file LICENSE in the root +# directory of this repository or package, or at +# https://github.com/restatedev/sdk-typescript/blob/main/LICENSE +# +""" +Type definitions for the Restate client. +""" + +import abc +from datetime import timedelta +import typing + + +from .context import HandlerType + +I = typing.TypeVar("I") +O = typing.TypeVar("O") + + +class RestateClientSendHandle: + """ + A handle for a send operation. + This is used to track the status of a send operation. + """ + + def __init__(self, invocation_id: str, status_code: int): + self.invocation_id = invocation_id + self.status_code = status_code + + +class RestateClient(abc.ABC): + """ + An abstract base class for a Restate client. + This class defines the interface for a Restate client. + """ + + @abc.abstractmethod + async def service_call( + self, + tpe: HandlerType[I, O], + arg: I, + idempotency_key: str | None = None, + headers: typing.Dict[str, str] | None = None, + ) -> O: + """Make an RPC call to the given handler""" + pass + + @abc.abstractmethod + async def service_send( + self, + tpe: HandlerType[I, O], + arg: I, + send_delay: typing.Optional[timedelta] = None, + idempotency_key: str | None = None, + headers: typing.Dict[str, str] | None = None, + ) -> RestateClientSendHandle: + """Make a send operation to the given handler""" + pass + + @abc.abstractmethod + async def object_call( + self, + tpe: HandlerType[I, O], + key: str, + arg: I, + idempotency_key: str | None = None, + headers: typing.Dict[str, str] | None = None, + ) -> O: + """Make an RPC call to the given object handler""" + pass + + @abc.abstractmethod + async def object_send( + self, + tpe: HandlerType[I, O], + key: str, + arg: I, + send_delay: typing.Optional[timedelta] = None, + idempotency_key: str | None = None, + headers: typing.Dict[str, str] | None = None, + ) -> RestateClientSendHandle: + """Make a send operation to the given object handler""" + pass + + @abc.abstractmethod + async def workflow_call( + self, + tpe: HandlerType[I, O], + key: str, + arg: I, + idempotency_key: str | None = None, + headers: typing.Dict[str, str] | None = None, + ) -> O: + """Make an RPC call to the given workflow handler""" + pass + + @abc.abstractmethod + async def workflow_send( + self, + tpe: HandlerType[I, O], + key: str, + arg: I, + send_delay: typing.Optional[timedelta] = None, + idempotency_key: str | None = None, + headers: typing.Dict[str, str] | None = None, + ) -> RestateClientSendHandle: + """Make a send operation to the given workflow handler""" + pass + + @abc.abstractmethod + async def generic_call( + self, + service: str, + handler: str, + arg: bytes, + key: str | None = None, + idempotency_key: str | None = None, + headers: typing.Dict[str, str] | None = None, + ) -> bytes: + """Make a generic RPC call to the given service and handler""" + pass + + @abc.abstractmethod + async def generic_send( + self, + service: str, + handler: str, + arg: bytes, + key: str | None = None, + send_delay: timedelta | None = None, + idempotency_key: str | None = None, + headers: typing.Dict[str, str] | None = None, + ) -> RestateClientSendHandle: + """Make a generic send operation to the given service and handler""" + pass diff --git a/python/restate/harness.py b/python/restate/harness.py index 3ff1e59..20fc244 100644 --- a/python/restate/harness.py +++ b/python/restate/harness.py @@ -17,15 +17,17 @@ import typing from urllib.error import URLError import socket -from contextlib import contextmanager +from contextlib import contextmanager, asynccontextmanager from warnings import deprecated from hypercorn.config import Config from hypercorn.asyncio import serve +from restate.client import create_client from restate.server_types import RestateAppT from restate.types import TestHarnessEnvironment from testcontainers.core.container import DockerContainer # type: ignore from testcontainers.core.waiting_utils import wait_container_is_ready # type: ignore + import httpx @@ -332,14 +334,14 @@ def test_harness( return RestateTestHarness(app, config) -@contextmanager -def create_test_harness( +@asynccontextmanager +async def create_test_harness( app: RestateAppT, follow_logs: bool = False, restate_image: str = "restatedev/restate:latest", always_replay: bool = False, disable_retries: bool = False, -) -> typing.Generator[TestHarnessEnvironment, None, None]: +) -> typing.AsyncGenerator[TestHarnessEnvironment, None]: """ Creates a test harness for running Restate services together with restate-server. @@ -378,4 +380,7 @@ def create_test_harness( msg = f"unable to register the services at {bind_address} - {res.status_code} {res.text}" raise AssertionError(msg) - yield TestHarnessEnvironment(ingress_url=runtime.ingress_url(), admin_api_url=runtime.admin_url()) + async with create_client(runtime.ingress_url()) as client: + yield TestHarnessEnvironment( + ingress_url=runtime.ingress_url(), admin_api_url=runtime.admin_url(), client=client + ) diff --git a/python/restate/types.py b/python/restate/types.py index 53390ab..c721c80 100644 --- a/python/restate/types.py +++ b/python/restate/types.py @@ -17,6 +17,8 @@ from dataclasses import dataclass +from restate.client_types import RestateClient + @dataclass class TestHarnessEnvironment: @@ -27,3 +29,6 @@ class TestHarnessEnvironment: admin_api_url: str """The URL of the Restate admin API endpoint used in the test""" + + client: RestateClient + """The Restate client connected to the ingress URL""" diff --git a/test-services/Dockerfile b/test-services/Dockerfile index 31602ac..e4a2157 100644 --- a/test-services/Dockerfile +++ b/test-services/Dockerfile @@ -13,7 +13,6 @@ COPY python ./python/ COPY Cargo.lock . COPY Cargo.toml . COPY rust-toolchain.toml . -COPY requirements.txt . COPY pyproject.toml . COPY LICENSE . COPY README.md . diff --git a/uv.lock b/uv.lock index 9cf3398..f90bc08 100644 --- a/uv.lock +++ b/uv.lock @@ -207,6 +207,11 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/2a/39/e50c7c3a983047577ee07d2a9e53faf5a69493943ec3f6a384bdc792deb2/httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad", size = 73517, upload-time = "2024-12-06T15:37:21.509Z" }, ] +[package.optional-dependencies] +http2 = [ + { name = "h2" }, +] + [[package]] name = "hypercorn" version = "0.17.3" @@ -545,6 +550,9 @@ name = "restate-sdk" source = { editable = "." } [package.optional-dependencies] +client = [ + { name = "httpx", extra = ["http2"] }, +] harness = [ { name = "httpx" }, { name = "hypercorn" }, @@ -568,6 +576,7 @@ test = [ requires-dist = [ { name = "dacite", marker = "extra == 'serde'" }, { name = "httpx", marker = "extra == 'harness'" }, + { name = "httpx", extras = ["http2"], marker = "extra == 'client'" }, { name = "hypercorn", marker = "extra == 'harness'" }, { name = "hypercorn", marker = "extra == 'test'" }, { name = "mypy", marker = "extra == 'lint'", specifier = ">=1.11.2" }, @@ -577,7 +586,7 @@ requires-dist = [ { name = "ruff", marker = "extra == 'lint'", specifier = ">=0.6.9" }, { name = "testcontainers", marker = "extra == 'harness'" }, ] -provides-extras = ["test", "lint", "harness", "serde"] +provides-extras = ["test", "lint", "harness", "serde", "client"] [[package]] name = "ruff"