Skip to content

Commit

Permalink
Redirect exceptions refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Nov 9, 2015
1 parent 74283ca commit 14262bd
Show file tree
Hide file tree
Showing 12 changed files with 70 additions and 55 deletions.
19 changes: 3 additions & 16 deletions src/main/java/org/redisson/client/RedisAskException.java
Expand Up @@ -15,25 +15,12 @@
*/
package org.redisson.client;

import java.net.InetSocketAddress;
import java.net.URI;

public class RedisAskException extends RedisException {
public class RedisAskException extends RedisRedirectException {

private static final long serialVersionUID = -6969734163155547631L;

private URI url;

public RedisAskException(String url) {
this.url = URI.create("//" + url);
}

public URI getUrl() {
return url;
}

public InetSocketAddress getAddr() {
return new InetSocketAddress(url.getHost(), url.getPort());
public RedisAskException(int slot, String url) {
super(slot, url);
}

}
12 changes: 3 additions & 9 deletions src/main/java/org/redisson/client/RedisMovedException.java
Expand Up @@ -15,18 +15,12 @@
*/
package org.redisson.client;

public class RedisMovedException extends RedisException {
public class RedisMovedException extends RedisRedirectException {

private static final long serialVersionUID = -6969734163155547631L;

private int slot;

public RedisMovedException(int slot) {
this.slot = slot;
}

public int getSlot() {
return slot;
public RedisMovedException(int slot, String url) {
super(slot, url);
}

}
30 changes: 30 additions & 0 deletions src/main/java/org/redisson/client/RedisRedirectException.java
@@ -0,0 +1,30 @@
package org.redisson.client;

import java.net.InetSocketAddress;
import java.net.URI;

class RedisRedirectException extends RedisException {

private static final long serialVersionUID = 181505625075250011L;

private int slot;
private URI url;

public RedisRedirectException(int slot, String url) {
this.slot = slot;
this.url = URI.create("//" + url);
}

public int getSlot() {
return slot;
}

public URI getUrl() {
return url;
}

public InetSocketAddress getAddr() {
return new InetSocketAddress(url.getHost(), url.getPort());
}

}
7 changes: 5 additions & 2 deletions src/main/java/org/redisson/client/handler/CommandDecoder.java
Expand Up @@ -165,10 +165,13 @@ private void decode(ByteBuf in, CommandData<Object, Object> data, List<Object> p
if (error.startsWith("MOVED")) {
String[] errorParts = error.split(" ");
int slot = Integer.valueOf(errorParts[1]);
data.getPromise().setFailure(new RedisMovedException(slot));
String addr = errorParts[2];
data.getPromise().setFailure(new RedisMovedException(slot, addr));
} else if (error.startsWith("ASK")) {
String[] errorParts = error.split(" ");
data.getPromise().setFailure(new RedisAskException(errorParts[2]));
int slot = Integer.valueOf(errorParts[1]);
String addr = errorParts[2];
data.getPromise().setFailure(new RedisAskException(slot, addr));
} else {
data.getPromise().setFailure(new RedisException(error + ". channel: " + channel + " command: " + data));
}
Expand Down
Expand Up @@ -18,7 +18,7 @@
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisException;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ConnectionEntry.Mode;
import org.redisson.connection.ConnectionEntry.NodeType;
import org.redisson.connection.DefaultConnectionListener;
import org.redisson.connection.FutureConnectionListener;

Expand All @@ -31,9 +31,9 @@ public ClusterConnectionListener(boolean readFromSlaves) {
}

@Override
public void onConnect(MasterSlaveServersConfig config, Mode serverMode, FutureConnectionListener connectionListener) throws RedisException {
public void onConnect(MasterSlaveServersConfig config, NodeType serverMode, FutureConnectionListener connectionListener) throws RedisException {
super.onConnect(config, serverMode, connectionListener);
if (serverMode == Mode.SLAVE && readFromSlaves) {
if (serverMode == NodeType.SLAVE && readFromSlaves) {
connectionListener.addCommand(RedisCommands.READONLY);
}
}
Expand Down
18 changes: 9 additions & 9 deletions src/main/java/org/redisson/connection/ConnectionEntry.java
Expand Up @@ -41,23 +41,23 @@ public enum FreezeReason {MANAGER, RECONNECT}
private FreezeReason freezeReason;
final RedisClient client;

public enum Mode {SLAVE, MASTER}
public enum NodeType {SLAVE, MASTER}

private final Mode serverMode;
private final NodeType nodeType;
private final ConnectionListener connectListener;
private final Queue<RedisConnection> connections = new ConcurrentLinkedQueue<RedisConnection>();
private final AtomicInteger connectionsCounter = new AtomicInteger();
private AtomicInteger failedAttempts = new AtomicInteger();

public ConnectionEntry(RedisClient client, int poolSize, ConnectionListener connectListener, Mode serverMode) {
public ConnectionEntry(RedisClient client, int poolSize, ConnectionListener connectListener, NodeType nodeType) {
this.client = client;
this.connectionsCounter.set(poolSize);
this.connectListener = connectListener;
this.serverMode = serverMode;
this.nodeType = nodeType;
}

public Mode getServerMode() {
return serverMode;
public NodeType getNodeType() {
return nodeType;
}

public void resetFailedAttempts() {
Expand Down Expand Up @@ -129,7 +129,7 @@ public void operationComplete(Future<RedisConnection> future) throws Exception {
log.debug("new connection created: {}", conn);

FutureConnectionListener<RedisConnection> listener = new FutureConnectionListener<RedisConnection>(connectionFuture, conn);
connectListener.onConnect(config, serverMode, listener);
connectListener.onConnect(config, nodeType, listener);
listener.executeCommands();

addReconnectListener(config, conn);
Expand All @@ -144,7 +144,7 @@ private void addReconnectListener(final MasterSlaveServersConfig config, RedisCo
@Override
public void onReconnect(RedisConnection conn, Promise<RedisConnection> connectionFuture) {
FutureConnectionListener<RedisConnection> listener = new FutureConnectionListener<RedisConnection>(connectionFuture, conn);
connectListener.onConnect(config, serverMode, listener);
connectListener.onConnect(config, nodeType, listener);
listener.executeCommands();
}
});
Expand All @@ -163,7 +163,7 @@ public void operationComplete(Future<RedisPubSubConnection> future) throws Excep
log.debug("new pubsub connection created: {}", conn);

FutureConnectionListener<RedisPubSubConnection> listener = new FutureConnectionListener<RedisPubSubConnection>(connectionFuture, conn);
connectListener.onConnect(config, serverMode, listener);
connectListener.onConnect(config, nodeType, listener);
listener.executeCommands();

addReconnectListener(config, conn);
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/redisson/connection/ConnectionListener.java
Expand Up @@ -17,10 +17,10 @@

import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisException;
import org.redisson.connection.ConnectionEntry.Mode;
import org.redisson.connection.ConnectionEntry.NodeType;

public interface ConnectionListener {

void onConnect(MasterSlaveServersConfig config, Mode serverMode, FutureConnectionListener connectionListener) throws RedisException;
void onConnect(MasterSlaveServersConfig config, NodeType serverMode, FutureConnectionListener connectionListener) throws RedisException;

}
Expand Up @@ -18,12 +18,12 @@
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisException;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ConnectionEntry.Mode;
import org.redisson.connection.ConnectionEntry.NodeType;

public class DefaultConnectionListener implements ConnectionListener {

@Override
public void onConnect(MasterSlaveServersConfig config, Mode serverMode, FutureConnectionListener connectionListener)
public void onConnect(MasterSlaveServersConfig config, NodeType serverMode, FutureConnectionListener connectionListener)
throws RedisException {
if (config.getPassword() != null) {
connectionListener.addCommand(RedisCommands.AUTH, config.getPassword());
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/org/redisson/connection/MasterSlaveEntry.java
Expand Up @@ -27,7 +27,7 @@
import org.redisson.client.RedisPubSubConnection;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.connection.ConnectionEntry.FreezeReason;
import org.redisson.connection.ConnectionEntry.Mode;
import org.redisson.connection.ConnectionEntry.NodeType;
import org.redisson.misc.ConnectionPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -67,17 +67,17 @@ public MasterSlaveEntry(Set<ClusterSlotRange> slotRanges, ConnectionManager conn
slaveBalancer.init(config, connectionManager, this);

boolean freezeMasterAsSlave = !config.getSlaveAddresses().isEmpty();
addSlave(config.getMasterAddress().getHost(), config.getMasterAddress().getPort(), freezeMasterAsSlave, Mode.MASTER);
addSlave(config.getMasterAddress().getHost(), config.getMasterAddress().getPort(), freezeMasterAsSlave, NodeType.MASTER);
for (URI address : config.getSlaveAddresses()) {
addSlave(address.getHost(), address.getPort(), false, Mode.SLAVE);
addSlave(address.getHost(), address.getPort(), false, NodeType.SLAVE);
}

writeConnectionHolder = new ConnectionPool<RedisConnection>(config, null, connectionManager, this);
}

public void setupMasterEntry(String host, int port) {
RedisClient client = connectionManager.createClient(host, port);
masterEntry = new SubscribesConnectionEntry(client, config.getMasterConnectionPoolSize(), 0, connectListener, Mode.MASTER);
masterEntry = new SubscribesConnectionEntry(client, config.getMasterConnectionPoolSize(), 0, connectListener, NodeType.MASTER);
writeConnectionHolder.add(masterEntry);
}

Expand All @@ -93,10 +93,10 @@ public Collection<RedisPubSubConnection> slaveDown(String host, int port, Freeze
}

public void addSlave(String host, int port) {
addSlave(host, port, true, Mode.SLAVE);
addSlave(host, port, true, NodeType.SLAVE);
}

private void addSlave(String host, int port, boolean freezed, Mode mode) {
private void addSlave(String host, int port, boolean freezed, NodeType mode) {
RedisClient client = connectionManager.createClient(host, port);
SubscribesConnectionEntry entry = new SubscribesConnectionEntry(client,
this.config.getSlaveConnectionPoolSize(),
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/redisson/connection/SingleEntry.java
Expand Up @@ -22,7 +22,7 @@
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.connection.ConnectionEntry.Mode;
import org.redisson.connection.ConnectionEntry.NodeType;
import org.redisson.misc.ConnectionPool;
import org.redisson.misc.PubSubConnectionPoll;

Expand All @@ -41,7 +41,7 @@ public SingleEntry(Set<ClusterSlotRange> slotRanges, ConnectionManager connectio
public void setupMasterEntry(String host, int port) {
RedisClient masterClient = connectionManager.createClient(host, port);
masterEntry = new SubscribesConnectionEntry(masterClient,
config.getMasterConnectionPoolSize(), config.getSlaveSubscriptionConnectionPoolSize(), connectListener, Mode.MASTER);
config.getMasterConnectionPoolSize(), config.getSlaveSubscriptionConnectionPoolSize(), connectListener, NodeType.MASTER);
writeConnectionHolder.add(masterEntry);
pubSubConnectionHolder.add(masterEntry);
}
Expand Down
Expand Up @@ -32,7 +32,7 @@ public class SubscribesConnectionEntry extends ConnectionEntry {
private final Queue<RedisPubSubConnection> freeSubscribeConnections = new ConcurrentLinkedQueue<RedisPubSubConnection>();
private final AtomicInteger connectionsCounter = new AtomicInteger();

public SubscribesConnectionEntry(RedisClient client, int poolSize, int subscribePoolSize, ConnectionListener connectListener, Mode serverMode) {
public SubscribesConnectionEntry(RedisClient client, int poolSize, int subscribePoolSize, ConnectionListener connectListener, NodeType serverMode) {
super(client, poolSize, connectListener, serverMode);
connectionsCounter.set(subscribePoolSize);
}
Expand Down
7 changes: 4 additions & 3 deletions src/main/java/org/redisson/misc/ConnectionPool.java
Expand Up @@ -25,7 +25,7 @@
import org.redisson.client.RedisConnectionException;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ConnectionEntry.FreezeReason;
import org.redisson.connection.ConnectionEntry.Mode;
import org.redisson.connection.ConnectionEntry.NodeType;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.LoadBalancer;
import org.redisson.connection.MasterSlaveEntry;
Expand Down Expand Up @@ -89,7 +89,8 @@ public Future<T> get() {
}

public Future<T> get(SubscribesConnectionEntry entry) {
if (!entry.isFreezed() && tryAcquireConnection(entry)) {
if ((entry.getNodeType() == NodeType.MASTER || !entry.isFreezed())
&& tryAcquireConnection(entry)) {
Promise<T> promise = connectionManager.newPromise();
connect(entry, promise);
return promise;
Expand Down Expand Up @@ -164,7 +165,7 @@ private void promiseSuccessful(final SubscribesConnectionEntry entry, final Prom

private void promiseFailure(SubscribesConnectionEntry entry, Promise<T> promise, Throwable cause) {
if (entry.incFailedAttempts() == config.getSlaveFailedAttempts()
&& entry.getServerMode() == Mode.SLAVE) {
&& entry.getNodeType() == NodeType.SLAVE) {
connectionManager.slaveDown(masterSlaveEntry, entry.getClient().getAddr().getHostName(),
entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT);
scheduleCheck(entry);
Expand Down

0 comments on commit 14262bd

Please sign in to comment.