Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package org.minbox.framework.message.pipe.server.processing;

import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.data.redis.listener.KeyspaceEventMessageListener;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.minbox.framework.message.pipe.core.PipeConstants.PIPE_NAME_PATTERN;

/**
* The {@link KeyspaceEventMessageListener} subclass
* <p>
* Encapsulate {@link KeyspaceEventMessageListener} ,
* provide a method for publishing Spring {@link org.springframework.context.ApplicationEvent}
*
* @author 恒宇少年
*/
public abstract class EventPublisherKeyspaceMessageListener extends KeyspaceEventMessageListener implements ApplicationEventPublisherAware {
private ApplicationEventPublisher applicationEventPublisher;

public abstract PatternTopic patternTopicUsed();

public EventPublisherKeyspaceMessageListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}

@Override
protected void doRegister(RedisMessageListenerContainer container) {
container.addMessageListener(this, this.patternTopicUsed());
}

@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}

/**
* Publish given {@link ApplicationEvent} instance
*
* @param event The {@link ApplicationEvent} instance
*/
protected void publishEvent(ApplicationEvent event) {
this.applicationEventPublisher.publishEvent(event);
}

/**
* Extract the pipeline name based on the Key in redis
*
* @param redisQueueKey The redis queue key
* example:"test.queue"
* @return The name of message pipe,if the key does not match the expression, it returns null
*/
protected String extractPipeName(String redisQueueKey) {
Pattern pipeKeyPattern = Pattern.compile(PIPE_NAME_PATTERN);
Matcher matcher = pipeKeyPattern.matcher(redisQueueKey);
return matcher.find() ? matcher.group(1) : null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.minbox.framework.message.pipe.server.processing.pop;

import lombok.Getter;
import org.minbox.framework.message.pipe.server.MessagePipe;
import org.springframework.context.ApplicationEvent;

/**
* @author 恒宇少年
*/
@Getter
public class PopMessageEvent extends ApplicationEvent {
/**
* The name of {@link MessagePipe}
*/
private String pipeName;

public PopMessageEvent(Object source, String pipeName) {
super(source);
this.pipeName = pipeName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.minbox.framework.message.pipe.server.processing.pop;

import lombok.extern.slf4j.Slf4j;
import org.minbox.framework.message.pipe.server.processing.EventPublisherKeyspaceMessageListener;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.util.ObjectUtils;

/**
* Monitor the list in Redis to get data from the left
*
* @author 恒宇少年
*/
@Slf4j
public class PopMessageFromPipeListener extends EventPublisherKeyspaceMessageListener {
/**
* The bean name of {@link PopMessageFromPipeListener}
*/
public static final String BEAN_NAME = "popMessageFromPipeListener";
private static final String LEFT_POP_PATTERN_TOPIC = "__keyevent@*:lpop";

public PopMessageFromPipeListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}

@Override
public PatternTopic patternTopicUsed() {
return new PatternTopic(LEFT_POP_PATTERN_TOPIC);
}

@Override
protected void doHandleMessage(Message message) {
String redisQueueKey = message.toString();
String pipeName = extractPipeName(redisQueueKey);
if (ObjectUtils.isEmpty(pipeName)) {
log.warn("The message pipe name was not extracted from Key: {}.", redisQueueKey);
return;
}
// Publish PopMessageEvent
PopMessageEvent popMessageEvent = new PopMessageEvent(this, pipeName);
publishEvent(popMessageEvent);
log.debug("Message Pipe:{},publish PopMessageEvent successfully.", pipeName);
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,12 @@
package org.minbox.framework.message.pipe.server.processing.push;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.minbox.framework.message.pipe.server.processing.EventPublisherKeyspaceMessageListener;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyspaceEventMessageListener;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.util.ObjectUtils;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.minbox.framework.message.pipe.core.PipeConstants.PIPE_NAME_PATTERN;

/**
* Waiting for the message to be pushed to the listener of the pipeline
* <p>
Expand All @@ -22,53 +15,33 @@
* @author 恒宇少年
*/
@Slf4j
public class PushMessageToPipeListener extends KeyspaceEventMessageListener implements ApplicationEventPublisherAware {
public class PushMessageToPipeListener extends EventPublisherKeyspaceMessageListener {
/**
* The bean name of {@link PushMessageToPipeListener}
*/
public static final String BEAN_NAME = "pushMessageListener";
private static final String PUSH_PATTERN_TOPIC = "__keyevent@*__:rpush";
private ApplicationEventPublisher applicationEventPublisher;

public PushMessageToPipeListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}

@Override
protected void doRegister(RedisMessageListenerContainer container) {
container.addMessageListener(this, new PatternTopic(PUSH_PATTERN_TOPIC));
public PatternTopic patternTopicUsed() {
return new PatternTopic(PUSH_PATTERN_TOPIC);
}

@Override
protected void doHandleMessage(Message message) {
String redisQueueKey = message.toString();
String pipeName = this.extractPipeName(redisQueueKey);
String pipeName = extractPipeName(redisQueueKey);
if (ObjectUtils.isEmpty(pipeName)) {
log.warn("The message pipe name was not extracted from Key: {}.", redisQueueKey);
return;
}
// Publish PushMessageEvent
PushMessageEvent pushMessageEvent = new PushMessageEvent(this, pipeName);
applicationEventPublisher.publishEvent(pushMessageEvent);
publishEvent(pushMessageEvent);
log.debug("Message Pipe:{},publish PushMessageEvent successfully.", pipeName);
}


@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}

/**
* Extract the pipeline name based on the Key in redis
*
* @param redisQueueKey The redis queue key
* example:"test.queue"
* @return The name of message pipe,if the key does not match the expression, it returns null
*/
private String extractPipeName(String redisQueueKey) {
Pattern pipeKeyPattern = Pattern.compile(PIPE_NAME_PATTERN);
Matcher matcher = pipeKeyPattern.matcher(redisQueueKey);
return matcher.find() ? matcher.group(1) : null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.minbox.framework.message.pipe.server.manager.MessagePipeFactoryBean;
import org.minbox.framework.message.pipe.server.manager.MessagePipeLoader;
import org.minbox.framework.message.pipe.server.manager.DefaultMessagePipeManager;
import org.minbox.framework.message.pipe.server.processing.pop.PopMessageFromPipeListener;
import org.minbox.framework.message.pipe.server.processing.push.PushMessageToPipeListener;
import org.minbox.framework.message.pipe.server.service.discovery.ClientServiceDiscovery;
import org.minbox.framework.util.BeanUtils;
Expand All @@ -28,6 +29,7 @@ public static void registerServerBeans(BeanDefinitionRegistry registry) {
registerMessagePipeLoader(registry);
registerClientServiceDiscovery(registry);
registerPushMessageListener(registry);
registerPopMessageFromPipeListener(registry);
}

/**
Expand Down Expand Up @@ -112,4 +114,13 @@ private static void registerClientServiceDiscovery(BeanDefinitionRegistry regist
private static void registerPushMessageListener(BeanDefinitionRegistry registry) {
BeanUtils.registerInfrastructureBeanIfAbsent(registry, PushMessageToPipeListener.BEAN_NAME, PushMessageToPipeListener.class);
}

/**
* Register {@link PopMessageFromPipeListener}
*
* @param registry The {@link BeanDefinitionRegistry} instance
*/
private static void registerPopMessageFromPipeListener(BeanDefinitionRegistry registry) {
BeanUtils.registerInfrastructureBeanIfAbsent(registry, PopMessageFromPipeListener.BEAN_NAME, PopMessageFromPipeListener.class);
}
}