Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When virtual threads are enabled, auto-configure Kafka listener container factories to use a virtual thread based executor #36396

Closed
wilkinsona opened this issue Jul 13, 2023 · 9 comments
Assignees
Labels
type: enhancement A general enhancement
Milestone

Comments

@wilkinsona
Copy link
Member

Through containerFactory.getContainerProperties().setListenerTaskExecutor(...), the factory can be configured with an AsyncTaskExecutor. We should see if we can auto-configure this, particularly when virtual threads are enabled as it could then be configured with a VirtualThreadTaskExecutor.

@wilkinsona wilkinsona added the type: enhancement A general enhancement label Jul 13, 2023
@wilkinsona wilkinsona added this to the 3.2.x milestone Jul 13, 2023
@mhalbritter mhalbritter self-assigned this Aug 2, 2023
@mhalbritter mhalbritter changed the title Investigate auto-configuring Kafka listener container factories with an AsyncTaskExecutor When virtual threads are enabled, auto-configure Kafka listener container factories to use a virtual thread based executor Aug 2, 2023
@mhalbritter mhalbritter modified the milestones: 3.2.x, 3.2.0-M2 Aug 2, 2023
@mhalbritter
Copy link
Contributor

When virtual threads are enabled, we configure the container to use a SimpleAsyncTaskExecutor with virtual threads.

@ultramagnus94
Copy link

ultramagnus94 commented Feb 16, 2024

I'm using Spring Boot 3.2.2
When I log in the method annotated by @KafkaListener, this still produces false:

@KafkaListener(topics = "my-topic")
public void consume(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION) int partition, @Header(KafkaHeaders.OFFSET) int offset) {
    log.info("{}", Thread.currentThread().isVirtual()); // false
    consumerService.process(message);
}

@mhalbritter
Copy link
Contributor

Do you have a small sample application which I could use to reproduce that?

@ultramagnus94
Copy link

Due to my company policy, I cannot upload any source code here. I'll do it when I'm home.
But it's quite simple
Btw, I'm using spring-kafka version 3.1.1

@ultramagnus94
Copy link

ultramagnus94 commented Feb 19, 2024

After several tests, I think I found the problem.
If I use default consumer factory, it will use virtual thread but I had a custom ConsumerFactory bean:

@Bean
@Primary
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "my bootstrap server address");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "my group id");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 120000);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
    props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
    props.put(SECURITY_PROTOCOL, "SASL_PLAINTEXT");
    props.put(SECURITY_MECHANISM, "PLAIN");
    props.put(SASL_JAAS_CONFIG, "my sasl configuration");
    return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
@Primary
public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory(ConsumerFactory<String, String> consumerFactory) {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    return factory;
}

I think this causes the problem.
Btw, is there any configuration for this to use virtual thread ?

@mhalbritter
Copy link
Contributor

mhalbritter commented Feb 19, 2024

You can call ConcurrentKafkaListenerContainerFactory.setListenerTaskExecutor on your ConcurrentKafkaListenerContainerFactory to supply it with a SimpleAsyncTaskExecutor which has SimpleAsyncTaskExecutor.setVirtualThreads(true) enabled. That's what our auto-configuration does.

@Moscagus
Copy link

Moscagus commented Feb 26, 2024

I'm using Spring Boot 3.2.0.
I also set SimpleAsyncTaskExecutor.setVirtualThreads(true).
The problem I see is that validating with jcmd and jstack the listeners are tied to a platform thread forever. Therefore it does not make effective use of virtual threads.
Without messages in Kafka and with a concurrency of 10, I have 10 platform threads transporting the virtual threads.

[oracle@vtuerto kafka-2602]$ cat jstack.txt | grep -A 1 ForkJoinPool-
"ForkJoinPool-1-worker-1" #39 [99] daemon prio=5 os_prio=0 cpu=2217.82ms elapsed=1798.28s allocated=136M defined_classes=707 tid=0x00007f49ef305fc0  [0x00007f49c3bfd000]
   **Carrying virtual thread #59**
