From 035047a717a3a134e988c24e1819370c44af10c4 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Mon, 13 Oct 2025 13:30:07 -0700 Subject: [PATCH 01/12] Plumb plugins to core --- temporalio/bridge/src/worker.rs | 15 +++++++++++++-- temporalio/bridge/worker.py | 1 + temporalio/worker/_replayer.py | 3 +++ temporalio/worker/_worker.py | 4 ++++ tests/test_plugins.py | 7 +++++++ 5 files changed, 28 insertions(+), 2 deletions(-) diff --git a/temporalio/bridge/src/worker.rs b/temporalio/bridge/src/worker.rs index 92b43f356..c5cf46453 100644 --- a/temporalio/bridge/src/worker.rs +++ b/temporalio/bridge/src/worker.rs @@ -14,8 +14,9 @@ use temporal_sdk_core::api::errors::PollError; use temporal_sdk_core::replay::{HistoryForReplay, ReplayWorkerInput}; use temporal_sdk_core_api::errors::WorkflowErrorType; use temporal_sdk_core_api::worker::{ - SlotInfo, SlotInfoTrait, SlotKind, SlotKindType, SlotMarkUsedContext, SlotReleaseContext, - SlotReservationContext, SlotSupplier as SlotSupplierTrait, SlotSupplierPermit, + PluginInfo as CorePluginInfo, SlotInfo, SlotInfoTrait, SlotKind, SlotKindType, + SlotMarkUsedContext, SlotReleaseContext, SlotReservationContext, + SlotSupplier as SlotSupplierTrait, SlotSupplierPermit, }; use temporal_sdk_core_api::Worker; use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion; @@ -63,6 +64,7 @@ pub struct WorkerConfig { nondeterminism_as_workflow_fail: bool, nondeterminism_as_workflow_fail_for_types: HashSet, nexus_task_poller_behavior: PollerBehavior, + plugins: Vec, } #[derive(FromPyObject)] @@ -722,6 +724,15 @@ fn convert_worker_config( .collect::>>(), ) .nexus_task_poller_behavior(conf.nexus_task_poller_behavior) + .plugins( + conf.plugins + .into_iter() + .map(|name| CorePluginInfo { + name, + version: None, + }) + .collect(), + ) .build() .map_err(|err| PyValueError::new_err(format!("Invalid worker config: {err}"))) } diff --git a/temporalio/bridge/worker.py b/temporalio/bridge/worker.py index 8e20b670a..fbcfc9c3c 100644 --- a/temporalio/bridge/worker.py +++ b/temporalio/bridge/worker.py @@ -64,6 +64,7 @@ class WorkerConfig: nondeterminism_as_workflow_fail: bool nondeterminism_as_workflow_fail_for_types: Set[str] nexus_task_poller_behavior: PollerBehavior + plugins: Sequence[str] @dataclass diff --git a/temporalio/worker/_replayer.py b/temporalio/worker/_replayer.py index 72ffa4b56..0cebb8fa6 100644 --- a/temporalio/worker/_replayer.py +++ b/temporalio/worker/_replayer.py @@ -80,6 +80,7 @@ def __init__( runtime=runtime, disable_safe_workflow_eviction=disable_safe_workflow_eviction, header_codec_behavior=header_codec_behavior, + plugins=[plugin.name() for plugin in plugins], ) # Apply plugin configuration @@ -293,6 +294,7 @@ def on_eviction_hook( nexus_task_poller_behavior=temporalio.bridge.worker.PollerBehaviorSimpleMaximum( 1 ), + plugins=self._config.get("plugins", []), ), ) # Start worker @@ -368,6 +370,7 @@ class ReplayerConfig(TypedDict, total=False): runtime: Optional[temporalio.runtime.Runtime] disable_safe_workflow_eviction: bool header_codec_behavior: HeaderCodecBehavior + plugins: Sequence[str] @dataclass(frozen=True) diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index ad238507c..42cfbf355 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -376,6 +376,7 @@ def __init__( f"The same plugin type {type(client_plugin)} is present from both client and worker. It may run twice and may not be the intended behavior." ) plugins = plugins_from_client + list(plugins) + config["plugins"] = [plugin.name() for plugin in plugins] self.plugins = plugins for plugin in plugins: @@ -609,6 +610,7 @@ def check_activity(activity): nexus_task_poller_behavior=config[ "nexus_task_poller_behavior" ]._to_bridge(), + plugins=config.get("plugins", []), ), ) @@ -621,6 +623,7 @@ def config(self) -> WorkerConfig: config = self._config.copy() config["activities"] = list(config.get("activities", [])) config["workflows"] = list(config.get("workflows", [])) + config["plugins"] = list(config.get("plugins", [])) return config @property @@ -902,6 +905,7 @@ class WorkerConfig(TypedDict, total=False): workflow_task_poller_behavior: PollerBehavior activity_task_poller_behavior: PollerBehavior nexus_task_poller_behavior: PollerBehavior + plugins: Sequence[str] def _warn_if_activity_executor_max_workers_is_inconsistent( diff --git a/tests/test_plugins.py b/tests/test_plugins.py index 5571841b4..adde010da 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -149,6 +149,7 @@ async def test_worker_plugin_basic_config(client: Client) -> None: plugins=[MyWorkerPlugin()], ) assert worker.config().get("task_queue") == "replaced_queue" + assert worker.config().get("plugins") == [MyWorkerPlugin().name()] # Test client plugin propagation to worker plugins new_config = client.config() @@ -156,6 +157,7 @@ async def test_worker_plugin_basic_config(client: Client) -> None: client = Client(**new_config) worker = Worker(client, task_queue="queue", activities=[never_run_activity]) assert worker.config().get("task_queue") == "combined" + assert worker.config().get("plugins") == [MyCombinedPlugin().name()] # Test both. Client propagated plugins are called first, so the worker plugin overrides in this case worker = Worker( @@ -165,6 +167,10 @@ async def test_worker_plugin_basic_config(client: Client) -> None: plugins=[MyWorkerPlugin()], ) assert worker.config().get("task_queue") == "replaced_queue" + assert worker.config().get("plugins") == [ + MyCombinedPlugin().name(), + MyWorkerPlugin().name(), + ] async def test_worker_duplicated_plugin(client: Client) -> None: @@ -271,6 +277,7 @@ async def test_replay(client: Client) -> None: replayer = Replayer(workflows=[], plugins=[plugin]) assert len(replayer.config().get("workflows") or []) == 1 assert replayer.config().get("data_converter") == pydantic_data_converter + assert replayer.config().get("plugins") == [plugin.name()] await replayer.replay_workflow(await handle.fetch_history()) From 8f3ea32f97c3bfa130e875e1eefd06b7a36c1bca Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Tue, 14 Oct 2025 09:20:52 -0700 Subject: [PATCH 02/12] Plumb skip_client_worker_set_check --- temporalio/bridge/Cargo.lock | 223 ++++++++++++++++++++++++++++++- temporalio/bridge/runtime.py | 12 +- temporalio/bridge/sdk-core | 2 +- temporalio/bridge/src/lib.rs | 4 +- temporalio/bridge/src/runtime.rs | 48 +++++-- temporalio/bridge/src/worker.rs | 14 +- temporalio/bridge/worker.py | 1 + temporalio/runtime.py | 58 +++++++- temporalio/worker/_replayer.py | 1 + temporalio/worker/_worker.py | 9 ++ tests/conftest.py | 14 ++ tests/test_runtime.py | 17 +++ tests/worker/test_activity.py | 2 + 13 files changed, 377 insertions(+), 28 deletions(-) diff --git a/temporalio/bridge/Cargo.lock b/temporalio/bridge/Cargo.lock index b40d5fc72..764d8c792 100644 --- a/temporalio/bridge/Cargo.lock +++ b/temporalio/bridge/Cargo.lock @@ -17,6 +17,17 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" +[[package]] +name = "aes" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" +dependencies = [ + "cfg-if", + "cipher", + "cpufeatures", +] + [[package]] name = "aho-corasick" version = "1.1.3" @@ -159,6 +170,15 @@ version = "2.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a65b545ab31d687cff52899d4890855fec459eb6afe0da6417b8a18da87aa29" +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + [[package]] name = "bumpalo" version = "3.19.0" @@ -213,6 +233,22 @@ dependencies = [ "serde", ] +[[package]] +name = "cipher" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" +dependencies = [ + "crypto-common", + "inout", +] + +[[package]] +name = "constant_time_eq" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6" + [[package]] name = "core-foundation" version = "0.10.1" @@ -229,6 +265,15 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "cpufeatures" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" +dependencies = [ + "libc", +] + [[package]] name = "crc32fast" version = "1.5.0" @@ -262,6 +307,16 @@ version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" +[[package]] +name = "crypto-common" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +dependencies = [ + "generic-array", + "typenum", +] + [[package]] name = "darling" version = "0.20.11" @@ -311,6 +366,21 @@ dependencies = [ "parking_lot_core", ] +[[package]] +name = "deflate64" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26bf8fc351c5ed29b5c2f0cbbac1b209b74f60ecd62e675a998df72c49af5204" + +[[package]] +name = "deranged" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a41953f86f8a05768a6cda24def994fd2f424b04ec5c719cf89989779f199071" +dependencies = [ + "powerfmt", +] + [[package]] name = "derive_arbitrary" version = "1.4.2" @@ -374,6 +444,17 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer", + "crypto-common", + "subtle", +] + [[package]] name = "dirs" version = "6.0.0" @@ -644,6 +725,16 @@ dependencies = [ "slab", ] +[[package]] +name = "generic-array" +version = "0.14.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bb6743198531e02858aeaea5398fcc883e71851fcbcb5a2f773e2fb6cb1edf2" +dependencies = [ + "typenum", + "version_check", +] + [[package]] name = "gethostname" version = "1.0.2" @@ -752,6 +843,15 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hmac" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +dependencies = [ + "digest", +] + [[package]] name = "http" version = "1.3.1" @@ -1004,6 +1104,15 @@ version = "2.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f4c7245a08504955605670dbf141fceab975f15ca21570696aebe9d2e71576bd" +[[package]] +name = "inout" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "879f10e63c20629ecabbb64a8010319738c66a5cd0c29b02d63d272b03751d01" +dependencies = [ + "generic-array", +] + [[package]] name = "instant" version = "0.1.13" @@ -1102,6 +1211,26 @@ version = "0.2.175" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a82ae493e598baaea5209805c49bbf2ea7de956d50d7da0da1164f9c6d28543" +[[package]] +name = "liblzma" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73c36d08cad03a3fbe2c4e7bb3a9e84c57e4ee4135ed0b065cade3d98480c648" +dependencies = [ + "liblzma-sys", +] + +[[package]] +name = "liblzma-sys" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01b9596486f6d60c3bbe644c0e1be1aa6ccc472ad630fe8927b456973d7cb736" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "libredox" version = "0.1.9" @@ -1277,6 +1406,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-traits" version = "0.2.19" @@ -1431,6 +1566,16 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "pbkdf2" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8ed6a7761f76e3b9f92dfb0a60a6a6477c61024b775147ff0973a02653abaf2" +dependencies = [ + "digest", + "hmac", +] + [[package]] name = "percent-encoding" version = "2.3.1" @@ -1518,6 +1663,18 @@ dependencies = [ "zerovec", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + +[[package]] +name = "ppmd-rust" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c834641d8ad1b348c9ee86dec3b9840d805acd5f24daa5f90c788951a52ff59b" + [[package]] name = "ppv-lite86" version = "0.2.21" @@ -2260,6 +2417,17 @@ dependencies = [ "serde", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sharded-slab" version = "0.1.7" @@ -2569,6 +2737,7 @@ dependencies = [ "tracing", "tracing-core", "url", + "uuid", ] [[package]] @@ -2647,6 +2816,25 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "time" +version = "0.3.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e7d9e3bb61134e77bde20dd4825b97c010155709965fedf0f49bb138e52a9d" +dependencies = [ + "deranged", + "num-conv", + "powerfmt", + "serde", + "time-core", +] + +[[package]] +name = "time-core" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40868e7c1d2f0b8d73e4a8c7f0ff63af4f6d19be117e90bd73eb1d62cf831c6b" + [[package]] name = "tinystr" version = "0.8.1" @@ -2931,6 +3119,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc7d623258602320d5c55d1bc22793b57daff0ec7efc270ea7d55ce1d5f5471c" +[[package]] +name = "typenum" +version = "1.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" + [[package]] name = "typetag" version = "0.2.20" @@ -2998,9 +3192,9 @@ checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" [[package]] name = "uuid" -version = "1.18.0" +version = "1.18.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f33196643e165781c20a5ead5582283a7dacbb87855d867fbc2df3f81eddc1be" +checksum = "2f87b8aa10b915a06587d0dec516c282ff295b475d94abf425d62b57710070a2" dependencies = [ "getrandom 0.3.3", "js-sys", @@ -3537,6 +3731,20 @@ name = "zeroize" version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" +dependencies = [ + "zeroize_derive", +] + +[[package]] +name = "zeroize_derive" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] [[package]] name = "zerotrie" @@ -3577,12 +3785,23 @@ version = "4.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "caa8cd6af31c3b31c6631b8f483848b91589021b28fffe50adada48d4f4d2ed1" dependencies = [ + "aes", "arbitrary", "bzip2", + "constant_time_eq", "crc32fast", + "deflate64", "flate2", + "getrandom 0.3.3", + "hmac", "indexmap", + "liblzma", "memchr", + "pbkdf2", + "ppmd-rust", + "sha1", + "time", + "zeroize", "zopfli", "zstd", ] diff --git a/temporalio/bridge/runtime.py b/temporalio/bridge/runtime.py index afc79f0f5..8f00d3c30 100644 --- a/temporalio/bridge/runtime.py +++ b/temporalio/bridge/runtime.py @@ -23,9 +23,9 @@ def _raise_in_thread(thread_id: int, exc_type: Type[BaseException]) -> bool: thread_id, exc_type ) - def __init__(self, *, telemetry: TelemetryConfig) -> None: + def __init__(self, *, options: RuntimeOptions) -> None: """Create SDK Core runtime.""" - self._ref = temporalio.bridge.temporal_sdk_bridge.init_runtime(telemetry) + self._ref = temporalio.bridge.temporal_sdk_bridge.init_runtime(options) def retrieve_buffered_metrics(self, durations_as_seconds: bool) -> Sequence[Any]: """Get buffered metrics.""" @@ -91,6 +91,14 @@ class TelemetryConfig: metrics: Optional[MetricsConfig] +@dataclass(frozen=True) +class RuntimeOptions: + """Python representation of the Rust struct for runtime options.""" + + telemetry: TelemetryConfig + worker_heartbeat_interval_millis: Optional[int] = None + + # WARNING: This must match Rust runtime::BufferedLogEntry class BufferedLogEntry(Protocol): """A buffered log entry.""" diff --git a/temporalio/bridge/sdk-core b/temporalio/bridge/sdk-core index 682d441dd..eb74962b5 160000 --- a/temporalio/bridge/sdk-core +++ b/temporalio/bridge/sdk-core @@ -1 +1 @@ -Subproject commit 682d441dd3b830e1477af3edb7c2330b403c4c33 +Subproject commit eb74962b5e4ed71b046c81f1cba35b2b9080a127 diff --git a/temporalio/bridge/src/lib.rs b/temporalio/bridge/src/lib.rs index cbd5be10e..ee157fb18 100644 --- a/temporalio/bridge/src/lib.rs +++ b/temporalio/bridge/src/lib.rs @@ -82,8 +82,8 @@ fn new_metric_meter(runtime_ref: &runtime::RuntimeRef) -> Option PyResult { - runtime::init_runtime(telemetry_config) +fn init_runtime(options: runtime::RuntimeOptions) -> PyResult { + runtime::init_runtime(options) } #[pyfunction] diff --git a/temporalio/bridge/src/runtime.rs b/temporalio/bridge/src/runtime.rs index 72cc905ae..a33d91946 100644 --- a/temporalio/bridge/src/runtime.rs +++ b/temporalio/bridge/src/runtime.rs @@ -13,7 +13,7 @@ use temporal_sdk_core::telemetry::{ build_otlp_metric_exporter, start_prometheus_metric_exporter, CoreLogStreamConsumer, MetricsCallBuffer, }; -use temporal_sdk_core::{CoreRuntime, TokioRuntimeBuilder}; +use temporal_sdk_core::{CoreRuntime, RuntimeOptionsBuilder, TokioRuntimeBuilder}; use temporal_sdk_core_api::telemetry::metrics::{CoreMeter, MetricCallBufferer}; use temporal_sdk_core_api::telemetry::{ CoreLog, Logger, MetricTemporality, OtelCollectorOptionsBuilder, OtlpProtocol, @@ -86,16 +86,27 @@ pub struct PrometheusConfig { histogram_bucket_overrides: Option>>, } +#[derive(FromPyObject)] +pub struct RuntimeOptions { + telemetry: TelemetryConfig, + worker_heartbeat_interval_millis: Option, +} + const FORWARD_LOG_BUFFER_SIZE: usize = 2048; const FORWARD_LOG_MAX_FREQ_MS: u64 = 10; -pub fn init_runtime(telemetry_config: TelemetryConfig) -> PyResult { +pub fn init_runtime(options: RuntimeOptions) -> PyResult { + let RuntimeOptions { + telemetry: TelemetryConfig { logging, metrics }, + worker_heartbeat_interval_millis, + } = options; + // Have to build/start telemetry config pieces let mut telemetry_build = TelemetryOptionsBuilder::default(); // Build logging config, capturing forwarding info to start later let mut log_forwarding: Option<(Receiver, PyObject)> = None; - if let Some(logging_conf) = telemetry_config.logging { + if let Some(logging_conf) = logging { telemetry_build.logging(if let Some(forward_to) = logging_conf.forward_to { // Note, actual log forwarding is started later let (consumer, stream) = CoreLogStreamConsumer::new(FORWARD_LOG_BUFFER_SIZE); @@ -113,26 +124,39 @@ pub fn init_runtime(telemetry_config: TelemetryConfig) -> PyResult { // Build metric config, but actual metrics instance is late-bound after // CoreRuntime is created since it needs Tokio runtime - if let Some(metrics_conf) = telemetry_config.metrics.as_ref() { - telemetry_build.attach_service_name(metrics_conf.attach_service_name); - if let Some(prefix) = &metrics_conf.metric_prefix { + let mut metrics_conf = metrics; + if let Some(metrics_conf_ref) = metrics_conf.as_ref() { + telemetry_build.attach_service_name(metrics_conf_ref.attach_service_name); + if let Some(prefix) = &metrics_conf_ref.metric_prefix { telemetry_build.metric_prefix(prefix.to_string()); } } - // Create core runtime which starts tokio multi-thread runtime - let mut core = CoreRuntime::new( + let mut runtime_options_build = RuntimeOptionsBuilder::default(); + runtime_options_build.telemetry_options( telemetry_build .build() .map_err(|err| PyValueError::new_err(format!("Invalid telemetry config: {err}")))?, - TokioRuntimeBuilder::default(), - ) - .map_err(|err| PyRuntimeError::new_err(format!("Failed initializing telemetry: {err}")))?; + ); + + if let Some(ms) = worker_heartbeat_interval_millis { + runtime_options_build.heartbeat_interval(Some(Duration::from_millis(ms))); + } else { + runtime_options_build.heartbeat_interval(None); + } + + let runtime_options = runtime_options_build + .build() + .map_err(|err| PyValueError::new_err(format!("Invalid runtime options: {err}")))?; + + // Create core runtime which starts tokio multi-thread runtime + let mut core = CoreRuntime::new(runtime_options, TokioRuntimeBuilder::default()) + .map_err(|err| PyRuntimeError::new_err(format!("Failed initializing telemetry: {err}")))?; // We late-bind the metrics after core runtime is created since it needs // the Tokio handle let mut metrics_call_buffer: Option>> = None; - if let Some(metrics_conf) = telemetry_config.metrics { + if let Some(metrics_conf) = metrics_conf.take() { let _guard = core.tokio_handle().enter(); // If they want buffered, cannot have Prom/OTel and we make buffered if metrics_conf.buffered_with_size > 0 { diff --git a/temporalio/bridge/src/worker.rs b/temporalio/bridge/src/worker.rs index c5cf46453..191149d0d 100644 --- a/temporalio/bridge/src/worker.rs +++ b/temporalio/bridge/src/worker.rs @@ -14,9 +14,8 @@ use temporal_sdk_core::api::errors::PollError; use temporal_sdk_core::replay::{HistoryForReplay, ReplayWorkerInput}; use temporal_sdk_core_api::errors::WorkflowErrorType; use temporal_sdk_core_api::worker::{ - PluginInfo as CorePluginInfo, SlotInfo, SlotInfoTrait, SlotKind, SlotKindType, - SlotMarkUsedContext, SlotReleaseContext, SlotReservationContext, - SlotSupplier as SlotSupplierTrait, SlotSupplierPermit, + SlotInfo, SlotInfoTrait, SlotKind, SlotKindType, SlotMarkUsedContext, SlotReleaseContext, + SlotReservationContext, SlotSupplier as SlotSupplierTrait, SlotSupplierPermit, }; use temporal_sdk_core_api::Worker; use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion; @@ -24,6 +23,7 @@ use temporal_sdk_core_protos::coresdk::{ nexus::NexusTaskCompletion, ActivityHeartbeat, ActivityTaskCompletion, }; use temporal_sdk_core_protos::temporal::api::history::v1::History; +use temporal_sdk_core_protos::temporal::api::worker::v1::PluginInfo; use tokio::sync::mpsc::{channel, Sender}; use tokio_stream::wrappers::ReceiverStream; use tracing::error; @@ -65,6 +65,7 @@ pub struct WorkerConfig { nondeterminism_as_workflow_fail_for_types: HashSet, nexus_task_poller_behavior: PollerBehavior, plugins: Vec, + skip_client_worker_set_check: bool, } #[derive(FromPyObject)] @@ -727,12 +728,13 @@ fn convert_worker_config( .plugins( conf.plugins .into_iter() - .map(|name| CorePluginInfo { + .map(|name| PluginInfo { name, - version: None, + version: String::new(), }) - .collect(), + .collect::>(), ) + .skip_client_worker_set_check(conf.skip_client_worker_set_check) .build() .map_err(|err| PyValueError::new_err(format!("Invalid worker config: {err}"))) } diff --git a/temporalio/bridge/worker.py b/temporalio/bridge/worker.py index fbcfc9c3c..d82f07b14 100644 --- a/temporalio/bridge/worker.py +++ b/temporalio/bridge/worker.py @@ -65,6 +65,7 @@ class WorkerConfig: nondeterminism_as_workflow_fail_for_types: Set[str] nexus_task_poller_behavior: PollerBehavior plugins: Sequence[str] + skip_client_worker_set_check: bool @dataclass diff --git a/temporalio/runtime.py b/temporalio/runtime.py index 84b683941..8a9a57da4 100644 --- a/temporalio/runtime.py +++ b/temporalio/runtime.py @@ -70,13 +70,37 @@ def set_default(runtime: Runtime, *, error_if_already_set: bool = True) -> None: raise RuntimeError("Runtime default already set") _default_runtime = runtime - def __init__(self, *, telemetry: TelemetryConfig) -> None: - """Create a default runtime with the given telemetry config. + def __init__( + self, + *, + telemetry: Optional[TelemetryConfig] = None, + runtime_options: Optional["RuntimeOptions"] = None, + ) -> None: + """Create a runtime with the provided configuration. Each new runtime creates a new internal thread pool, so use sparingly. + + Args: + telemetry: Telemetry configuration when not supplying + ``runtime_options``. + runtime_options: Full runtime configuration including telemetry and + worker heartbeating options. + + Raises: + ValueError: If both ``telemetry`` and ``runtime_options`` are + provided. """ + if runtime_options and telemetry: + raise ValueError("Cannot supply both telemetry and runtime_options") + + if runtime_options is None: + telemetry = telemetry or TelemetryConfig() + runtime_options = RuntimeOptions(telemetry=telemetry) + else: + telemetry = runtime_options.telemetry + self._core_runtime = temporalio.bridge.runtime.Runtime( - telemetry=telemetry._to_bridge_config() + options=runtime_options._to_bridge_config() ) if isinstance(telemetry.metrics, MetricBuffer): telemetry.metrics._runtime = self @@ -391,6 +415,34 @@ def _to_bridge_config(self) -> temporalio.bridge.runtime.TelemetryConfig: ) +@dataclass(frozen=True) +class RuntimeOptions: + """Configuration for runtime initialization.""" + + telemetry: TelemetryConfig = field(default_factory=TelemetryConfig) + """Telemetry configuration applied to the runtime.""" + + worker_heartbeat_interval: Optional[timedelta] = None + """Interval for worker heartbeats. ``None`` disables heartbeating.""" + + def _to_bridge_config(self) -> temporalio.bridge.runtime.RuntimeOptions: + heartbeat_millis: Optional[int] + if self.worker_heartbeat_interval is None: + heartbeat_millis = None + else: + if self.worker_heartbeat_interval <= timedelta(0): + raise ValueError("worker_heartbeat_interval must be positive") + heartbeat_millis = int( + self.worker_heartbeat_interval.total_seconds() * 1000 + ) + if heartbeat_millis == 0: + heartbeat_millis = 1 + return temporalio.bridge.runtime.RuntimeOptions( + telemetry=self.telemetry._to_bridge_config(), + worker_heartbeat_interval_millis=heartbeat_millis, + ) + + BufferedMetricKind = NewType("BufferedMetricKind", int) """Representation of a buffered metric kind.""" diff --git a/temporalio/worker/_replayer.py b/temporalio/worker/_replayer.py index 0cebb8fa6..aa44c6396 100644 --- a/temporalio/worker/_replayer.py +++ b/temporalio/worker/_replayer.py @@ -295,6 +295,7 @@ def on_eviction_hook( 1 ), plugins=self._config.get("plugins", []), + skip_client_worker_set_check=True, ), ) # Start worker diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index 42cfbf355..1bf73d9fe 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -148,6 +148,7 @@ def __init__( nexus_task_poller_behavior: PollerBehavior = PollerBehaviorSimpleMaximum( maximum=5 ), + skip_client_worker_set_check: bool = False, ) -> None: """Create a worker to process workflows and/or activities. @@ -322,6 +323,9 @@ def __init__( Defaults to a 5-poller maximum. nexus_task_poller_behavior: Specify the behavior of Nexus task polling. Defaults to a 5-poller maximum. + skip_client_worker_set_check: Skip the runtime validation that ensures + the client is registered with the worker set. This should only be + used in tests. """ config = WorkerConfig( client=client, @@ -364,6 +368,7 @@ def __init__( workflow_task_poller_behavior=workflow_task_poller_behavior, activity_task_poller_behavior=activity_task_poller_behavior, nexus_task_poller_behavior=nexus_task_poller_behavior, + skip_client_worker_set_check=skip_client_worker_set_check, ) plugins_from_client = cast( @@ -390,6 +395,8 @@ def _init_from_config(self, client: temporalio.client.Client, config: WorkerConf """ self._config = config + config.setdefault("skip_client_worker_set_check", False) + if not ( config["activities"] or config["nexus_service_handlers"] @@ -611,6 +618,7 @@ def check_activity(activity): "nexus_task_poller_behavior" ]._to_bridge(), plugins=config.get("plugins", []), + skip_client_worker_set_check=config["skip_client_worker_set_check"], ), ) @@ -906,6 +914,7 @@ class WorkerConfig(TypedDict, total=False): activity_task_poller_behavior: PollerBehavior nexus_task_poller_behavior: PollerBehavior plugins: Sequence[str] + skip_client_worker_set_check: bool def _warn_if_activity_executor_max_workers_is_inconsistent( diff --git a/tests/conftest.py b/tests/conftest.py index 0ee203fb6..737dac7c0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -6,6 +6,8 @@ import pytest import pytest_asyncio +import temporalio.worker + from . import DEV_SERVER_DOWNLOAD_VERSION # If there is an integration test environment variable set, we must remove the @@ -57,6 +59,18 @@ def pytest_addoption(parser): ) +@pytest.fixture(autouse=True) +def _force_worker_skip_client_set(monkeypatch): + original_init = temporalio.worker.Worker.__init__ + + def patched_init(self, *args, **kwargs): + kwargs.setdefault("skip_client_worker_set_check", True) + return original_init(self, *args, **kwargs) + + monkeypatch.setattr(temporalio.worker.Worker, "__init__", patched_init) + yield + + @pytest.fixture(scope="session") def event_loop(): loop = asyncio.get_event_loop_policy().new_event_loop() diff --git a/tests/test_runtime.py b/tests/test_runtime.py index 4505ebfcf..330b45881 100644 --- a/tests/test_runtime.py +++ b/tests/test_runtime.py @@ -7,6 +7,8 @@ from typing import List, cast from urllib.request import urlopen +import pytest + from temporalio import workflow from temporalio.client import Client from temporalio.runtime import ( @@ -14,6 +16,7 @@ LoggingConfig, PrometheusConfig, Runtime, + RuntimeOptions, TelemetryConfig, TelemetryFilter, ) @@ -254,3 +257,17 @@ async def check_metrics() -> None: # Wait for metrics to appear and match the expected buckets await assert_eventually(check_metrics) + + +def test_runtime_options_to_bridge_config() -> None: + assert RuntimeOptions()._to_bridge_config().worker_heartbeat_interval_millis is None + options = RuntimeOptions(worker_heartbeat_interval=timedelta(seconds=30)) + bridge_config = options._to_bridge_config() + assert bridge_config.worker_heartbeat_interval_millis == 30_000 + + +def test_runtime_options_invalid_heartbeat() -> None: + with pytest.raises(ValueError): + RuntimeOptions( + worker_heartbeat_interval=timedelta(seconds=-5) + )._to_bridge_config() diff --git a/tests/worker/test_activity.py b/tests/worker/test_activity.py index eba7df21f..2fdb0e7c9 100644 --- a/tests/worker/test_activity.py +++ b/tests/worker/test_activity.py @@ -1374,6 +1374,8 @@ async def _execute_workflow_with_activity( worker_config["task_queue"] = str(uuid.uuid4()) worker_config["activities"] = [fn] + additional_activities worker_config["shared_state_manager"] = _default_shared_state_manager + worker_config["skip_client_worker_set_check"] = True + print("worker_config[skip_client_worker_set_check] = True\n") if not worker_config.get("max_concurrent_activities"): worker_config["max_concurrent_activities"] = default_max_concurrent_activities async with Worker(**worker_config): From 76ec5484fb9530375ae58aa043d1c6c0c2c30e2a Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Tue, 14 Oct 2025 10:26:21 -0700 Subject: [PATCH 03/12] New test passing plugin names to core --- temporalio/bridge/sdk-core | 2 +- temporalio/bridge/src/runtime.rs | 9 ++++----- temporalio/bridge/src/worker.rs | 5 +++-- temporalio/worker/_worker.py | 2 -- tests/test_plugins.py | 30 ++++++++++++++++++++++++++++++ tests/worker/test_activity.py | 1 - 6 files changed, 38 insertions(+), 11 deletions(-) diff --git a/temporalio/bridge/sdk-core b/temporalio/bridge/sdk-core index eb74962b5..401a38506 160000 --- a/temporalio/bridge/sdk-core +++ b/temporalio/bridge/sdk-core @@ -1 +1 @@ -Subproject commit eb74962b5e4ed71b046c81f1cba35b2b9080a127 +Subproject commit 401a38506b8b5780c8c6dd87016c825b05c2a70e diff --git a/temporalio/bridge/src/runtime.rs b/temporalio/bridge/src/runtime.rs index a33d91946..4d6ec90fb 100644 --- a/temporalio/bridge/src/runtime.rs +++ b/temporalio/bridge/src/runtime.rs @@ -124,10 +124,9 @@ pub fn init_runtime(options: RuntimeOptions) -> PyResult { // Build metric config, but actual metrics instance is late-bound after // CoreRuntime is created since it needs Tokio runtime - let mut metrics_conf = metrics; - if let Some(metrics_conf_ref) = metrics_conf.as_ref() { - telemetry_build.attach_service_name(metrics_conf_ref.attach_service_name); - if let Some(prefix) = &metrics_conf_ref.metric_prefix { + if let Some(metrics_conf) = metrics.as_ref() { + telemetry_build.attach_service_name(metrics_conf.attach_service_name); + if let Some(prefix) = &metrics_conf.metric_prefix { telemetry_build.metric_prefix(prefix.to_string()); } } @@ -156,7 +155,7 @@ pub fn init_runtime(options: RuntimeOptions) -> PyResult { // We late-bind the metrics after core runtime is created since it needs // the Tokio handle let mut metrics_call_buffer: Option>> = None; - if let Some(metrics_conf) = metrics_conf.take() { + if let Some(metrics_conf) = metrics { let _guard = core.tokio_handle().enter(); // If they want buffered, cannot have Prom/OTel and we make buffered if metrics_conf.buffered_with_size > 0 { diff --git a/temporalio/bridge/src/worker.rs b/temporalio/bridge/src/worker.rs index 191149d0d..0a5deb928 100644 --- a/temporalio/bridge/src/worker.rs +++ b/temporalio/bridge/src/worker.rs @@ -647,11 +647,12 @@ impl WorkerRef { Ok(()) } - fn replace_client(&self, client: &client::ClientRef) { + fn replace_client(&self, client: &client::ClientRef) -> PyResult<()> { self.worker .as_ref() .expect("missing worker") - .replace_client(client.retry_client.clone().into_inner()); + .replace_client(client.retry_client.clone().into_inner()) + .map_err(|err| PyValueError::new_err(format!("Failed replacing client: {err}"))) } fn initiate_shutdown(&self) -> PyResult<()> { diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index 1bf73d9fe..54b1ad37e 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -395,8 +395,6 @@ def _init_from_config(self, client: temporalio.client.Client, config: WorkerConf """ self._config = config - config.setdefault("skip_client_worker_set_check", False) - if not ( config["activities"] or config["nexus_service_handlers"] diff --git a/tests/test_plugins.py b/tests/test_plugins.py index adde010da..ab2077064 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -9,6 +9,7 @@ import temporalio.client import temporalio.converter import temporalio.worker +import temporalio.bridge.temporal_sdk_bridge from temporalio import workflow from temporalio.client import Client, ClientConfig, OutboundInterceptor, WorkflowHistory from temporalio.contrib.pydantic import pydantic_data_converter @@ -173,6 +174,35 @@ async def test_worker_plugin_basic_config(client: Client) -> None: ] +async def test_worker_plugin_names_forwarded_to_core( + client: Client, monkeypatch: pytest.MonkeyPatch +) -> None: + captured_plugins: list[str] = [] + + original_new_worker = temporalio.bridge.temporal_sdk_bridge.new_worker + + def new_worker_wrapper(runtime_ref, client_ref, config): + nonlocal captured_plugins + captured_plugins = list(config.plugins) + return original_new_worker(runtime_ref, client_ref, config) + + monkeypatch.setattr( + temporalio.bridge.temporal_sdk_bridge, + "new_worker", + new_worker_wrapper, + ) + + plugin1 = SimplePlugin("test-worker-plugin1") + plugin2 = SimplePlugin("test-worker-plugin2") + worker = Worker( + client, + task_queue="queue", + activities=[never_run_activity], + plugins=[plugin1, plugin2], + ) + assert captured_plugins == [plugin1.name(), plugin2.name()] + + async def test_worker_duplicated_plugin(client: Client) -> None: new_config = client.config() new_config["plugins"] = [MyCombinedPlugin()] diff --git a/tests/worker/test_activity.py b/tests/worker/test_activity.py index 2fdb0e7c9..d91975f61 100644 --- a/tests/worker/test_activity.py +++ b/tests/worker/test_activity.py @@ -1375,7 +1375,6 @@ async def _execute_workflow_with_activity( worker_config["activities"] = [fn] + additional_activities worker_config["shared_state_manager"] = _default_shared_state_manager worker_config["skip_client_worker_set_check"] = True - print("worker_config[skip_client_worker_set_check] = True\n") if not worker_config.get("max_concurrent_activities"): worker_config["max_concurrent_activities"] = default_max_concurrent_activities async with Worker(**worker_config): From f684a05df4c6445a58ad072b9a54cd1218eec282 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Fri, 17 Oct 2025 13:26:02 -0700 Subject: [PATCH 04/12] lint --- temporalio/bridge/runtime.py | 2 +- temporalio/runtime.py | 2 +- tests/test_plugins.py | 2 +- tests/worker/test_worker.py | 8 +++++++- 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/temporalio/bridge/runtime.py b/temporalio/bridge/runtime.py index 8f00d3c30..a06f9686c 100644 --- a/temporalio/bridge/runtime.py +++ b/temporalio/bridge/runtime.py @@ -96,7 +96,7 @@ class RuntimeOptions: """Python representation of the Rust struct for runtime options.""" telemetry: TelemetryConfig - worker_heartbeat_interval_millis: Optional[int] = None + worker_heartbeat_interval_millis: Optional[int] = 30000 # 30s # WARNING: This must match Rust runtime::BufferedLogEntry diff --git a/temporalio/runtime.py b/temporalio/runtime.py index 8a9a57da4..87c93720d 100644 --- a/temporalio/runtime.py +++ b/temporalio/runtime.py @@ -422,7 +422,7 @@ class RuntimeOptions: telemetry: TelemetryConfig = field(default_factory=TelemetryConfig) """Telemetry configuration applied to the runtime.""" - worker_heartbeat_interval: Optional[timedelta] = None + worker_heartbeat_interval: Optional[timedelta] = timedelta(seconds=30) """Interval for worker heartbeats. ``None`` disables heartbeating.""" def _to_bridge_config(self) -> temporalio.bridge.runtime.RuntimeOptions: diff --git a/tests/test_plugins.py b/tests/test_plugins.py index ab2077064..651f197aa 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -6,10 +6,10 @@ import pytest +import temporalio.bridge.temporal_sdk_bridge import temporalio.client import temporalio.converter import temporalio.worker -import temporalio.bridge.temporal_sdk_bridge from temporalio import workflow from temporalio.client import Client, ClientConfig, OutboundInterceptor, WorkflowHistory from temporalio.contrib.pydantic import pydantic_data_converter diff --git a/tests/worker/test_worker.py b/tests/worker/test_worker.py index 3bb16af8e..5ec55ea5d 100644 --- a/tests/worker/test_worker.py +++ b/tests/worker/test_worker.py @@ -16,6 +16,7 @@ from temporalio.api.workflowservice.v1 import ( DescribeWorkerDeploymentRequest, DescribeWorkerDeploymentResponse, + ListWorkersRequest, SetWorkerDeploymentCurrentVersionRequest, SetWorkerDeploymentCurrentVersionResponse, SetWorkerDeploymentRampingVersionRequest, @@ -27,7 +28,12 @@ TaskReachabilityType, ) from temporalio.common import PinnedVersioningOverride, RawValue, VersioningBehavior -from temporalio.runtime import PrometheusConfig, Runtime, TelemetryConfig +from temporalio.runtime import ( + PrometheusConfig, + Runtime, + RuntimeOptions, + TelemetryConfig, +) from temporalio.service import RPCError from temporalio.testing import WorkflowEnvironment from temporalio.worker import ( From 2484ef7b5477a76718a0dbb04e570c834a52ff36 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Mon, 20 Oct 2025 15:42:03 -0700 Subject: [PATCH 05/12] Fix and simplify plumbing plugins from WorkerConfig to core, impose runtime client identity requirement --- temporalio/bridge/Cargo.lock | 345 +++++++++----------------------- temporalio/bridge/Cargo.toml | 4 +- temporalio/bridge/runtime.py | 2 +- temporalio/bridge/sdk-core | 2 +- temporalio/bridge/src/client.rs | 9 +- temporalio/worker/_replayer.py | 4 +- temporalio/worker/_worker.py | 7 +- tests/test_plugins.py | 7 +- tests/test_runtime.py | 12 +- tests/worker/test_worker.py | 2 +- tests/worker/test_workflow.py | 64 ++---- 11 files changed, 130 insertions(+), 328 deletions(-) diff --git a/temporalio/bridge/Cargo.lock b/temporalio/bridge/Cargo.lock index 764d8c792..b9046647e 100644 --- a/temporalio/bridge/Cargo.lock +++ b/temporalio/bridge/Cargo.lock @@ -17,17 +17,6 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" -[[package]] -name = "aes" -version = "0.8.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" -dependencies = [ - "cfg-if", - "cipher", - "cpufeatures", -] - [[package]] name = "aho-corasick" version = "1.1.3" @@ -170,15 +159,6 @@ version = "2.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a65b545ab31d687cff52899d4890855fec459eb6afe0da6417b8a18da87aa29" -[[package]] -name = "block-buffer" -version = "0.10.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" -dependencies = [ - "generic-array", -] - [[package]] name = "bumpalo" version = "3.19.0" @@ -233,22 +213,6 @@ dependencies = [ "serde", ] -[[package]] -name = "cipher" -version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" -dependencies = [ - "crypto-common", - "inout", -] - -[[package]] -name = "constant_time_eq" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6" - [[package]] name = "core-foundation" version = "0.10.1" @@ -265,15 +229,6 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" -[[package]] -name = "cpufeatures" -version = "0.2.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" -dependencies = [ - "libc", -] - [[package]] name = "crc32fast" version = "1.5.0" @@ -307,16 +262,6 @@ version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" -[[package]] -name = "crypto-common" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" -dependencies = [ - "generic-array", - "typenum", -] - [[package]] name = "darling" version = "0.20.11" @@ -366,21 +311,6 @@ dependencies = [ "parking_lot_core", ] -[[package]] -name = "deflate64" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26bf8fc351c5ed29b5c2f0cbbac1b209b74f60ecd62e675a998df72c49af5204" - -[[package]] -name = "deranged" -version = "0.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a41953f86f8a05768a6cda24def994fd2f424b04ec5c719cf89989779f199071" -dependencies = [ - "powerfmt", -] - [[package]] name = "derive_arbitrary" version = "1.4.2" @@ -444,17 +374,6 @@ dependencies = [ "unicode-xid", ] -[[package]] -name = "digest" -version = "0.10.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" -dependencies = [ - "block-buffer", - "crypto-common", - "subtle", -] - [[package]] name = "dirs" version = "6.0.0" @@ -493,6 +412,12 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" +[[package]] +name = "dyn-clone" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" + [[package]] name = "either" version = "1.15.0" @@ -725,16 +650,6 @@ dependencies = [ "slab", ] -[[package]] -name = "generic-array" -version = "0.14.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bb6743198531e02858aeaea5398fcc883e71851fcbcb5a2f773e2fb6cb1edf2" -dependencies = [ - "typenum", - "version_check", -] - [[package]] name = "gethostname" version = "1.0.2" @@ -843,15 +758,6 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" -[[package]] -name = "hmac" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" -dependencies = [ - "digest", -] - [[package]] name = "http" version = "1.3.1" @@ -1104,15 +1010,6 @@ version = "2.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f4c7245a08504955605670dbf141fceab975f15ca21570696aebe9d2e71576bd" -[[package]] -name = "inout" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "879f10e63c20629ecabbb64a8010319738c66a5cd0c29b02d63d272b03751d01" -dependencies = [ - "generic-array", -] - [[package]] name = "instant" version = "0.1.13" @@ -1211,26 +1108,6 @@ version = "0.2.175" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a82ae493e598baaea5209805c49bbf2ea7de956d50d7da0da1164f9c6d28543" -[[package]] -name = "liblzma" -version = "0.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73c36d08cad03a3fbe2c4e7bb3a9e84c57e4ee4135ed0b065cade3d98480c648" -dependencies = [ - "liblzma-sys", -] - -[[package]] -name = "liblzma-sys" -version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01b9596486f6d60c3bbe644c0e1be1aa6ccc472ad630fe8927b456973d7cb736" -dependencies = [ - "cc", - "libc", - "pkg-config", -] - [[package]] name = "libredox" version = "0.1.9" @@ -1406,12 +1283,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "num-conv" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" - [[package]] name = "num-traits" version = "0.2.19" @@ -1463,9 +1334,9 @@ checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" [[package]] name = "opentelemetry" -version = "0.30.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aaf416e4cb72756655126f7dd7bb0af49c674f4c1b9903e80c009e0c37e552e6" +checksum = "b84bcd6ae87133e903af7ef497404dda70c60d0ea14895fc8a5e6722754fc2a0" dependencies = [ "futures-core", "futures-sink", @@ -1477,9 +1348,9 @@ dependencies = [ [[package]] name = "opentelemetry-http" -version = "0.30.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50f6639e842a97dbea8886e3439710ae463120091e2e064518ba8e716e6ac36d" +checksum = "d7a6d09a73194e6b66df7c8f1b680f156d916a1a942abf2de06823dd02b7855d" dependencies = [ "async-trait", "bytes", @@ -1490,9 +1361,9 @@ dependencies = [ [[package]] name = "opentelemetry-otlp" -version = "0.30.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbee664a43e07615731afc539ca60c6d9f1a9425e25ca09c57bc36c87c55852b" +checksum = "7a2366db2dca4d2ad033cad11e6ee42844fd727007af5ad04a1730f4cb8163bf" dependencies = [ "http", "opentelemetry", @@ -1509,21 +1380,22 @@ dependencies = [ [[package]] name = "opentelemetry-proto" -version = "0.30.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e046fd7660710fe5a05e8748e70d9058dc15c94ba914e7c4faa7c728f0e8ddc" +checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f" dependencies = [ "opentelemetry", "opentelemetry_sdk", "prost", "tonic", + "tonic-prost", ] [[package]] name = "opentelemetry_sdk" -version = "0.30.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11f644aa9e5e31d11896e024305d7e3c98a88884d9f8919dbf37a9991bc47a4b" +checksum = "e14ae4f5991976fd48df6d843de219ca6d31b01daaab2dad5af2badeded372bd" dependencies = [ "futures-channel", "futures-executor", @@ -1531,7 +1403,6 @@ dependencies = [ "opentelemetry", "percent-encoding", "rand 0.9.2", - "serde_json", "thiserror 2.0.15", "tokio", "tokio-stream", @@ -1566,16 +1437,6 @@ dependencies = [ "windows-targets 0.52.6", ] -[[package]] -name = "pbkdf2" -version = "0.12.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8ed6a7761f76e3b9f92dfb0a60a6a6477c61024b775147ff0973a02653abaf2" -dependencies = [ - "digest", - "hmac", -] - [[package]] name = "percent-encoding" version = "2.3.1" @@ -1663,18 +1524,6 @@ dependencies = [ "zerovec", ] -[[package]] -name = "powerfmt" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" - -[[package]] -name = "ppmd-rust" -version = "1.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c834641d8ad1b348c9ee86dec3b9840d805acd5f24daa5f90c788951a52ff59b" - [[package]] name = "ppv-lite86" version = "0.2.21" @@ -1746,9 +1595,9 @@ dependencies = [ [[package]] name = "prost" -version = "0.13.5" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +checksum = "7231bd9b3d3d33c86b58adbac74b5ec0ad9f496b19d22801d773636feaa95f3d" dependencies = [ "bytes", "prost-derive", @@ -1756,9 +1605,9 @@ dependencies = [ [[package]] name = "prost-build" -version = "0.13.5" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" +checksum = "ac6c3320f9abac597dcbc668774ef006702672474aad53c6d596b62e487b40b1" dependencies = [ "heck", "itertools", @@ -1769,6 +1618,8 @@ dependencies = [ "prettyplease", "prost", "prost-types", + "pulldown-cmark", + "pulldown-cmark-to-cmark", "regex", "syn", "tempfile", @@ -1776,9 +1627,9 @@ dependencies = [ [[package]] name = "prost-derive" -version = "0.13.5" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425" dependencies = [ "anyhow", "itertools", @@ -1789,18 +1640,18 @@ dependencies = [ [[package]] name = "prost-types" -version = "0.13.5" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" +checksum = "b9b4db3d6da204ed77bb26ba83b6122a73aeb2e87e25fbf7ad2e84c4ccbf8f72" dependencies = [ "prost", ] [[package]] name = "prost-wkt" -version = "0.6.1" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "497e1e938f0c09ef9cabe1d49437b4016e03e8f82fbbe5d1c62a9b61b9decae1" +checksum = "655944d0ce015e71b3ec21279437e6a09e58433e50c7b0677901f3d5235e74f5" dependencies = [ "chrono", "inventory", @@ -1813,9 +1664,9 @@ dependencies = [ [[package]] name = "prost-wkt-build" -version = "0.6.1" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07b8bf115b70a7aa5af1fd5d6e9418492e9ccb6e4785e858c938e28d132a884b" +checksum = "f869f1443fee474b785e935d92e1007f57443e485f51668ed41943fc01a321a2" dependencies = [ "heck", "prost", @@ -1826,9 +1677,9 @@ dependencies = [ [[package]] name = "prost-wkt-types" -version = "0.6.1" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8cdde6df0a98311c839392ca2f2f0bcecd545f86a62b4e3c6a49c336e970fe5" +checksum = "eeeffd6b9becd4600dd461399f3f71aeda2ff0848802a9ed526cf12e8f42902a" dependencies = [ "chrono", "prost", @@ -1862,6 +1713,26 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "pulldown-cmark" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e8bbe1a966bd2f362681a44f6edce3c2310ac21e4d5067a6e7ec396297a6ea0" +dependencies = [ + "bitflags", + "memchr", + "unicase", +] + +[[package]] +name = "pulldown-cmark-to-cmark" +version = "21.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5b6a0769a491a08b31ea5c62494a8f144ee0987d86d670a8af4df1e1b7cde75" +dependencies = [ + "pulldown-cmark", +] + [[package]] name = "pyo3" version = "0.25.1" @@ -2417,17 +2288,6 @@ dependencies = [ "serde", ] -[[package]] -name = "sha1" -version = "0.10.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" -dependencies = [ - "cfg-if", - "cpufeatures", - "digest", -] - [[package]] name = "sharded-slab" version = "0.1.7" @@ -2618,6 +2478,7 @@ dependencies = [ "bytes", "derive_builder", "derive_more", + "dyn-clone", "futures-retry", "futures-util", "http", @@ -2748,16 +2609,15 @@ dependencies = [ "base64", "derive_more", "prost", - "prost-build", "prost-wkt", - "prost-wkt-build", "prost-wkt-types", "rand 0.9.2", "serde", "serde_json", "thiserror 2.0.15", "tonic", - "tonic-build", + "tonic-prost", + "tonic-prost-build", "uuid", ] @@ -2816,25 +2676,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "time" -version = "0.3.44" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91e7d9e3bb61134e77bde20dd4825b97c010155709965fedf0f49bb138e52a9d" -dependencies = [ - "deranged", - "num-conv", - "powerfmt", - "serde", - "time-core", -] - -[[package]] -name = "time-core" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40868e7c1d2f0b8d73e4a8c7f0ff63af4f6d19be117e90bd73eb1d62cf831c6b" - [[package]] name = "tinystr" version = "0.8.1" @@ -2966,9 +2807,9 @@ checksum = "fcc842091f2def52017664b53082ecbbeb5c7731092bad69d2c63050401dfd64" [[package]] name = "tonic" -version = "0.13.1" +version = "0.14.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9" +checksum = "eb7613188ce9f7df5bfe185db26c5814347d110db17920415cf2fbcad85e7203" dependencies = [ "async-trait", "axum", @@ -2983,9 +2824,9 @@ dependencies = [ "hyper-util", "percent-encoding", "pin-project", - "prost", "rustls-native-certs", - "socket2 0.5.10", + "socket2 0.6.0", + "sync_wrapper", "tokio", "tokio-rustls", "tokio-stream", @@ -2997,9 +2838,32 @@ dependencies = [ [[package]] name = "tonic-build" -version = "0.13.1" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c40aaccc9f9eccf2cd82ebc111adc13030d23e887244bc9cfa5d1d636049de3" +dependencies = [ + "prettyplease", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tonic-prost" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66bd50ad6ce1252d87ef024b3d64fe4c3cf54a86fb9ef4c631fdd0ded7aeaa67" +dependencies = [ + "bytes", + "prost", + "tonic", +] + +[[package]] +name = "tonic-prost-build" +version = "0.14.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac6f67be712d12f0b41328db3137e0d0757645d8904b4cb7d51cd9c2279e847" +checksum = "b4a16cba4043dc3ff43fcb3f96b4c5c154c64cbd18ca8dce2ab2c6a451d058a2" dependencies = [ "prettyplease", "proc-macro2", @@ -3007,6 +2871,8 @@ dependencies = [ "prost-types", "quote", "syn", + "tempfile", + "tonic-build", ] [[package]] @@ -3119,12 +2985,6 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc7d623258602320d5c55d1bc22793b57daff0ec7efc270ea7d55ce1d5f5471c" -[[package]] -name = "typenum" -version = "1.19.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" - [[package]] name = "typetag" version = "0.2.20" @@ -3149,6 +3009,12 @@ dependencies = [ "syn", ] +[[package]] +name = "unicase" +version = "2.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539" + [[package]] name = "unicode-ident" version = "1.0.18" @@ -3731,20 +3597,6 @@ name = "zeroize" version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" -dependencies = [ - "zeroize_derive", -] - -[[package]] -name = "zeroize_derive" -version = "1.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] [[package]] name = "zerotrie" @@ -3785,23 +3637,12 @@ version = "4.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "caa8cd6af31c3b31c6631b8f483848b91589021b28fffe50adada48d4f4d2ed1" dependencies = [ - "aes", "arbitrary", "bzip2", - "constant_time_eq", "crc32fast", - "deflate64", "flate2", - "getrandom 0.3.3", - "hmac", "indexmap", - "liblzma", "memchr", - "pbkdf2", - "ppmd-rust", - "sha1", - "time", - "zeroize", "zopfli", "zstd", ] diff --git a/temporalio/bridge/Cargo.toml b/temporalio/bridge/Cargo.toml index d1aaabb53..8328931b1 100644 --- a/temporalio/bridge/Cargo.toml +++ b/temporalio/bridge/Cargo.toml @@ -19,7 +19,7 @@ crate-type = ["cdylib"] anyhow = "1.0" async-trait = "0.1" futures = "0.3" -prost = "0.13" +prost = "0.14" pyo3 = { version = "0.25", features = [ "extension-module", "abi3-py39", @@ -38,7 +38,7 @@ temporal-sdk-core-api = { version = "0.1.0", path = "./sdk-core/core-api", featu temporal-sdk-core-protos = { version = "0.1.0", path = "./sdk-core/sdk-core-protos" } tokio = "1.26" tokio-stream = "0.1" -tonic = "0.13" +tonic = "0.14" tracing = "0.1" url = "2.2" diff --git a/temporalio/bridge/runtime.py b/temporalio/bridge/runtime.py index a06f9686c..b08b97584 100644 --- a/temporalio/bridge/runtime.py +++ b/temporalio/bridge/runtime.py @@ -96,7 +96,7 @@ class RuntimeOptions: """Python representation of the Rust struct for runtime options.""" telemetry: TelemetryConfig - worker_heartbeat_interval_millis: Optional[int] = 30000 # 30s + worker_heartbeat_interval_millis: Optional[int] = 30_000 # 30s # WARNING: This must match Rust runtime::BufferedLogEntry diff --git a/temporalio/bridge/sdk-core b/temporalio/bridge/sdk-core index 401a38506..9e9a46191 160000 --- a/temporalio/bridge/sdk-core +++ b/temporalio/bridge/sdk-core @@ -1 +1 @@ -Subproject commit 401a38506b8b5780c8c6dd87016c825b05c2a70e +Subproject commit 9e9a46191656fc9ccd95589dac3552410561d620 diff --git a/temporalio/bridge/src/client.rs b/temporalio/bridge/src/client.rs index dfbd432a1..68ebb1cb4 100644 --- a/temporalio/bridge/src/client.rs +++ b/temporalio/bridge/src/client.rs @@ -6,10 +6,11 @@ use std::time::Duration; use temporal_client::{ ClientKeepAliveConfig as CoreClientKeepAliveConfig, ClientOptions, ClientOptionsBuilder, ConfiguredClient, HttpConnectProxyOptions, RetryClient, RetryConfig, - TemporalServiceClientWithMetrics, TlsConfig, + TlsConfig, TemporalServiceClient, }; -use tonic::metadata::{ - AsciiMetadataKey, AsciiMetadataValue, BinaryMetadataKey, BinaryMetadataValue, +use temporal_client::tonic::{ + self, + metadata::{AsciiMetadataKey, AsciiMetadataValue, BinaryMetadataKey, BinaryMetadataValue}, }; use url::Url; @@ -17,7 +18,7 @@ use crate::runtime; pyo3::create_exception!(temporal_sdk_bridge, RPCError, PyException); -type Client = RetryClient>; +type Client = RetryClient>; #[pyclass] pub struct ClientRef { diff --git a/temporalio/worker/_replayer.py b/temporalio/worker/_replayer.py index aa44c6396..d3e0f2b9d 100644 --- a/temporalio/worker/_replayer.py +++ b/temporalio/worker/_replayer.py @@ -80,7 +80,6 @@ def __init__( runtime=runtime, disable_safe_workflow_eviction=disable_safe_workflow_eviction, header_codec_behavior=header_codec_behavior, - plugins=[plugin.name() for plugin in plugins], ) # Apply plugin configuration @@ -294,7 +293,7 @@ def on_eviction_hook( nexus_task_poller_behavior=temporalio.bridge.worker.PollerBehaviorSimpleMaximum( 1 ), - plugins=self._config.get("plugins", []), + plugins=[plugin.name() for plugin in self.plugins], skip_client_worker_set_check=True, ), ) @@ -371,7 +370,6 @@ class ReplayerConfig(TypedDict, total=False): runtime: Optional[temporalio.runtime.Runtime] disable_safe_workflow_eviction: bool header_codec_behavior: HeaderCodecBehavior - plugins: Sequence[str] @dataclass(frozen=True) diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index 54b1ad37e..0a14f6314 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -381,7 +381,7 @@ def __init__( f"The same plugin type {type(client_plugin)} is present from both client and worker. It may run twice and may not be the intended behavior." ) plugins = plugins_from_client + list(plugins) - config["plugins"] = [plugin.name() for plugin in plugins] + config["plugins"] = plugins self.plugins = plugins for plugin in plugins: @@ -615,7 +615,7 @@ def check_activity(activity): nexus_task_poller_behavior=config[ "nexus_task_poller_behavior" ]._to_bridge(), - plugins=config.get("plugins", []), + plugins=[plugin.name() for plugin in config.get("plugins", [])], skip_client_worker_set_check=config["skip_client_worker_set_check"], ), ) @@ -629,7 +629,6 @@ def config(self) -> WorkerConfig: config = self._config.copy() config["activities"] = list(config.get("activities", [])) config["workflows"] = list(config.get("workflows", [])) - config["plugins"] = list(config.get("plugins", [])) return config @property @@ -911,7 +910,7 @@ class WorkerConfig(TypedDict, total=False): workflow_task_poller_behavior: PollerBehavior activity_task_poller_behavior: PollerBehavior nexus_task_poller_behavior: PollerBehavior - plugins: Sequence[str] + plugins: Sequence[Plugin] skip_client_worker_set_check: bool diff --git a/tests/test_plugins.py b/tests/test_plugins.py index 651f197aa..23a4ddd9e 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -150,7 +150,7 @@ async def test_worker_plugin_basic_config(client: Client) -> None: plugins=[MyWorkerPlugin()], ) assert worker.config().get("task_queue") == "replaced_queue" - assert worker.config().get("plugins") == [MyWorkerPlugin().name()] + assert [p.name() for p in worker.config().get("plugins")] == [MyWorkerPlugin().name()] # Test client plugin propagation to worker plugins new_config = client.config() @@ -158,7 +158,7 @@ async def test_worker_plugin_basic_config(client: Client) -> None: client = Client(**new_config) worker = Worker(client, task_queue="queue", activities=[never_run_activity]) assert worker.config().get("task_queue") == "combined" - assert worker.config().get("plugins") == [MyCombinedPlugin().name()] + assert [p.name() for p in worker.config().get("plugins")] == [MyCombinedPlugin().name()] # Test both. Client propagated plugins are called first, so the worker plugin overrides in this case worker = Worker( @@ -168,7 +168,7 @@ async def test_worker_plugin_basic_config(client: Client) -> None: plugins=[MyWorkerPlugin()], ) assert worker.config().get("task_queue") == "replaced_queue" - assert worker.config().get("plugins") == [ + assert [p.name() for p in worker.config().get("plugins")] == [ MyCombinedPlugin().name(), MyWorkerPlugin().name(), ] @@ -307,7 +307,6 @@ async def test_replay(client: Client) -> None: replayer = Replayer(workflows=[], plugins=[plugin]) assert len(replayer.config().get("workflows") or []) == 1 assert replayer.config().get("data_converter") == pydantic_data_converter - assert replayer.config().get("plugins") == [plugin.name()] await replayer.replay_workflow(await handle.fetch_history()) diff --git a/tests/test_runtime.py b/tests/test_runtime.py index 330b45881..a11b17b51 100644 --- a/tests/test_runtime.py +++ b/tests/test_runtime.py @@ -21,7 +21,7 @@ TelemetryFilter, ) from temporalio.worker import Worker -from tests.helpers import assert_eq_eventually, assert_eventually, find_free_port +from tests.helpers import assert_eq_eventually, assert_eventually, find_free_port, worker_versioning_enabled @workflow.defn @@ -260,10 +260,12 @@ async def check_metrics() -> None: def test_runtime_options_to_bridge_config() -> None: - assert RuntimeOptions()._to_bridge_config().worker_heartbeat_interval_millis is None - options = RuntimeOptions(worker_heartbeat_interval=timedelta(seconds=30)) - bridge_config = options._to_bridge_config() - assert bridge_config.worker_heartbeat_interval_millis == 30_000 + assert RuntimeOptions()._to_bridge_config().worker_heartbeat_interval_millis == 30_000 + bridge_config = RuntimeOptions(worker_heartbeat_interval=timedelta(seconds=60))._to_bridge_config() + assert bridge_config.worker_heartbeat_interval_millis == 60_000 + + bridge_config1 = RuntimeOptions(worker_heartbeat_interval=None)._to_bridge_config() + assert bridge_config1.worker_heartbeat_interval_millis is None def test_runtime_options_invalid_heartbeat() -> None: diff --git a/tests/worker/test_worker.py b/tests/worker/test_worker.py index 5ec55ea5d..eeea1f559 100644 --- a/tests/worker/test_worker.py +++ b/tests/worker/test_worker.py @@ -1143,7 +1143,7 @@ async def test_can_run_autoscaling_polling_worker( activity_pollers = [l for l in matches if "activity_task" in l] assert len(activity_pollers) == 1 assert activity_pollers[0].endswith("2") - workflow_pollers = [l for l in matches if "workflow_task" in l] + workflow_pollers = [l for l in matches if "workflow_task" in l and w.task_queue in l] assert len(workflow_pollers) == 2 # There's sticky & non-sticky pollers, and they may have a count of 1 or 2 depending on # initialization timing. diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index a987d1b34..32cc7e8ae 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -5302,57 +5302,19 @@ async def run(self) -> None: await asyncio.sleep(0.1) -async def test_workflow_replace_worker_client(client: Client, env: WorkflowEnvironment): - if env.supports_time_skipping: - pytest.skip("Only testing against two real servers") - # We are going to start a second ephemeral server and then replace the - # client. So we will start a no-cache ticking workflow with the current - # client and confirm it has accomplished at least one task. Then we will - # start another on the other client, and confirm it gets started too. Then - # we will terminate both. We have to use a ticking workflow with only one - # poller to force a quick re-poll to recognize our client change quickly (as - # opposed to just waiting the minute for poll timeout). - async with await WorkflowEnvironment.start_local( - dev_server_download_version=DEV_SERVER_DOWNLOAD_VERSION - ) as other_env: - # Start both workflows on different servers - task_queue = f"tq-{uuid.uuid4()}" - handle1 = await client.start_workflow( - TickingWorkflow.run, id=f"wf-{uuid.uuid4()}", task_queue=task_queue - ) - handle2 = await other_env.client.start_workflow( - TickingWorkflow.run, id=f"wf-{uuid.uuid4()}", task_queue=task_queue - ) - - async def any_task_completed(handle: WorkflowHandle) -> bool: - async for e in handle.fetch_history_events(): - if e.HasField("workflow_task_completed_event_attributes"): - return True - return False - - # Now start the worker on the first env - async with Worker( - client, - task_queue=task_queue, - workflows=[TickingWorkflow], - max_cached_workflows=0, - max_concurrent_workflow_task_polls=1, - ) as worker: - # Confirm the first ticking workflow has completed a task but not - # the second - await assert_eq_eventually(True, lambda: any_task_completed(handle1)) - assert not await any_task_completed(handle2) - - # Now replace the client, which should be used fairly quickly - # because we should have timer-done poll completions every 100ms - worker.client = other_env.client - - # Now confirm the other workflow has started - await assert_eq_eventually(True, lambda: any_task_completed(handle2)) - - # Terminate both - await handle1.terminate() - await handle2.terminate() +async def test_workflow_replace_worker_client(client: Client): + other_runtime = Runtime() + other_client = await Client.connect( + client.service_client.config.target_host, + namespace=client.namespace, + runtime=other_runtime, + ) + async with new_worker(client, HelloWorkflow) as worker: + with pytest.raises( + ValueError, + match="New client is not on the same runtime as the existing client", + ): + worker.client = other_client @activity.defn(dynamic=True) From efb91e41aad3f7395d6a63b8cbb90811eb52ab03 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Mon, 20 Oct 2025 18:03:46 -0700 Subject: [PATCH 06/12] poe lint --- tests/test_plugins.py | 10 +++++++--- tests/test_runtime.py | 15 ++++++++++++--- tests/worker/test_worker.py | 4 +++- 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/tests/test_plugins.py b/tests/test_plugins.py index 23a4ddd9e..b438d6808 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -150,7 +150,9 @@ async def test_worker_plugin_basic_config(client: Client) -> None: plugins=[MyWorkerPlugin()], ) assert worker.config().get("task_queue") == "replaced_queue" - assert [p.name() for p in worker.config().get("plugins")] == [MyWorkerPlugin().name()] + assert [p.name() for p in worker.config().get("plugins", [])] == [ + MyWorkerPlugin().name() + ] # Test client plugin propagation to worker plugins new_config = client.config() @@ -158,7 +160,9 @@ async def test_worker_plugin_basic_config(client: Client) -> None: client = Client(**new_config) worker = Worker(client, task_queue="queue", activities=[never_run_activity]) assert worker.config().get("task_queue") == "combined" - assert [p.name() for p in worker.config().get("plugins")] == [MyCombinedPlugin().name()] + assert [p.name() for p in worker.config().get("plugins", [])] == [ + MyCombinedPlugin().name() + ] # Test both. Client propagated plugins are called first, so the worker plugin overrides in this case worker = Worker( @@ -168,7 +172,7 @@ async def test_worker_plugin_basic_config(client: Client) -> None: plugins=[MyWorkerPlugin()], ) assert worker.config().get("task_queue") == "replaced_queue" - assert [p.name() for p in worker.config().get("plugins")] == [ + assert [p.name() for p in worker.config().get("plugins", [])] == [ MyCombinedPlugin().name(), MyWorkerPlugin().name(), ] diff --git a/tests/test_runtime.py b/tests/test_runtime.py index a11b17b51..32219951d 100644 --- a/tests/test_runtime.py +++ b/tests/test_runtime.py @@ -21,7 +21,12 @@ TelemetryFilter, ) from temporalio.worker import Worker -from tests.helpers import assert_eq_eventually, assert_eventually, find_free_port, worker_versioning_enabled +from tests.helpers import ( + assert_eq_eventually, + assert_eventually, + find_free_port, + worker_versioning_enabled, +) @workflow.defn @@ -260,8 +265,12 @@ async def check_metrics() -> None: def test_runtime_options_to_bridge_config() -> None: - assert RuntimeOptions()._to_bridge_config().worker_heartbeat_interval_millis == 30_000 - bridge_config = RuntimeOptions(worker_heartbeat_interval=timedelta(seconds=60))._to_bridge_config() + assert ( + RuntimeOptions()._to_bridge_config().worker_heartbeat_interval_millis == 30_000 + ) + bridge_config = RuntimeOptions( + worker_heartbeat_interval=timedelta(seconds=60) + )._to_bridge_config() assert bridge_config.worker_heartbeat_interval_millis == 60_000 bridge_config1 = RuntimeOptions(worker_heartbeat_interval=None)._to_bridge_config() diff --git a/tests/worker/test_worker.py b/tests/worker/test_worker.py index daf1f0f47..0bc706091 100644 --- a/tests/worker/test_worker.py +++ b/tests/worker/test_worker.py @@ -1139,7 +1139,9 @@ async def test_can_run_autoscaling_polling_worker( activity_pollers = [l for l in matches if "activity_task" in l] assert len(activity_pollers) == 1 assert activity_pollers[0].endswith("2") - workflow_pollers = [l for l in matches if "workflow_task" in l and w.task_queue in l] + workflow_pollers = [ + l for l in matches if "workflow_task" in l and w.task_queue in l + ] assert len(workflow_pollers) == 2 # There's sticky & non-sticky pollers, and they may have a count of 1 or 2 depending on # initialization timing. From 504fc05a2e07b05469f6fd3ea7be44efe6eeded5 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Tue, 21 Oct 2025 10:36:41 -0700 Subject: [PATCH 07/12] More rusty, clarify naming --- temporalio/bridge/src/runtime.rs | 21 +++++++-------------- tests/conftest.py | 2 +- 2 files changed, 8 insertions(+), 15 deletions(-) diff --git a/temporalio/bridge/src/runtime.rs b/temporalio/bridge/src/runtime.rs index 4d6ec90fb..be77a1676 100644 --- a/temporalio/bridge/src/runtime.rs +++ b/temporalio/bridge/src/runtime.rs @@ -131,20 +131,13 @@ pub fn init_runtime(options: RuntimeOptions) -> PyResult { } } - let mut runtime_options_build = RuntimeOptionsBuilder::default(); - runtime_options_build.telemetry_options( - telemetry_build - .build() - .map_err(|err| PyValueError::new_err(format!("Invalid telemetry config: {err}")))?, - ); - - if let Some(ms) = worker_heartbeat_interval_millis { - runtime_options_build.heartbeat_interval(Some(Duration::from_millis(ms))); - } else { - runtime_options_build.heartbeat_interval(None); - } - - let runtime_options = runtime_options_build + let runtime_options = RuntimeOptionsBuilder::default() + .telemetry_options( + telemetry_build + .build() + .map_err(|err| PyValueError::new_err(format!("Invalid telemetry config: {err}")))?, + ) + .heartbeat_interval(worker_heartbeat_interval_millis.map(Duration::from_millis)) .build() .map_err(|err| PyValueError::new_err(format!("Invalid runtime options: {err}")))?; diff --git a/tests/conftest.py b/tests/conftest.py index edefef2cf..0e2156f86 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -60,7 +60,7 @@ def pytest_addoption(parser): @pytest.fixture(autouse=True) -def _force_worker_skip_client_set(monkeypatch): +def _skip_client_worker_set_check(monkeypatch): original_init = temporalio.worker.Worker.__init__ def patched_init(self, *args, **kwargs): From 85b978d11e42c002880b5ecc5661d260d5a8d9ff Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Tue, 21 Oct 2025 10:53:01 -0700 Subject: [PATCH 08/12] More rusty, clarify naming, simplify Runtime args, add Client plugins to list --- temporalio/client.py | 5 +++ temporalio/runtime.py | 62 +++++++++++------------------------ temporalio/worker/_worker.py | 6 +++- tests/test_runtime.py | 27 ++++++++------- tests/worker/test_worker.py | 1 - tests/worker/test_workflow.py | 2 +- 6 files changed, 45 insertions(+), 58 deletions(-) diff --git a/temporalio/client.py b/temporalio/client.py index 71f43d6d9..540b80de6 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -349,6 +349,11 @@ def api_key(self, value: Optional[str]) -> None: self.service_client.config.api_key = value self.service_client.update_api_key(value) + @property + def plugins(self) -> Sequence[Plugin]: + """Plugins used by this client.""" + return self._config["plugins"] + # Overload for no-param workflow @overload async def start_workflow( diff --git a/temporalio/runtime.py b/temporalio/runtime.py index 87c93720d..04e83ed50 100644 --- a/temporalio/runtime.py +++ b/temporalio/runtime.py @@ -73,8 +73,8 @@ def set_default(runtime: Runtime, *, error_if_already_set: bool = True) -> None: def __init__( self, *, - telemetry: Optional[TelemetryConfig] = None, - runtime_options: Optional["RuntimeOptions"] = None, + telemetry: TelemetryConfig, + worker_heartbeat_interval: Optional[timedelta] = timedelta(seconds=30), ) -> None: """Create a runtime with the provided configuration. @@ -83,25 +83,29 @@ def __init__( Args: telemetry: Telemetry configuration when not supplying ``runtime_options``. - runtime_options: Full runtime configuration including telemetry and - worker heartbeating options. + worker_heartbeat_interval: Interval for worker heartbeats. ``None`` + disables heartbeating. Raises: - ValueError: If both ``telemetry`` and ``runtime_options`` are - provided. + ValueError: If both ```runtime_options`` is a negative value. """ - if runtime_options and telemetry: - raise ValueError("Cannot supply both telemetry and runtime_options") - - if runtime_options is None: - telemetry = telemetry or TelemetryConfig() - runtime_options = RuntimeOptions(telemetry=telemetry) + if worker_heartbeat_interval is None: + heartbeat_millis = None else: - telemetry = runtime_options.telemetry + if worker_heartbeat_interval <= timedelta(0): + raise ValueError("worker_heartbeat_interval must be positive") + heartbeat_millis = int(worker_heartbeat_interval.total_seconds() * 1000) + if heartbeat_millis == 0: + heartbeat_millis = 1 + + self._heartbeat_millis = heartbeat_millis - self._core_runtime = temporalio.bridge.runtime.Runtime( - options=runtime_options._to_bridge_config() + runtime_options = temporalio.bridge.runtime.RuntimeOptions( + telemetry=telemetry._to_bridge_config(), + worker_heartbeat_interval_millis=heartbeat_millis, ) + + self._core_runtime = temporalio.bridge.runtime.Runtime(options=runtime_options) if isinstance(telemetry.metrics, MetricBuffer): telemetry.metrics._runtime = self core_meter = temporalio.bridge.metric.MetricMeter.create(self._core_runtime) @@ -415,34 +419,6 @@ def _to_bridge_config(self) -> temporalio.bridge.runtime.TelemetryConfig: ) -@dataclass(frozen=True) -class RuntimeOptions: - """Configuration for runtime initialization.""" - - telemetry: TelemetryConfig = field(default_factory=TelemetryConfig) - """Telemetry configuration applied to the runtime.""" - - worker_heartbeat_interval: Optional[timedelta] = timedelta(seconds=30) - """Interval for worker heartbeats. ``None`` disables heartbeating.""" - - def _to_bridge_config(self) -> temporalio.bridge.runtime.RuntimeOptions: - heartbeat_millis: Optional[int] - if self.worker_heartbeat_interval is None: - heartbeat_millis = None - else: - if self.worker_heartbeat_interval <= timedelta(0): - raise ValueError("worker_heartbeat_interval must be positive") - heartbeat_millis = int( - self.worker_heartbeat_interval.total_seconds() * 1000 - ) - if heartbeat_millis == 0: - heartbeat_millis = 1 - return temporalio.bridge.runtime.RuntimeOptions( - telemetry=self.telemetry._to_bridge_config(), - worker_heartbeat_interval_millis=heartbeat_millis, - ) - - BufferedMetricKind = NewType("BufferedMetricKind", int) """Representation of a buffered metric kind.""" diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index d0009e150..a968398a6 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -559,6 +559,10 @@ def check_activity(activity): maximum=config["max_concurrent_activity_task_polls"] ) + worker_plugins = [plugin.name() for plugin in config.get("plugins", [])] + client_plugins = [plugin.name() for plugin in config["client"].plugins] + plugins = list(set(worker_plugins + client_plugins)) + # Create bridge worker last. We have empirically observed that if it is # created before an error is raised from the activity worker # constructor, a deadlock/hang will occur presumably while trying to @@ -613,7 +617,7 @@ def check_activity(activity): nexus_task_poller_behavior=config[ "nexus_task_poller_behavior" ]._to_bridge(), - plugins=[plugin.name() for plugin in config.get("plugins", [])], + plugins=plugins, skip_client_worker_set_check=config["skip_client_worker_set_check"], ), ) diff --git a/tests/test_runtime.py b/tests/test_runtime.py index 32219951d..5677eb040 100644 --- a/tests/test_runtime.py +++ b/tests/test_runtime.py @@ -16,7 +16,6 @@ LoggingConfig, PrometheusConfig, Runtime, - RuntimeOptions, TelemetryConfig, TelemetryFilter, ) @@ -265,20 +264,24 @@ async def check_metrics() -> None: def test_runtime_options_to_bridge_config() -> None: - assert ( - RuntimeOptions()._to_bridge_config().worker_heartbeat_interval_millis == 30_000 + runtime = Runtime(telemetry=TelemetryConfig()) + assert runtime._heartbeat_millis == 30_000 + + runtime = Runtime( + telemetry=TelemetryConfig(), + worker_heartbeat_interval=timedelta(seconds=60), ) - bridge_config = RuntimeOptions( - worker_heartbeat_interval=timedelta(seconds=60) - )._to_bridge_config() - assert bridge_config.worker_heartbeat_interval_millis == 60_000 + assert runtime._heartbeat_millis == 60_000 - bridge_config1 = RuntimeOptions(worker_heartbeat_interval=None)._to_bridge_config() - assert bridge_config1.worker_heartbeat_interval_millis is None + runtime = Runtime( + telemetry=TelemetryConfig(), + worker_heartbeat_interval=None, + ) + assert runtime._heartbeat_millis is None def test_runtime_options_invalid_heartbeat() -> None: with pytest.raises(ValueError): - RuntimeOptions( - worker_heartbeat_interval=timedelta(seconds=-5) - )._to_bridge_config() + Runtime( + telemetry=TelemetryConfig(), worker_heartbeat_interval=timedelta(seconds=-5) + ) diff --git a/tests/worker/test_worker.py b/tests/worker/test_worker.py index 0bc706091..116fe4336 100644 --- a/tests/worker/test_worker.py +++ b/tests/worker/test_worker.py @@ -32,7 +32,6 @@ from temporalio.runtime import ( PrometheusConfig, Runtime, - RuntimeOptions, TelemetryConfig, ) from temporalio.service import RPCError diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 3d2aac885..2d8f3a605 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -5302,7 +5302,7 @@ async def run(self) -> None: async def test_workflow_replace_worker_client(client: Client): - other_runtime = Runtime() + other_runtime = Runtime(telemetry=TelemetryConfig()) other_client = await Client.connect( client.service_client.config.target_host, namespace=client.namespace, From 012cdbe2f39a66d31cb0855080a7162e1a94ae09 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Tue, 21 Oct 2025 14:02:55 -0700 Subject: [PATCH 09/12] Move new plugin test to bridge --- tests/bridge/test_plugins.py | 39 ++++++++++++++++++++++++++++++++++++ tests/test_plugins.py | 30 --------------------------- 2 files changed, 39 insertions(+), 30 deletions(-) create mode 100644 tests/bridge/test_plugins.py diff --git a/tests/bridge/test_plugins.py b/tests/bridge/test_plugins.py new file mode 100644 index 000000000..28f7af43f --- /dev/null +++ b/tests/bridge/test_plugins.py @@ -0,0 +1,39 @@ +from collections import Counter + +import pytest + +import temporalio.bridge.temporal_sdk_bridge +from temporalio.client import Client +from temporalio.plugin import SimplePlugin +from temporalio.worker import Worker +from tests.worker.test_worker import never_run_activity + + +async def test_worker_plugin_names_forwarded_to_core( + client: Client, monkeypatch: pytest.MonkeyPatch +) -> None: + captured_plugins: list[str] = [] + + original_new_worker = temporalio.bridge.temporal_sdk_bridge.new_worker + + def new_worker_wrapper(runtime_ref, client_ref, config): + nonlocal captured_plugins + captured_plugins = list(config.plugins) + return original_new_worker(runtime_ref, client_ref, config) + + monkeypatch.setattr( + temporalio.bridge.temporal_sdk_bridge, + "new_worker", + new_worker_wrapper, + ) + + plugin1 = SimplePlugin("test-worker-plugin1") + plugin2 = SimplePlugin("test-worker-plugin2") + Worker( + client, + task_queue="queue", + activities=[never_run_activity], + plugins=[plugin1, plugin2], + ) + # Use counter to compare unordered lists + assert Counter(captured_plugins) == Counter([plugin1.name(), plugin2.name()]) diff --git a/tests/test_plugins.py b/tests/test_plugins.py index b438d6808..d51c4781c 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -6,7 +6,6 @@ import pytest -import temporalio.bridge.temporal_sdk_bridge import temporalio.client import temporalio.converter import temporalio.worker @@ -178,35 +177,6 @@ async def test_worker_plugin_basic_config(client: Client) -> None: ] -async def test_worker_plugin_names_forwarded_to_core( - client: Client, monkeypatch: pytest.MonkeyPatch -) -> None: - captured_plugins: list[str] = [] - - original_new_worker = temporalio.bridge.temporal_sdk_bridge.new_worker - - def new_worker_wrapper(runtime_ref, client_ref, config): - nonlocal captured_plugins - captured_plugins = list(config.plugins) - return original_new_worker(runtime_ref, client_ref, config) - - monkeypatch.setattr( - temporalio.bridge.temporal_sdk_bridge, - "new_worker", - new_worker_wrapper, - ) - - plugin1 = SimplePlugin("test-worker-plugin1") - plugin2 = SimplePlugin("test-worker-plugin2") - worker = Worker( - client, - task_queue="queue", - activities=[never_run_activity], - plugins=[plugin1, plugin2], - ) - assert captured_plugins == [plugin1.name(), plugin2.name()] - - async def test_worker_duplicated_plugin(client: Client) -> None: new_config = client.config() new_config["plugins"] = [MyCombinedPlugin()] From 1155d6fb20ede5504b6daeafd249931b90f1b6fb Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Tue, 21 Oct 2025 14:30:56 -0700 Subject: [PATCH 10/12] switch poe lint and poe build-develop order in CI, revert test move --- .github/workflows/ci.yml | 2 +- tests/bridge/test_plugins.py | 39 ------------------------------------ tests/test_plugins.py | 30 +++++++++++++++++++++++++++ 3 files changed, 31 insertions(+), 40 deletions(-) delete mode 100644 tests/bridge/test_plugins.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 71d804e5a..06b0aedc4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -58,8 +58,8 @@ jobs: - run: uv sync --all-extras - run: poe bridge-lint if: ${{ matrix.clippyLinter }} - - run: poe lint - run: poe build-develop + - run: poe lint - run: mkdir junit-xml - run: poe test ${{matrix.pytestExtraArgs}} -s --junit-xml=junit-xml/${{ matrix.python }}--${{ matrix.os }}.xml timeout-minutes: 15 diff --git a/tests/bridge/test_plugins.py b/tests/bridge/test_plugins.py deleted file mode 100644 index 28f7af43f..000000000 --- a/tests/bridge/test_plugins.py +++ /dev/null @@ -1,39 +0,0 @@ -from collections import Counter - -import pytest - -import temporalio.bridge.temporal_sdk_bridge -from temporalio.client import Client -from temporalio.plugin import SimplePlugin -from temporalio.worker import Worker -from tests.worker.test_worker import never_run_activity - - -async def test_worker_plugin_names_forwarded_to_core( - client: Client, monkeypatch: pytest.MonkeyPatch -) -> None: - captured_plugins: list[str] = [] - - original_new_worker = temporalio.bridge.temporal_sdk_bridge.new_worker - - def new_worker_wrapper(runtime_ref, client_ref, config): - nonlocal captured_plugins - captured_plugins = list(config.plugins) - return original_new_worker(runtime_ref, client_ref, config) - - monkeypatch.setattr( - temporalio.bridge.temporal_sdk_bridge, - "new_worker", - new_worker_wrapper, - ) - - plugin1 = SimplePlugin("test-worker-plugin1") - plugin2 = SimplePlugin("test-worker-plugin2") - Worker( - client, - task_queue="queue", - activities=[never_run_activity], - plugins=[plugin1, plugin2], - ) - # Use counter to compare unordered lists - assert Counter(captured_plugins) == Counter([plugin1.name(), plugin2.name()]) diff --git a/tests/test_plugins.py b/tests/test_plugins.py index d51c4781c..b438d6808 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -6,6 +6,7 @@ import pytest +import temporalio.bridge.temporal_sdk_bridge import temporalio.client import temporalio.converter import temporalio.worker @@ -177,6 +178,35 @@ async def test_worker_plugin_basic_config(client: Client) -> None: ] +async def test_worker_plugin_names_forwarded_to_core( + client: Client, monkeypatch: pytest.MonkeyPatch +) -> None: + captured_plugins: list[str] = [] + + original_new_worker = temporalio.bridge.temporal_sdk_bridge.new_worker + + def new_worker_wrapper(runtime_ref, client_ref, config): + nonlocal captured_plugins + captured_plugins = list(config.plugins) + return original_new_worker(runtime_ref, client_ref, config) + + monkeypatch.setattr( + temporalio.bridge.temporal_sdk_bridge, + "new_worker", + new_worker_wrapper, + ) + + plugin1 = SimplePlugin("test-worker-plugin1") + plugin2 = SimplePlugin("test-worker-plugin2") + worker = Worker( + client, + task_queue="queue", + activities=[never_run_activity], + plugins=[plugin1, plugin2], + ) + assert captured_plugins == [plugin1.name(), plugin2.name()] + + async def test_worker_duplicated_plugin(client: Client) -> None: new_config = client.config() new_config["plugins"] = [MyCombinedPlugin()] From ebc6c0d90906bd2ca362d0da958eec6e3d089284 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Tue, 21 Oct 2025 14:34:10 -0700 Subject: [PATCH 11/12] missed a spot for poe lint in CI --- .github/workflows/ci.yml | 2 +- tests/test_plugins.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 06b0aedc4..bc8895419 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -149,8 +149,8 @@ jobs: - run: uv tool install poethepoet - run: uv lock --upgrade - run: uv sync --all-extras - - run: poe lint - run: poe build-develop + - run: poe lint - run: mkdir junit-xml - run: poe test -s --junit-xml=junit-xml/latest-deps.xml timeout-minutes: 10 diff --git a/tests/test_plugins.py b/tests/test_plugins.py index b438d6808..f4b98397a 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -1,6 +1,7 @@ import dataclasses import uuid import warnings +from collections import Counter from contextlib import AbstractAsyncContextManager, asynccontextmanager from typing import AsyncIterator, Awaitable, Callable, Optional, cast @@ -204,7 +205,8 @@ def new_worker_wrapper(runtime_ref, client_ref, config): activities=[never_run_activity], plugins=[plugin1, plugin2], ) - assert captured_plugins == [plugin1.name(), plugin2.name()] + # Use counter to compare unordered lists + assert Counter(captured_plugins) == Counter([plugin1.name(), plugin2.name()]) async def test_worker_duplicated_plugin(client: Client) -> None: From 7c88957fc3f39c102ed93e503afe803a6c50b0d1 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Wed, 22 Oct 2025 11:11:02 -0700 Subject: [PATCH 12/12] Default is 60, not 30 --- temporalio/bridge/runtime.py | 2 +- temporalio/runtime.py | 2 +- tests/test_runtime.py | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/temporalio/bridge/runtime.py b/temporalio/bridge/runtime.py index b08b97584..3583c1d46 100644 --- a/temporalio/bridge/runtime.py +++ b/temporalio/bridge/runtime.py @@ -96,7 +96,7 @@ class RuntimeOptions: """Python representation of the Rust struct for runtime options.""" telemetry: TelemetryConfig - worker_heartbeat_interval_millis: Optional[int] = 30_000 # 30s + worker_heartbeat_interval_millis: Optional[int] = 60_000 # 60s # WARNING: This must match Rust runtime::BufferedLogEntry diff --git a/temporalio/runtime.py b/temporalio/runtime.py index 04e83ed50..0df31dca4 100644 --- a/temporalio/runtime.py +++ b/temporalio/runtime.py @@ -74,7 +74,7 @@ def __init__( self, *, telemetry: TelemetryConfig, - worker_heartbeat_interval: Optional[timedelta] = timedelta(seconds=30), + worker_heartbeat_interval: Optional[timedelta] = timedelta(seconds=60), ) -> None: """Create a runtime with the provided configuration. diff --git a/tests/test_runtime.py b/tests/test_runtime.py index 5677eb040..98b0f884a 100644 --- a/tests/test_runtime.py +++ b/tests/test_runtime.py @@ -265,13 +265,13 @@ async def check_metrics() -> None: def test_runtime_options_to_bridge_config() -> None: runtime = Runtime(telemetry=TelemetryConfig()) - assert runtime._heartbeat_millis == 30_000 + assert runtime._heartbeat_millis == 60_000 runtime = Runtime( telemetry=TelemetryConfig(), - worker_heartbeat_interval=timedelta(seconds=60), + worker_heartbeat_interval=timedelta(seconds=10), ) - assert runtime._heartbeat_millis == 60_000 + assert runtime._heartbeat_millis == 10_000 runtime = Runtime( telemetry=TelemetryConfig(),