Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error Handling Clarity and Intended Workflows #2779

Closed
StevenPG opened this issue Jul 18, 2023 · 20 comments
Closed

Error Handling Clarity and Intended Workflows #2779

StevenPG opened this issue Jul 18, 2023 · 20 comments

Comments

@StevenPG
Copy link
Contributor

Describe the issue
Missing samples or documentation on intended workflows when it comes to error handling and best practices deeper in streams workflows.

Practically all of the documentation around error handling refers to serialization or deserialization errors and handling exceptions that occur at these integration points.

However, any exceptions that are thrown from Transformers, Processors and KeyValueMappers, etc. methods result in Kafka consumers being destroyed.

There are some stack overflow questions around error handling, but the recommendations seem far from ideal.

For example, given the following workflow:

Consumer<KStream<String, SomeObject>>() {
    return input -> input.filter(someInitialFilter)
                    .transform(someExceptionThrowingTransformer)
                    .process(someExceptionThrowingProcessor);
}

The recommendation seems to be to add a try-catch inside the transformer and processor, but returning a value becomes difficult to do cleanly.

One can choose to return null, and then add a filter for a null value. Or a specially configured object, which is then checked and filtered out, OR the processor needs to account for a success or error case from the transformer. We've also seen a branching approach with a global error object. Nearly every one of our transform and process methods have risk of throwing an exception, so these strategies are all very intrusive.

Each of these methods are clunky in my team's opinion, and in my company's usage, we generally want to send the result to a DLQ or throw it away entirely (along with logging).

We've seen recommendations to perform actions that could result in exceptions in filters. But then we are unable to return values when making a "transformation" from a rest call for example.

Are we using these components incorrectly?

The request and use-case for this issue is the following:

Escape hatch handler that can be overridden for any given stream component; Filter/Transformer/Processer/KeyValueMapper/etc. If the exception is swallowed and not re-thrown, the message is removed the same as if it was filtered out.

The idea is to enable developers to encapsulate a transformation for example, and it's error handling outside the stream configuration.

Existing repository that contains some of the newer capabilities documented, but do not address mid-stream errors.

https://github.com/StevenPG/scs-experimentation/blob/main/scs4-error-handling/error-handling/src/main/java/com/stevenpg/errorhandling/ErrorHandlingApplication.java

@sobychacko
Copy link
Contributor

@StevenPG The way I understand your issue, you want to send the record to a DLT if your code throws an exception as shown here and here? One general caveat to keep in mind when using Kafka Streams is that, during the execution of your business logic, Spring is outside of the picture. The binder only comes into play at the binding bootstrap phase. Once the bindings are established (i.e., proper Kafka topics are bound as KStream, KTable etc.), the binder stays away from the actual runtime path. This is why the other general error-handling mechanisms available in Spring Cloud Stream do not work here.

As explained in this SO answer, you can get around this if you are willing to add a try/catch block around your critical code sections. You use something like StreamBridge in your catch code to send the data to a DLT. You must also use the branching API feature to properly branch out the records in error, as seen in that SO thread. All these may sound intrusive, but with Kafka Streams, the applications must do this as the binder or the framework does not know what is happening at runtime. If you have other ideas, we are all ears, though.

@sobychacko
Copy link
Contributor

sobychacko commented Jul 19, 2023

Here is a rough take on how the following method can be converted to handle errors. I haven't tested this code, but it should work for application-level error handling for Kafka Streams binder.

@Bean
public Consumer<KStream<String, String>> errorStream() {
    return input -> {
        KStream<String, String> newInput = input
                     // Remove odd numbers so we throw an exception on every other message
                     .map((key, value) -> new KeyValue<>(key, Integer.parseInt(value)))
                     .filter((key, value) -> (value & 1) == 0);

        final Map<String, ? extends KStream<?, String>> branches =
                newInput.split(Named.as("foo-")).branch((key, value) -> {
                            try {
                               // code from the map call in the original method that causes the exception goes here. 
                                // you have access to both key and value here. 
                                return true;
                            }
                            catch (Exception e) {
                                Message<?> message = MessageBuilder.withPayload(value).build();
                                streamBridge.send("to-my-dlt", message);
                                return false;
                            }

                        }, Branched.as("bar"))
                        .defaultBranch();
    };
 }

