Skip to content

Commit

Permalink
Config.keepPubSubOrder setting added. #919
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Jun 13, 2017
1 parent 8cad23c commit e56e091
Show file tree
Hide file tree
Showing 10 changed files with 175 additions and 24 deletions.
Expand Up @@ -48,6 +48,7 @@ public class RedisClientConfig {
private int database;
private String clientName;
private boolean readOnly;
private boolean keepPubSubOrder = true;

private boolean sslEnableEndpointIdentification = true;
private SslProvider sslProvider = SslProvider.JDK;
Expand All @@ -56,6 +57,7 @@ public class RedisClientConfig {
private URI sslKeystore;
private String sslKeystorePassword;


public RedisClientConfig setAddress(String host, int port) {
this.address = URIBuilder.create("redis://" + host + ":" + port);
return this;
Expand Down Expand Up @@ -201,6 +203,11 @@ public RedisClientConfig setReadOnly(boolean readOnly) {
return this;
}



public boolean isKeepPubSubOrder() {
return keepPubSubOrder;
}
public void setKeepPubSubOrder(boolean keepPubSubOrder) {
this.keepPubSubOrder = keepPubSubOrder;
}

}
Expand Up @@ -44,18 +44,20 @@
public class CommandPubSubDecoder extends CommandDecoder {

// It is not needed to use concurrent map because responses are coming consecutive
private final Map<String, MultiDecoder<Object>> pubSubMessageDecoders = new HashMap<String, MultiDecoder<Object>>();
private final Map<PubSubKey, CommandData<Object, Object>> pubSubChannels = PlatformDependent.newConcurrentHashMap();
private final Map<String, PubSubEntry> entries = new HashMap<String, PubSubEntry>();
private final Map<PubSubKey, CommandData<Object, Object>> commands = PlatformDependent.newConcurrentHashMap();

private final ExecutorService executor;
private final boolean keepOrder;

public CommandPubSubDecoder(ExecutorService executor) {
public CommandPubSubDecoder(ExecutorService executor, boolean keepOrder) {
this.executor = executor;
this.keepOrder = keepOrder;
}

public void addPubSubCommand(String channel, CommandData<Object, Object> data) {
String operation = data.getCommand().getName().toLowerCase();
pubSubChannels.put(new PubSubKey(channel, operation), data);
commands.put(new PubSubKey(channel, operation), data);
}

@Override
Expand All @@ -70,27 +72,68 @@ protected void decodeResult(CommandData<Object, Object> data, List<Object> parts
String channelName = ((PubSubStatusMessage) result).getChannel();
String operation = ((PubSubStatusMessage) result).getType().name().toLowerCase();
PubSubKey key = new PubSubKey(channelName, operation);
CommandData<Object, Object> d = pubSubChannels.get(key);
CommandData<Object, Object> d = commands.get(key);
if (Arrays.asList(RedisCommands.PSUBSCRIBE.getName(), RedisCommands.SUBSCRIBE.getName()).contains(d.getCommand().getName())) {
pubSubChannels.remove(key);
pubSubMessageDecoders.put(channelName, d.getMessageDecoder());
commands.remove(key);
entries.put(channelName, new PubSubEntry(d.getMessageDecoder()));
}
if (Arrays.asList(RedisCommands.PUNSUBSCRIBE.getName(), RedisCommands.UNSUBSCRIBE.getName()).contains(d.getCommand().getName())) {
pubSubChannels.remove(key);
pubSubMessageDecoders.remove(channelName);
commands.remove(key);
entries.remove(key);
}
}

final RedisPubSubConnection pubSubConnection = RedisPubSubConnection.getFrom(channel);

if (keepOrder) {
PubSubEntry item = entries.get(((Message) result).getChannel());
enqueueMessage(result, pubSubConnection, item);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
if (result instanceof PubSubStatusMessage) {
pubSubConnection.onMessage((PubSubStatusMessage) result);
} else if (result instanceof PubSubMessage) {
pubSubConnection.onMessage((PubSubMessage) result);
} else {
pubSubConnection.onMessage((PubSubPatternMessage) result);
}
}
});
}
}
}

private void enqueueMessage(Object result, final RedisPubSubConnection pubSubConnection, final PubSubEntry entry) {
if (result != null) {
entry.getQueue().add((Message)result);
}

if (entry.getSent().compareAndSet(false, true)) {
executor.execute(new Runnable() {
@Override
public void run() {
if (result instanceof PubSubStatusMessage) {
pubSubConnection.onMessage((PubSubStatusMessage) result);
} else if (result instanceof PubSubMessage) {
pubSubConnection.onMessage((PubSubMessage) result);
} else {
pubSubConnection.onMessage((PubSubPatternMessage) result);
try {
while (true) {
Message result = entry.getQueue().poll();
if (result != null) {
if (result instanceof PubSubStatusMessage) {
pubSubConnection.onMessage((PubSubStatusMessage) result);
} else if (result instanceof PubSubMessage) {
pubSubConnection.onMessage((PubSubMessage) result);
} else {
pubSubConnection.onMessage((PubSubPatternMessage) result);
}
} else {
break;
}
}
} finally {
entry.getSent().set(false);
if (!entry.getQueue().isEmpty()) {
enqueueMessage(null, pubSubConnection, entry);
}
}
}
});
Expand All @@ -107,17 +150,17 @@ protected MultiDecoder<Object> messageDecoder(CommandData<Object, Object> data,
if (Arrays.asList("subscribe", "psubscribe", "punsubscribe", "unsubscribe").contains(command)) {
String channelName = parts.get(1).toString();
PubSubKey key = new PubSubKey(channelName, command);
CommandData<Object, Object> commandData = pubSubChannels.get(key);
CommandData<Object, Object> commandData = commands.get(key);
if (commandData == null) {
return null;
}
return commandData.getCommand().getReplayMultiDecoder();
} else if (parts.get(0).equals("message")) {
String channelName = (String) parts.get(1);
return pubSubMessageDecoders.get(channelName);
return entries.get(channelName).getDecoder();
} else if (parts.get(0).equals("pmessage")) {
String patternName = (String) parts.get(1);
return pubSubMessageDecoders.get(patternName);
return entries.get(patternName).getDecoder();
}
}

Expand All @@ -129,11 +172,11 @@ protected Decoder<Object> selectDecoder(CommandData<Object, Object> data, List<O
if (data == null && parts != null) {
if (parts.size() == 2 && "message".equals(parts.get(0))) {
String channelName = (String) parts.get(1);
return pubSubMessageDecoders.get(channelName);
return entries.get(channelName).getDecoder();
}
if (parts.size() == 3 && "pmessage".equals(parts.get(0))) {
String patternName = (String) parts.get(1);
return pubSubMessageDecoders.get(patternName);
return entries.get(patternName).getDecoder();
}
}
return super.selectDecoder(data, parts);
Expand Down
@@ -0,0 +1,55 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.handler;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.client.protocol.pubsub.Message;

/**
*
* @author Nikita Koksharov
*
*/
public class PubSubEntry {

private final MultiDecoder<Object> decoder;

private final Queue<Message> queue = new ConcurrentLinkedQueue<Message>();

private final AtomicBoolean sent = new AtomicBoolean();

public PubSubEntry(MultiDecoder<Object> decoder) {
super();
this.decoder = decoder;
}

public MultiDecoder<Object> getDecoder() {
return decoder;
}

public Queue<Message> getQueue() {
return queue;
}

public AtomicBoolean getSent() {
return sent;
}

}
Expand Up @@ -89,7 +89,7 @@ protected void initChannel(Channel ch) throws Exception {
if (type == Type.PLAIN) {
ch.pipeline().addLast(new CommandDecoder());
} else {
ch.pipeline().addLast(new CommandPubSubDecoder(config.getExecutor()));
ch.pipeline().addLast(new CommandPubSubDecoder(config.getExecutor(), config.isKeepPubSubOrder()));
}
}

Expand Down
Expand Up @@ -15,6 +15,13 @@
*/
package org.redisson.client.protocol.pubsub;

/**
*
* @author Nikita Koksharov
*
*/
public interface Message {

String getChannel();

}
Expand Up @@ -15,6 +15,11 @@
*/
package org.redisson.client.protocol.pubsub;

/**
*
* @author Nikita Koksharov
*
*/
public class PubSubMessage implements Message {

private final String channel;
Expand Down
Expand Up @@ -15,6 +15,11 @@
*/
package org.redisson.client.protocol.pubsub;

/**
*
* @author Nikita Koksharov
*
*/
public class PubSubPatternMessage implements Message {

private final String pattern;
Expand Down
Expand Up @@ -15,6 +15,11 @@
*/
package org.redisson.client.protocol.pubsub;

/**
*
* @author Nikita Koksharov
*
*/
public class PubSubStatusMessage implements Message {

private final PubSubType type;
Expand Down
23 changes: 23 additions & 0 deletions redisson/src/main/java/org/redisson/config/Config.java
Expand Up @@ -91,6 +91,8 @@ public class Config {

private long lockWatchdogTimeout = 30 * 1000;

private boolean keepPubSubOrder = true;

public Config() {
}

Expand All @@ -103,6 +105,7 @@ public Config(Config oldConf) {
oldConf.setCodec(new JsonJacksonCodec());
}

setKeepPubSubOrder(oldConf.isKeepPubSubOrder());
setLockWatchdogTimeout(oldConf.getLockWatchdogTimeout());
setNettyThreads(oldConf.getNettyThreads());
setThreads(oldConf.getThreads());
Expand Down Expand Up @@ -580,6 +583,26 @@ public long getLockWatchdogTimeout() {
return lockWatchdogTimeout;
}

/**
* Define whether keep PubSub messages handling in arrival order
* or handle messages concurrently.
* <p>
* This setting applied only for PubSub messages published to single channel.
* <p>
* Default is <code>true</code>.
*
* @param keepPubSubOrder - <code>true</code> if order required, <code>false</code> otherwise.
* @return config
*/
public Config setKeepPubSubOrder(boolean keepPubSubOrder) {
this.keepPubSubOrder = keepPubSubOrder;
return this;
}
public boolean isKeepPubSubOrder() {
return keepPubSubOrder;
}


/**
* Read config object stored in JSON format from <code>String</code>
*
Expand Down
Expand Up @@ -375,7 +375,8 @@ protected RedisClientConfig createRedisConfig(NodeType type, URI address, int ti
.setSslKeystore(config.getSslKeystore())
.setSslKeystorePassword(config.getSslKeystorePassword())
.setDatabase(config.getDatabase())
.setClientName(config.getClientName());
.setClientName(config.getClientName())
.setKeepPubSubOrder(cfg.isKeepPubSubOrder());

if (type != NodeType.SENTINEL) {
redisConfig.setPassword(config.getPassword());
Expand Down

0 comments on commit e56e091

Please sign in to comment.