Skip to content

Commit

Permalink
DATAREDIS-667 - Introduce LettuceConnectionProvider.
Browse files Browse the repository at this point in the history
We now create and release Lettuce connections using LettuceConnectionProvider. Connection providers encapsulate the underlying client and expose only creation and release methods. A connection provider can provide connections for Standalone or Cluster connections or wrap a LettucePool.

Previously we used RedisClient and RedisClusterClient directly which required context propagation (pooling/non-pooling) conditional code to obtain the appropriate connection type.

Original Pull Request: #262
  • Loading branch information
mp911de authored and christophstrobl committed Sep 7, 2017
1 parent 5c37948 commit 5ac7f8e
Show file tree
Hide file tree
Showing 11 changed files with 546 additions and 175 deletions.
@@ -0,0 +1,64 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.redis.connection.lettuce;

import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;

/**
* Connection provider for Cluster connections.
*
* @author Mark Paluch
* @since 2.0
*/
class ClusterConnectionProvider implements LettuceConnectionProvider {

private final RedisClusterClient client;
private final RedisCodec<?, ?> codec;

ClusterConnectionProvider(RedisClusterClient client, RedisCodec<?, ?> codec) {

this.client = client;
this.codec = codec;
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.lettuce.LettuceConnectionProvider#getConnection(java.lang.Class)
*/
@SuppressWarnings("rawtypes")
@Override
public StatefulConnection<?, ?> getConnection(Class<? extends StatefulConnection> connectionType) {

if (connectionType.equals(StatefulRedisPubSubConnection.class)) {
return client.connectPubSub(codec);
}

if (StatefulRedisClusterConnection.class.isAssignableFrom(connectionType)
|| connectionType.equals(StatefulConnection.class)) {
return client.connect(codec);
}

throw new UnsupportedOperationException("Connection type " + connectionType + " not supported!");
}

public RedisClusterClient getClient() {
return client;
}
}
Expand Up @@ -17,15 +17,12 @@

import io.lettuce.core.RedisException;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.sync.BaseRedisCommands;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.SlotHash;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.sync.RedisClusterCommands;
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import lombok.RequiredArgsConstructor;

import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -60,11 +57,9 @@ public class LettuceClusterConnection extends LettuceConnection implements Defau

static final ExceptionTranslationStrategy exceptionConverter = new PassThroughExceptionTranslationStrategy(
new LettuceExceptionConverter());
static final RedisCodec<byte[], byte[]> CODEC = ByteArrayCodec.INSTANCE;

private final Log log = LogFactory.getLog(getClass());

private final RedisClusterClient clusterClient;
private ClusterCommandExecutor clusterCommandExecutor;
private ClusterTopologyProvider topologyProvider;
private boolean disposeClusterCommandExecutorOnClose;
Expand All @@ -77,12 +72,7 @@ public class LettuceClusterConnection extends LettuceConnection implements Defau
* @param clusterClient must not be {@literal null}.
*/
public LettuceClusterConnection(RedisClusterClient clusterClient) {

this(clusterClient, RedisURI.DEFAULT_TIMEOUT_DURATION,
new ClusterCommandExecutor(new LettuceClusterTopologyProvider(clusterClient),
new LettuceClusterNodeResourceProvider(clusterClient), exceptionConverter));

this.disposeClusterCommandExecutorOnClose = true;
this(new ClusterConnectionProvider(clusterClient, CODEC));
}

/**
Expand All @@ -93,7 +83,7 @@ public LettuceClusterConnection(RedisClusterClient clusterClient) {
* @param executor must not be {@literal null}.
*/
public LettuceClusterConnection(RedisClusterClient clusterClient, ClusterCommandExecutor executor) {
this(clusterClient, RedisURI.DEFAULT_TIMEOUT_DURATION, executor);
this(clusterClient, executor, RedisURI.DEFAULT_TIMEOUT_DURATION);
}

/**
Expand All @@ -105,16 +95,64 @@ public LettuceClusterConnection(RedisClusterClient clusterClient, ClusterCommand
* @param executor must not be {@literal null}.
* @since 2.0
*/
public LettuceClusterConnection(RedisClusterClient clusterClient, Duration timeout, ClusterCommandExecutor executor) {
public LettuceClusterConnection(RedisClusterClient clusterClient, ClusterCommandExecutor executor, Duration timeout) {
this(new ClusterConnectionProvider(clusterClient, CODEC), executor, timeout);
}

/**
* Creates new {@link LettuceClusterConnection} using {@link LettuceConnectionProvider} running commands across the
* cluster via given {@link ClusterCommandExecutor}.
*
* @param connectionProvider must not be {@literal null}.
* @since 2.0
*/
public LettuceClusterConnection(LettuceConnectionProvider connectionProvider) {

super(null, connectionProvider, RedisURI.DEFAULT_TIMEOUT_DURATION.toMillis(), 0);

super(null, timeout.toMillis(), clusterClient, null, 0);
topologyProvider = new LettuceClusterTopologyProvider(getClient());
clusterCommandExecutor = new ClusterCommandExecutor(topologyProvider,
new LettuceClusterNodeResourceProvider(getConnectionProvider()), exceptionConverter);
disposeClusterCommandExecutorOnClose = true;
}

/**
* Creates new {@link LettuceClusterConnection} using {@link LettuceConnectionProvider} running commands across the
* cluster via given {@link ClusterCommandExecutor}.
*
* @param connectionProvider must not be {@literal null}.
* @param executor must not be {@literal null}.
* @since 2.0
*/
public LettuceClusterConnection(LettuceConnectionProvider connectionProvider, ClusterCommandExecutor executor) {
this(connectionProvider, executor, RedisURI.DEFAULT_TIMEOUT_DURATION);
}

/**
* Creates new {@link LettuceClusterConnection} using {@link LettuceConnectionProvider} running commands across the
* cluster via given {@link ClusterCommandExecutor}.
*
* @param connectionProvider must not be {@literal null}.
* @param executor must not be {@literal null}.
* @since 2.0
*/
public LettuceClusterConnection(LettuceConnectionProvider connectionProvider, ClusterCommandExecutor executor,
Duration timeout) {

super(null, connectionProvider, timeout.toMillis(), 0);

Assert.notNull(clusterClient, "RedisClusterClient must not be null.");
Assert.notNull(executor, "ClusterCommandExecutor must not be null.");

this.clusterClient = clusterClient;
this.topologyProvider = new LettuceClusterTopologyProvider(clusterClient);
this.topologyProvider = new LettuceClusterTopologyProvider(getClient());
this.clusterCommandExecutor = executor;
this.disposeClusterCommandExecutorOnClose = false;
}

/**
* @return access to {@link RedisClusterClient} for non-connection access.
*/
private RedisClusterClient getClient() {
return ((ClusterConnectionProvider) getConnectionProvider()).getClient();
}

/*
Expand Down Expand Up @@ -235,7 +273,7 @@ public Integer clusterGetSlotForKey(byte[] key) {
@Override
public RedisClusterNode clusterGetNodeForSlot(int slot) {

return LettuceConverters.toRedisClusterNode(clusterClient.getPartitions().getPartitionBySlot(slot));
return LettuceConverters.toRedisClusterNode(getClient().getPartitions().getPartitionBySlot(slot));
}

/*
Expand Down Expand Up @@ -459,15 +497,6 @@ public void select(int dbIndex) {
}
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.lettuce.LettuceConnection#getAsyncDedicatedConnection()
*/
@Override
protected StatefulConnection<byte[], byte[]> doGetAsyncDedicatedConnection() {
return clusterClient.connect(CODEC);
}

// --> cluster node stuff

/*
Expand All @@ -476,7 +505,7 @@ protected StatefulConnection<byte[], byte[]> doGetAsyncDedicatedConnection() {
*/
@Override
public List<RedisClusterNode> clusterGetNodes() {
return LettuceConverters.partitionsToClusterNodes(clusterClient.getPartitions());
return LettuceConverters.partitionsToClusterNodes(getClient().getPartitions());
}

/*
Expand Down Expand Up @@ -527,19 +556,6 @@ public Map<RedisClusterNode, Collection<RedisClusterNode>> clusterGetMasterSlave
return result;
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.lettuce.LettuceConnection#switchToPubSub()
*/
@Override
protected StatefulRedisPubSubConnection<byte[], byte[]> switchToPubSub() {

close();

// open a pubsub one
return clusterClient.connectPubSub(CODEC);
}

public ClusterCommandExecutor getClusterCommandExecutor() {
return clusterCommandExecutor;
}
Expand Down Expand Up @@ -588,16 +604,12 @@ protected interface LettuceMultiKeyClusterCommandCallback<T>
* @author Christoph Strobl
* @since 1.7
*/
@RequiredArgsConstructor
static class LettuceClusterNodeResourceProvider implements ClusterNodeResourceProvider, DisposableBean {

private final RedisClusterClient client;
private final LettuceConnectionProvider connectionProvider;
private volatile StatefulRedisClusterConnection<byte[], byte[]> connection;

public LettuceClusterNodeResourceProvider(RedisClusterClient client) {

this.client = client;
}

@Override
@SuppressWarnings("unchecked")
public RedisClusterCommands<byte[], byte[]> getResourceForSpecificNode(RedisClusterNode node) {
Expand All @@ -607,7 +619,7 @@ public RedisClusterCommands<byte[], byte[]> getResourceForSpecificNode(RedisClus
if (connection == null) {
synchronized (this) {
if (connection == null) {
this.connection = client.connect(CODEC);
this.connection = connectionProvider.getConnection();
}
}
}
Expand All @@ -631,7 +643,7 @@ public void returnResourceForSpecificNode(RedisClusterNode node, Object resource
@Override
public void destroy() throws Exception {
if (connection != null) {
connection.close();
connectionProvider.release(connection);
}
}
}
Expand Down

0 comments on commit 5ac7f8e

Please sign in to comment.