From 0c8e0e7f36e7afe74fed69ccdf2dbaedcef47a6c Mon Sep 17 00:00:00 2001 From: hengboy Date: Fri, 11 Sep 2020 11:46:34 +0800 Subject: [PATCH] =?UTF-8?q?:rocket:=20=E6=96=B0=E5=A2=9Elpop=E8=8E=B7?= =?UTF-8?q?=E5=8F=96=E7=AE=A1=E9=81=93=E9=98=BB=E5=A1=9E=E9=98=9F=E5=88=97?= =?UTF-8?q?=E5=86=85=E6=B6=88=E6=81=AF=E4=BA=8B=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...EventPublisherKeyspaceMessageListener.java | 63 +++++++++++++++++++ .../processing/pop/PopMessageEvent.java | 21 +++++++ .../pop/PopMessageFromPipeListener.java | 45 +++++++++++++ .../push/PushMessageToPipeListener.java | 39 ++---------- .../spring/utils/MessagePipeBeanUtils.java | 11 ++++ 5 files changed, 146 insertions(+), 33 deletions(-) create mode 100644 message-pipe-server/src/main/java/org/minbox/framework/message/pipe/server/processing/EventPublisherKeyspaceMessageListener.java create mode 100644 message-pipe-server/src/main/java/org/minbox/framework/message/pipe/server/processing/pop/PopMessageEvent.java create mode 100644 message-pipe-server/src/main/java/org/minbox/framework/message/pipe/server/processing/pop/PopMessageFromPipeListener.java diff --git a/message-pipe-server/src/main/java/org/minbox/framework/message/pipe/server/processing/EventPublisherKeyspaceMessageListener.java b/message-pipe-server/src/main/java/org/minbox/framework/message/pipe/server/processing/EventPublisherKeyspaceMessageListener.java new file mode 100644 index 0000000..ae123c8 --- /dev/null +++ b/message-pipe-server/src/main/java/org/minbox/framework/message/pipe/server/processing/EventPublisherKeyspaceMessageListener.java @@ -0,0 +1,63 @@ +package org.minbox.framework.message.pipe.server.processing; + +import org.springframework.context.ApplicationEvent; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationEventPublisherAware; +import org.springframework.data.redis.listener.KeyspaceEventMessageListener; +import org.springframework.data.redis.listener.PatternTopic; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.minbox.framework.message.pipe.core.PipeConstants.PIPE_NAME_PATTERN; + +/** + * The {@link KeyspaceEventMessageListener} subclass + *

+ * Encapsulate {@link KeyspaceEventMessageListener} , + * provide a method for publishing Spring {@link org.springframework.context.ApplicationEvent} + * + * @author 恒宇少年 + */ +public abstract class EventPublisherKeyspaceMessageListener extends KeyspaceEventMessageListener implements ApplicationEventPublisherAware { + private ApplicationEventPublisher applicationEventPublisher; + + public abstract PatternTopic patternTopicUsed(); + + public EventPublisherKeyspaceMessageListener(RedisMessageListenerContainer listenerContainer) { + super(listenerContainer); + } + + @Override + protected void doRegister(RedisMessageListenerContainer container) { + container.addMessageListener(this, this.patternTopicUsed()); + } + + @Override + public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { + this.applicationEventPublisher = applicationEventPublisher; + } + + /** + * Publish given {@link ApplicationEvent} instance + * + * @param event The {@link ApplicationEvent} instance + */ + protected void publishEvent(ApplicationEvent event) { + this.applicationEventPublisher.publishEvent(event); + } + + /** + * Extract the pipeline name based on the Key in redis + * + * @param redisQueueKey The redis queue key + * example:"test.queue" + * @return The name of message pipe,if the key does not match the expression, it returns null + */ + protected String extractPipeName(String redisQueueKey) { + Pattern pipeKeyPattern = Pattern.compile(PIPE_NAME_PATTERN); + Matcher matcher = pipeKeyPattern.matcher(redisQueueKey); + return matcher.find() ? matcher.group(1) : null; + } +} diff --git a/message-pipe-server/src/main/java/org/minbox/framework/message/pipe/server/processing/pop/PopMessageEvent.java b/message-pipe-server/src/main/java/org/minbox/framework/message/pipe/server/processing/pop/PopMessageEvent.java new file mode 100644 index 0000000..a522c0a --- /dev/null +++ b/message-pipe-server/src/main/java/org/minbox/framework/message/pipe/server/processing/pop/PopMessageEvent.java @@ -0,0 +1,21 @@ +package org.minbox.framework.message.pipe.server.processing.pop; + +import lombok.Getter; +import org.minbox.framework.message.pipe.server.MessagePipe; +import org.springframework.context.ApplicationEvent; + +/** + * @author 恒宇少年 + */ +@Getter +public class PopMessageEvent extends ApplicationEvent { + /** + * The name of {@link MessagePipe} + */ + private String pipeName; + + public PopMessageEvent(Object source, String pipeName) { + super(source); + this.pipeName = pipeName; + } +} diff --git a/message-pipe-server/src/main/java/org/minbox/framework/message/pipe/server/processing/pop/PopMessageFromPipeListener.java b/message-pipe-server/src/main/java/org/minbox/framework/message/pipe/server/processing/pop/PopMessageFromPipeListener.java new file mode 100644 index 0000000..3f3919b --- /dev/null +++ b/message-pipe-server/src/main/java/org/minbox/framework/message/pipe/server/processing/pop/PopMessageFromPipeListener.java @@ -0,0 +1,45 @@ +package org.minbox.framework.message.pipe.server.processing.pop; + +import lombok.extern.slf4j.Slf4j; +import org.minbox.framework.message.pipe.server.processing.EventPublisherKeyspaceMessageListener; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.listener.PatternTopic; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; +import org.springframework.util.ObjectUtils; + +/** + * Monitor the list in Redis to get data from the left + * + * @author 恒宇少年 + */ +@Slf4j +public class PopMessageFromPipeListener extends EventPublisherKeyspaceMessageListener { + /** + * The bean name of {@link PopMessageFromPipeListener} + */ + public static final String BEAN_NAME = "popMessageFromPipeListener"; + private static final String LEFT_POP_PATTERN_TOPIC = "__keyevent@*:lpop"; + + public PopMessageFromPipeListener(RedisMessageListenerContainer listenerContainer) { + super(listenerContainer); + } + + @Override + public PatternTopic patternTopicUsed() { + return new PatternTopic(LEFT_POP_PATTERN_TOPIC); + } + + @Override + protected void doHandleMessage(Message message) { + String redisQueueKey = message.toString(); + String pipeName = extractPipeName(redisQueueKey); + if (ObjectUtils.isEmpty(pipeName)) { + log.warn("The message pipe name was not extracted from Key: {}.", redisQueueKey); + return; + } + // Publish PopMessageEvent + PopMessageEvent popMessageEvent = new PopMessageEvent(this, pipeName); + publishEvent(popMessageEvent); + log.debug("Message Pipe:{},publish PopMessageEvent successfully.", pipeName); + } +} diff --git a/message-pipe-server/src/main/java/org/minbox/framework/message/pipe/server/processing/push/PushMessageToPipeListener.java b/message-pipe-server/src/main/java/org/minbox/framework/message/pipe/server/processing/push/PushMessageToPipeListener.java index 8881a7e..833893d 100644 --- a/message-pipe-server/src/main/java/org/minbox/framework/message/pipe/server/processing/push/PushMessageToPipeListener.java +++ b/message-pipe-server/src/main/java/org/minbox/framework/message/pipe/server/processing/push/PushMessageToPipeListener.java @@ -1,19 +1,12 @@ package org.minbox.framework.message.pipe.server.processing.push; import lombok.extern.slf4j.Slf4j; -import org.springframework.context.ApplicationEventPublisher; -import org.springframework.context.ApplicationEventPublisherAware; +import org.minbox.framework.message.pipe.server.processing.EventPublisherKeyspaceMessageListener; import org.springframework.data.redis.connection.Message; -import org.springframework.data.redis.listener.KeyspaceEventMessageListener; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.util.ObjectUtils; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import static org.minbox.framework.message.pipe.core.PipeConstants.PIPE_NAME_PATTERN; - /** * Waiting for the message to be pushed to the listener of the pipeline *

@@ -22,53 +15,33 @@ * @author 恒宇少年 */ @Slf4j -public class PushMessageToPipeListener extends KeyspaceEventMessageListener implements ApplicationEventPublisherAware { +public class PushMessageToPipeListener extends EventPublisherKeyspaceMessageListener { /** * The bean name of {@link PushMessageToPipeListener} */ public static final String BEAN_NAME = "pushMessageListener"; private static final String PUSH_PATTERN_TOPIC = "__keyevent@*__:rpush"; - private ApplicationEventPublisher applicationEventPublisher; public PushMessageToPipeListener(RedisMessageListenerContainer listenerContainer) { super(listenerContainer); } @Override - protected void doRegister(RedisMessageListenerContainer container) { - container.addMessageListener(this, new PatternTopic(PUSH_PATTERN_TOPIC)); + public PatternTopic patternTopicUsed() { + return new PatternTopic(PUSH_PATTERN_TOPIC); } @Override protected void doHandleMessage(Message message) { String redisQueueKey = message.toString(); - String pipeName = this.extractPipeName(redisQueueKey); + String pipeName = extractPipeName(redisQueueKey); if (ObjectUtils.isEmpty(pipeName)) { log.warn("The message pipe name was not extracted from Key: {}.", redisQueueKey); return; } // Publish PushMessageEvent PushMessageEvent pushMessageEvent = new PushMessageEvent(this, pipeName); - applicationEventPublisher.publishEvent(pushMessageEvent); + publishEvent(pushMessageEvent); log.debug("Message Pipe:{},publish PushMessageEvent successfully.", pipeName); } - - - @Override - public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { - this.applicationEventPublisher = applicationEventPublisher; - } - - /** - * Extract the pipeline name based on the Key in redis - * - * @param redisQueueKey The redis queue key - * example:"test.queue" - * @return The name of message pipe,if the key does not match the expression, it returns null - */ - private String extractPipeName(String redisQueueKey) { - Pattern pipeKeyPattern = Pattern.compile(PIPE_NAME_PATTERN); - Matcher matcher = pipeKeyPattern.matcher(redisQueueKey); - return matcher.find() ? matcher.group(1) : null; - } } diff --git a/message-pipe-spring-context/src/main/java/org/minbox/framework/message/pipe/spring/utils/MessagePipeBeanUtils.java b/message-pipe-spring-context/src/main/java/org/minbox/framework/message/pipe/spring/utils/MessagePipeBeanUtils.java index 0c73ceb..106ed7f 100644 --- a/message-pipe-spring-context/src/main/java/org/minbox/framework/message/pipe/spring/utils/MessagePipeBeanUtils.java +++ b/message-pipe-spring-context/src/main/java/org/minbox/framework/message/pipe/spring/utils/MessagePipeBeanUtils.java @@ -6,6 +6,7 @@ import org.minbox.framework.message.pipe.server.manager.MessagePipeFactoryBean; import org.minbox.framework.message.pipe.server.manager.MessagePipeLoader; import org.minbox.framework.message.pipe.server.manager.DefaultMessagePipeManager; +import org.minbox.framework.message.pipe.server.processing.pop.PopMessageFromPipeListener; import org.minbox.framework.message.pipe.server.processing.push.PushMessageToPipeListener; import org.minbox.framework.message.pipe.server.service.discovery.ClientServiceDiscovery; import org.minbox.framework.util.BeanUtils; @@ -28,6 +29,7 @@ public static void registerServerBeans(BeanDefinitionRegistry registry) { registerMessagePipeLoader(registry); registerClientServiceDiscovery(registry); registerPushMessageListener(registry); + registerPopMessageFromPipeListener(registry); } /** @@ -112,4 +114,13 @@ private static void registerClientServiceDiscovery(BeanDefinitionRegistry regist private static void registerPushMessageListener(BeanDefinitionRegistry registry) { BeanUtils.registerInfrastructureBeanIfAbsent(registry, PushMessageToPipeListener.BEAN_NAME, PushMessageToPipeListener.class); } + + /** + * Register {@link PopMessageFromPipeListener} + * + * @param registry The {@link BeanDefinitionRegistry} instance + */ + private static void registerPopMessageFromPipeListener(BeanDefinitionRegistry registry) { + BeanUtils.registerInfrastructureBeanIfAbsent(registry, PopMessageFromPipeListener.BEAN_NAME, PopMessageFromPipeListener.class); + } }