Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KMLC Don't Call emergencyStop on StopAfterFenceException #1633

Closed
garyrussell opened this issue Nov 20, 2020 · 0 comments · Fixed by #1634
Closed

KMLC Don't Call emergencyStop on StopAfterFenceException #1633

garyrussell opened this issue Nov 20, 2020 · 0 comments · Fixed by #1634

Comments

@garyrussell
Copy link
Contributor

garyrussell commented Nov 20, 2020

Only stop the child container.

@garyrussell garyrussell added this to the 2.6.4 milestone Nov 20, 2020
@garyrussell garyrussell self-assigned this Nov 20, 2020
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Nov 20, 2020
Resolves spring-projects#1633

Only the child container instance should be stopped when a producer is
fenced.

Tested with a Boot Application:

```java
@SpringBootApplication
public class Kgh1633Application {

	public static void main(String[] args) {
		SpringApplication.run(Kgh1633Application.class, args);
	}

	@Autowired
	KafkaTemplate<String, String> template;

	@KafkaListener(id = "kgh1633a", topics = "kgh1633")
	public void listen(String in) throws InterruptedException {
		System.out.println(in);
		this.template.send("kgh1633-1", "foo");
		Thread.sleep(10_000);
	}

	@bean
	public NewTopic topic1() {
		return TopicBuilder.name("kgh1633").partitions(1).replicas(1).build();
	}

	@bean
	public NewTopic topic2() {
		return TopicBuilder.name("kgh1633-1").partitions(1).replicas(1).build();
	}

//	@eventlistener
//	public void started(ConsumerStartedEvent event) {
//		// temporary until spring-projectsGH-1633 is fixed
//		KafkaMessageListenerContainer<?, ?> container = (KafkaMessageListenerContainer<?, ?>) event.getSource();
//		container.setEmergencyStop(() -> container.stop(() -> { }));
//	}

	@eventlistener
	public void stopped(ConsumerStoppedEvent event) {
		((KafkaMessageListenerContainer<?, ?>) event.getSource()).start();
	}

}

@component
class Customizer {

	Customizer(AbstractKafkaListenerContainerFactory<?, ?, ?> factory) {
		factory.getContainerProperties().setStopContainerWhenFenced(true);
	}

}
```
```properties
spring.kafka.producer.transaction-id-prefix=tx-
spring.kafka.producer.acks=all
spring.kafka.producer.properties.transaction.timeout.ms=5000
```
artembilan pushed a commit that referenced this issue Nov 20, 2020
Resolves #1633

Only the child container instance should be stopped when a producer is
fenced.

Tested with a Boot Application:

```java
@SpringBootApplication
public class Kgh1633Application {

	public static void main(String[] args) {
		SpringApplication.run(Kgh1633Application.class, args);
	}

	@Autowired
	KafkaTemplate<String, String> template;

	@KafkaListener(id = "kgh1633a", topics = "kgh1633")
	public void listen(String in) throws InterruptedException {
		System.out.println(in);
		this.template.send("kgh1633-1", "foo");
		Thread.sleep(10_000);
	}

	@bean
	public NewTopic topic1() {
		return TopicBuilder.name("kgh1633").partitions(1).replicas(1).build();
	}

	@bean
	public NewTopic topic2() {
		return TopicBuilder.name("kgh1633-1").partitions(1).replicas(1).build();
	}

//	@eventlistener
//	public void started(ConsumerStartedEvent event) {
//		// temporary until GH-1633 is fixed
//		KafkaMessageListenerContainer<?, ?> container = (KafkaMessageListenerContainer<?, ?>) event.getSource();
//		container.setEmergencyStop(() -> container.stop(() -> { }));
//	}

	@eventlistener
	public void stopped(ConsumerStoppedEvent event) {
		((KafkaMessageListenerContainer<?, ?>) event.getSource()).start();
	}

}

@component
class Customizer {

	Customizer(AbstractKafkaListenerContainerFactory<?, ?, ?> factory) {
		factory.getContainerProperties().setStopContainerWhenFenced(true);
	}

}
```
```properties
spring.kafka.producer.transaction-id-prefix=tx-
spring.kafka.producer.acks=all
spring.kafka.producer.properties.transaction.timeout.ms=5000
```
garyrussell added a commit that referenced this issue Nov 20, 2020
Resolves #1633

Only the child container instance should be stopped when a producer is
fenced.

Tested with a Boot Application:

```java
@SpringBootApplication
public class Kgh1633Application {

	public static void main(String[] args) {
		SpringApplication.run(Kgh1633Application.class, args);
	}

	@Autowired
	KafkaTemplate<String, String> template;

	@KafkaListener(id = "kgh1633a", topics = "kgh1633")
	public void listen(String in) throws InterruptedException {
		System.out.println(in);
		this.template.send("kgh1633-1", "foo");
		Thread.sleep(10_000);
	}

	@bean
	public NewTopic topic1() {
		return TopicBuilder.name("kgh1633").partitions(1).replicas(1).build();
	}

	@bean
	public NewTopic topic2() {
		return TopicBuilder.name("kgh1633-1").partitions(1).replicas(1).build();
	}

//	@eventlistener
//	public void started(ConsumerStartedEvent event) {
//		// temporary until GH-1633 is fixed
//		KafkaMessageListenerContainer<?, ?> container = (KafkaMessageListenerContainer<?, ?>) event.getSource();
//		container.setEmergencyStop(() -> container.stop(() -> { }));
//	}

	@eventlistener
	public void stopped(ConsumerStoppedEvent event) {
		((KafkaMessageListenerContainer<?, ?>) event.getSource()).start();
	}

}

@component
class Customizer {

	Customizer(AbstractKafkaListenerContainerFactory<?, ?, ?> factory) {
		factory.getContainerProperties().setStopContainerWhenFenced(true);
	}

}
```
```properties
spring.kafka.producer.transaction-id-prefix=tx-
spring.kafka.producer.acks=all
spring.kafka.producer.properties.transaction.timeout.ms=5000
```
garyrussell added a commit that referenced this issue Nov 20, 2020
Resolves #1633

- don't block when self-stopping after producer is fenced
- pass the exception into wrapUp to properly set the stopped reason

**chery-pick to 2.5.x**
garyrussell added a commit that referenced this issue Nov 20, 2020
Resolves #1633

- don't block when self-stopping after producer is fenced
- pass the exception into wrapUp to properly set the stopped reason

**chery-pick to 2.5.x**
garyrussell added a commit that referenced this issue Nov 23, 2020
Cannot reduce complexity - catch clauses
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant