Skip to content

Commit

Permalink
ClusterServersConfig.readFromSlaves param added. #272
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Nov 3, 2015
1 parent 7d940c0 commit 68f6178
Show file tree
Hide file tree
Showing 12 changed files with 125 additions and 35 deletions.
18 changes: 16 additions & 2 deletions src/main/java/org/redisson/ClusterServersConfig.java
Expand Up @@ -16,7 +16,6 @@
package org.redisson;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -34,13 +33,16 @@ public class ClusterServersConfig extends BaseMasterSlaveServersConfig<ClusterSe
*/
private int scanInterval = 1000;

private boolean readFromSlaves = true;

public ClusterServersConfig() {
}

ClusterServersConfig(ClusterServersConfig config) {
super(config);
setNodeAddresses(config.getNodeAddresses());
setScanInterval(config.getScanInterval());
setReadFromSlaves(config.isReadFromSlaves());
}

/**
Expand Down Expand Up @@ -76,6 +78,18 @@ public ClusterServersConfig setScanInterval(int scanInterval) {
return this;
}


public boolean isReadFromSlaves() {
return readFromSlaves;
}
/**
* Use cluster slaves for read-operations
*
* @param readFromSlaves
* @return
*/
public ClusterServersConfig setReadFromSlaves(boolean readFromSlaves) {
this.readFromSlaves = readFromSlaves;
return this;
}

}
Expand Up @@ -50,6 +50,7 @@
public interface RedisCommands {

RedisStrictCommand<Void> ASKING = new RedisStrictCommand<Void>("ASKING", new VoidReplayConvertor());
RedisStrictCommand<Void> READONLY = new RedisStrictCommand<Void>("READONLY", new VoidReplayConvertor());

RedisCommand<Boolean> ZADD = new RedisCommand<Boolean>("ZADD", new BooleanAmountReplayConvertor(), 3);
RedisCommand<Boolean> ZREM = new RedisCommand<Boolean>("ZREM", new BooleanAmountReplayConvertor(), 2);
Expand Down
26 changes: 26 additions & 0 deletions src/main/java/org/redisson/cluster/ClusterConnectionListener.java
@@ -0,0 +1,26 @@
package org.redisson.cluster;

import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.DefaultConnectionListener;
import org.redisson.connection.ConnectionEntry.Mode;

public class ClusterConnectionListener extends DefaultConnectionListener {

private final boolean readFromSlaves;

public ClusterConnectionListener(boolean readFromSlaves) {
this.readFromSlaves = readFromSlaves;
}

@Override
public void onConnect(MasterSlaveServersConfig config, RedisConnection conn, Mode serverMode) throws RedisException {
super.onConnect(config, conn, serverMode);
if (serverMode == Mode.SLAVE && readFromSlaves) {
conn.sync(RedisCommands.READONLY);
}
}

}
Expand Up @@ -36,7 +36,6 @@
import org.redisson.connection.CRC16;
import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.SingleEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -54,6 +53,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private ScheduledFuture<?> monitorFuture;

public ClusterConnectionManager(ClusterServersConfig cfg, Config config) {
connectListener = new ClusterConnectionListener(cfg.isReadFromSlaves());
init(config);

this.config = create(cfg);
Expand Down Expand Up @@ -127,9 +127,10 @@ private void addMasterEntry(ClusterPartition partition, ClusterServersConfig cfg
log.info("master: {} added for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges());
config.setMasterAddress(partition.getMasterAddress());
config.setSlaveAddresses(partition.getSlaveAddresses());

log.info("slaves: {} added for slot ranges: {}", partition.getSlaveAddresses(), partition.getSlotRanges());

SingleEntry entry = new SingleEntry(partition.getSlotRanges(), this, config);
MasterSlaveEntry entry = new MasterSlaveEntry(partition.getSlotRanges(), this, config, connectListener);
entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
for (ClusterSlotRange slotRange : partition.getSlotRanges()) {
addEntry(slotRange, entry);
Expand Down
19 changes: 13 additions & 6 deletions src/main/java/org/redisson/connection/ConnectionEntry.java
Expand Up @@ -38,12 +38,18 @@ public class ConnectionEntry {
private volatile boolean freezed;
final RedisClient client;

public enum Mode {SLAVE, MASTER}

private final Mode serverMode;
private final ConnectionListener connectListener;
private final Queue<RedisConnection> connections = new ConcurrentLinkedQueue<RedisConnection>();
private final AtomicInteger connectionsCounter = new AtomicInteger();

public ConnectionEntry(RedisClient client, int poolSize) {
public ConnectionEntry(RedisClient client, int poolSize, ConnectionListener connectListener, Mode serverMode) {
this.client = client;
this.connectionsCounter.set(poolSize);
this.connectListener = connectListener;
this.serverMode = serverMode;
}

public RedisClient getClient() {
Expand All @@ -67,7 +73,8 @@ public boolean tryAcquireConnection() {
if (connectionsCounter.get() == 0) {
return false;
}
if (connectionsCounter.compareAndSet(connectionsCounter.get(), connectionsCounter.get() - 1)) {
int value = connectionsCounter.get();
if (connectionsCounter.compareAndSet(value, value - 1)) {
return true;
}
}
Expand Down Expand Up @@ -96,11 +103,11 @@ public void operationComplete(Future<RedisConnection> future) throws Exception {
RedisConnection conn = future.getNow();
log.debug("new connection created: {}", conn);

prepareConnection(config, conn);
connectListener.onConnect(config, conn, serverMode);
conn.setReconnectListener(new ReconnectListener() {
@Override
public void onReconnect(RedisConnection conn) {
prepareConnection(config, conn);
connectListener.onConnect(config, conn, serverMode);
}
});
}
Expand Down Expand Up @@ -131,11 +138,11 @@ public void operationComplete(Future<RedisPubSubConnection> future) throws Excep
RedisPubSubConnection conn = future.getNow();
log.debug("new pubsub connection created: {}", conn);

prepareConnection(config, conn);
connectListener.onConnect(config, conn, serverMode);
conn.setReconnectListener(new ReconnectListener() {
@Override
public void onReconnect(RedisConnection conn) {
prepareConnection(config, conn);
connectListener.onConnect(config, conn, serverMode);
}
});
}
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/org/redisson/connection/ConnectionListener.java
@@ -0,0 +1,12 @@
package org.redisson.connection;

import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
import org.redisson.connection.ConnectionEntry.Mode;

public interface ConnectionListener {

void onConnect(MasterSlaveServersConfig config, RedisConnection redisConnection, Mode serverMode) throws RedisException;

}
@@ -0,0 +1,25 @@
package org.redisson.connection;

import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ConnectionEntry.Mode;

public class DefaultConnectionListener implements ConnectionListener {

@Override
public void onConnect(MasterSlaveServersConfig config, RedisConnection conn, Mode serverMode)
throws RedisException {
if (config.getPassword() != null) {
conn.sync(RedisCommands.AUTH, config.getPassword());
}
if (config.getDatabase() != 0) {
conn.sync(RedisCommands.SELECT, config.getDatabase());
}
if (config.getClientName() != null) {
conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName());
}
}

}
Expand Up @@ -74,6 +74,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {

protected EventLoopGroup group;

protected ConnectionListener connectListener = new DefaultConnectionListener();

protected Class<? extends SocketChannel> socketChannelClass;

Expand Down Expand Up @@ -135,7 +136,7 @@ protected void init(MasterSlaveServersConfig config) {
protected void initEntry(MasterSlaveServersConfig config) {
HashSet<ClusterSlotRange> slots = new HashSet<ClusterSlotRange>();
slots.add(singleSlotRange);
MasterSlaveEntry entry = new MasterSlaveEntry(slots, this, config);
MasterSlaveEntry entry = new MasterSlaveEntry(slots, this, config, connectListener);
entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
addEntry(singleSlotRange, entry);
}
Expand Down
34 changes: 17 additions & 17 deletions src/main/java/org/redisson/connection/MasterSlaveEntry.java
Expand Up @@ -17,9 +17,7 @@

import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -28,6 +26,7 @@
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.connection.ConnectionEntry.Mode;
import org.redisson.misc.ConnectionPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -47,6 +46,8 @@ public class MasterSlaveEntry<E extends ConnectionEntry> {
LoadBalancer slaveBalancer;
SubscribesConnectionEntry masterEntry;

final ConnectionListener connectListener;

final MasterSlaveServersConfig config;
final ConnectionManager connectionManager;

Expand All @@ -55,32 +56,27 @@ public class MasterSlaveEntry<E extends ConnectionEntry> {

final AtomicBoolean active = new AtomicBoolean(true);

public MasterSlaveEntry(Set<ClusterSlotRange> slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config) {
public MasterSlaveEntry(Set<ClusterSlotRange> slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config, ConnectionListener connectListener) {
this.slotRanges = slotRanges;
this.connectionManager = connectionManager;
this.config = config;
this.connectListener = connectListener;

slaveBalancer = config.getLoadBalancer();
slaveBalancer.init(config, connectionManager);

List<URI> addresses = new ArrayList<URI>(config.getSlaveAddresses());
addresses.add(config.getMasterAddress());
for (URI address : addresses) {
RedisClient client = connectionManager.createClient(address.getHost(), address.getPort());
slaveBalancer.add(new SubscribesConnectionEntry(client,
this.config.getSlaveConnectionPoolSize(),
this.config.getSlaveSubscriptionConnectionPoolSize()));
}
if (!config.getSlaveAddresses().isEmpty()) {
slaveDown(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
boolean freezeMasterAsSlave = !config.getSlaveAddresses().isEmpty();
addSlave(config.getMasterAddress().getHost(), config.getMasterAddress().getPort(), freezeMasterAsSlave, Mode.MASTER);
for (URI address : config.getSlaveAddresses()) {
addSlave(address.getHost(), address.getPort(), false, Mode.SLAVE);
}

writeConnectionHolder = new ConnectionPool<RedisConnection>(config, null, connectionManager.getGroup());
}

protected void setupMasterEntry(String host, int port) {
public void setupMasterEntry(String host, int port) {
RedisClient client = connectionManager.createClient(host, port);
masterEntry = new SubscribesConnectionEntry(client, config.getMasterConnectionPoolSize(), 0);
masterEntry = new SubscribesConnectionEntry(client, config.getMasterConnectionPoolSize(), 0, connectListener, Mode.MASTER);
writeConnectionHolder.add(masterEntry);
}

Expand All @@ -95,11 +91,15 @@ public Collection<RedisPubSubConnection> slaveDown(String host, int port) {
}

public void addSlave(String host, int port) {
addSlave(host, port, true, Mode.SLAVE);
}

private void addSlave(String host, int port, boolean freezed, Mode mode) {
RedisClient client = connectionManager.createClient(host, port);
SubscribesConnectionEntry entry = new SubscribesConnectionEntry(client,
this.config.getSlaveConnectionPoolSize(),
this.config.getSlaveSubscriptionConnectionPoolSize());
entry.setFreezed(true);
this.config.getSlaveSubscriptionConnectionPoolSize(), connectListener, mode);
entry.setFreezed(freezed);
slaveBalancer.add(entry);
}

Expand Down
Expand Up @@ -74,7 +74,7 @@ public SingleConnectionManager(SingleServerConfig cfg, Config config) {
protected void initEntry(MasterSlaveServersConfig config) {
HashSet<ClusterSlotRange> slots = new HashSet<ClusterSlotRange>();
slots.add(singleSlotRange);
SingleEntry entry = new SingleEntry(slots, this, config);
SingleEntry entry = new SingleEntry(slots, this, config, connectListener);
entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
addEntry(singleSlotRange, entry);
}
Expand Down
8 changes: 5 additions & 3 deletions src/main/java/org/redisson/connection/SingleEntry.java
Expand Up @@ -22,6 +22,7 @@
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.connection.ConnectionEntry.Mode;
import org.redisson.misc.ConnectionPool;
import org.redisson.misc.PubSubConnectionPoll;

Expand All @@ -31,15 +32,16 @@ public class SingleEntry extends MasterSlaveEntry<SubscribesConnectionEntry> {

final ConnectionPool<RedisPubSubConnection> pubSubConnectionHolder;

public SingleEntry(Set<ClusterSlotRange> slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config) {
super(slotRanges, connectionManager, config);
public SingleEntry(Set<ClusterSlotRange> slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config, ConnectionListener connectListener) {
super(slotRanges, connectionManager, config, connectListener);
pubSubConnectionHolder = new PubSubConnectionPoll(config, null, connectionManager.getGroup());
}

@Override
public void setupMasterEntry(String host, int port) {
RedisClient masterClient = connectionManager.createClient(host, port);
masterEntry = new SubscribesConnectionEntry(masterClient, config.getMasterConnectionPoolSize(), config.getSlaveSubscriptionConnectionPoolSize());
masterEntry = new SubscribesConnectionEntry(masterClient,
config.getMasterConnectionPoolSize(), config.getSlaveSubscriptionConnectionPoolSize(), connectListener, Mode.MASTER);
writeConnectionHolder.add(masterEntry);
pubSubConnectionHolder.add(masterEntry);
}
Expand Down
Expand Up @@ -32,8 +32,8 @@ public class SubscribesConnectionEntry extends ConnectionEntry {
private final Queue<RedisPubSubConnection> freeSubscribeConnections = new ConcurrentLinkedQueue<RedisPubSubConnection>();
private final AtomicInteger connectionsCounter = new AtomicInteger();

public SubscribesConnectionEntry(RedisClient client, int poolSize, int subscribePoolSize) {
super(client, poolSize);
public SubscribesConnectionEntry(RedisClient client, int poolSize, int subscribePoolSize, ConnectionListener connectListener, Mode serverMode) {
super(client, poolSize, connectListener, serverMode);
connectionsCounter.set(subscribePoolSize);
}

Expand All @@ -58,7 +58,8 @@ public boolean tryAcquireSubscribeConnection() {
if (connectionsCounter.get() == 0) {
return false;
}
if (connectionsCounter.compareAndSet(connectionsCounter.get(), connectionsCounter.get() - 1)) {
int value = connectionsCounter.get();
if (connectionsCounter.compareAndSet(value, value - 1)) {
return true;
}
}
Expand Down

0 comments on commit 68f6178

Please sign in to comment.