Skip to content

Commit

Permalink
Polish "Set virtual thread names for RabbitMQ and Pulsar"
Browse files Browse the repository at this point in the history
  • Loading branch information
mhalbritter committed Mar 18, 2024
1 parent ecda754 commit 09652cb
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,6 @@
@ConditionalOnClass(EnableRabbit.class)
class RabbitAnnotationDrivenConfiguration {

/**
* Default Name of the thread created for simple rabbit listener.
*/
public static final String THREADNAME_RABBIT_SIMPLE = "rabbit-simple-";

/**
* Default Name of the thread created for direct rabbit listener.
*/
public static final String THREADNAME_RABBIT_DIRECT = "rabbit-direct-";

private final ObjectProvider<MessageConverter> messageConverter;

private final ObjectProvider<MessageRecoverer> messageRecoverer;
Expand Down Expand Up @@ -86,7 +76,7 @@ SimpleRabbitListenerContainerFactoryConfigurer simpleRabbitListenerContainerFact
@ConditionalOnThreading(Threading.VIRTUAL)
SimpleRabbitListenerContainerFactoryConfigurer simpleRabbitListenerContainerFactoryConfigurerVirtualThreads() {
SimpleRabbitListenerContainerFactoryConfigurer configurer = simpleListenerConfigurer();
configurer.setTaskExecutor(new VirtualThreadTaskExecutor(THREADNAME_RABBIT_SIMPLE));
configurer.setTaskExecutor(new VirtualThreadTaskExecutor("rabbit-simple-"));
return configurer;
}

Expand Down Expand Up @@ -115,7 +105,7 @@ DirectRabbitListenerContainerFactoryConfigurer directRabbitListenerContainerFact
@ConditionalOnThreading(Threading.VIRTUAL)
DirectRabbitListenerContainerFactoryConfigurer directRabbitListenerContainerFactoryConfigurerVirtualThreads() {
DirectRabbitListenerContainerFactoryConfigurer configurer = directListenerConfigurer();
configurer.setTaskExecutor(new VirtualThreadTaskExecutor(THREADNAME_RABBIT_DIRECT));
configurer.setTaskExecutor(new VirtualThreadTaskExecutor("rabbit-direct-"));
return configurer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,9 @@
@Import(PulsarConfiguration.class)
public class PulsarAutoConfiguration {

/**
* Default Name of the thread created for pulsar consumer.
*/
public static final String THREADNAME_PULSAR_CONSUMER = "pulsar-consumer-";
private final PulsarProperties properties;

/**
* Default Name of the thread created for pulsar task executor.
*/
public static final String THREADNAME_PULSAR_TASKEXECUTOR = "pulsar-taskexecutor-";

private PulsarProperties properties;

private PulsarPropertiesMapper propertiesMapper;
private final PulsarPropertiesMapper propertiesMapper;

PulsarAutoConfiguration(PulsarProperties properties) {
this.properties = properties;
Expand Down Expand Up @@ -168,7 +158,7 @@ ConcurrentPulsarListenerContainerFactory<?> pulsarListenerContainerFactory(
containerProperties.setSchemaResolver(schemaResolver);
containerProperties.setTopicResolver(topicResolver);
if (Threading.VIRTUAL.isActive(environment)) {
containerProperties.setConsumerTaskExecutor(new VirtualThreadTaskExecutor(THREADNAME_PULSAR_CONSUMER));
containerProperties.setConsumerTaskExecutor(new VirtualThreadTaskExecutor("pulsar-consumer-"));
}
this.propertiesMapper.customizeContainerProperties(containerProperties);
return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties);
Expand Down Expand Up @@ -199,8 +189,7 @@ DefaultPulsarReaderContainerFactory<?> pulsarReaderContainerFactory(PulsarReader
PulsarReaderContainerProperties readerContainerProperties = new PulsarReaderContainerProperties();
readerContainerProperties.setSchemaResolver(schemaResolver);
if (Threading.VIRTUAL.isActive(environment)) {
readerContainerProperties
.setReaderTaskExecutor(new VirtualThreadTaskExecutor(THREADNAME_PULSAR_TASKEXECUTOR));
readerContainerProperties.setReaderTaskExecutor(new VirtualThreadTaskExecutor("pulsar-reader-"));
}
this.propertiesMapper.customizeReaderContainerProperties(readerContainerProperties);
return new DefaultPulsarReaderContainerFactory<>(pulsarReaderFactory, readerContainerProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,11 +552,10 @@ void shouldConfigureVirtualThreadsForSimpleListener() {
.getBean("rabbitListenerContainerFactory", SimpleRabbitListenerContainerFactory.class);
assertThat(rabbitListenerContainerFactory).extracting("taskExecutor")
.isInstanceOf(VirtualThreadTaskExecutor.class);
final var taskExecutor = ReflectionTestUtils.getField(rabbitListenerContainerFactory, "taskExecutor");
final var virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory");
final var threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class));
assertThat(threadCreated.getName())
.containsPattern(RabbitAnnotationDrivenConfiguration.THREADNAME_RABBIT_SIMPLE + "[0-9]*");
Object taskExecutor = ReflectionTestUtils.getField(rabbitListenerContainerFactory, "taskExecutor");
Object virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory");
Thread threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class));
assertThat(threadCreated.getName()).containsPattern("rabbit-simple-[0-9]+");

});
}
Expand All @@ -570,11 +569,10 @@ void shouldConfigureVirtualThreadsForDirectListener() {
DirectRabbitListenerContainerFactoryConfigurer.class);
assertThat(rabbitListenerContainerFactory).extracting("taskExecutor")
.isInstanceOf(VirtualThreadTaskExecutor.class);
final var taskExecutor = ReflectionTestUtils.getField(rabbitListenerContainerFactory, "taskExecutor");
final var virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory");
final var threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class));
assertThat(threadCreated.getName())
.containsPattern(RabbitAnnotationDrivenConfiguration.THREADNAME_RABBIT_DIRECT + "[0-9]*");
Object taskExecutor = ReflectionTestUtils.getField(rabbitListenerContainerFactory, "taskExecutor");
Object virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory");
Thread threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class));
assertThat(threadCreated.getName()).containsPattern("rabbit-direct-[0-9]+");

});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,11 +508,10 @@ void whenVirtualThreadsAreEnabledOnJava21AndLaterListenerContainerShouldUseVirtu
.getBean(ConcurrentPulsarListenerContainerFactory.class);
assertThat(factory.getContainerProperties().getConsumerTaskExecutor())
.isInstanceOf(VirtualThreadTaskExecutor.class);
final var taskExecutor = factory.getContainerProperties().getConsumerTaskExecutor();
final var virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory");
final var threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class));
assertThat(threadCreated.getName())
.containsPattern(PulsarAutoConfiguration.THREADNAME_PULSAR_CONSUMER + "[0-9]*");
Object taskExecutor = factory.getContainerProperties().getConsumerTaskExecutor();
Object virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory");
Thread threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class));
assertThat(threadCreated.getName()).containsPattern("pulsar-consumer-[0-9]+");
});
}

Expand Down Expand Up @@ -568,11 +567,10 @@ void whenVirtualThreadsAreEnabledOnJava21AndLaterReaderShouldUseVirtualThreads()
.getBean(DefaultPulsarReaderContainerFactory.class);
assertThat(factory.getContainerProperties().getReaderTaskExecutor())
.isInstanceOf(VirtualThreadTaskExecutor.class);
final var taskExecutor = factory.getContainerProperties().getReaderTaskExecutor();
final var virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory");
final var threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class));
assertThat(threadCreated.getName())
.containsPattern(PulsarAutoConfiguration.THREADNAME_PULSAR_TASKEXECUTOR + "[0-9]*");
Object taskExecutor = factory.getContainerProperties().getReaderTaskExecutor();
Object virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory");
Thread threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class));
assertThat(threadCreated.getName()).containsPattern("pulsar-reader-[0-9]+");
});
}

Expand Down

0 comments on commit 09652cb

Please sign in to comment.