diff --git a/temporalio/client.py b/temporalio/client.py index 745f35731..48a72f1d2 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -247,6 +247,7 @@ def __init__( default_workflow_query_reject_condition=default_workflow_query_reject_condition, header_codec_behavior=header_codec_behavior, ) + self._initial_config = config.copy() for plugin in plugins: config = plugin.configure_client(config) @@ -261,12 +262,16 @@ def _init_from_config(self, config: ClientConfig): for interceptor in reversed(list(self._config["interceptors"])): self._impl = interceptor.intercept_client(self._impl) - def config(self) -> ClientConfig: + def config(self, active_config: bool = False) -> ClientConfig: """Config, as a dictionary, used to create this client. + Args: + active_config: If true, return the modified configuration in use rather than the initial one + provided to the client. + This makes a shallow copy of the config each call. """ - config = self._config.copy() + config = self._config.copy() if active_config else self._initial_config.copy() config["interceptors"] = list(config["interceptors"]) return config @@ -4290,7 +4295,8 @@ async def _to_proto( await _apply_headers( self.headers, action.start_workflow.header.fields, - client.config()["header_codec_behavior"] == HeaderCodecBehavior.CODEC + client.config(True)["header_codec_behavior"] + == HeaderCodecBehavior.CODEC and not self._from_raw, client.data_converter.payload_codec, ) @@ -6917,7 +6923,8 @@ async def _apply_headers( await _apply_headers( source, dest, - self._client.config()["header_codec_behavior"] == HeaderCodecBehavior.CODEC, + self._client.config(True)["header_codec_behavior"] + == HeaderCodecBehavior.CODEC, self._client.data_converter.payload_codec, ) diff --git a/temporalio/worker/_replayer.py b/temporalio/worker/_replayer.py index 2d953d284..5a8a8a7ff 100644 --- a/temporalio/worker/_replayer.py +++ b/temporalio/worker/_replayer.py @@ -81,6 +81,7 @@ def __init__( disable_safe_workflow_eviction=disable_safe_workflow_eviction, header_codec_behavior=header_codec_behavior, ) + self._initial_config = self._config.copy() # Apply plugin configuration self.plugins = plugins @@ -91,13 +92,17 @@ def __init__( if not self._config["workflows"]: raise ValueError("At least one workflow must be specified") - def config(self) -> ReplayerConfig: + def config(self, active_config: bool = False) -> ReplayerConfig: """Config, as a dictionary, used to create this replayer. + Args: + active_config: If true, return the modified configuration in use rather than the initial one + provided to the client. + Returns: Configuration, shallow-copied. """ - config = self._config.copy() + config = self._config.copy() if active_config else self._initial_config.copy() config["workflows"] = list(config["workflows"]) return config diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index daa02fbec..c8bba7350 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -376,8 +376,9 @@ def __init__( f"The same plugin type {type(client_plugin)} is present from both client and worker. It may run twice and may not be the intended behavior." ) plugins = plugins_from_client + list(plugins) + self._initial_config = config.copy() - self.plugins = plugins + self._plugins = plugins for plugin in plugins: config = plugin.configure_worker(config) @@ -388,7 +389,6 @@ def _init_from_config(self, client: temporalio.client.Client, config: WorkerConf Client is safe to take separately since it can't be modified by worker plugins. """ self._config = config - if not ( config["activities"] or config["nexus_service_handlers"] @@ -409,7 +409,7 @@ def _init_from_config(self, client: temporalio.client.Client, config: WorkerConf ) # Prepend applicable client interceptors to the given ones - client_config = config["client"].config() + client_config = config["client"].config(True) interceptors_from_client = cast( List[Interceptor], [i for i in client_config["interceptors"] if isinstance(i, Interceptor)], @@ -555,7 +555,7 @@ def check_activity(activity): maximum=config["max_concurrent_activity_task_polls"] ) - deduped_plugin_names = list(set([plugin.name() for plugin in self.plugins])) + deduped_plugin_names = list(set([plugin.name() for plugin in self._plugins])) # Create bridge worker last. We have empirically observed that if it is # created before an error is raised from the activity worker @@ -623,13 +623,17 @@ def check_activity(activity): ), ) - def config(self) -> WorkerConfig: + def config(self, active_config: bool = False) -> WorkerConfig: """Config, as a dictionary, used to create this worker. + Args: + active_config: If true, return the modified configuration in use rather than the initial one + provided to the worker. + Returns: Configuration, shallow-copied. """ - config = self._config.copy() + config = self._config.copy() if active_config else self._initial_config.copy() config["activities"] = list(config.get("activities", [])) config["workflows"] = list(config.get("workflows", [])) return config @@ -703,7 +707,7 @@ def make_lambda(plugin, next): return lambda w: plugin.run_worker(w, next) next_function = lambda w: w._run() - for plugin in reversed(self.plugins): + for plugin in reversed(self._plugins): next_function = make_lambda(plugin, next_function) await next_function(self) diff --git a/tests/test_plugins.py b/tests/test_plugins.py index 0034bd943..45ae76bd9 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -150,7 +150,7 @@ async def test_worker_plugin_basic_config(client: Client) -> None: activities=[never_run_activity], plugins=[MyWorkerPlugin()], ) - task_queue = worker.config().get("task_queue") + task_queue = worker.config(True).get("task_queue") assert task_queue is not None and task_queue.startswith("replaced_queue") # Test client plugin propagation to worker plugins @@ -160,7 +160,7 @@ async def test_worker_plugin_basic_config(client: Client) -> None: worker = Worker( client, task_queue="queue" + str(uuid.uuid4()), activities=[never_run_activity] ) - task_queue = worker.config().get("task_queue") + task_queue = worker.config(True).get("task_queue") assert task_queue is not None and task_queue.startswith("combined") # Test both. Client propagated plugins are called first, so the worker plugin overrides in this case @@ -170,7 +170,7 @@ async def test_worker_plugin_basic_config(client: Client) -> None: activities=[never_run_activity], plugins=[MyWorkerPlugin()], ) - task_queue = worker.config().get("task_queue") + task_queue = worker.config(True).get("task_queue") assert task_queue is not None and task_queue.startswith("replaced_queue") @@ -202,7 +202,7 @@ async def test_worker_sandbox_restrictions(client: Client) -> None: assert ( "my_module" in cast( - SandboxedWorkflowRunner, worker.config().get("workflow_runner") + SandboxedWorkflowRunner, worker.config(True).get("workflow_runner") ).restrictions.passthrough_modules ) @@ -276,8 +276,8 @@ async def test_replay(client: Client) -> None: ) await handle.result() replayer = Replayer(workflows=[], plugins=[plugin]) - assert len(replayer.config().get("workflows") or []) == 1 - assert replayer.config().get("data_converter") == pydantic_data_converter + assert len(replayer.config(True).get("workflows") or []) == 1 + assert replayer.config(True).get("data_converter") == pydantic_data_converter await replayer.replay_workflow(await handle.fetch_history()) @@ -303,7 +303,7 @@ async def test_simple_plugins(client: Client) -> None: plugins=[plugin], ) # On a sequence, a value is appended - assert worker.config().get("workflows") == [HelloWorkflow, HelloWorkflow2] + assert worker.config(True).get("workflows") == [HelloWorkflow, HelloWorkflow2] # Test with plugin registered in client worker = Worker( @@ -311,11 +311,11 @@ async def test_simple_plugins(client: Client) -> None: task_queue="queue" + str(uuid.uuid4()), activities=[never_run_activity], ) - assert worker.config().get("workflows") == [HelloWorkflow2] + assert worker.config(True).get("workflows") == [HelloWorkflow2] replayer = Replayer(workflows=[HelloWorkflow], plugins=[plugin]) - assert replayer.config().get("data_converter") == pydantic_data_converter - assert replayer.config().get("workflows") == [HelloWorkflow, HelloWorkflow2] + assert replayer.config(True).get("data_converter") == pydantic_data_converter + assert replayer.config(True).get("workflows") == [HelloWorkflow, HelloWorkflow2] async def test_simple_plugins_callables(client: Client) -> None: @@ -350,7 +350,7 @@ def converter(old: Optional[DataConverter]): activities=[never_run_activity], plugins=[plugin], ) - assert worker.config().get("workflows") == [] + assert worker.config(True).get("workflows") == [] class MediumPlugin(SimplePlugin): @@ -371,5 +371,5 @@ async def test_medium_plugin(client: Client) -> None: plugins=[plugin], workflows=[HelloWorkflow], ) - task_queue = worker.config().get("task_queue") + task_queue = worker.config(True).get("task_queue") assert task_queue is not None and task_queue.startswith("override")