Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug: consumption of connector source will hang forever if encounters an error #7192

Closed
StrikeW opened this issue Jan 4, 2023 · 6 comments
Closed
Assignees
Labels
priority/high type/bug Something isn't working
Milestone

Comments

@StrikeW
Copy link
Contributor

StrikeW commented Jan 4, 2023

Describe the bug

If the connector source throws an error up to the source executor, the consumption of the source will stop forever:

error!("hang up stream reader due to polling error: {}", err);
futures::future::pending().stack_trace("source_error").await

2023-01-04T11:56:46.373988Z INFO risingwave_connector::source::kafka::source::reader:135: kafka message error: Err(KafkaError (Partition EOF: 11))
2023-01-04T11:56:46.375555Z INFO risingwave_connector::source::kafka::source::reader:135: kafka message error: Err(KafkaError (Partition EOF: 11))
2023-01-04T11:56:46.376565Z ERROR risingwave_stream::executor::source::reader:61: hang up stream reader due to polling error: internal error: Partition EOF: 11
2023-01-04T11:56:46.377123Z ERROR risingwave_stream::executor::source::reader:61: hang up stream reader due to polling error: internal error: Partition EOF: 11

But the barrier messages can still pass to downstream executors:

select_with_strategy(
barrier_receiver_arm,
source_stream_arm,
// We prefer barrier on the left hand side over source chunks.
|_: &mut ()| PollNext::Left,
)

I am not sure whether this problem is a by-design feature.

To Reproduce

Manually construct an error in the KafkaSplitReader and propagate it to the upper layer.

diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs
index ab993dfde..e24706f76 100644
--- a/src/connector/src/source/kafka/source/reader.rs
+++ b/src/connector/src/source/kafka/source/reader.rs
@@ -21,6 +21,7 @@ use futures::StreamExt;
 use futures_async_stream::try_stream;
 use rdkafka::config::RDKafkaLogLevel;
 use rdkafka::consumer::{Consumer, DefaultConsumerContext, StreamConsumer};
+use rdkafka::error::KafkaError;
 use rdkafka::{ClientConfig, Message, Offset, TopicPartitionList};

 use crate::source::base::{SourceMessage, SplitReader, MAX_CHUNK_SIZE};
