Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop returning tonic errors from polls instead log warnings and retry. #202

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .buildkite/docker/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ services:
# - '9042:9042'

temporal:
image: temporalio/auto-setup:1.12.1
image: temporalio/auto-setup:1.12.3
ports:
- "7233:7233"
- "7234:7234"
Expand All @@ -26,7 +26,7 @@ services:
- cassandra

temporal-web:
image: temporalio/web:1.11.0
image: temporalio/web:1.12.0
logging:
driver: none
ports:
Expand Down
36 changes: 17 additions & 19 deletions src/core_tests/activity_tasks.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::{
errors::PollActivityError,
job_assert,
pollers::{MockManualGateway, MockServerGatewayApis},
test_help::{
Expand Down Expand Up @@ -115,8 +114,8 @@ async fn activity_not_found_returns_ok() {

let core = mock_core(MocksHolder::from_gateway_with_responses(
mock_gateway,
vec![].into(),
vec![].into(),
[],
[],
));

core.complete_activity_task(ActivityTaskCompletion {
Expand All @@ -143,8 +142,8 @@ async fn heartbeats_report_cancels_only_once() {

let core = mock_core(MocksHolder::from_gateway_with_responses(
mock_gateway,
vec![].into(),
vec![
[],
[
PollActivityTaskQueueResponse {
task_token: vec![1],
activity_id: "act1".to_string(),
Expand All @@ -157,8 +156,7 @@ async fn heartbeats_report_cancels_only_once() {
heartbeat_timeout: Some(Duration::from_millis(1).into()),
..Default::default()
},
]
.into(),
],
));

let act = core.poll_activity_task(TEST_Q).await.unwrap();
Expand Down Expand Up @@ -277,16 +275,19 @@ async fn activity_poll_timeout_retries() {
if calls <= 2 {
Some(Ok(PollActivityTaskQueueResponse::default()))
} else {
Some(Err(tonic::Status::unknown("Test done")))
Some(Ok(PollActivityTaskQueueResponse {
task_token: b"hello!".to_vec(),
..Default::default()
}))
}
});
let mock_worker = MockWorker {
act_poller: Some(Box::from(mock_act_poller)),
..Default::default()
};
let core = mock_core(MocksHolder::from_mock_workers(mock_gateway, [mock_worker]));
let r = core.poll_activity_task(TEST_Q).await;
assert_matches!(r.unwrap_err(), PollActivityError::TonicError(_));
let r = core.poll_activity_task(TEST_Q).await.unwrap();
assert_matches!(r.task_token.as_slice(), b"hello!");
}

#[tokio::test]
Expand Down Expand Up @@ -479,15 +480,12 @@ async fn only_returns_cancels_for_desired_queue() {
});

let mut w1 = MockWorker::for_queue("q1");
w1.act_poller = Some(mock_poller_from_resps(
vec![PollActivityTaskQueueResponse {
task_token: vec![1],
activity_id: "act1".to_string(),
heartbeat_timeout: Some(Duration::from_millis(1).into()),
..Default::default()
}]
.into(),
));
w1.act_poller = Some(mock_poller_from_resps([PollActivityTaskQueueResponse {
task_token: vec![1],
activity_id: "act1".to_string(),
heartbeat_timeout: Some(Duration::from_millis(1).into()),
..Default::default()
}]));
let mut mock_act_poller = mock_poller();
mock_act_poller
.expect_poll()
Expand Down
9 changes: 4 additions & 5 deletions src/core_tests/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn legacy_query(#[case] include_history: bool) {
.times(1)
.returning(move |_, _| Ok(RespondQueryTaskCompletedResponse::default()));

let mut mock = MocksHolder::from_gateway_with_responses(mock_gateway, tasks, vec![].into());
let mut mock = MocksHolder::from_gateway_with_responses(mock_gateway, tasks, vec![]);
if !include_history {
mock.worker_cfg(TEST_Q, |wc| wc.max_cached_workflows = 10);
}
Expand Down Expand Up @@ -167,7 +167,7 @@ async fn new_queries(#[case] num_queries: usize) {
.returning(|_| Ok(RespondWorkflowTaskCompletedResponse::default()));
mock_gateway.expect_respond_legacy_query().times(0);

let mut mock = MocksHolder::from_gateway_with_responses(mock_gateway, tasks, vec![].into());
let mut mock = MocksHolder::from_gateway_with_responses(mock_gateway, tasks, vec![]);
mock.worker_cfg(TEST_Q, |wc| wc.max_cached_workflows = 10);
let core = mock_core(mock);

Expand Down Expand Up @@ -250,7 +250,7 @@ async fn legacy_query_failure_on_wft_failure() {
.times(1)
.returning(move |_, _| Ok(RespondQueryTaskCompletedResponse::default()));

let mut mock = MocksHolder::from_gateway_with_responses(mock_gateway, tasks, vec![].into());
let mut mock = MocksHolder::from_gateway_with_responses(mock_gateway, tasks, vec![]);
mock.worker_cfg(TEST_Q, |wc| wc.max_cached_workflows = 10);
let core = mock_core(mock);

Expand Down Expand Up @@ -326,7 +326,7 @@ async fn legacy_query_with_full_history_after_complete() {
.times(2)
.returning(move |_, _| Ok(RespondQueryTaskCompletedResponse::default()));

let mut mock = MocksHolder::from_gateway_with_responses(mock_gateway, tasks, vec![].into());
let mut mock = MocksHolder::from_gateway_with_responses(mock_gateway, tasks, vec![]);
mock.worker_cfg(TEST_Q, |wc| wc.max_cached_workflows = 10);
let core = mock_core(mock);

Expand All @@ -343,7 +343,6 @@ async fn legacy_query_with_full_history_after_complete() {
.await
.unwrap();
let task = core.poll_workflow_activation(TEST_Q).await.unwrap();
dbg!(&task);
core.complete_workflow_activation(WfActivationCompletion::from_cmds(
TEST_Q,
task.run_id,
Expand Down
12 changes: 4 additions & 8 deletions src/core_tests/workers.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::{
pollers::{MockManualGateway, MockManualPoller, MockServerGatewayApis},
pollers::{MockManualGateway, MockServerGatewayApis},
test_help::{
build_fake_core, build_multihist_mock_sg, canned_histories, hist_to_poll_resp, mock_core,
mock_core_with_opts_no_workers, register_mock_workers, single_hist_mock_sg,
FakeWfResponses, MockWorker, MocksHolder, ResponseType, TEST_Q,
mock_core_with_opts_no_workers, mock_manual_poller, register_mock_workers,
single_hist_mock_sg, FakeWfResponses, MockWorker, MocksHolder, ResponseType, TEST_Q,
},
Core, CoreInitOptionsBuilder, CoreSDK, PollWfError, WorkerConfigBuilder,
};
Expand Down Expand Up @@ -260,11 +260,7 @@ fn worker_shutdown() -> (CoreSDK, watch::Sender<bool>) {
let mut mock_pollers = vec![];
for i in 1..=2 {
let tq = format!("q{}", i);
let mut mock_poller = MockManualPoller::new();
mock_poller.expect_notify_shutdown().return_const(());
mock_poller
.expect_shutdown_box()
.returning(|| async {}.boxed());
let mut mock_poller = mock_manual_poller();
let rx = rx.clone();
let tqc = tq.clone();
mock_poller.expect_poll().returning(move || {
Expand Down
5 changes: 2 additions & 3 deletions src/core_tests/workflow_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1090,7 +1090,6 @@ async fn lots_of_workflows() {
task_q: TEST_Q.to_owned(),
}
});

let mock = build_multihist_mock_sg(hists, false, None);
let core = &mock_core(mock);

Expand Down Expand Up @@ -1322,7 +1321,7 @@ async fn buffered_work_drained_on_shutdown() {
let mut mock = MockServerGatewayApis::new();
mock.expect_complete_workflow_task()
.returning(|_| Ok(RespondWorkflowTaskCompletedResponse::default()));
let mut mock = MocksHolder::from_gateway_with_responses(mock, tasks, vec![].into());
let mut mock = MocksHolder::from_gateway_with_responses(mock, tasks, []);
// Cache on to avoid being super repetitive
mock.worker_cfg(TEST_Q, |wc| wc.max_cached_workflows = 10);
let core = &mock_core(mock);
Expand Down Expand Up @@ -1377,7 +1376,7 @@ async fn buffering_tasks_doesnt_count_toward_outstanding_max() {
))
.take(20),
);
let mut mock = MocksHolder::from_gateway_with_responses(mock, tasks, vec![].into());
let mut mock = MocksHolder::from_gateway_with_responses(mock, tasks, []);
mock.worker_cfg(TEST_Q, |wc| {
wc.max_cached_workflows = 10;
wc.max_outstanding_workflow_tasks = 5;
Expand Down
9 changes: 2 additions & 7 deletions src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
//! Error types exposed by public APIs

use crate::{machines::WFMachinesError, task_token::TaskToken, WorkerLookupErr};
use temporal_sdk_core_protos::{
coresdk::{activity_result::ActivityResult, workflow_completion::WfActivationCompletion},
temporal::api::workflowservice::v1::PollWorkflowTaskQueueResponse,
use temporal_sdk_core_protos::coresdk::{
activity_result::ActivityResult, workflow_completion::WfActivationCompletion,
};
use tonic::codegen::http::uri::InvalidUri;

Expand Down Expand Up @@ -43,10 +42,6 @@ pub enum PollWfError {
/// The run id of the erring workflow
run_id: String,
},
/// The server returned a malformed polling response. Either we aren't handling a valid form,
/// or the server is bugging out. Likely fatal.
#[error("Poll workflow response from server was malformed: {0:?}")]
BadPollResponseFromServer(Box<PollWorkflowTaskQueueResponse>),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't there ever bad poll responses that should be sent to lang and considered fatal?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will you do with that information? Nothing interesting to be done. I'll send you an eviction if it matters

/// [crate::Core::shutdown] was called, and there are no more replay tasks to be handled. Lang
/// must call [crate::Core::complete_workflow_activation] for any remaining tasks, and then may
/// exit.
Expand Down
4 changes: 2 additions & 2 deletions src/machines/workflow_machines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,8 @@ impl WorkflowMachines {
Ok(())
}

/// Called when we want to run the event loop because a workflow task started event has
/// triggered
/// Called when a workflow task started event has triggered. Ensures we are tracking the ID
/// of the current started event as well as workflow time properly.
fn task_started(&mut self, task_started_event_id: i64, time: SystemTime) -> Result<()> {
let s = span!(Level::DEBUG, "Task started trigger");
let _enter = s.enter();
Expand Down
56 changes: 43 additions & 13 deletions src/pollers/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,29 @@ impl<SG> RetryGateway<SG> {

impl<SG: ServerGatewayApis + Send + Sync + 'static> RetryGateway<SG> {
async fn call_with_retry<R, F, Fut>(&self, factory: F) -> Result<R>
where
F: Fn() -> Fut + Unpin,
Fut: Future<Output = Result<R>>,
{
self.call_type_with_retry(factory, CallType::Normal).await
}

async fn long_poll_call_with_retry<R, F, Fut>(&self, factory: F) -> Result<R>
where
F: Fn() -> Fut + Unpin,
Fut: Future<Output = Result<R>>,
{
self.call_type_with_retry(factory, CallType::LongPoll).await
}

async fn call_type_with_retry<R, F, Fut>(&self, factory: F, ct: CallType) -> Result<R>
where
F: Fn() -> Fut + Unpin,
Fut: Future<Output = Result<R>>,
{
Ok(FutureRetry::new(
factory,
TonicErrorHandler::new(
self.retry_config.clone().into(),
self.retry_config.max_retries,
),
TonicErrorHandler::new(self.retry_config.clone(), ct),
)
.await
.map_err(|(e, _attempt)| e)?
Expand All @@ -54,27 +67,44 @@ impl<SG: ServerGatewayApis + Send + Sync + 'static> RetryGateway<SG> {
}

#[derive(Debug)]
pub struct TonicErrorHandler {
struct TonicErrorHandler {
backoff: ExponentialBackoff,
max_attempts: usize,
call_type: CallType,
}

impl TonicErrorHandler {
pub fn new(backoff: ExponentialBackoff, max_attempts: usize) -> Self {
TonicErrorHandler {
backoff,
max_attempts,
fn new(mut cfg: RetryConfig, call_type: CallType) -> Self {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe instead of introducing this CallType enum just pass a different config?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same reason about needing to log a warning

if call_type == CallType::LongPoll {
// Long polls can retry forever
cfg.max_elapsed_time = None;
}
Self {
max_attempts: cfg.max_retries,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

max_attempts is not the same as max_retries.
e.g: 3 attempts = 2 retries.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bit of logic is unchanged. Attempt starts 0 indexed so it works out.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name is misleading though, I'd change it (doesn't have to be in this PR).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The long poll handler shouldn't have max attempts set.
Maybe change the type to Option<usize>?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need some way to decide when to start logging warnings if we've been retrying forever. Normal max retries provides a decent signal.

backoff: cfg.into(),
call_type,
}
}
}
#[derive(Debug, Eq, PartialEq, Hash)]
enum CallType {
Normal,
LongPoll,
}

impl ErrorHandler<tonic::Status> for TonicErrorHandler {
type OutError = tonic::Status;

fn handle(&mut self, current_attempt: usize, e: tonic::Status) -> RetryPolicy<tonic::Status> {
// Long poll calls get unlimited retries
if current_attempt >= self.max_attempts {
return RetryPolicy::ForwardError(e);
if self.call_type == CallType::Normal {
return RetryPolicy::ForwardError(e);
} else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pedantic/subjective: no else needed

// But once they exceed the normal max attempts, start logging warnings
warn!(error=?e, "Polling encountered repeated error")
}
}

if RETRYABLE_ERROR_CODES.contains(&e.code()) {
match self.backoff.next_backoff() {
None => RetryPolicy::ForwardError(e), // None is returned when we've ran out of time.
Expand Down Expand Up @@ -114,15 +144,15 @@ impl<SG: ServerGatewayApis + Send + Sync + 'static> ServerGatewayApis for RetryG
task_queue: String,
) -> Result<PollWorkflowTaskQueueResponse> {
let factory = move || self.gateway.poll_workflow_task(task_queue.clone());
self.call_with_retry(factory).await
self.long_poll_call_with_retry(factory).await
}

async fn poll_activity_task(
&self,
task_queue: String,
) -> Result<PollActivityTaskQueueResponse> {
let factory = move || self.gateway.poll_activity_task(task_queue.clone());
self.call_with_retry(factory).await
self.long_poll_call_with_retry(factory).await
}

async fn reset_sticky_task_queue(
Expand Down
21 changes: 15 additions & 6 deletions src/test_help/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,17 @@ where
}

/// Uses the provided list of tasks to create a mock poller for the `TEST_Q`
pub fn from_gateway_with_responses(
pub fn from_gateway_with_responses<WFT, ACT>(
sg: SG,
wf_tasks: VecDeque<PollWorkflowTaskQueueResponse>,
act_tasks: VecDeque<PollActivityTaskQueueResponse>,
) -> MocksHolder<SG> {
wf_tasks: WFT,
act_tasks: ACT,
) -> MocksHolder<SG>
where
WFT: IntoIterator<Item = PollWorkflowTaskQueueResponse>,
ACT: IntoIterator<Item = PollActivityTaskQueueResponse>,
<WFT as IntoIterator>::IntoIter: std::marker::Send + 'static,
<ACT as IntoIterator>::IntoIter: std::marker::Send + 'static,
{
let mut mock_pollers = HashMap::new();
let mock_poller = mock_poller_from_resps(wf_tasks);
let mock_act_poller = mock_poller_from_resps(act_tasks);
Expand All @@ -236,13 +242,16 @@ where
}
}

pub fn mock_poller_from_resps<T>(mut tasks: VecDeque<T>) -> BoxedPoller<T>
pub fn mock_poller_from_resps<T, I>(tasks: I) -> BoxedPoller<T>
where
T: Send + Sync + 'static,
I: IntoIterator<Item = T>,
<I as IntoIterator>::IntoIter: std::marker::Send + 'static,
{
let mut mock_poller = mock_poller();
let mut tasks = tasks.into_iter();
mock_poller.expect_poll().returning(move || {
if let Some(t) = tasks.pop_front() {
if let Some(t) = tasks.next() {
Some(Ok(t))
} else {
Some(Err(tonic::Status::out_of_range(
Expand Down
Loading