Skip to content

Conversation

garyrussell
Copy link
Contributor

@garyrussell garyrussell commented Aug 15, 2017

Resolves #916
Resolves #802

  • register a pubsub error channel.
  • register and subscribe a bridge handler to bridge it to the global error channel.
  • pass the error channel to the implementation so it can wire it into the outbound endpoint.
  • destroy the infrastructure when unbinding.

Resolves #916

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@viniciusccarvalho viniciusccarvalho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with the changes, I hope we can review this on 2.0. I have a few reservations, but for now I'll keep it to myself until we get a change to review the internal guts of AbstractBinder, AbstractMessageChannelBinder and Consumer/ProducerDestination classes

@garyrussell
Copy link
Contributor Author

Hold off merging; I will add some tests.

garyrussell added a commit to garyrussell/spring-cloud-stream-binder-rabbit that referenced this pull request Aug 16, 2017
Requires: spring-cloud/spring-cloud-stream#1039

Publish errors (returned and nack'd messages) to the error channel.
garyrussell added a commit to garyrussell/spring-cloud-stream-binder-rabbit that referenced this pull request Aug 16, 2017
Requires: spring-cloud/spring-cloud-stream#1039

Publish errors (returned and nack'd messages) to the error channel.
garyrussell added a commit to garyrussell/spring-cloud-stream-binder-kafka that referenced this pull request Aug 16, 2017
@sobychacko
Copy link
Contributor

@garyrussell how do these changes correlate to when someone set something like spring.cloud.stream.bindings.error.destination=error-dest. In that case, I think we already create a bridge handler and subscribe to the error channel and send any errors from the error channel to the destination. Referring to this recent commit: 22141a8.

It may not be related, but I am just curious.

@garyrussell
Copy link
Contributor Author

garyrussell commented Aug 16, 2017

@sobychacko Interesting; I wasn't aware of that. They're (sort of) related, but not really.

Currently, the binders don't send errors to the Spring Integration global errorChannel.

It looks to me like that code binds the global errorChannel to a destination (enabling errors to be sent to a binder destination). This would only kick in if the user app publishes to the error channel somehow.

This PR registers an error channel for (async) errors for producer destinations. (And this one, which is already merged, does the same for consumer destinations). On the consumer side, this publication is performed regardless. DLQ processing now consumes from that channel before sending to a DLQ. On the producer side, the error channel is disabled by default because this adds overhead (in the case of Rabbit, but not Kafka).

In both cases, those error channels are bridged to the global errorChannel.

This allows the user to consume producer and consumer errors, either individually or collectively (with a normal SI flow).

Since the global errorChannel is a pubsub channel, these errors will also go to that binding as well (spring.cloud.stream.bindings.error.destination=error-dest), if present.

@sobychacko
Copy link
Contributor

Thanks, @garyrussell for the explanation. That code actually binds a bridged channel from the error channel to the destination. The bridge channel goes through our normal channel configuration for content type conversion etc.

Initial Commit for spring-cloudGH-916.

- register a pubsub error channel.
- register and subscribe a bridge handler to bridge it to the global error channel.
- pass the error channel to the implementation so it can wire it into the outbound endpoint.
- destroy the infrastructure when unbinding.
garyrussell added a commit to garyrussell/spring-cloud-stream-binder-rabbit that referenced this pull request Aug 17, 2017
Requires: spring-cloud/spring-cloud-stream#1039

Publish errors (returned and nack'd messages) to the error channel.
@viniciusccarvalho
Copy link
Contributor

Merged via 0eab8a5

viniciusccarvalho pushed a commit to spring-attic/spring-cloud-stream-binder-rabbit that referenced this pull request Aug 22, 2017
Requires: spring-cloud/spring-cloud-stream#1039

Publish errors (returned and nack'd messages) to the error channel.

Add docs

Test Polishing
viniciusccarvalho pushed a commit to viniciusccarvalho/spring-cloud-stream-binder-kafka that referenced this pull request Aug 22, 2017
SCSt-spring-atticGH-916: Configure Producer Error Channel

Requires: spring-cloud/spring-cloud-stream#1039

Publish send failures to the error channel.

Add docs

Revert to Spring Kafka 1.1.6
sobychacko pushed a commit to sobychacko/spring-cloud-stream-binder-kafka that referenced this pull request Sep 20, 2017
SCSt-spring-atticGH-916: Configure Producer Error Channel

Requires: spring-cloud/spring-cloud-stream#1039

Publish send failures to the error channel.

Add docs

Revert to Spring Kafka 1.1.6
artembilan pushed a commit to spring-attic/spring-cloud-stream-binder-kafka that referenced this pull request Sep 29, 2017
fixes #193

Integration missed commits and provide some polishing, improvements and fixes

Remove `resetOffsets` option

Fix #170

Use parent version for spring-cloud-build-tools

Add update version script

Fixes for consumer and producer property propagation

Fix #142 #129 #156 #162

- Remove conditional configuration for Boot 1.4 support
- Filter properties before creating consumer and producer property sets
- Restore `configuration` as Map<String,String> for fixing Boot binding
- Remove 0.9 tests

SCSt-GH-913: Error Handling via ErrorChannel

Relates to spring-cloud/spring-cloud-stream#913

Fixes #162

- configure an ErrorMessageSendingRecoverer to send errors to an error channel, whether or not retry is enabled.

Change Test Binder to use a Fully Wired Integration Context

- logging handler subscribed to errorChannel

Rebase; revert s-k to 1.1.x, Kafka to 0.10.1.1

Remove dependency overrides.

POM structure corrections

- move all intra-project deps to dependency management
- remove redundant overrides of Spring Integration Kafka

Remove reference to deleted module

- `spring-cloud-stream-binder-kafka-test-support` was previously
   removed, but it was still added as an unused dependency to the
   project

Remove duplicate debug statement.

unless you really really want to make sure users see this :)

GH-144: Add Kafka Streams Binder

Fix #144

Addressing some PR reviews

Remove java 8 lambada expressions from KStreamBoundElementFactory

Initial - add support for serdes per binding

Fixing checkstyle issues
test source 1.8

Convert integration tests to use Java 7

Internal refactoring

Remove payload serde code in KStreamBoundElementFactory and reuse it from core

Addressing PR comments

cleanup around payload deserialization

Update to latest serialization logic

Extract common properites class for KStream producer/consumer

Addressing PR review comments

* Remove redundant dependencies for KStream Binder

Documentation for KStream binder

* Documentation for KStream binder

Fix #160

* Addressing PR review comments

* Addressing PR review comments

* Addressing PR review comments

Fixes #181

SCSt-GH-916: Configure Producer Error Channel

Requires: spring-cloud/spring-cloud-stream#1039

Publish send failures to the error channel.

Add docs

Revert to Spring Kafka 1.1.6

GH-62: Remove Tuple Kryo Registrar Wrapper

Resolves #62

No longer needed.

GH-169: Use the Actual Partition Count (Producer)

Fixes #169

If the configured `partitionCount` is less than the physical partition count on an existing
topic, the binder emits this message:

    The `partitionCount` of the producer for topic partJ.0 is 3, smaller than the actual partition count of 8 of the topic.
    The larger number will be used instead.

However, that is not true; the configured partition count is used.

Override the configured partition count with the actual partition count.

0.11 Binder

Initial Commit

- Transactional Binder

Version Updates

- Headers support

KStreams and 0.11

GH-188: KStream Binder Properties

KStream binder: support class for application level properties

Provide commonly used KStream application properties for convenient access at runtime

Fix #188

Since windowing operations are common in KStream applications, making the TimeWindows object
avaiable as a first class bean (using auto configuration). This bean is only created if the
relevant properties are provided by the user.

Kstream binder: producer default Serde changes

Change the way the default Serde classes are selected for key and value
in producer when only one of those is provided by the user.

Fix #190

KStream binder cleanup,
merge cleanup

re-update kafka version

2.0 related changes

Fix tests
Upgrade Kstream tests

converting anonymous classes to lambda expressions

Renaming Kafka-11 qualifier from test module
Refactoring test class names

cleanup
adding .jdk8 files

Fix KafkaBinderMetrics in 2.0

Fix #199

Addressing PR review comments

Addressing PR review comments
sobychacko added a commit to sobychacko/spring-cloud-stream-binder-kafka that referenced this pull request Oct 5, 2017
fixes spring-attic#193

Integration missed commits and provide some polishing, improvements and fixes

Remove `resetOffsets` option

Fix spring-attic#170

Use parent version for spring-cloud-build-tools

Add update version script

Fixes for consumer and producer property propagation

Fix spring-attic#142 spring-attic#129 spring-attic#156 spring-attic#162

- Remove conditional configuration for Boot 1.4 support
- Filter properties before creating consumer and producer property sets
- Restore `configuration` as Map<String,String> for fixing Boot binding
- Remove 0.9 tests

SCSt-spring-atticGH-913: Error Handling via ErrorChannel

Relates to spring-cloud/spring-cloud-stream#913

Fixes spring-attic#162

- configure an ErrorMessageSendingRecoverer to send errors to an error channel, whether or not retry is enabled.

Change Test Binder to use a Fully Wired Integration Context

- logging handler subscribed to errorChannel

Rebase; revert s-k to 1.1.x, Kafka to 0.10.1.1

Remove dependency overrides.

POM structure corrections

- move all intra-project deps to dependency management
- remove redundant overrides of Spring Integration Kafka

Remove reference to deleted module

- `spring-cloud-stream-binder-kafka-test-support` was previously
   removed, but it was still added as an unused dependency to the
   project

Remove duplicate debug statement.

unless you really really want to make sure users see this :)

spring-atticGH-144: Add Kafka Streams Binder

Fix spring-attic#144

Addressing some PR reviews

Remove java 8 lambada expressions from KStreamBoundElementFactory

Initial - add support for serdes per binding

Fixing checkstyle issues
test source 1.8

Convert integration tests to use Java 7

Internal refactoring

Remove payload serde code in KStreamBoundElementFactory and reuse it from core

Addressing PR comments

cleanup around payload deserialization

Update to latest serialization logic

Extract common properites class for KStream producer/consumer

Addressing PR review comments

* Remove redundant dependencies for KStream Binder

Documentation for KStream binder

* Documentation for KStream binder

Fix spring-attic#160

* Addressing PR review comments

* Addressing PR review comments

* Addressing PR review comments

Fixes spring-attic#181

SCSt-spring-atticGH-916: Configure Producer Error Channel

Requires: spring-cloud/spring-cloud-stream#1039

Publish send failures to the error channel.

Add docs

Revert to Spring Kafka 1.1.6

spring-atticGH-62: Remove Tuple Kryo Registrar Wrapper

Resolves spring-attic#62

No longer needed.

spring-atticGH-169: Use the Actual Partition Count (Producer)

Fixes spring-attic#169

If the configured `partitionCount` is less than the physical partition count on an existing
topic, the binder emits this message:

    The `partitionCount` of the producer for topic partJ.0 is 3, smaller than the actual partition count of 8 of the topic.
    The larger number will be used instead.

However, that is not true; the configured partition count is used.

Override the configured partition count with the actual partition count.

0.11 Binder

Initial Commit

- Transactional Binder

Version Updates

- Headers support

KStreams and 0.11

spring-atticGH-188: KStream Binder Properties

KStream binder: support class for application level properties

Provide commonly used KStream application properties for convenient access at runtime

Fix spring-attic#188

Since windowing operations are common in KStream applications, making the TimeWindows object
avaiable as a first class bean (using auto configuration). This bean is only created if the
relevant properties are provided by the user.

Kstream binder: producer default Serde changes

Change the way the default Serde classes are selected for key and value
in producer when only one of those is provided by the user.

Fix spring-attic#190

KStream binder cleanup,
merge cleanup

re-update kafka version

2.0 related changes

Fix tests
Upgrade Kstream tests

converting anonymous classes to lambda expressions

Renaming Kafka-11 qualifier from test module
Refactoring test class names

cleanup
adding .jdk8 files

Fix KafkaBinderMetrics in 2.0

Fix spring-attic#199

Addressing PR review comments

Addressing PR review comments
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants