Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Nov 30, 2016
1 parent 4c960b7 commit c45ab70
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 194 deletions.
Expand Up @@ -37,7 +37,7 @@
import org.redisson.config.ReadMode; import org.redisson.config.ReadMode;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.balancer.LoadBalancerManager; import org.redisson.connection.balancer.LoadBalancerManager;
import org.redisson.connection.balancer.LoadBalancerManagerImpl; import org.redisson.connection.balancer.LoadBalancerManager;
import org.redisson.connection.pool.MasterConnectionPool; import org.redisson.connection.pool.MasterConnectionPool;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -76,7 +76,7 @@ public MasterSlaveEntry(Set<ClusterSlotRange> slotRanges, ConnectionManager conn
this.connectionManager = connectionManager; this.connectionManager = connectionManager;
this.config = config; this.config = config;


slaveBalancer = new LoadBalancerManagerImpl(config, connectionManager, this); slaveBalancer = new LoadBalancerManager(config, connectionManager, this);
writeConnectionHolder = new MasterConnectionPool(config, connectionManager, this); writeConnectionHolder = new MasterConnectionPool(config, connectionManager, this);
} }


Expand Down
Expand Up @@ -16,37 +16,164 @@
package org.redisson.connection.balancer; package org.redisson.connection.balancer;


import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;


import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.ClientConnectionsEntry; import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.pool.PubSubConnectionPool;
import org.redisson.connection.pool.SlaveConnectionPool;
import org.redisson.misc.RPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public interface LoadBalancerManager { import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent;


RFuture<RedisConnection> getConnection(InetSocketAddress addr); public class LoadBalancerManager {


int getAvailableClients(); private final Logger log = LoggerFactory.getLogger(getClass());


void shutdownAsync(); private final ConnectionManager connectionManager;
private final Map<InetSocketAddress, ClientConnectionsEntry> addr2Entry = PlatformDependent.newConcurrentHashMap();
private final PubSubConnectionPool pubSubConnectionPool;
private final SlaveConnectionPool slaveConnectionPool;


void shutdown(); public LoadBalancerManager(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry entry) {
this.connectionManager = connectionManager;
slaveConnectionPool = new SlaveConnectionPool(config, connectionManager, entry);
pubSubConnectionPool = new PubSubConnectionPool(config, connectionManager, entry);
}


boolean unfreeze(String host, int port, FreezeReason freezeReason); public RFuture<Void> add(final ClientConnectionsEntry entry) {
final RPromise<Void> result = connectionManager.newPromise();
FutureListener<Void> listener = new FutureListener<Void>() {
AtomicInteger counter = new AtomicInteger(2);
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(future.cause());
return;
}
if (counter.decrementAndGet() == 0) {
addr2Entry.put(entry.getClient().getAddr(), entry);
result.trySuccess(null);
}
}
};


ClientConnectionsEntry freeze(ClientConnectionsEntry connectionEntry, FreezeReason freezeReason); RFuture<Void> slaveFuture = slaveConnectionPool.add(entry);
slaveFuture.addListener(listener);
RFuture<Void> pubSubFuture = pubSubConnectionPool.add(entry);
pubSubFuture.addListener(listener);
return result;
}

public int getAvailableClients() {
int count = 0;
for (ClientConnectionsEntry connectionEntry : addr2Entry.values()) {
if (!connectionEntry.isFreezed()) {
count++;
}
}
return count;
}

public boolean unfreeze(String host, int port, FreezeReason freezeReason) {
InetSocketAddress addr = new InetSocketAddress(host, port);
ClientConnectionsEntry entry = addr2Entry.get(addr);
if (entry == null) {
throw new IllegalStateException("Can't find " + addr + " in slaves!");
}

synchronized (entry) {
if (!entry.isFreezed()) {
return false;
}
if ((freezeReason == FreezeReason.RECONNECT
&& entry.getFreezeReason() == FreezeReason.RECONNECT)
|| freezeReason != FreezeReason.RECONNECT) {
entry.resetFailedAttempts();
entry.setFreezed(false);
entry.setFreezeReason(null);
return true;
}
}
return false;
}


ClientConnectionsEntry freeze(String host, int port, FreezeReason freezeReason); public ClientConnectionsEntry freeze(String host, int port, FreezeReason freezeReason) {
InetSocketAddress addr = new InetSocketAddress(host, port);
ClientConnectionsEntry connectionEntry = addr2Entry.get(addr);
return freeze(connectionEntry, freezeReason);
}

public ClientConnectionsEntry freeze(ClientConnectionsEntry connectionEntry, FreezeReason freezeReason) {
if (connectionEntry == null) {
return null;
}

synchronized (connectionEntry) {
// only RECONNECT freeze reason could be replaced
if (connectionEntry.getFreezeReason() == null
|| connectionEntry.getFreezeReason() == FreezeReason.RECONNECT) {
connectionEntry.setFreezed(true);
connectionEntry.setFreezeReason(freezeReason);
return connectionEntry;
}
if (connectionEntry.isFreezed()) {
return null;
}
}

return connectionEntry;
}

public RFuture<RedisPubSubConnection> nextPubSubConnection() {
return pubSubConnectionPool.get();
}

public RFuture<RedisConnection> getConnection(InetSocketAddress addr) {
ClientConnectionsEntry entry = addr2Entry.get(addr);
if (entry != null) {
return slaveConnectionPool.get(entry);
}
RedisConnectionException exception = new RedisConnectionException("Can't find entry for " + addr);
return connectionManager.newFailedFuture(exception);
}


RFuture<Void> add(ClientConnectionsEntry entry); public RFuture<RedisConnection> nextConnection() {
return slaveConnectionPool.get();
}


RFuture<RedisConnection> nextConnection(); public void returnPubSubConnection(RedisPubSubConnection connection) {
ClientConnectionsEntry entry = addr2Entry.get(connection.getRedisClient().getAddr());
pubSubConnectionPool.returnConnection(entry, connection);
}


RFuture<RedisPubSubConnection> nextPubSubConnection(); public void returnConnection(RedisConnection connection) {
ClientConnectionsEntry entry = addr2Entry.get(connection.getRedisClient().getAddr());
slaveConnectionPool.returnConnection(entry, connection);
}


void returnConnection(RedisConnection connection); public void shutdown() {
for (ClientConnectionsEntry entry : addr2Entry.values()) {
entry.getClient().shutdown();
}
}


void returnPubSubConnection(RedisPubSubConnection connection); public void shutdownAsync() {
for (ClientConnectionsEntry entry : addr2Entry.values()) {
connectionManager.shutdownAsync(entry.getClient());
}
}


} }

This file was deleted.

0 comments on commit c45ab70

Please sign in to comment.