--
"ForkJoinPool-1-worker-2" #40 [100] daemon prio=5 os_prio=0 cpu=1901.63ms elapsed=1798.27s allocated=94034K defined_classes=125 tid=0x00005607582c69e0  [0x00007f49c3b7d000]
   **Carrying virtual thread #43**
--
"ForkJoinPool-1-worker-3" #41 [101] daemon prio=5 os_prio=0 cpu=1723.87ms elapsed=1798.27s allocated=54037K defined_classes=275 tid=0x0000560758173da0 nid=101 waiting on condition  [0x00007f49c3afe000]
   java.lang.Thread.State: TIMED_WAITING (parking)
--
"ForkJoinPool-1-worker-4" #48 [105] daemon prio=5 os_prio=0 cpu=1803.04ms elapsed=1797.83s allocated=91141K defined_classes=60 tid=0x00007f49ec4692d0  [0x00007f49c3831000]
   **Carrying virtual thread #45**
--
"ForkJoinPool-1-worker-5" #55 [111] daemon prio=5 os_prio=0 cpu=1706.78ms elapsed=1797.62s allocated=87998K defined_classes=0 tid=0x00005607577a4cd0  [0x00007f49c3531000]
   **Carrying virtual thread #75**
--
"ForkJoinPool-1-worker-6" #56 [112] daemon prio=5 os_prio=0 cpu=4311.24ms elapsed=1797.61s allocated=263M defined_classes=2987 tid=0x00007f49ef397e90 nid=112 waiting on condition  [0x00007f49c34b2000]
   java.lang.Thread.State: WAITING (parking)
--
"ForkJoinPool-1-worker-7" #63 [117] daemon prio=5 os_prio=0 cpu=1725.83ms elapsed=1797.46s allocated=87727K defined_classes=4 tid=0x0000560757b28c60  [0x00007f49c3231000]
   **Carrying virtual thread #50**
--
"ForkJoinPool-1-worker-8" #64 [118] daemon prio=5 os_prio=0 cpu=1913.82ms elapsed=1797.43s allocated=92419K defined_classes=0 tid=0x00007f49ef3d1c40  [0x00007f49c31b1000]
   **Carrying virtual thread #47**
--
"ForkJoinPool-1-worker-9" #68 [121] daemon prio=5 os_prio=0 cpu=1762.17ms elapsed=1797.40s allocated=85613K defined_classes=0 tid=0x00005607577a22e0  [0x00007f49beffd000]
   **Carrying virtual thread #67**
--
"ForkJoinPool-1-worker-10" #72 [124] daemon prio=5 os_prio=0 cpu=1732.98ms elapsed=1797.35s allocated=90651K defined_classes=0 tid=0x00007f49ef3e71a0  [0x00007f49bee7d000]
   **Carrying virtual thread #62**
--
"ForkJoinPool-1-worker-11" #76 [127] daemon prio=5 os_prio=0 cpu=1917.60ms elapsed=1797.31s allocated=83567K defined_classes=103 tid=0x000056075837ffd0  [0x00007f49becfd000]
   **Carrying virtual thread #71**
--
"ForkJoinPool-1-worker-12" #82 [130] daemon prio=5 os_prio=0 cpu=1524.40ms elapsed=1789.37s allocated=52890K defined_classes=3 tid=0x0000560757f3f7d0  [0x00007f49beb7d000]
   **Carrying virtual thread #38**
--
"ForkJoinPool-1-worker-13" #92 [137] daemon prio=5 os_prio=0 cpu=2644.57ms elapsed=1769.39s allocated=106M defined_classes=934 tid=0x00007f49ec1a6a60 nid=137 waiting on condition  [0x00007f49be7fe000]
   java.lang.Thread.State: WAITING (parking)

@wilkinsona
Copy link
Member Author

@Moscagus If you have a question about configuring Spring Kafka, please ask on Stack Overflow. If you believe you've found a bug in how Spring Kafka manages its threads, please open a Spring Kafka issue.

@Moscagus
Copy link

Ok thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: enhancement A general enhancement
Projects
None yet
Development

No branches or pull requests

4 participants