Skip to content

Commit

Permalink
Fixed - Config can not handle underscore in host #722
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Dec 13, 2016
1 parent 4260a3d commit 1319607
Show file tree
Hide file tree
Showing 20 changed files with 263 additions and 165 deletions.
10 changes: 5 additions & 5 deletions redisson/src/main/java/org/redisson/client/RedisClient.java
Expand Up @@ -16,7 +16,7 @@
package org.redisson.client; package org.redisson.client;


import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URL;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
Expand All @@ -31,7 +31,7 @@
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise; import org.redisson.misc.RedissonPromise;
import org.redisson.misc.URIBuilder; import org.redisson.misc.URLBuilder;


import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
Expand Down Expand Up @@ -70,15 +70,15 @@ public class RedisClient {
private boolean hasOwnGroup; private boolean hasOwnGroup;


public RedisClient(String address) { public RedisClient(String address) {
this(URIBuilder.create(address)); this(URLBuilder.create(address));
} }


public RedisClient(URI address) { public RedisClient(URL address) {
this(new HashedWheelTimer(), Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2), new NioEventLoopGroup(), address); this(new HashedWheelTimer(), Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2), new NioEventLoopGroup(), address);
hasOwnGroup = true; hasOwnGroup = true;
} }


public RedisClient(Timer timer, ExecutorService executor, EventLoopGroup group, URI address) { public RedisClient(Timer timer, ExecutorService executor, EventLoopGroup group, URL address) {
this(timer, executor, group, address.getHost(), address.getPort()); this(timer, executor, group, address.getHost(), address.getPort());
} }


Expand Down
Expand Up @@ -15,7 +15,7 @@
*/ */
package org.redisson.cluster; package org.redisson.cluster;