@@ -129,6 +130,8 @@ impl KafkaSplitReader {
         #[for_await]
         'for_outer_loop: for msgs in self.consumer.stream().ready_chunks(MAX_CHUNK_SIZE) {
             for msg in msgs {
+                let fake_msg = Err(KafkaError::PartitionEOF(11));
+                let my_msg = fake_msg?;
                 let msg = msg?;
                 let cur_offset = msg.offset();
                 bytes_current_second += match &msg.payload() {

Then start the cluster with the full config and run the tpch-bench script to ingest data.

Expected behavior

IIUC, when executors encounter an error in other scenarios, the error will report to the Meta via the barrier collection mechanism (#6319). Then Meta will enter the recovery process to recover the whole streaming graph.

But for the scenario of connector source failure, the upstream system may become available after a while or the failure is unrecoverable indeed. So it may be a waste to let Meta recover the cluster blindly:

if let Err(err) = result {
// FIXME: If it is a connector source error occurred in the init barrier, we should pass
// back to frontend
fail_point!("inject_barrier_err_success");
let fail_node = checkpoint_control.barrier_failed();
tracing::warn!("Failed to complete epoch {}: {:?}", prev_epoch, err);
self.do_recovery(err, fail_node, state, tracker, checkpoint_control)
.await;
return;

Candidate solutions:

  1. Employ a bounded retry strategy for connector sources
    When a connector source encounters an error, we will try our best to recover the consumer client. For example, we can drop the current consumer and create a new one to try to resume consumption. If we fail to recover the consumer, we can choose to hang up the connector source stream and prompt users to drop the source and troubleshoot the upstream system.

  2. Hang up the connector source stream forever as the current implementation did and prompt users to drop the source and troubleshoot the upstream system. For example, we can prompt an error to users when they query the downstream MVs of the broken source.

Additional context

No response

@StrikeW StrikeW added the type/bug Something isn't working label Jan 4, 2023
@github-actions github-actions bot added this to the release-0.1.16 milestone Jan 4, 2023
@tabVersion tabVersion self-assigned this Jan 30, 2023
@tabVersion tabVersion removed this from the release-0.1.17 milestone Feb 27, 2023
@BugenZhao
Copy link
Member

+1. The upper-layer streaming logic has no way to handle the connector error, so we expect the connector itself to retry in the case of temporary service unavailable as much as possible. For those unrecoverable errors, we catch the error by tracing::error here and hang it up to avoid interfering with other streaming jobs. The most trivial way for handling this will be dropping and re-creating the streaming job.

So we may also need to report the error as a user error? cc @jon-chuang @fuyufjh

@jon-chuang
Copy link
Contributor

jon-chuang commented Feb 28, 2023

internal error: Partition EOF: 11

I believe that in this case, it's not an unrecoverable error. Is this accurate? @tabVersion

Perhaps this particular case should actually not be handled as an error. Unclear if we should even report it as a user error. Consuming the Kafka partition temporarily should be expected in normal operation.

EDIT: I guess this is an artificial error and we should not focus too much on it.

@jon-chuang
Copy link
Contributor

jon-chuang commented Feb 28, 2023

It also seems that we need to classify errors into recoverable and unrecoverable errors.

Question: should this be defined in the connector, or in the stream layer?

@jon-chuang
Copy link
Contributor

jon-chuang commented Feb 28, 2023

Employ a bounded retry strategy for connector sources
When a connector source encounters an error, we will try our best to recover the consumer client. For example, we can drop the current consumer and create a new one to try to resume consumption. If we fail to recover the consumer, we can choose to hang up the connector source stream and prompt users to drop the source and troubleshoot the upstream system.

+1 for this. Connector itself should not propagate the error to stream, rather, it should itself retry, and once it terminates the retry, emit an error that the stream actor will decide what to do with.

I think we should bounded exponential retry (default: up to few hours? - to give the user/SRE time to respond, perhaps restart the source) and report the user error in the meantime.

@jon-chuang
Copy link
Contributor

jon-chuang commented Feb 28, 2023

But the barrier messages can still pass to downstream executors:

I don't think this should be the behaviour now.

~~Currently, due to `try_stream`, every executor should first propagate its msg errors into the stream. This will eventually be intercepted by `impl StreamConsumer for DispatchExecutor`, the error from `StreamExecutorResult` will be propagated into `BarrierStream`, and `Actor::run_consumer` will terminate, triggering `context.lock_barrier_manager().notify_failure(actor_id, err);`~~

I believe that select_with_strategy will still propagate errors if they are available while a barrier has not yet arrived.

Edit: yes, futures::pending will cause this infinite pause. Hanging silently like this is definitely undesirable. The behaviour should always be to trigger either an explicit MV suspension or MV retry if the connector propagates an error.

@jon-chuang
Copy link
Contributor

jon-chuang commented Feb 28, 2023

After discussion with @tabVersion , we are in agreement on the following:

  1. For the connector side, we expect both recoverable and unrecoverable errors. We will not make any effort to categorize the errors into these two categories. Instead, we will attempt to handle both scenarios by an exponential backoff up to 1 hour (by default). This will be implemented as a wrapper around the connectors. Once the max timeout/max retries are reached, we will emit the error to stream side.
  2. The behaviour for stream will currently be to restart from checkpoint (the current behaviour is retry every 10 seconds and infinite...see: chore(recovery): remove max retry attempts for recovery #6531), but we should eventually implement suspending the MV for this type of error (irrecoverable error - since the connector side has implemented it's own retry strategy). This means that recoverable/irrecoverable error distinction is made on the stream side.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority/high type/bug Something isn't working
Projects
None yet
Development

No branches or pull requests

5 participants