Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@ mod pollers;
mod protosext;
mod workflow;

pub use pollers::{ServerGateway, ServerGatewayOptions};
pub use pollers::{ServerGateway, ServerGatewayApis, ServerGatewayOptions};
pub use url::Url;

use crate::{
machines::{InconvertibleCommandError, WFCommand},
pollers::ServerGatewayApis,
protos::{
coresdk::{
complete_task_req::Completion, wf_activation_completion::Status, CompleteTaskReq, Task,
Expand All @@ -38,6 +37,7 @@ use dashmap::DashMap;
use std::{convert::TryInto, sync::mpsc::SendError, sync::Arc};
use tokio::runtime::Runtime;
use tonic::codegen::http::uri::InvalidUri;
use tracing::Level;

/// A result alias having [CoreError] as the error type
pub type Result<T, E = CoreError> = std::result::Result<T, E>;
Expand Down Expand Up @@ -75,6 +75,7 @@ pub struct CoreInitOptions {
/// * Will panic if called from within an async context, as it will construct a runtime and you
/// cannot construct a runtime from within a runtime.
pub fn init(opts: CoreInitOptions) -> Result<impl Core> {
let _ = env_logger::try_init();
let runtime = Runtime::new().map_err(CoreError::TokioInitError)?;
// Initialize server client
let work_provider = runtime.block_on(opts.gateway_opts.connect())?;
Expand Down Expand Up @@ -131,6 +132,12 @@ where
return Err(CoreError::BadDataFromWorkProvider(work));
};

event!(
Level::DEBUG,
msg = "Received workflow task",
?work.task_token
);

// Correlate task token w/ run ID
self.workflow_task_tokens
.insert(work.task_token.clone(), run_id.clone());
Expand Down
9 changes: 6 additions & 3 deletions src/machines/workflow_machines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl WorkflowMachines {
/// is the last event in the history.
///
/// TODO: Describe what actually happens in here
#[instrument(skip(self))]
#[instrument(level = "debug", skip(self))]
pub(crate) fn handle_event(
&mut self,
event: &HistoryEvent,
Expand Down Expand Up @@ -290,8 +290,11 @@ impl WorkflowMachines {
// We need to notify the lang sdk that it's time to kick off a workflow
self.outgoing_wf_activation_jobs.push_back(
StartWorkflowTaskAttributes {
// TODO: This needs to be set during init
workflow_type: "".to_string(),
workflow_type: attrs
.workflow_type
.as_ref()
.map(|wt| wt.name.clone())
.unwrap_or_default(),
workflow_id: self.workflow_id.clone(),
arguments: attrs.input.clone(),
}
Expand Down
1 change: 1 addition & 0 deletions src/pollers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ pub struct ServerGateway {
pub opts: ServerGatewayOptions,
}

/// This trait provides ways to call the temporal server itself
pub trait ServerGatewayApis:
PollWorkflowTaskQueueApi + RespondWorkflowTaskCompletedApi + StartWorkflowExecutionApi
{
Expand Down
16 changes: 11 additions & 5 deletions src/workflow/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
protos::temporal::api::history::v1::WorkflowExecutionStartedEventAttributes,
};
use std::sync::mpsc::{self, Receiver, Sender};
use tracing::Level;

/// The [DrivenWorkflow] trait expects to be called to make progress, but the [CoreSDKService]
/// expects to be polled by the lang sdk. This struct acts as the bridge between the two, buffering
Expand All @@ -31,17 +32,22 @@ impl WorkflowBridge {
}

impl DrivenWorkflow for WorkflowBridge {
#[instrument]
fn start(&mut self, attribs: WorkflowExecutionStartedEventAttributes) -> Vec<WFCommand> {
event!(Level::DEBUG, msg = "Workflow bridge start called", ?attribs);
self.started_attrs = Some(attribs);
vec![]
}

#[instrument]
fn fetch_workflow_iteration_output(&mut self) -> Vec<WFCommand> {
self.incoming_commands
.try_recv()
.unwrap_or_else(|_| vec![WFCommand::NoCommandsFromLang])
let in_cmds = self.incoming_commands.try_recv();

event!(
Level::DEBUG,
msg = "Workflow bridge iteration fetch",
?in_cmds
);

in_cmds.unwrap_or_else(|_| vec![WFCommand::NoCommandsFromLang])
}

fn signal(&mut self, _attribs: WorkflowExecutionSignaledEventAttributes) {
Expand Down
3 changes: 0 additions & 3 deletions tests/integ_tests/poller_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ use temporal_sdk_core::{
temporal::api::command::v1::{
CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes,
},
temporal::api::common::v1::WorkflowType,
temporal::api::taskqueue::v1::TaskQueue,
temporal::api::workflowservice::v1::StartWorkflowExecutionRequest,
},
Core, CoreInitOptions, ServerGatewayOptions, Url,
};
Expand Down