Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ src/protos/*.rs
/bindings/
/core/machine_coverage/
/.cloud_certs/
.aider*
2 changes: 2 additions & 0 deletions core/src/core_tests/workers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,8 @@ async fn worker_shutdown_api(#[case] use_cache: bool, #[case] api_success: bool)
mock.expect_workers()
.returning(|| DEFAULT_WORKERS_REGISTRY.clone());
mock.expect_is_mock().returning(|| true);
mock.expect_sdk_name_and_version()
.returning(|| ("test-core".to_string(), "0.0.0".to_string()));
if use_cache {
if api_success {
mock.expect_shutdown_worker()
Expand Down
115 changes: 103 additions & 12 deletions core/src/internal_flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,44 @@ pub(crate) enum InternalFlags {
lang: BTreeSet<u32>,
core_since_last_complete: HashSet<CoreInternalFlags>,
lang_since_last_complete: HashSet<u32>,
last_sdk_name: String,
last_sdk_version: String,
sdk_name: String,
sdk_version: String,
},
Disabled,
}

impl InternalFlags {
pub(crate) fn new(server_capabilities: &get_system_info_response::Capabilities) -> Self {
pub(crate) fn new(
server_capabilities: &get_system_info_response::Capabilities,
sdk_name: String,
sdk_version: String,
) -> Self {
match server_capabilities.sdk_metadata {
true => Self::Enabled {
core: Default::default(),
lang: Default::default(),
core_since_last_complete: Default::default(),
lang_since_last_complete: Default::default(),
last_sdk_name: "".to_string(),
last_sdk_version: "".to_string(),
sdk_name,
sdk_version,
},
false => Self::Disabled,
}
}

pub(crate) fn add_from_complete(&mut self, e: &WorkflowTaskCompletedEventAttributes) {
if let Self::Enabled { core, lang, .. } = self {
if let Self::Enabled {
core,
lang,
last_sdk_name,
last_sdk_version,
..
} = self
{
if let Some(metadata) = e.sdk_metadata.as_ref() {
core.extend(
metadata
Expand All @@ -74,6 +93,12 @@ impl InternalFlags {
.map(|u| CoreInternalFlags::from_u32(*u)),
);
lang.extend(metadata.lang_used_flags.iter());
if !metadata.sdk_name.is_empty() {
*last_sdk_name = metadata.sdk_name.clone();
}
if !metadata.sdk_version.is_empty() {
*last_sdk_version = metadata.sdk_version.clone();
}
}
}
}
Expand Down Expand Up @@ -133,6 +158,10 @@ impl InternalFlags {
lang_since_last_complete,
core,
lang,
last_sdk_name,
last_sdk_version,
sdk_name,
sdk_version,
} => {
let core_newly_used: Vec<_> = core_since_last_complete
.iter()
Expand All @@ -146,11 +175,21 @@ impl InternalFlags {
.collect();
core.extend(core_since_last_complete.iter());
lang.extend(lang_since_last_complete.iter());
let sdk_name = if last_sdk_name != sdk_name {
sdk_name.clone()
} else {
"".to_string()
};
let sdk_version = if last_sdk_version != sdk_version {
sdk_version.clone()
} else {
"".to_string()
};
WorkflowTaskCompletedMetadata {
core_used_flags: core_newly_used,
lang_used_flags: lang_newly_used,
sdk_name: "".to_string(),
sdk_version: "".to_string(),
sdk_name,
sdk_version,
}
}
Self::Disabled => WorkflowTaskCompletedMetadata::default(),
Expand Down Expand Up @@ -186,9 +225,19 @@ mod tests {
use super::*;
use temporal_sdk_core_protos::temporal::api::workflowservice::v1::get_system_info_response::Capabilities;

impl Default for InternalFlags {
fn default() -> Self {
Self::Disabled
}
}

#[test]
fn disabled_in_capabilities_disables() {
let mut f = InternalFlags::new(&Capabilities::default());
let mut f = InternalFlags::new(
&Capabilities::default(),
"name".to_string(),
"ver".to_string(),
);
f.add_lang_used([1]);
f.add_from_complete(&WorkflowTaskCompletedEventAttributes {
sdk_metadata: Some(WorkflowTaskCompletedMetadata {
Expand All @@ -214,23 +263,29 @@ mod tests {
}

#[test]
fn only_writes_new_flags() {
let mut f = InternalFlags::new(&Capabilities {
sdk_metadata: true,
..Default::default()
});
fn only_writes_new_flags_and_sdk_info() {
let mut f = InternalFlags::new(
&Capabilities {
sdk_metadata: true,
..Default::default()
},
"name".to_string(),
"ver".to_string(),
);
f.add_lang_used([1]);
f.try_use(CoreInternalFlags::IdAndTypeDeterminismChecks, true);
let gathered = f.gather_for_wft_complete();
assert_matches!(gathered.core_used_flags.as_slice(), &[1]);
assert_matches!(gathered.lang_used_flags.as_slice(), &[1]);
assert_matches!(gathered.sdk_name.as_str(), "name");
assert_matches!(gathered.sdk_version.as_str(), "ver");

f.add_from_complete(&WorkflowTaskCompletedEventAttributes {
sdk_metadata: Some(WorkflowTaskCompletedMetadata {
core_used_flags: vec![2],
lang_used_flags: vec![2],
sdk_name: "".to_string(),
sdk_version: "".to_string(),
sdk_name: "name".to_string(),
sdk_version: "ver".to_string(),
}),
..Default::default()
});
Expand All @@ -239,5 +294,41 @@ mod tests {
let gathered = f.gather_for_wft_complete();
assert_matches!(gathered.core_used_flags.as_slice(), &[]);
assert_matches!(gathered.lang_used_flags.as_slice(), &[]);
assert!(gathered.sdk_name.is_empty());
assert!(gathered.sdk_version.is_empty());

f.add_from_complete(&WorkflowTaskCompletedEventAttributes {
sdk_metadata: Some(WorkflowTaskCompletedMetadata::default()),
..Default::default()
});
let gathered = f.gather_for_wft_complete();
assert_matches!(gathered.core_used_flags.as_slice(), &[]);
assert_matches!(gathered.lang_used_flags.as_slice(), &[]);
assert!(gathered.sdk_name.is_empty());
assert!(gathered.sdk_version.is_empty());

f.add_from_complete(&WorkflowTaskCompletedEventAttributes {
sdk_metadata: Some(WorkflowTaskCompletedMetadata {
sdk_name: "other sdk".to_string(),
sdk_version: "other ver".to_string(),
..Default::default()
}),
..Default::default()
});
let gathered = f.gather_for_wft_complete();
assert_matches!(gathered.sdk_name.as_str(), "name");
assert_matches!(gathered.sdk_version.as_str(), "ver");

f.add_from_complete(&WorkflowTaskCompletedEventAttributes {
sdk_metadata: Some(WorkflowTaskCompletedMetadata {
sdk_name: "name".to_string(),
sdk_version: "ver2".to_string(),
..Default::default()
}),
..Default::default()
});
let gathered = f.gather_for_wft_complete();
assert!(gathered.sdk_name.is_empty());
assert_matches!(gathered.sdk_version.as_str(), "ver");
}
}
8 changes: 8 additions & 0 deletions core/src/worker/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ pub(crate) trait WorkerClient: Sync + Send {
fn capabilities(&self) -> Option<Capabilities>;
fn workers(&self) -> Arc<SlotManager>;
fn is_mock(&self) -> bool;
/// Return (sdk_name, sdk_version) from the underlying client configuration
fn sdk_name_and_version(&self) -> (String, String);
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -486,6 +488,12 @@ impl WorkerClient for WorkerClientBag {
fn is_mock(&self) -> bool {
false
}

fn sdk_name_and_version(&self) -> (String, String) {
let lock = self.replaceable_client.read();
let opts = lock.get_client().inner().options();
(opts.client_name.clone(), opts.client_version.clone())
}
}

/// A version of [RespondWorkflowTaskCompletedRequest] that will finish being filled out by the
Expand Down
5 changes: 5 additions & 0 deletions core/src/worker/client/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub(crate) fn mock_workflow_client() -> MockWorkerClient {
r.expect_is_mock().returning(|| true);
r.expect_shutdown_worker()
.returning(|_| Ok(ShutdownWorkerResponse {}));
r.expect_sdk_name_and_version()
.returning(|| ("test-core".to_string(), "0.0.0".to_string()));
r
}

Expand All @@ -42,6 +44,8 @@ pub(crate) fn mock_manual_workflow_client() -> MockManualWorkerClient {
r.expect_workers()
.returning(|| DEFAULT_WORKERS_REGISTRY.clone());
r.expect_is_mock().returning(|| true);
r.expect_sdk_name_and_version()
.returning(|| ("test-core".to_string(), "0.0.0".to_string()));
r
}

Expand Down Expand Up @@ -146,5 +150,6 @@ mockall::mock! {
fn capabilities(&self) -> Option<Capabilities>;
fn workers(&self) -> Arc<SlotManager>;
fn is_mock(&self) -> bool;
fn sdk_name_and_version(&self) -> (String, String);
}
}
28 changes: 8 additions & 20 deletions core/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ use temporal_sdk_core_protos::{
temporal::api::{
enums::v1::TaskQueueKind,
taskqueue::v1::{StickyExecutionAttributes, TaskQueue},
workflowservice::v1::get_system_info_response,
},
TaskToken,
};
Expand Down Expand Up @@ -508,16 +507,19 @@ impl Worker {
external_wft_tx,
);
let worker_key = Mutex::new(client.workers().register(Box::new(provider)));
let sdk_name_and_ver = client.sdk_name_and_version();
Self {
worker_key,
client: client.clone(),
workflows: Workflows::new(
build_wf_basics(
config.clone(),
WorkflowBasics {
worker_config: Arc::new(config.clone()),
shutdown_token: shutdown_token.child_token(),
metrics,
shutdown_token.child_token(),
client.capabilities().unwrap_or_default(),
),
server_capabilities: client.capabilities().unwrap_or_default(),
sdk_name: sdk_name_and_ver.0,
sdk_version: sdk_name_and_ver.1,
},
sticky_queue_name.map(|sq| StickyExecutionAttributes {
worker_task_queue: Some(TaskQueue {
name: sq,
Expand Down Expand Up @@ -864,20 +866,6 @@ pub(crate) struct PostActivateHookData<'a> {
pub(crate) replaying: bool,
}

fn build_wf_basics(
config: WorkerConfig,
metrics: MetricsContext,
shutdown_token: CancellationToken,
server_capabilities: get_system_info_response::Capabilities,
) -> WorkflowBasics {
WorkflowBasics {
worker_config: Arc::new(config),
shutdown_token,
metrics,
server_capabilities,
}
}

pub(crate) enum TaskPollers {
Real,
#[cfg(test)]
Expand Down
4 changes: 2 additions & 2 deletions core/src/worker/workflow/machines/activity_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,7 @@ mod test {
attrs: Default::default(),
cancellation_type: Default::default(),
cancelled_before_sent: false,
internal_flags: Rc::new(RefCell::new(InternalFlags::new(&Default::default()))),
internal_flags: Rc::new(RefCell::new(InternalFlags::default())),
},
);
let cmds = s.cancel().unwrap();
Expand All @@ -894,7 +894,7 @@ mod test {
cancellation_type: ActivityCancellationType::Abandon.into(),
..Default::default()
},
Rc::new(RefCell::new(InternalFlags::new(&Default::default()))),
Rc::new(RefCell::new(InternalFlags::default())),
true,
);
let mut s = if let Machines::ActivityMachine(am) = s.machine {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,7 @@ mod test {
workflow_type: "".to_string(),
cancelled_before_sent: false,
cancel_type: Default::default(),
internal_flags: Rc::new(RefCell::new(InternalFlags::new(&Default::default()))),
internal_flags: Rc::new(RefCell::new(InternalFlags::default())),
},
);
let cmds = s.cancel().unwrap();
Expand All @@ -1006,7 +1006,7 @@ mod test {
workflow_type: "".to_string(),
cancelled_before_sent: false,
cancel_type: ChildWorkflowCancellationType::Abandon,
internal_flags: Rc::new(RefCell::new(InternalFlags::new(&Default::default()))),
internal_flags: Rc::new(RefCell::new(InternalFlags::default())),
};
let state = Cancelled {
seen_cancelled_event: true,
Expand Down
6 changes: 5 additions & 1 deletion core/src/worker/workflow/machines/workflow_machines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,11 @@ where
impl WorkflowMachines {
pub(crate) fn new(basics: RunBasics, driven_wf: DrivenWorkflow) -> Self {
let replaying = basics.history.previous_wft_started_id > 0;
let mut observed_internal_flags = InternalFlags::new(basics.capabilities);
let mut observed_internal_flags = InternalFlags::new(
basics.capabilities,
basics.sdk_name.to_owned(),
basics.sdk_version.to_owned(),
);
// Peek ahead to determine used flags in the first WFT.
if let Some(attrs) = basics.history.peek_next_wft_completed(0) {
observed_internal_flags.add_from_complete(attrs);
Expand Down
4 changes: 4 additions & 0 deletions core/src/worker/workflow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ pub(crate) struct WorkflowBasics {
pub(crate) shutdown_token: CancellationToken,
pub(crate) metrics: MetricsContext,
pub(crate) server_capabilities: get_system_info_response::Capabilities,
pub(crate) sdk_name: String,
pub(crate) sdk_version: String,
}

pub(crate) struct RunBasics<'a> {
Expand All @@ -141,6 +143,8 @@ pub(crate) struct RunBasics<'a> {
pub(crate) history: HistoryUpdate,
pub(crate) metrics: MetricsContext,
pub(crate) capabilities: &'a get_system_info_response::Capabilities,
pub(crate) sdk_name: &'a str,
pub(crate) sdk_version: &'a str,
}

impl Workflows {
Expand Down
Loading
Loading