Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ def __init__(
default_workflow_query_reject_condition=default_workflow_query_reject_condition,
header_codec_behavior=header_codec_behavior,
)
self._initial_config = config.copy()

for plugin in plugins:
config = plugin.configure_client(config)
Expand All @@ -261,12 +262,16 @@ def _init_from_config(self, config: ClientConfig):
for interceptor in reversed(list(self._config["interceptors"])):
self._impl = interceptor.intercept_client(self._impl)

def config(self) -> ClientConfig:
def config(self, active_config: bool = False) -> ClientConfig:
"""Config, as a dictionary, used to create this client.

Args:
active_config: If true, return the modified configuration in use rather than the initial one
provided to the client.

This makes a shallow copy of the config each call.
"""
config = self._config.copy()
config = self._config.copy() if active_config else self._initial_config.copy()
config["interceptors"] = list(config["interceptors"])
return config

Expand Down Expand Up @@ -4290,7 +4295,8 @@ async def _to_proto(
await _apply_headers(
self.headers,
action.start_workflow.header.fields,
client.config()["header_codec_behavior"] == HeaderCodecBehavior.CODEC
client.config(True)["header_codec_behavior"]
== HeaderCodecBehavior.CODEC
and not self._from_raw,
client.data_converter.payload_codec,
)
Expand Down Expand Up @@ -6917,7 +6923,8 @@ async def _apply_headers(
await _apply_headers(
source,
dest,
self._client.config()["header_codec_behavior"] == HeaderCodecBehavior.CODEC,
self._client.config(True)["header_codec_behavior"]
== HeaderCodecBehavior.CODEC,
self._client.data_converter.payload_codec,
)

Expand Down
9 changes: 7 additions & 2 deletions temporalio/worker/_replayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def __init__(
disable_safe_workflow_eviction=disable_safe_workflow_eviction,
header_codec_behavior=header_codec_behavior,
)
self._initial_config = self._config.copy()

# Apply plugin configuration
self.plugins = plugins
Expand All @@ -91,13 +92,17 @@ def __init__(
if not self._config["workflows"]:
raise ValueError("At least one workflow must be specified")

def config(self) -> ReplayerConfig:
def config(self, active_config: bool = False) -> ReplayerConfig:
"""Config, as a dictionary, used to create this replayer.

Args:
active_config: If true, return the modified configuration in use rather than the initial one
provided to the client.

Returns:
Configuration, shallow-copied.
"""
config = self._config.copy()
config = self._config.copy() if active_config else self._initial_config.copy()
config["workflows"] = list(config["workflows"])
return config

Expand Down
18 changes: 11 additions & 7 deletions temporalio/worker/_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,9 @@ def __init__(
f"The same plugin type {type(client_plugin)} is present from both client and worker. It may run twice and may not be the intended behavior."
)
plugins = plugins_from_client + list(plugins)
self._initial_config = config.copy()

self.plugins = plugins
self._plugins = plugins
for plugin in plugins:
config = plugin.configure_worker(config)

Expand All @@ -388,7 +389,6 @@ def _init_from_config(self, client: temporalio.client.Client, config: WorkerConf
Client is safe to take separately since it can't be modified by worker plugins.
"""
self._config = config

if not (
config["activities"]
or config["nexus_service_handlers"]
Expand All @@ -409,7 +409,7 @@ def _init_from_config(self, client: temporalio.client.Client, config: WorkerConf
)

# Prepend applicable client interceptors to the given ones
client_config = config["client"].config()
client_config = config["client"].config(True)
interceptors_from_client = cast(
List[Interceptor],
[i for i in client_config["interceptors"] if isinstance(i, Interceptor)],
Expand Down Expand Up @@ -555,7 +555,7 @@ def check_activity(activity):
maximum=config["max_concurrent_activity_task_polls"]
)

deduped_plugin_names = list(set([plugin.name() for plugin in self.plugins]))
deduped_plugin_names = list(set([plugin.name() for plugin in self._plugins]))

# Create bridge worker last. We have empirically observed that if it is
# created before an error is raised from the activity worker
Expand Down Expand Up @@ -623,13 +623,17 @@ def check_activity(activity):
),
)

def config(self) -> WorkerConfig:
def config(self, active_config: bool = False) -> WorkerConfig:
"""Config, as a dictionary, used to create this worker.

Args:
active_config: If true, return the modified configuration in use rather than the initial one
provided to the worker.

Returns:
Configuration, shallow-copied.
"""
config = self._config.copy()
config = self._config.copy() if active_config else self._initial_config.copy()
config["activities"] = list(config.get("activities", []))
config["workflows"] = list(config.get("workflows", []))
return config
Expand Down Expand Up @@ -703,7 +707,7 @@ def make_lambda(plugin, next):
return lambda w: plugin.run_worker(w, next)

next_function = lambda w: w._run()
for plugin in reversed(self.plugins):
for plugin in reversed(self._plugins):
next_function = make_lambda(plugin, next_function)

await next_function(self)
Expand Down
24 changes: 12 additions & 12 deletions tests/test_plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ async def test_worker_plugin_basic_config(client: Client) -> None:
activities=[never_run_activity],
plugins=[MyWorkerPlugin()],
)
task_queue = worker.config().get("task_queue")
task_queue = worker.config(True).get("task_queue")
assert task_queue is not None and task_queue.startswith("replaced_queue")

# Test client plugin propagation to worker plugins
Expand All @@ -160,7 +160,7 @@ async def test_worker_plugin_basic_config(client: Client) -> None:
worker = Worker(
client, task_queue="queue" + str(uuid.uuid4()), activities=[never_run_activity]
)
task_queue = worker.config().get("task_queue")
task_queue = worker.config(True).get("task_queue")
assert task_queue is not None and task_queue.startswith("combined")

# Test both. Client propagated plugins are called first, so the worker plugin overrides in this case
Expand All @@ -170,7 +170,7 @@ async def test_worker_plugin_basic_config(client: Client) -> None:
activities=[never_run_activity],
plugins=[MyWorkerPlugin()],
)
task_queue = worker.config().get("task_queue")
task_queue = worker.config(True).get("task_queue")
assert task_queue is not None and task_queue.startswith("replaced_queue")


Expand Down Expand Up @@ -202,7 +202,7 @@ async def test_worker_sandbox_restrictions(client: Client) -> None:
assert (
"my_module"
in cast(
SandboxedWorkflowRunner, worker.config().get("workflow_runner")
SandboxedWorkflowRunner, worker.config(True).get("workflow_runner")
).restrictions.passthrough_modules
)

Expand Down Expand Up @@ -276,8 +276,8 @@ async def test_replay(client: Client) -> None:
)
await handle.result()
replayer = Replayer(workflows=[], plugins=[plugin])
assert len(replayer.config().get("workflows") or []) == 1
assert replayer.config().get("data_converter") == pydantic_data_converter
assert len(replayer.config(True).get("workflows") or []) == 1
assert replayer.config(True).get("data_converter") == pydantic_data_converter

await replayer.replay_workflow(await handle.fetch_history())

Expand All @@ -303,19 +303,19 @@ async def test_simple_plugins(client: Client) -> None:
plugins=[plugin],
)
# On a sequence, a value is appended
assert worker.config().get("workflows") == [HelloWorkflow, HelloWorkflow2]
assert worker.config(True).get("workflows") == [HelloWorkflow, HelloWorkflow2]

# Test with plugin registered in client
worker = Worker(
new_client,
task_queue="queue" + str(uuid.uuid4()),
activities=[never_run_activity],
)
assert worker.config().get("workflows") == [HelloWorkflow2]
assert worker.config(True).get("workflows") == [HelloWorkflow2]

replayer = Replayer(workflows=[HelloWorkflow], plugins=[plugin])
assert replayer.config().get("data_converter") == pydantic_data_converter
assert replayer.config().get("workflows") == [HelloWorkflow, HelloWorkflow2]
assert replayer.config(True).get("data_converter") == pydantic_data_converter
assert replayer.config(True).get("workflows") == [HelloWorkflow, HelloWorkflow2]


async def test_simple_plugins_callables(client: Client) -> None:
Expand Down Expand Up @@ -350,7 +350,7 @@ def converter(old: Optional[DataConverter]):
activities=[never_run_activity],
plugins=[plugin],
)
assert worker.config().get("workflows") == []
assert worker.config(True).get("workflows") == []


class MediumPlugin(SimplePlugin):
Expand All @@ -371,5 +371,5 @@ async def test_medium_plugin(client: Client) -> None:
plugins=[plugin],
workflows=[HelloWorkflow],
)
task_queue = worker.config().get("task_queue")
task_queue = worker.config(True).get("task_queue")
assert task_queue is not None and task_queue.startswith("override")
Loading