The default branch contains the records in error, which we ignore since we programmatically handle the exceptions in the first branch. The branch "foo-bar" contains good records if you want to operate on it further.

We are splitting at the critical section of your code, where it streams into two branches - one where the data is good and another where it is not, in which case we handle it.

The above code is functionally equivalent to your original method, which I copied below.

@Bean
public Consumer<KStream<String, String>> errorStream() {
    return input -> input
            // Remove odd numbers so we throw an exception on every other message
            .map((key, value) -> new KeyValue<>(key, Integer.parseInt(value)))
            .filter((key, value) -> (value & 1) == 0)
            .map((key, value) -> {
                throw new RuntimeException("Pushing uncaught error to kill stream!");
            }
    );
}

@StevenPG
Copy link
Contributor Author

StevenPG commented Jul 19, 2023

The description you provided is correct, and looking over the code in your example, I conceptually understand the limitation and the resolution.

We use our stream definitions as the main drivers of our message processor's functionality, I wonder if we're abusing the framework a little bit. (e.g, ~10 transforms/maps/processors/filters for a standard stream workflow)

We actually have services configured to run at each of the abstractions that have been mentioned.

You mentioned Kafka Streams vs Spring Cloud Streams and the actual Kafka binder. Is this functionality I'm asking about available at those abstraction levels? It seems that each piece of error handling documentation all act at the same level.

I'm mostly trying to avoid the situation where we have 10 stream components, and each component must be branched off to handle errors, and then only one branch continues.

@sobychacko
Copy link
Contributor

sobychacko commented Jul 19, 2023

Let us look into this general use case and see if there are some ways o handle errors through a custom StreamsUncaughtExceptionHandler.

@StevenPG
Copy link
Contributor Author

Much appreciated! I was unable to get the StreamsUncaughtExceptionHandler working on my end, but I'll redouble my efforts when I can to make sure I have the correct understanding of what it does right now.

@sobychacko
Copy link
Contributor

@StevenPG I want to explore your use case a bit further.

@Bean
public Consumer<KStream<String, String>> errorStream() {
    return input -> input
            // Remove odd numbers so we throw an exception on every other message
            .map((key, value) -> new KeyValue<>(key, Integer.parseInt(value)))
            .filter((key, value) -> (value & 1) == 0)
            .map((key, value) -> {
                throw new RuntimeException("Pushing uncaught error to kill stream!");
            }
    );
}

In the code above, where you throw the RuntimeException, is that where you call the service that could throw an exception? Also, I assume you don't want to wrap each service invocation within try/catch blocks, as that could pollute your business logic code with multiple error-handling concerns. Am I right? In other words, is avoiding the try/catch block around the critical section the main thing you want to accomplish by letting the exception bubble up to the framework, which then "theoretically" handle it?

@StevenPG
Copy link
Contributor Author

StevenPG commented Jul 19, 2023

Partially yes.

An example workflow might be

@Bean
public Consumer<KStream<String, String>> errorStream() {
    return input -> input
            // Remove odd numbers so we throw an exception on every other message
            .map((key, value) -> new KeyValue<>(key, Integer.parseInt(value)))
            .filter((key, value) -> (value & 1) == 0)
            .map(key, value) -> {
                 // Call an external service and get a data object that maps to the incoming value
                 return newObject
            }
            .map((key, value) -> {
                // Attempt to publish value (newObject) to a service, receive a 400 bad request so we throw exception
               // otherwise we would pass the result of the API call to the next .map method
                throw new RuntimeException("Pushing uncaught error to kill stream!");
               // Ideally, allow the message to be ackOnError if an exception propagates from here and "filter" out without
              // needing to publish a "dummy object" or null that the next component needs to be aware of to handle
            }
           .map(key, value) -> {
                // Downstream operations if the above is successful, would be ideal for this KeyValueMapper to not need
               // to be aware of an errorState object from above, but instead only receive the message if no errors occurred
           }
    );
}

