Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.minbox.framework.api.boot.autoconfigure.message.pipe.client;

import org.minbox.framework.message.pipe.client.config.ClientConfiguration;
import org.minbox.framework.message.pipe.spring.annotation.client.EnableMessagePipeClient;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
Expand All @@ -13,7 +12,6 @@
*/
@ConditionalOnClass(ClientConfiguration.class)
@EnableConfigurationProperties(MessagePipeClientProperties.class)
@EnableMessagePipeClient
public class MessagePipeClientAutoConfiguration {
private MessagePipeClientProperties messagePipeClientProperties;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.minbox.framework.api.boot.autoconfigure.message.pipe.server;

import org.minbox.framework.message.pipe.server.config.MessagePipeConfiguration;

/**
* The {@link MessagePipeConfiguration} Custom configuration interface definition
* <p>
* If there is a sequence, you can use the {@link org.springframework.core.annotation.Order} annotation to configure
*
* @author 恒宇少年
*/
@FunctionalInterface
public interface MessagePipeConfigurationCustomizer {
/**
* To implement this method, it can be modified according to the parameter object
*
* @param configuration The {@link MessagePipeConfiguration} instance
*/
void customize(MessagePipeConfiguration configuration);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.minbox.framework.api.boot.autoconfigure.message.pipe.server;

import org.minbox.framework.message.pipe.server.config.MessagePipeConfiguration;
import org.springframework.boot.util.LambdaSafe;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
* The wrapper class of {@link MessagePipeConfigurationCustomizer}
* <p>
* Execute {@link MessagePipeConfigurationCustomizer#customize} according to the order configured by {@link org.springframework.core.annotation.Order}
*
* @author 恒宇少年
*/
public class MessagePipeConfigurationCustomizers {
private List<MessagePipeConfigurationCustomizer> customizers;

public MessagePipeConfigurationCustomizers(List<MessagePipeConfigurationCustomizer> customizers) {
this.customizers = (customizers != null) ? new ArrayList<>(customizers) : Collections.emptyList();
}

public MessagePipeConfiguration customizer(MessagePipeConfiguration configuration) {
LambdaSafe.callbacks(MessagePipeConfigurationCustomizer.class, this.customizers, configuration)
.withLogger(MessagePipeConfigurationCustomizer.class).invoke((customizer) -> customizer.customize(configuration));
return configuration;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,22 @@

import org.minbox.framework.message.pipe.server.config.MessagePipeConfiguration;
import org.minbox.framework.message.pipe.server.config.ServerConfiguration;
import org.minbox.framework.message.pipe.spring.annotation.server.EnableMessagePipeServer;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;

import java.util.List;
import java.util.stream.Collectors;

/**
* The Message Pipe Server configuration
*
* @author 恒宇少年
*/
@ConditionalOnClass(ServerConfiguration.class)
@EnableConfigurationProperties(MessagePipeServerProperties.class)
@EnableMessagePipeServer
public class MessagePipeServerAutoConfiguration {
private MessagePipeServerProperties messagePipeServerProperties;

Expand All @@ -33,6 +36,21 @@ public ServerConfiguration serverConfiguration() {
return messagePipeServerProperties.getConfiguration();
}

/**
* Instantiate the wrapper class of {@link MessagePipeConfigurationCustomizer}
*
* @param customizers The {@link MessagePipeConfigurationCustomizer} object provider
* @return The {@link MessagePipeConfigurationCustomizers} instance
*/
@Bean
@ConditionalOnMissingBean
public MessagePipeConfigurationCustomizers messagePipeConfigurationCustomizers(
ObjectProvider<MessagePipeConfigurationCustomizer> customizers) {
List<MessagePipeConfigurationCustomizer> sortedCustomizers =
customizers.orderedStream().collect(Collectors.toList());
return new MessagePipeConfigurationCustomizers(sortedCustomizers);
}

/**
* Create {@link MessagePipeConfiguration}
* <p>
Expand All @@ -41,14 +59,36 @@ public ServerConfiguration serverConfiguration() {
* @return The {@link MessagePipeConfiguration} instance
*/
@Bean
public MessagePipeConfiguration messagePipeConfiguration() {
public MessagePipeConfiguration messagePipeConfiguration(MessagePipeConfigurationCustomizers customizers) {
MessagePipeConfiguration configuration = MessagePipeConfiguration.defaultConfiguration();
MessagePipeConfiguration.LockTime lockTime =
new MessagePipeConfiguration.LockTime()
.setLeaseTime(messagePipeServerProperties.getLockLeaseTime())
.setTimeUnit(messagePipeServerProperties.getLockLeaseTimeUnit());
configuration.setLockTime(lockTime);
configuration.setDistributionMessagePoolSize(messagePipeServerProperties.getDistributionMessagePoolSize());
return configuration;
return customizers.customizer(configuration);
}

/**
* Configuration {@link MessagePipeConfiguration#setLockTime}
*
* @return The {@link MessagePipeConfigurationCustomizer} instance of {@link MessagePipeConfiguration.LockTime}
*/
@Bean
public MessagePipeConfigurationCustomizer customizerLockTime() {
return configuration -> {
MessagePipeConfiguration.LockTime lockTime =
new MessagePipeConfiguration.LockTime()
.setLeaseTime(messagePipeServerProperties.getLockLeaseTime())
.setTimeUnit(messagePipeServerProperties.getLockLeaseTimeUnit());
configuration.setLockTime(lockTime);
};
}

/**
* Configuration {@link MessagePipeConfiguration#setMessagePipeMonitorMillis}
*
* @return The {@link MessagePipeConfigurationCustomizer} instance of monitor millis
*/
@Bean
public MessagePipeConfigurationCustomizer customizerMonitorTime() {
return configuration ->
configuration.setMessagePipeMonitorMillis(messagePipeServerProperties.getMessagePipeMonitorMillis());

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,11 @@ public class MessagePipeServerProperties {
/**
* The number of threads in the message thread pool
*/
@Deprecated
private int distributionMessagePoolSize = 10;
/**
* The interval time for each message pipeline to perform monitoring
* time unit: milliseconds
*/
private long messagePipeMonitorMillis = 10000L;
}
1 change: 1 addition & 0 deletions api-boot-samples/api-boot-sample-logging/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
<configuration>
<jvmArguments>-Dfile.encoding=UTF-8</jvmArguments>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.minbox.framework.api.boot.sample.message.pipe.server;

import org.minbox.framework.message.pipe.server.processing.push.PushMessageEvent;
import org.minbox.framework.message.pipe.spring.annotation.ServerServiceType;
import org.minbox.framework.message.pipe.spring.annotation.server.EnableMessagePipeServer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;

/**
* 消息管道Server相关配置
*
* @author 恒宇少年
*/
@Configuration
@EnableMessagePipeServer(serverType = ServerServiceType.GRPC)
public class MessagePipeServerConfiguration {
/**
* 配置Redis监听容器
* <p>
* 新消息写入消息管道时,
* 会触发{@link PushMessageEvent}事件,而该事件的监听方式则是采用的Redis的KeyEvent的形式
*
* @param connectionFactory Redis连接工厂对象
* @return The {@link RedisMessageListenerContainer} instance
*/
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.minbox.framework.api.boot.sample.message.pipe.server;

import org.minbox.framework.api.boot.autoconfigure.message.pipe.server.MessagePipeConfigurationCustomizer;
import org.minbox.framework.message.pipe.core.transport.RequestIdGenerator;
import org.minbox.framework.message.pipe.server.config.MessagePipeConfiguration;
import org.springframework.stereotype.Component;

import java.util.UUID;

/**
* 自定义配置{@link org.minbox.framework.message.pipe.core.transport.RequestIdGenerator}
*
* @author 恒宇少年
*/
@Component
public class RequestIdCustomizer implements MessagePipeConfigurationCustomizer {
@Override
public void customize(MessagePipeConfiguration configuration) {
RequestIdGenerator generator = new UuidRequestIdGenerator();
configuration.setRequestIdGenerator(generator);
}

/**
* 使用Uuid随机数生成请求ID
*/
class UuidRequestIdGenerator implements RequestIdGenerator {
@Override
public String generate() {
return UUID.randomUUID().toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
<plugin>
<groupId>org.minbox.framework</groupId>
<artifactId>api-boot-mybatis-enhance-maven-codegen</artifactId>
<version>${api-boot.version}</version>
<version>2.3.1.RELEASE</version>
<dependencies>
<!--数据驱动依赖-->
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion api-boot-samples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<!--Samples所使用的ApiBoot版本-->
<api-boot.version>${project.version}</api-boot.version>
<!--Samples所使用的SpringBoot版本-->
<spring-boot.version>2.3.1.RELEASE</spring-boot.version>
<spring-boot.version>2.3.3.RELEASE</spring-boot.version>
</properties>
<artifactId>api-boot-samples</artifactId>
<packaging>pom</packaging>
Expand Down