From e1b4717fac282b2fcf5ce947b8023b24a00bd54a Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Tue, 25 Nov 2025 10:13:12 -0800 Subject: [PATCH 1/3] Store initial configuration and provide it in config() by default --- temporalio/client.py | 9 +++++++-- temporalio/worker/_worker.py | 14 +++++++++----- tests/test_plugins.py | 16 ++++++++-------- 3 files changed, 24 insertions(+), 15 deletions(-) diff --git a/temporalio/client.py b/temporalio/client.py index 770a51392..27d95cde1 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -247,6 +247,7 @@ def __init__( header_codec_behavior=header_codec_behavior, plugins=plugins, ) + self._initial_config = config.copy() for plugin in plugins: config = plugin.configure_client(config) @@ -261,12 +262,16 @@ def _init_from_config(self, config: ClientConfig): for interceptor in reversed(list(self._config["interceptors"])): self._impl = interceptor.intercept_client(self._impl) - def config(self) -> ClientConfig: + def config(self, active_config: bool = False) -> ClientConfig: """Config, as a dictionary, used to create this client. + Args: + active_config: If true, return the modified configuration in use rather than the initial one + provided to the client. + This makes a shallow copy of the config each call. """ - config = self._config.copy() + config = self._config.copy() if active_config else self._initial_config.copy() config["interceptors"] = list(config["interceptors"]) return config diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index fc2c2241d..a3867e532 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -376,8 +376,9 @@ def __init__( f"The same plugin type {type(client_plugin)} is present from both client and worker. It may run twice and may not be the intended behavior." ) plugins = plugins_from_client + list(plugins) + self._initial_config = config.copy() - self.plugins = plugins + self._plugins = plugins for plugin in plugins: config = plugin.configure_worker(config) @@ -388,7 +389,6 @@ def _init_from_config(self, client: temporalio.client.Client, config: WorkerConf Client is safe to take separately since it can't be modified by worker plugins. """ self._config = config - if not ( config["activities"] or config["nexus_service_handlers"] @@ -612,13 +612,17 @@ def check_activity(activity): ), ) - def config(self) -> WorkerConfig: + def config(self, active_config: bool = False) -> WorkerConfig: """Config, as a dictionary, used to create this worker. + Args: + active_config: If true, return the modified configuration in use rather than the initial one + provided to the worker. + Returns: Configuration, shallow-copied. """ - config = self._config.copy() + config = self._config.copy() if active_config else self._initial_config.copy() config["activities"] = list(config.get("activities", [])) config["workflows"] = list(config.get("workflows", [])) return config @@ -692,7 +696,7 @@ def make_lambda(plugin, next): return lambda w: plugin.run_worker(w, next) next_function = lambda w: w._run() - for plugin in reversed(self.plugins): + for plugin in reversed(self._plugins): next_function = make_lambda(plugin, next_function) await next_function(self) diff --git a/tests/test_plugins.py b/tests/test_plugins.py index 5571841b4..b151c695b 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -148,14 +148,14 @@ async def test_worker_plugin_basic_config(client: Client) -> None: activities=[never_run_activity], plugins=[MyWorkerPlugin()], ) - assert worker.config().get("task_queue") == "replaced_queue" + assert worker.config(True).get("task_queue") == "replaced_queue" # Test client plugin propagation to worker plugins new_config = client.config() new_config["plugins"] = [MyCombinedPlugin()] client = Client(**new_config) worker = Worker(client, task_queue="queue", activities=[never_run_activity]) - assert worker.config().get("task_queue") == "combined" + assert worker.config(True).get("task_queue") == "combined" # Test both. Client propagated plugins are called first, so the worker plugin overrides in this case worker = Worker( @@ -164,7 +164,7 @@ async def test_worker_plugin_basic_config(client: Client) -> None: activities=[never_run_activity], plugins=[MyWorkerPlugin()], ) - assert worker.config().get("task_queue") == "replaced_queue" + assert worker.config(True).get("task_queue") == "replaced_queue" async def test_worker_duplicated_plugin(client: Client) -> None: @@ -195,7 +195,7 @@ async def test_worker_sandbox_restrictions(client: Client) -> None: assert ( "my_module" in cast( - SandboxedWorkflowRunner, worker.config().get("workflow_runner") + SandboxedWorkflowRunner, worker.config(True).get("workflow_runner") ).restrictions.passthrough_modules ) @@ -296,7 +296,7 @@ async def test_simple_plugins(client: Client) -> None: plugins=[plugin], ) # On a sequence, a value is appended - assert worker.config().get("workflows") == [HelloWorkflow, HelloWorkflow2] + assert worker.config(True).get("workflows") == [HelloWorkflow, HelloWorkflow2] # Test with plugin registered in client worker = Worker( @@ -304,7 +304,7 @@ async def test_simple_plugins(client: Client) -> None: task_queue="queue", activities=[never_run_activity], ) - assert worker.config().get("workflows") == [HelloWorkflow2] + assert worker.config(True).get("workflows") == [HelloWorkflow2] replayer = Replayer(workflows=[HelloWorkflow], plugins=[plugin]) assert replayer.config().get("data_converter") == pydantic_data_converter @@ -343,7 +343,7 @@ def converter(old: Optional[DataConverter]): activities=[never_run_activity], plugins=[plugin], ) - assert worker.config().get("workflows") == [] + assert worker.config(True).get("workflows") == [] class MediumPlugin(SimplePlugin): @@ -361,4 +361,4 @@ async def test_medium_plugin(client: Client) -> None: worker = Worker( client, task_queue="queue", plugins=[plugin], workflows=[HelloWorkflow] ) - assert worker.config().get("task_queue") == "override" + assert worker.config(True).get("task_queue") == "override" From c156ac23be27cd11f309c1911470174526d02c31 Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Tue, 25 Nov 2025 10:24:56 -0800 Subject: [PATCH 2/3] Include replayer --- temporalio/worker/_replayer.py | 9 +++++++-- tests/test_plugins.py | 8 ++++---- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/temporalio/worker/_replayer.py b/temporalio/worker/_replayer.py index 04a32be17..731fca339 100644 --- a/temporalio/worker/_replayer.py +++ b/temporalio/worker/_replayer.py @@ -81,6 +81,7 @@ def __init__( disable_safe_workflow_eviction=disable_safe_workflow_eviction, header_codec_behavior=header_codec_behavior, ) + self._initial_config = self._config.copy() # Apply plugin configuration self.plugins = plugins @@ -91,13 +92,17 @@ def __init__( if not self._config["workflows"]: raise ValueError("At least one workflow must be specified") - def config(self) -> ReplayerConfig: + def config(self, active_config: bool = False) -> ReplayerConfig: """Config, as a dictionary, used to create this replayer. + Args: + active_config: If true, return the modified configuration in use rather than the initial one + provided to the client. + Returns: Configuration, shallow-copied. """ - config = self._config.copy() + config = self._config.copy() if active_config else self._initial_config.copy() config["workflows"] = list(config["workflows"]) return config diff --git a/tests/test_plugins.py b/tests/test_plugins.py index b151c695b..bacce7654 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -269,8 +269,8 @@ async def test_replay(client: Client) -> None: ) await handle.result() replayer = Replayer(workflows=[], plugins=[plugin]) - assert len(replayer.config().get("workflows") or []) == 1 - assert replayer.config().get("data_converter") == pydantic_data_converter + assert len(replayer.config(True).get("workflows") or []) == 1 + assert replayer.config(True).get("data_converter") == pydantic_data_converter await replayer.replay_workflow(await handle.fetch_history()) @@ -307,8 +307,8 @@ async def test_simple_plugins(client: Client) -> None: assert worker.config(True).get("workflows") == [HelloWorkflow2] replayer = Replayer(workflows=[HelloWorkflow], plugins=[plugin]) - assert replayer.config().get("data_converter") == pydantic_data_converter - assert replayer.config().get("workflows") == [HelloWorkflow, HelloWorkflow2] + assert replayer.config(True).get("data_converter") == pydantic_data_converter + assert replayer.config(True).get("workflows") == [HelloWorkflow, HelloWorkflow2] async def test_simple_plugins_callables(client: Client) -> None: From 08d5aff6f74ae84dd790d2048385bda715c007d7 Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Tue, 25 Nov 2025 11:18:28 -0800 Subject: [PATCH 3/3] Use active config in a few places --- temporalio/client.py | 6 ++++-- temporalio/worker/_worker.py | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/temporalio/client.py b/temporalio/client.py index 7001cfa74..48a72f1d2 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -4295,7 +4295,8 @@ async def _to_proto( await _apply_headers( self.headers, action.start_workflow.header.fields, - client.config()["header_codec_behavior"] == HeaderCodecBehavior.CODEC + client.config(True)["header_codec_behavior"] + == HeaderCodecBehavior.CODEC and not self._from_raw, client.data_converter.payload_codec, ) @@ -6922,7 +6923,8 @@ async def _apply_headers( await _apply_headers( source, dest, - self._client.config()["header_codec_behavior"] == HeaderCodecBehavior.CODEC, + self._client.config(True)["header_codec_behavior"] + == HeaderCodecBehavior.CODEC, self._client.data_converter.payload_codec, ) diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index 1fb805a2b..c8bba7350 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -409,7 +409,7 @@ def _init_from_config(self, client: temporalio.client.Client, config: WorkerConf ) # Prepend applicable client interceptors to the given ones - client_config = config["client"].config() + client_config = config["client"].config(True) interceptors_from_client = cast( List[Interceptor], [i for i in client_config["interceptors"] if isinstance(i, Interceptor)],