The try-catch logic is one major portion of it, but the fundamental issue is having the ability to gracefully end the workflow for a given message at any point during the stream, not just in .filters.

Hopefully that makes sense. The exception handling is the main way I can think of handling it cleanly.

We still want exceptions to be an exception case, and not used as control-flow. That's where we really like utilizing Branches, when we know the result from an API might be good or bad and want to handle it appropriately.

@sobychacko
Copy link
Contributor

but the fundamental issue is having the ability to gracefully end the workflow for a given message at any point during the stream, not just in .filters.

IMO, branching in Kafka Streams gives you a graceful way of doing this. The only thing is that you need to use a try/catch block, and in the catch block, do all the error handling (such as sending to a DLT, logging, etc.). See my earlier suggestion ^^ on this issue.

I can think about three different ways of error handling, and they all have advantages/disadvantages depending on your use case.

  1. Branching API in Kafka Streams, as already discussed. In the try/catch block, you deal with any errors programmatically. The downside is that the code has branching calls, and it may not be intuitive for a reader at first look why we are introducing branches. On the other hand, we are using a native API to Kafka Streams to solve this use case. Another thing is the explicit error handling in the catch block. Since you have three map calls above, which could potentially throw exceptions, you need three different branching in your code to address errors. This could pollute your business logic quite a bit. Another problem is that the original record is mapped to something new by the time you come to the 3rd map call (the 3rd branching call, according to this suggestion). To send the original record to a DLT, you must carry it through an aggregate object over the map calls.
  2. Another option is to use a StreamsUncaughtExceptionHandler. You can provide this handler via the KafkaStreamsCustomizer. Below is an example of doing so.
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return factoryBean -> {
        factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
            @Override
            public void customize(KafkaStreams kafkaStreams) {
                kafkaStreams.setUncaughtExceptionHandler(new StreamsUncaughtExceptionHandler() {
                    @Override
                    public StreamThreadExceptionResponse handle(Throwable exception) {
                        return StreamThreadExceptionResponse.REPLACE_THREAD;
                    }
                });
            }
        });
    };
}

The main thing to remember with this approach is that you are replacing the stream thread with a new one if there is an error. Another big drawback to this approach is that there is no way to access the original record in the handle method above. Thus, you cannot send it to a DLT or do some basic logging. However, you can create a custom Throwable, include the original record as part of it, and throw that from your application code (e.g., inside your map calls). There is another big problem with this approach. This will only work for transient errors. If you get that 404 due to a permanent issue, you are stuck at that offset, and the record will be retried repeatedly until it moves further. Therefore, only use this approach for transient errors, and resort to other means for non-transient errors.

  1. Another option is a new way of tackling this use case. This solution is not available at the moment in the framework. However, this is another potential option. The idea is to introduce a DltAwareProcessor. Once hypothetically available in the framework, you can set your business logic as a delegate in the form of a BiFunction. Here is a blueprint for that as pseudo-code.
