Skip to content

Commit

Permalink
Extend RedisClusterClient and RedisClusterFactoryBean to take multipl…
Browse files Browse the repository at this point in the history
…e hosts #401

RedisClusterClient and RedisClusterFactoryBean can now be initialized by taking multiple hosts within one URI.

RedisClusterClient.create("redis+tls://password@host1:6379,host2:6380")
  • Loading branch information
mp911de committed Nov 6, 2016
1 parent b764373 commit be25731
Show file tree
Hide file tree
Showing 7 changed files with 353 additions and 74 deletions.
35 changes: 27 additions & 8 deletions src/main/java/com/lambdaworks/redis/RedisURI.java
Original file line number Diff line number Diff line change
Expand Up @@ -535,16 +535,20 @@ private static RedisURI buildRedisUriFromUri(URI uri) {
private String getAuthority(final String scheme) {

String authority = null;

if (host != null) {
authority = urlEncode(host) + getPortPart(port, scheme);
if (host.contains(",")) {
authority = host;
} else {
authority = urlEncode(host) + getPortPart(port, scheme);
}
}

if (sentinels.size() != 0) {
String joinedSentinels = sentinels.stream()

authority = sentinels.stream()
.map(redisURI -> urlEncode(redisURI.getHost()) + getPortPart(redisURI.getPort(), scheme))
.collect(Collectors.joining(","));

authority = joinedSentinels;
}

if (socket != null) {
Expand Down Expand Up @@ -772,7 +776,8 @@ private static void parseSentinelMasterId(Builder builder, String queryParam) {
}

private static Builder configureStandalone(URI uri) {
Builder builder;

Builder builder = null;
Set<String> allowedSchemes = LettuceSets.unmodifiableSet(URI_SCHEME_REDIS, URI_SCHEME_REDIS_SECURE,
URI_SCHEME_REDIS_SOCKET, URI_SCHEME_REDIS_SOCKET_ALT, URI_SCHEME_REDIS_SECURE_ALT, URI_SCHEME_REDIS_TLS_ALT);

Expand All @@ -783,10 +788,24 @@ private static Builder configureStandalone(URI uri) {
if (URI_SCHEME_REDIS_SOCKET.equals(uri.getScheme()) || URI_SCHEME_REDIS_SOCKET_ALT.equals(uri.getScheme())) {
builder = Builder.socket(uri.getPath());
} else {
if (uri.getPort() > 0) {
builder = Builder.redis(uri.getHost(), uri.getPort());

if (isNotEmpty(uri.getHost())) {

if (uri.getPort() > 0) {
builder = Builder.redis(uri.getHost(), uri.getPort());
} else {
builder = Builder.redis(uri.getHost());
}
} else {
builder = Builder.redis(uri.getHost());

if (isNotEmpty(uri.getAuthority())) {
String authority = uri.getAuthority();
if (authority.indexOf('@') > -1) {
authority = authority.substring(authority.indexOf('@') + 1);
}

builder = Builder.redis(authority);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.io.Closeable;
import java.net.SocketAddress;
import java.net.URI;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -219,7 +220,7 @@ public static RedisClusterClient create(RedisURI redisURI) {
* Create a new client that connects to the supplied {@link RedisURI uri} with default {@link ClientResources}. You can
* connect to different Redis servers but you must supply a {@link RedisURI} on connecting.
*
* @param redisURIs one or more Redis URI, must not be {@literal null} and not empty
* @param redisURIs one or more Redis URI, must not be {@literal null} and not empty.
* @return a new instance of {@link RedisClusterClient}
*/
public static RedisClusterClient create(Iterable<RedisURI> redisURIs) {
Expand All @@ -232,12 +233,12 @@ public static RedisClusterClient create(Iterable<RedisURI> redisURIs) {
* Create a new client that connects to the supplied uri with default {@link ClientResources}. You can connect to different
* Redis servers but you must supply a {@link RedisURI} on connecting.
*
* @param uri the Redis URI, must not be {@literal null}
* @param uri the Redis URI, must not be empty or {@literal null}.
* @return a new instance of {@link RedisClusterClient}
*/
public static RedisClusterClient create(String uri) {
LettuceAssert.notNull(uri, "URI must not be null");
return create(RedisURI.create(uri));
LettuceAssert.notEmpty(uri, "URI must not be empty");
return create(RedisClusterURIUtil.toRedisURIs(URI.create(uri)));
}

/**
Expand All @@ -261,13 +262,13 @@ public static RedisClusterClient create(ClientResources clientResources, RedisUR
* supply a {@link RedisURI} on connecting.
*
* @param clientResources the client resources, must not be {@literal null}
* @param uri the Redis URI, must not be {@literal null}
* @param uri the Redis URI, must not be empty or {@literal null}.
* @return a new instance of {@link RedisClusterClient}
*/
public static RedisClusterClient create(ClientResources clientResources, String uri) {
assertNotNull(clientResources);
LettuceAssert.notNull(uri, "URI must not be null");
return create(clientResources, RedisURI.create(uri));
LettuceAssert.notEmpty(uri, "URI must not be empty");
return create(clientResources, RedisClusterURIUtil.toRedisURIs(URI.create(uri)));
}

/**
Expand Down Expand Up @@ -678,7 +679,7 @@ protected Partitions loadPartitions() {
for (RedisClusterNode partition : loadedPartitions) {
if (viewedBy != null) {
RedisURI uri = partition.getUri();
applyUriConnectionSettings(viewedBy, uri);
RedisClusterURIUtil.applyUriConnectionSettings(viewedBy, uri);
}
}

Expand Down Expand Up @@ -841,19 +842,6 @@ boolean expireStaleConnections() {
return getClusterClientOptions() == null || getClusterClientOptions().isCloseStaleConnections();
}

static void applyUriConnectionSettings(RedisURI from, RedisURI to) {

if (from.getPassword() != null && from.getPassword().length != 0) {
to.setPassword(new String(from.getPassword()));
}

to.setTimeout(from.getTimeout());
to.setUnit(from.getUnit());
to.setSsl(from.isSsl());
to.setStartTls(from.isStartTls());
to.setVerifyPeer(from.isVerifyPeer());
}

private static <K, V> void assertNotNull(RedisCodec<K, V> codec) {
LettuceAssert.notNull(codec, "RedisCodec must not be null");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.lambdaworks.redis.cluster;

import java.net.URI;
import java.util.ArrayList;
import java.util.List;

import com.lambdaworks.redis.RedisURI;
import com.lambdaworks.redis.internal.HostAndPort;

/**
* {@link RedisClusterURIUtil} is a collection of {@link RedisURI}-based utility methods for {@link RedisClusterClient} use.
*
* @author Mark Paluch
* @since 4.4
*/
public class RedisClusterURIUtil {

/**
* Parse a Redis Cluster URI with potentially multiple hosts into a {@link List} of {@link RedisURI}.
*
* An URI follows the syntax: {@code redis://[password@]host[:port][,host2[:port2]]}
*
* @param uri must not be empty or {@literal null}.
* @return {@link List} of {@link RedisURI}.
*/
public static List<RedisURI> toRedisURIs(URI uri) {

RedisURI redisURI = RedisURI.create(uri);

String[] parts = redisURI.getHost().split("\\,");

List<RedisURI> redisURIs = new ArrayList<>(parts.length);

for (String part : parts) {
HostAndPort hostAndPort = HostAndPort.parse(part);

RedisURI nodeUri = RedisURI.create(hostAndPort.getHostText(),
hostAndPort.hasPort() ? hostAndPort.getPort() : redisURI.getPort());

applyUriConnectionSettings(redisURI, nodeUri);

redisURIs.add(nodeUri);
}

return redisURIs;
}

/**
* Apply {@link RedisURI} settings such as SSL/Timeout/password.
*
* @param from from {@link RedisURI}.
* @param to from {@link RedisURI}.
*/
static void applyUriConnectionSettings(RedisURI from, RedisURI to) {

if (from.getPassword() != null && from.getPassword().length != 0) {
to.setPassword(new String(from.getPassword()));
}

to.setTimeout(from.getTimeout());
to.setUnit(from.getUnit());
to.setSsl(from.isSsl());
to.setStartTls(from.isStartTls());
to.setVerifyPeer(from.isVerifyPeer());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,23 @@
import static com.lambdaworks.redis.LettuceStrings.isNotEmpty;

import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

import com.lambdaworks.redis.RedisURI;
import com.lambdaworks.redis.cluster.RedisClusterClient;
import com.lambdaworks.redis.cluster.RedisClusterURIUtil;
import com.lambdaworks.redis.internal.LettuceAssert;

/**
* Factory Bean for {@link RedisClusterClient} instances. Needs either a {@link URI} or a {@link RedisURI} as input and allows
* to reuse {@link com.lambdaworks.redis.resource.ClientResources}. URI Format: {@code
* redis://[password@]host[:port]
* redis://[password@]host[:port][,host2[:port2]]
* }
*
* {@code
* rediss://[password@]host[:port]
* rediss://[password@]host[:port][,host2[:port2]]
* }
*
* @see RedisURI
Expand All @@ -41,32 +45,52 @@
public class RedisClusterClientFactoryBean extends LettuceFactoryBeanSupport<RedisClusterClient> {

private boolean verifyPeer = false;
private Collection<RedisURI> redisURIs;

@Override
public void afterPropertiesSet() throws Exception {

if (getRedisURI() == null) {
URI uri = getUri();
if (redisURIs == null) {

LettuceAssert.isTrue(!uri.getScheme().equals(RedisURI.URI_SCHEME_REDIS_SENTINEL),
"Sentinel mode not supported when using RedisClusterClient");
if (getUri() != null) {
URI uri = getUri();

RedisURI redisURI = RedisURI.create(uri);
if (isNotEmpty(getPassword())) {
redisURI.setPassword(getPassword());
}
LettuceAssert.isTrue(!uri.getScheme().equals(RedisURI.URI_SCHEME_REDIS_SENTINEL),
"Sentinel mode not supported when using RedisClusterClient");

if (RedisURI.URI_SCHEME_REDIS_SECURE.equals(uri.getScheme())
|| RedisURI.URI_SCHEME_REDIS_SECURE_ALT.equals(uri.getScheme())
|| RedisURI.URI_SCHEME_REDIS_TLS_ALT.equals(uri.getScheme())) {
redisURI.setVerifyPeer(verifyPeer);
}
List<RedisURI> redisURIs = RedisClusterURIUtil.toRedisURIs(uri);

for (RedisURI redisURI : redisURIs) {
applyProperties(uri.getScheme(), redisURI);
}

this.redisURIs = redisURIs;
} else {

setRedisURI(redisURI);
URI uri = getRedisURI().toURI();
RedisURI redisURI = RedisURI.create(uri);
applyProperties(uri.getScheme(), redisURI);
this.redisURIs = Collections.singleton(redisURI);
}
}

super.afterPropertiesSet();
}

private void applyProperties(String scheme, RedisURI redisURI) {

if (isNotEmpty(getPassword())) {
redisURI.setPassword(getPassword());
}

if (RedisURI.URI_SCHEME_REDIS_SECURE.equals(scheme) || RedisURI.URI_SCHEME_REDIS_SECURE_ALT.equals(scheme)
|| RedisURI.URI_SCHEME_REDIS_TLS_ALT.equals(scheme)) {
redisURI.setVerifyPeer(verifyPeer);
}
}

protected Collection<RedisURI> getRedisURIs() {
return redisURIs;
}

@Override
Expand All @@ -83,9 +107,10 @@ public Class<?> getObjectType() {
protected RedisClusterClient createInstance() throws Exception {

if (getClientResources() != null) {
return RedisClusterClient.create(getClientResources(), getRedisURI());
return RedisClusterClient.create(getClientResources(), redisURIs);
}
return RedisClusterClient.create(getRedisURI());

return RedisClusterClient.create(redisURIs);
}

public boolean isVerifyPeer() {
Expand Down
Loading

0 comments on commit be25731

Please sign in to comment.