import java.net.URI; import java.net.URL;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
Expand Down Expand Up @@ -66,13 +66,13 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {


private final Logger log = LoggerFactory.getLogger(getClass()); private final Logger log = LoggerFactory.getLogger(getClass());


private final Map<URI, RedisConnection> nodeConnections = PlatformDependent.newConcurrentHashMap(); private final Map<URL, RedisConnection> nodeConnections = PlatformDependent.newConcurrentHashMap();


private final ConcurrentMap<Integer, ClusterPartition> lastPartitions = PlatformDependent.newConcurrentHashMap(); private final ConcurrentMap<Integer, ClusterPartition> lastPartitions = PlatformDependent.newConcurrentHashMap();


private ScheduledFuture<?> monitorFuture; private ScheduledFuture<?> monitorFuture;


private volatile URI lastClusterNode; private volatile URL lastClusterNode;


public ClusterConnectionManager(ClusterServersConfig cfg, Config config) { public ClusterConnectionManager(ClusterServersConfig cfg, Config config) {
super(config); super(config);
Expand All @@ -84,7 +84,7 @@ public ClusterConnectionManager(ClusterServersConfig cfg, Config config) {


Throwable lastException = null; Throwable lastException = null;
List<String> failedMasters = new ArrayList<String>(); List<String> failedMasters = new ArrayList<String>();
for (URI addr : cfg.getNodeAddresses()) { for (URL addr : cfg.getNodeAddresses()) {
RFuture<RedisConnection> connectionFuture = connect(cfg, addr); RFuture<RedisConnection> connectionFuture = connect(cfg, addr);
try { try {
RedisConnection connection = connectionFuture.syncUninterruptibly().getNow(); RedisConnection connection = connectionFuture.syncUninterruptibly().getNow();
Expand Down Expand Up @@ -158,7 +158,7 @@ private void close(RedisConnection conn) {
} }
} }


private RFuture<RedisConnection> connect(ClusterServersConfig cfg, final URI addr) { private RFuture<RedisConnection> connect(ClusterServersConfig cfg, final URL addr) {
RedisConnection connection = nodeConnections.get(addr); RedisConnection connection = nodeConnections.get(addr);
if (connection != null) { if (connection != null) {
return newSucceededFuture(connection); return newSucceededFuture(connection);
Expand Down Expand Up @@ -308,22 +308,22 @@ public void operationComplete(Future<Void> future) throws Exception {
return result; return result;
} }


private void scheduleClusterChangeCheck(final ClusterServersConfig cfg, final Iterator<URI> iterator) { private void scheduleClusterChangeCheck(final ClusterServersConfig cfg, final Iterator<URL> iterator) {
monitorFuture = GlobalEventExecutor.INSTANCE.schedule(new Runnable() { monitorFuture = GlobalEventExecutor.INSTANCE.schedule(new Runnable() {
@Override @Override
public void run() { public void run() {
AtomicReference<Throwable> lastException = new AtomicReference<Throwable>(); AtomicReference<Throwable> lastException = new AtomicReference<Throwable>();
Iterator<URI> nodesIterator = iterator; Iterator<URL> nodesIterator = iterator;
if (nodesIterator == null) { if (nodesIterator == null) {
List<URI> nodes = new ArrayList<URI>(); List<URL> nodes = new ArrayList<URL>();
List<URI> slaves = new ArrayList<URI>(); List<URL> slaves = new ArrayList<URL>();


for (ClusterPartition partition : getLastPartitions()) { for (ClusterPartition partition : getLastPartitions()) {
if (!partition.isMasterFail()) { if (!partition.isMasterFail()) {
nodes.add(partition.getMasterAddress()); nodes.add(partition.getMasterAddress());
} }


Set<URI> partitionSlaves = new HashSet<URI>(partition.getSlaveAddresses()); Set<URL> partitionSlaves = new HashSet<URL>(partition.getSlaveAddresses());
partitionSlaves.removeAll(partition.getFailedSlaveAddresses()); partitionSlaves.removeAll(partition.getFailedSlaveAddresses());
slaves.addAll(partitionSlaves); slaves.addAll(partitionSlaves);
} }
Expand All @@ -339,7 +339,7 @@ public void run() {
}, cfg.getScanInterval(), TimeUnit.MILLISECONDS); }, cfg.getScanInterval(), TimeUnit.MILLISECONDS);
} }


private void checkClusterState(final ClusterServersConfig cfg, final Iterator<URI> iterator, final AtomicReference<Throwable> lastException) { private void checkClusterState(final ClusterServersConfig cfg, final Iterator<URL> iterator, final AtomicReference<Throwable> lastException) {
if (!iterator.hasNext()) { if (!iterator.hasNext()) {
log.error("Can't update cluster state", lastException.get()); log.error("Can't update cluster state", lastException.get());
scheduleClusterChangeCheck(cfg, null); scheduleClusterChangeCheck(cfg, null);
Expand All @@ -348,7 +348,7 @@ private void checkClusterState(final ClusterServersConfig cfg, final Iterator<UR
if (!getShutdownLatch().acquire()) { if (!getShutdownLatch().acquire()) {
return; return;
} }
final URI uri = iterator.next(); final URL uri = iterator.next();
RFuture<RedisConnection> connectionFuture = connect(cfg, uri); RFuture<RedisConnection> connectionFuture = connect(cfg, uri);
connectionFuture.addListener(new FutureListener<RedisConnection>() { connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override @Override
Expand All @@ -366,7 +366,7 @@ public void operationComplete(Future<RedisConnection> future) throws Exception {
}); });
} }


private void updateClusterState(final ClusterServersConfig cfg, final RedisConnection connection, final Iterator<URI> iterator, final URI uri) { private void updateClusterState(final ClusterServersConfig cfg, final RedisConnection connection, final Iterator<URL> iterator, final URL uri) {
RFuture<List<ClusterNodeInfo>> future = connection.async(RedisCommands.CLUSTER_NODES); RFuture<List<ClusterNodeInfo>> future = connection.async(RedisCommands.CLUSTER_NODES);
future.addListener(new FutureListener<List<ClusterNodeInfo>>() { future.addListener(new FutureListener<List<ClusterNodeInfo>>() {
@Override @Override
Expand Down Expand Up @@ -415,7 +415,7 @@ private void checkSlaveNodesChange(Collection<ClusterPartition> newPartitions) {


MasterSlaveEntry entry = getEntry(currentPart.getMasterAddr()); MasterSlaveEntry entry = getEntry(currentPart.getMasterAddr());
// should be invoked first in order to remove stale failedSlaveAddresses // should be invoked first in order to remove stale failedSlaveAddresses
Set<URI> addedSlaves = addRemoveSlaves(entry, currentPart, newPart); Set<URL> addedSlaves = addRemoveSlaves(entry, currentPart, newPart);
// Do some slaves have changed state from failed to alive? // Do some slaves have changed state from failed to alive?
upDownSlaves(entry, currentPart, newPart, addedSlaves); upDownSlaves(entry, currentPart, newPart, addedSlaves);


Expand All @@ -424,42 +424,42 @@ private void checkSlaveNodesChange(Collection<ClusterPartition> newPartitions) {
} }
} }


private void upDownSlaves(final MasterSlaveEntry entry, final ClusterPartition currentPart, final ClusterPartition newPart, Set<URI> addedSlaves) { private void upDownSlaves(final MasterSlaveEntry entry, final ClusterPartition currentPart, final ClusterPartition newPart, Set<URL> addedSlaves) {
Set<URI> aliveSlaves = new HashSet<URI>(currentPart.getFailedSlaveAddresses()); Set<URL> aliveSlaves = new HashSet<URL>(currentPart.getFailedSlaveAddresses());
aliveSlaves.removeAll(addedSlaves); aliveSlaves.removeAll(addedSlaves);
aliveSlaves.removeAll(newPart.getFailedSlaveAddresses()); aliveSlaves.removeAll(newPart.getFailedSlaveAddresses());
for (URI uri : aliveSlaves) { for (URL uri : aliveSlaves) {
currentPart.removeFailedSlaveAddress(uri); currentPart.removeFailedSlaveAddress(uri);
if (entry.slaveUp(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) { if (entry.slaveUp(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) {
log.info("slave: {} has up for slot ranges: {}", uri, currentPart.getSlotRanges()); log.info("slave: {} has up for slot ranges: {}", uri, currentPart.getSlotRanges());
} }
} }


Set<URI> failedSlaves = new HashSet<URI>(newPart.getFailedSlaveAddresses()); Set<URL> failedSlaves = new HashSet<URL>(newPart.getFailedSlaveAddresses());
failedSlaves.removeAll(currentPart.getFailedSlaveAddresses()); failedSlaves.removeAll(currentPart.getFailedSlaveAddresses());
for (URI uri : failedSlaves) { for (URL uri : failedSlaves) {
currentPart.addFailedSlaveAddress(uri); currentPart.addFailedSlaveAddress(uri);
if (entry.slaveDown(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) { if (entry.slaveDown(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) {
log.warn("slave: {} has down for slot ranges: {}", uri, currentPart.getSlotRanges()); log.warn("slave: {} has down for slot ranges: {}", uri, currentPart.getSlotRanges());
} }
} }
} }


private Set<URI> addRemoveSlaves(final MasterSlaveEntry entry, final ClusterPartition currentPart, final ClusterPartition newPart) { private Set<URL> addRemoveSlaves(final MasterSlaveEntry entry, final ClusterPartition currentPart, final ClusterPartition newPart) {
Set<URI> removedSlaves = new HashSet<URI>(currentPart.getSlaveAddresses()); Set<URL> removedSlaves = new HashSet<URL>(currentPart.getSlaveAddresses());
removedSlaves.removeAll(newPart.getSlaveAddresses()); removedSlaves.removeAll(newPart.getSlaveAddresses());


for (URI uri : removedSlaves) { for (URL uri : removedSlaves) {
currentPart.removeSlaveAddress(uri); currentPart.removeSlaveAddress(uri);


if (entry.slaveDown(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) { if (entry.slaveDown(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) {
log.info("slave {} removed for slot ranges: {}", uri, currentPart.getSlotRanges()); log.info("slave {} removed for slot ranges: {}", uri, currentPart.getSlotRanges());
} }
} }


Set<URI> addedSlaves = new HashSet<URI>(newPart.getSlaveAddresses()); Set<URL> addedSlaves = new HashSet<URL>(newPart.getSlaveAddresses());
addedSlaves.removeAll(currentPart.getSlaveAddresses()); addedSlaves.removeAll(currentPart.getSlaveAddresses());
for (final URI uri : addedSlaves) { for (final URL uri : addedSlaves) {
RFuture<Void> future = entry.addSlave(uri.getHost(), uri.getPort()); RFuture<Void> future = entry.addSlave(uri.getHost(), uri.getPort());
future.addListener(new FutureListener<Void>() { future.addListener(new FutureListener<Void>() {
@Override @Override
Expand Down Expand Up @@ -516,8 +516,8 @@ private RFuture<Void> checkMasterNodesChange(ClusterServersConfig cfg, Collectio
if (!newMasterPart.getMasterAddress().equals(currentPart.getMasterAddress())) { if (!newMasterPart.getMasterAddress().equals(currentPart.getMasterAddress())) {
log.info("changing master from {} to {} for {}", log.info("changing master from {} to {} for {}",
currentPart.getMasterAddress(), newMasterPart.getMasterAddress(), slot); currentPart.getMasterAddress(), newMasterPart.getMasterAddress(), slot);
URI newUri = newMasterPart.getMasterAddress(); URL newUri = newMasterPart.getMasterAddress();
URI oldUri = currentPart.getMasterAddress(); URL oldUri = currentPart.getMasterAddress();


changeMaster(slot, newUri.getHost(), newUri.getPort()); changeMaster(slot, newUri.getHost(), newUri.getPort());


Expand Down Expand Up @@ -720,7 +720,7 @@ private HashSet<ClusterPartition> getLastPartitions() {
} }


@Override @Override
public URI getLastClusterNode() { public URL getLastClusterNode() {
return lastClusterNode; return lastClusterNode;
} }


Expand Down
Expand Up @@ -16,10 +16,11 @@
package org.redisson.cluster; package org.redisson.cluster;


import java.net.URI; import java.net.URI;
import java.net.URL;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;


import org.redisson.misc.URIBuilder; import org.redisson.misc.URLBuilder;


/** /**
* *
Expand All @@ -33,7 +34,7 @@ public enum Flag {NOFLAGS, SLAVE, MASTER, MYSELF, FAIL, HANDSHAKE, NOADDR};
private final String nodeInfo; private final String nodeInfo;


private String nodeId; private String nodeId;
private URI address; private URL address;
private final Set<Flag> flags = new HashSet<Flag>(); private final Set<Flag> flags = new HashSet<Flag>();
private String slaveOf; private String slaveOf;


Expand All @@ -50,11 +51,11 @@ public void setNodeId(String nodeId) {
this.nodeId = nodeId; this.nodeId = nodeId;
} }


public URI getAddress() { public URL getAddress() {
return address; return address;
} }
public void setAddress(String address) { public void setAddress(String address) {
this.address = URIBuilder.create(address); this.address = URLBuilder.create(address);
} }


public void addSlotRange(ClusterSlotRange range) { public void addSlotRange(ClusterSlotRange range) {
Expand Down
33 changes: 19 additions & 14 deletions redisson/src/main/java/org/redisson/cluster/ClusterPartition.java
Expand Up @@ -16,20 +16,25 @@
package org.redisson.cluster; package org.redisson.cluster;


import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URL;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;


import org.redisson.misc.URIBuilder; import org.redisson.misc.URLBuilder;


/**
*
* @author Nikita Koksharov
*
*/
public class ClusterPartition { public class ClusterPartition {


private final String nodeId; private final String nodeId;
private boolean masterFail; private boolean masterFail;
private URI masterAddress; private URL masterAddress;
private final Set<URI> slaveAddresses = new HashSet<URI>(); private final Set<URL> slaveAddresses = new HashSet<URL>();
private final Set<URI> failedSlaves = new HashSet<URI>(); private final Set<URL> failedSlaves = new HashSet<URL>();


private final Set<Integer> slots = new HashSet<Integer>(); private final Set<Integer> slots = new HashSet<Integer>();
private final Set<ClusterSlotRange> slotRanges = new HashSet<ClusterSlotRange>(); private final Set<ClusterSlotRange> slotRanges = new HashSet<ClusterSlotRange>();
Expand Down Expand Up @@ -85,33 +90,33 @@ public InetSocketAddress getMasterAddr() {
return new InetSocketAddress(masterAddress.getHost(), masterAddress.getPort()); return new InetSocketAddress(masterAddress.getHost(), masterAddress.getPort());
} }


public URI getMasterAddress() { public URL getMasterAddress() {
return masterAddress; return masterAddress;
} }
public void setMasterAddress(String masterAddress) { public void setMasterAddress(String masterAddress) {
setMasterAddress(URIBuilder.create(masterAddress)); setMasterAddress(URLBuilder.create(masterAddress));
} }
public void setMasterAddress(URI masterAddress) { public void setMasterAddress(URL masterAddress) {
this.masterAddress = masterAddress; this.masterAddress = masterAddress;
} }


public void addFailedSlaveAddress(URI address) { public void addFailedSlaveAddress(URL address) {
failedSlaves.add(address); failedSlaves.add(address);
} }
public Set<URI> getFailedSlaveAddresses() { public Set<URL> getFailedSlaveAddresses() {
return Collections.unmodifiableSet(failedSlaves); return Collections.unmodifiableSet(failedSlaves);
} }
public void removeFailedSlaveAddress(URI uri) { public void removeFailedSlaveAddress(URL uri) {
failedSlaves.remove(uri); failedSlaves.remove(uri);
} }


public void addSlaveAddress(URI address) { public void addSlaveAddress(URL address) {
slaveAddresses.add(address); slaveAddresses.add(address);
} }
public Set<URI> getSlaveAddresses() { public Set<URL> getSlaveAddresses() {
return Collections.unmodifiableSet(slaveAddresses); return Collections.unmodifiableSet(slaveAddresses);
} }
public void removeSlaveAddress(URI uri) { public void removeSlaveAddress(URL uri) {
slaveAddresses.remove(uri); slaveAddresses.remove(uri);
failedSlaves.remove(uri); failedSlaves.remove(uri);
} }
Expand Down
Expand Up @@ -15,18 +15,23 @@
*/ */
package org.redisson.config; package org.redisson.config;


import java.net.URI; import java.net.URL;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;


import org.redisson.misc.URIBuilder; import org.redisson.misc.URLBuilder;


/**
*
* @author Nikita Koksharov
*
*/
public class ClusterServersConfig extends BaseMasterSlaveServersConfig<ClusterServersConfig> { public class ClusterServersConfig extends BaseMasterSlaveServersConfig<ClusterServersConfig> {


/** /**
* Redis cluster node urls list * Redis cluster node urls list
*/ */
private List<URI> nodeAddresses = new ArrayList<URI>(); private List<URL> nodeAddresses = new ArrayList<URL>();


/** /**
* Redis cluster scan interval in milliseconds * Redis cluster scan interval in milliseconds
Expand All @@ -50,14 +55,14 @@ public ClusterServersConfig() {
*/ */
public ClusterServersConfig addNodeAddress(String ... addresses) { public ClusterServersConfig addNodeAddress(String ... addresses) {
for (String address : addresses) { for (String address : addresses) {
nodeAddresses.add(URIBuilder.create(address)); nodeAddresses.add(URLBuilder.create(address));
} }
return this; return this;
} }
public List<URI> getNodeAddresses() { public List<URL> getNodeAddresses() {
return nodeAddresses; return nodeAddresses;
} }
void setNodeAddresses(List<URI> nodeAddresses) { void setNodeAddresses(List<URL> nodeAddresses) {
this.nodeAddresses = nodeAddresses; this.nodeAddresses = nodeAddresses;
} }


Expand Down
10 changes: 10 additions & 0 deletions redisson/src/main/java/org/redisson/config/ConfigSupport.java
Expand Up @@ -47,7 +47,13 @@
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import org.redisson.codec.CodecProvider; import org.redisson.codec.CodecProvider;
import org.redisson.liveobject.provider.ResolverProvider; import org.redisson.liveobject.provider.ResolverProvider;
import org.redisson.misc.URLBuilder;


/**
*
* @author Nikita Koksharov
*
*/
public class ConfigSupport { public class ConfigSupport {


@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, property = "class") @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, property = "class")
Expand Down Expand Up @@ -111,6 +117,10 @@ public static class ConfigMixIn {
private final ObjectMapper jsonMapper = createMapper(null); private final ObjectMapper jsonMapper = createMapper(null);
private final ObjectMapper yamlMapper = createMapper(new YAMLFactory()); private final ObjectMapper yamlMapper = createMapper(new YAMLFactory());


public ConfigSupport() {
URLBuilder.init();
}

public <T> T fromJSON(String content, Class<T> configType) throws IOException { public <T> T fromJSON(String content, Class<T> configType) throws IOException {
return jsonMapper.readValue(content, configType); return jsonMapper.readValue(content, configType);
} }
Expand Down

0 comments on commit 1319607

Please sign in to comment.