return input -> input
            // Remove odd numbers so we throw an exception on every other message
            .map((key, value) -> new KeyValue<>(key, Integer.parseInt(value)))
            .filter((key, value) -> (value & 1) == 0)
            .process(() -> new DltAwareProcessor(BiFunction<String, String, NewObject>)
            .process(() -> new DltAwareProcessor(BiFunction<String, String, NewObject>)
            .process(() -> new DltAwareProcessor(BiFunction<String, String, NewObject>)

Your three map calls above are converted as BiFunctions and passed along to the process API call. Behind the scenes, we call the BiFunction provided, and if an exception is thrown, we send that to a DLT. If not, forward the result to the downstream processor.

Please remember that option 3 above is only a concept at this point. If you think this solution works for you, then we can certainly look at that.

Please let us know what you think about these ideas.

@StevenPG
Copy link
Contributor Author

StevenPG commented Jul 20, 2023

Thank you for this!

I agree with everything you mentioned for method 1. Regarding the carryover of an aggregate object to the later map call, that is I imagine much more commonplace.

But I can say with how we use Streams, we would be happy with each map call sending to it's own DLT. In many cases, when we have a bad message or a non-transient failure, we need to enact a workflow that undoes the previous action. This seems to align with method 3 really nicely.

With method 2, I absolutely think this is a necessary final fallback component. Especially to handle errors that slip through all the cracks. Everything you described aligns with my understanding. 2 Fits the bill for transient errors for sure with REPLACE_THREAD.

Method 3 I think is perfect and is a really clean way to implement "filtering" (or the more useful, DLQ/DLTing) mid-stream.

It would allow a safe way to perform an encapsulated action with an escape hatch. In my experience, I have seen a lot of organizations treat DLQs as a manual review operation. Where a message entering an error queue/topic results in a notification and a review. A dedicated DLT from each stream component would complement this kind of strategy nicely, while also allowing for the same aggregate pass-along strategy that needs to be used today.

Method 3 feels REALLY clean to me, a spring property for each DLT, or a way to override the DltAwareProcessor and provide DLT configurations would be really nice. I thought .process was a terminating method for a stream, I'll take a closer look at that to make sure I understand the pseudo-code fully (if that's relevant).

Method 3 definitely stands out to me as the ideal solution, and a resolution for this piece of functionality that (looking over stack overflow threads and a few github issues) seem to be common enough.

Please let me know if there's anything I can do to help support this effort if it moves past the discussion phase (or during the discussion phase).

Our team takes advantage of Streams every opportunity we get, and we'd be able to build more robust and longer streams with more isolated components using functionality like what you described with Method 3.

@sobychacko
Copy link
Contributor

In that case, we will look at adding option - 3 as a feature from the framework (i.e., the Kafka Streams binder). Options 1 & 2 do not require framework-level changes since you can do them on the application side today. Until Kafka Streams 3.2, the process API calls were terminating operations. In order to continue, you needed to use transform methods to produce output. But starting from 3.3., they deprecated all the transform methods on KStream and made the process calls optionally non-terminating. It works by calling context#forward to propagate the records. If we don't call forward, no downstream operations will receive the data. This is the feature that I was planning to leverage in the DltAwareProcessor.

@StevenPG
Copy link
Contributor Author

That's phenomenal! Keep me looped in and let me know what I can contribute. I'm not super familiar with the code base but plan on diving in this weekend to start to get a better understanding of the internals.

We're going to be almost entirely Spring Boot 3 soon, so if it all plays out, we'll be able to take advantage of the feature in short order.

Good to know about the process calls, I have a bunch of Transformers that will need updating from the deprecation and it's good to know that in advance.

@sobychacko
Copy link
Contributor

Sounds good. We will let you know how it goes. Since it's a new component that we are adding to the binder, it will probably only be part of the latest SCSt (4.1.0 release based on Boot 3.2/compatible with 3.1/3.0 etc.). However, if the changes are not very intrusive and backward compatible, we could certainly back-port it to 4.0.x which is based on Boot 3.x.

@sobychacko
Copy link
Contributor

sobychacko commented Jul 21, 2023

@StevenPG To kickstart things towards a stable implementation for this feature, I started with some initial ideas via this commit. Please keep in mind that this is added as an experimental basis only with the very minimum needed for supporting your use case. However, we will appreciate your feedback on this.

You need to use the binder version 4.1.0-SNAPSHOT for this. You will automatically get this if you use the spring cloud version 2023.0.0-SNAPSHOT.

Following is how you would use this feature in the application:

@Bean
public Consumer<KStream<String, String>> errorStream(DltSenderContext dltSenderContext) {
    return input -> input
            // Remove odd numbers so we throw an exception on every other message
            .map((key, value) -> new KeyValue<>(key, Integer.parseInt(value)))
            .filter((key, value) -> (value & 1) == 0)
            .process(() -> new DltAwareProcessor<>((k, v) -> {
                throw new RuntimeException("fail");
            }, "hello-dlt-1", dltSenderContext))
            .map((key, value) -> {
               ...
            });
}

DltAwareProcessor constructor takes three arguments:

public DltAwareProcessor(BiFunction<KIn, VIn, KeyValue<KOut, VOut>> businessLogic, 
                         String dltDestination, DltSenderContext dltSenderContext) {
	
  1. First argument is your business logic via a BiFunction which takes key and value as arguments and produces a new KeyValue as output.
  2. Second argument is the name for the DLT (currently we only support a single partitioned topic for this)
  3. DltSenderContext which you need to provide as a bean in the application
@Bean
DltSenderContext dltSenderContext() {
    return new DltSenderContext();
}

This DltSenderContext bean is necessary as that is how the processor retrieves the spring context and the necessary components for sending to the DLT.

We will add these components once we agree on them - DltAwareProcessor and DltSenderContext - as part of the Kafka Streams binder API.

If you can, please take it for a spin and give us your feedback. We will take an iterative approach and improve based on the feedback.

@StevenPG
Copy link
Contributor Author

This is awesome! This fits our use-case perfectly.

I added an example in the repository I referenced earlier in a branch: https://github.com/StevenPG/scs-experimentation/blob/experimental/dltawareprocessor-test-drive/scs4-error-handling/error-handling/src/main/java/com/stevenpg/errorhandling/ErrorHandlingApplication.java

I tried to design the stream in the same way we do with our current implementations, using an external supplier to enforce a new instance and passing the class in before calling ::get for "good" readability (per my team's common patterns).

Regarding the timestamp comment, I'm not sure what a best practice would be. Though getting the current time I think should suffice for a use case like ours. We would just want to know roughly when the dlq message was created. If we need to know exactly when the failure occurred, we should grab the time during our error handling within the BiFunction and record that accurately.

The output is exactly as expected and handles expected internal exceptions just as you described:

If 1 < 5, throw error
If 2 < 5, throw error
If 3 < 5, throw error
If 4 < 5, throw error
5 > 5, no more errors, continuing
6 > 5, no more errors, continuing
7 > 5, no more errors, continuing
8 > 5, no more errors, continuing
9 > 5, no more errors, continuing
10 > 5, no more errors, continuing
11 > 5, no more errors, continuing
12 > 5, no more errors, continuing
13 > 5, no more errors, continuing
14 > 5, no more errors, continuing
15 > 5, no more errors, continuing
16 > 5, no more errors, continuing
17 > 5, no more errors, continuing
18 > 5, no more errors, continuing
Received dead letter message with val::196084492
Received dead letter message with val::2082210122
Received dead letter message with val::-1828379440
Received dead letter message with val::-190670826
Received dead letter message with val::-1024757230
19 > 5, no more errors, continuing
20 > 5, no more errors, continuing

The only things I think I could call out would be the following:

  • The ability to exclude dlq information, so that a message is just dropped and not sent anywhere. In environments where things are more fast and loose, or the application isn't permitted to create topics. It may be useful to avoid the context.forward but also not push to a dlq. This might go against the framework or good patterns, just throwing it out there (from prior experience in less open environments)

  • An autoconfigured DlqSenderContext, just for simplicity

These are really just what I can come up with that feel productive. The solution as-is is so succinct and does the job really well.

Let me know if there's anything else I can do or if there's additional test paths that would be valuable.

I'll be broaching the topic with the team on Monday as well, to try to get further feedback and discussion.

And again, thank you!!

@sobychacko
Copy link
Contributor

@StevenPG Thanks for the feedback. Having an external supplier looks like the right design choice. I added a commit with the following changes.

  1. An optional Supplier<Long> that takes a timestamp supplier to attach in the downstream forwarded records.
  2. DltSenderContext is now auto configured and conditional. You only need to inject it if you want the framework to handle the DLT sending. No more need to define this bean in the application.
  3. If you want to handle DLT independently of the framework, you can provide a BiConsumer<Record<KIn, VIn>, Exception> that gives you access to the record and the exception. The application will be responsible for all error handling.

If you go with option 2 and provides a DltSenderContext, then you also must provide a DLT destination topic. With option 3, you don't need to do that.

Here is how the DltAwareProcessor constructor looks for these scenarios:

For option 2 above:

.process(() -> new DltAwareProcessor<>((k, v) -> {
                throw new RuntimeException("fail");
            }, "hello-dlt-1", dltSenderContext))

With this, the downstream forwarded record will have a default timestamp of current time, however, you can add a Supplier<Long> that overrides that.

For option 3 above:

.process(() -> new DltAwareProcessor<>((k, v) -> {
                throw new RuntimeException("fail");
            }, (record, exception) -> { "Handle error" },
           <Optional Supplier<Long> for timestamp on downstream records>)

Please let us know your thoughts on these changes. Thank you!

@StevenPG
Copy link
Contributor Author

StevenPG commented Jul 24, 2023

I looked through the commit to get a clearer picture and I think it makes perfect sense to require an exceptionHandling consumer for when a DLT configuration is not provided. It's an excellent way to enforce good practice and also enable the developer to choose when to DLT and when to safely "ignore" an error.

The Supplier being optional is ideal, I feel like most users will likely (ourselves included) naively pass System::currentTimeMillis. So having that default is slick. Users that need the specific time config will have easy access to that configuration, and it seems really easy to follow from a documentation standpoint.

This is honestly phenomenal and resolves pretty much every presently unsupported use case I and my team have run into.

I was going to update the example code, but it looks like there hasn't been a snapshot update: https://repo.spring.io/ui/native/snapshot/org/springframework/cloud/spring-cloud-stream-binder-kafka-streams/4.1.0-SNAPSHOT/ (I'm not sure what the build/publish cadence is)

All good on my end, I think this is perfect.

I broached the thread here with my team and we all couldn't find any missing use-cases, this really handles the in-flow error handling perfectly. It'll also provide great motivation to move to 4.x+!

I'd love to contribute a handful of examples to the spring-cloud-streams-samples repository if/when this goes live in a release!

@sobychacko
Copy link
Contributor

Thanks for the quick feedback. I will clean up the implementation and add tests, documentation, etc. The 4.1.0-SNAPSHOT should be ready by now.

@StevenPG
Copy link
Contributor Author

Just to clarify my understanding of the release process, it looks like this is targeted for a full spring cloud release November 28th right? Just to follow-up and give my team a rough target date!

@sobychacko
Copy link
Contributor

@StevenPG Because of the Spring One conference and a few other commitments, we had to put this work on the back burner. But, we are ready to get back to it. I will get back to you soon with more updates.

@sobychacko
Copy link
Contributor

@StevenPG I cleaned up the code and added tests, docs, etc. I will close this issue, but I would appreciate it if you could look at the state of this support and let us know any additional feedback. In particular, I wonder if we should call this DltAwareProcessor since we allow the users to override the entire DLT mechanism and bring their own recovery mechanism. When and if they do that, this will not be a DLT-based recovery but a generic recovery. If you want to brainstorm this idea further or have additional feedback, I suggest creating a new issue. Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants