|
29 | 29 | import org.apache.pulsar.common.schema.SchemaType; |
30 | 30 | import org.junit.jupiter.api.Nested; |
31 | 31 | import org.junit.jupiter.api.Test; |
| 32 | +import org.junit.jupiter.api.condition.EnabledForJreRange; |
| 33 | +import org.junit.jupiter.api.condition.JRE; |
32 | 34 | import org.junit.jupiter.params.ParameterizedTest; |
33 | 35 | import org.junit.jupiter.params.provider.ValueSource; |
34 | 36 |
|
|
39 | 41 | import org.springframework.context.annotation.Bean; |
40 | 42 | import org.springframework.context.annotation.Configuration; |
41 | 43 | import org.springframework.core.annotation.Order; |
| 44 | +import org.springframework.core.task.VirtualThreadTaskExecutor; |
42 | 45 | import org.springframework.pulsar.annotation.PulsarBootstrapConfiguration; |
43 | 46 | import org.springframework.pulsar.annotation.PulsarListenerAnnotationBeanPostProcessor; |
44 | 47 | import org.springframework.pulsar.annotation.PulsarReaderAnnotationBeanPostProcessor; |
@@ -464,6 +467,27 @@ void whenObservationsDisabledDoesNotEnableObservation() { |
464 | 467 | .hasFieldOrPropertyWithValue("containerProperties.observationEnabled", false)); |
465 | 468 | } |
466 | 469 |
|
| 470 | + @Test |
| 471 | + @EnabledForJreRange(min = JRE.JAVA_21) |
| 472 | + void whenVirtualThreadsAreEnabledOnJava21AndLaterListenerContainerShouldUseVirtualThreads() { |
| 473 | + this.contextRunner.withPropertyValues("spring.threads.virtual.enabled=true").run((context) -> { |
| 474 | + ConcurrentPulsarListenerContainerFactory<?> factory = context |
| 475 | + .getBean(ConcurrentPulsarListenerContainerFactory.class); |
| 476 | + assertThat(factory.getContainerProperties().getConsumerTaskExecutor()) |
| 477 | + .isInstanceOf(VirtualThreadTaskExecutor.class); |
| 478 | + }); |
| 479 | + } |
| 480 | + |
| 481 | + @Test |
| 482 | + @EnabledForJreRange(max = JRE.JAVA_20) |
| 483 | + void whenVirtualThreadsAreEnabledOnJava20AndEarlierListenerContainerShouldNotUseVirtualThreads() { |
| 484 | + this.contextRunner.withPropertyValues("spring.threads.virtual.enabled=true").run((context) -> { |
| 485 | + ConcurrentPulsarListenerContainerFactory<?> factory = context |
| 486 | + .getBean(ConcurrentPulsarListenerContainerFactory.class); |
| 487 | + assertThat(factory.getContainerProperties().getConsumerTaskExecutor()).isNull(); |
| 488 | + }); |
| 489 | + } |
| 490 | + |
467 | 491 | } |
468 | 492 |
|
469 | 493 | @Nested |
@@ -498,6 +522,27 @@ <T> void whenHasUserDefinedCustomizersAppliesInCorrectOrder() { |
498 | 522 | }); |
499 | 523 | } |
500 | 524 |
|
| 525 | + @Test |
| 526 | + @EnabledForJreRange(min = JRE.JAVA_21) |
| 527 | + void whenVirtualThreadsAreEnabledOnJava21AndLaterReaderShouldUseVirtualThreads() { |
| 528 | + this.contextRunner.withPropertyValues("spring.threads.virtual.enabled=true").run((context) -> { |
| 529 | + DefaultPulsarReaderContainerFactory<?> factory = context |
| 530 | + .getBean(DefaultPulsarReaderContainerFactory.class); |
| 531 | + assertThat(factory.getContainerProperties().getReaderTaskExecutor()) |
| 532 | + .isInstanceOf(VirtualThreadTaskExecutor.class); |
| 533 | + }); |
| 534 | + } |
| 535 | + |
| 536 | + @Test |
| 537 | + @EnabledForJreRange(max = JRE.JAVA_20) |
| 538 | + void whenVirtualThreadsAreEnabledOnJava20AndEarlierReaderShouldNotUseVirtualThreads() { |
| 539 | + this.contextRunner.withPropertyValues("spring.threads.virtual.enabled=true").run((context) -> { |
| 540 | + DefaultPulsarReaderContainerFactory<?> factory = context |
| 541 | + .getBean(DefaultPulsarReaderContainerFactory.class); |
| 542 | + assertThat(factory.getContainerProperties().getReaderTaskExecutor()).isNull(); |
| 543 | + }); |
| 544 | + } |
| 545 | + |
501 | 546 | @TestConfiguration(proxyBeanMethods = false) |
502 | 547 | static class ReaderBuilderCustomizersConfig { |
503 | 548 |
|
|
0 commit comments