Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DATAREDIS-976 - Allow extension of Lettuce Connection and Subscription classes. #457

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -5,7 +5,7 @@

<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>2.2.0.BUILD-SNAPSHOT</version>
<version>2.2.0.DATAREDIS-976-SNAPSHOT</version>

<name>Spring Data Redis</name>

Expand Down
Expand Up @@ -19,6 +19,7 @@
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;

Expand All @@ -33,6 +34,7 @@
*
* @author Mark Paluch
* @author Christoph Strobl
* @author Bruce Cloud
* @since 2.0
*/
class ClusterConnectionProvider implements LettuceConnectionProvider, RedisClientProvider {
Expand Down Expand Up @@ -93,7 +95,8 @@ class ClusterConnectionProvider implements LettuceConnectionProvider, RedisClien
}
}

if (connectionType.equals(StatefulRedisPubSubConnection.class)) {
if (connectionType.equals(StatefulRedisPubSubConnection.class)
|| connectionType.equals(StatefulRedisClusterPubSubConnection.class)) {

return client.connectPubSubAsync(codec) //
.thenApply(connectionType::cast);
Expand Down
Expand Up @@ -22,7 +22,6 @@
import io.lettuce.core.cluster.SlotHash;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.sync.RedisClusterCommands;
import io.lettuce.core.cluster.models.partitions.Partitions;
import lombok.RequiredArgsConstructor;

import java.time.Duration;
Expand All @@ -35,6 +34,7 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.DataAccessResourceFailureException;
Expand All @@ -54,6 +54,9 @@
import org.springframework.util.ObjectUtils;

/**
* {@code RedisClusterConnection} implementation on top of <a href="https://github.com/mp911de/lettuce">Lettuce</a>
* Redis client.
*
* @author Christoph Strobl
* @author Mark Paluch
* @since 1.7
Expand All @@ -64,7 +67,6 @@ public class LettuceClusterConnection extends LettuceConnection implements Defau
new LettuceExceptionConverter());

private final Log log = LogFactory.getLog(getClass());
private final RedisClusterClient clusterClient;

private ClusterCommandExecutor clusterCommandExecutor;
private ClusterTopologyProvider topologyProvider;
Expand Down Expand Up @@ -119,8 +121,7 @@ public LettuceClusterConnection(LettuceConnectionProvider connectionProvider) {
Assert.isTrue(connectionProvider instanceof ClusterConnectionProvider,
"LettuceConnectionProvider must be a ClusterConnectionProvider.");

this.clusterClient = getClient();
this.topologyProvider = new LettuceClusterTopologyProvider(this.clusterClient);
this.topologyProvider = new LettuceClusterTopologyProvider(getClient());
this.clusterCommandExecutor = new ClusterCommandExecutor(this.topologyProvider,
new LettuceClusterNodeResourceProvider(getConnectionProvider()), exceptionConverter);
this.disposeClusterCommandExecutorOnClose = true;
Expand Down Expand Up @@ -156,8 +157,7 @@ public LettuceClusterConnection(LettuceConnectionProvider connectionProvider, Cl
Assert.isTrue(connectionProvider instanceof ClusterConnectionProvider,
"LettuceConnectionProvider must be a ClusterConnectionProvider.");

this.clusterClient = getClient();
this.topologyProvider = new LettuceClusterTopologyProvider(this.clusterClient);
this.topologyProvider = new LettuceClusterTopologyProvider(getClient());
this.clusterCommandExecutor = executor;
this.disposeClusterCommandExecutorOnClose = false;
}
Expand All @@ -168,22 +168,20 @@ public LettuceClusterConnection(LettuceConnectionProvider connectionProvider, Cl
*
* @param sharedConnection may be {@literal null} if no shared connection used.
* @param connectionProvider must not be {@literal null}.
* @param clusterClient must not be {@literal null}.
* @param clusterTopologyProvider must not be {@literal null}.
* @param executor must not be {@literal null}.
* @param timeout must not be {@literal null}.
* @since 2.1
*/
LettuceClusterConnection(@Nullable StatefulRedisClusterConnection<byte[], byte[]> sharedConnection,
LettuceConnectionProvider connectionProvider, RedisClusterClient clusterClient, ClusterCommandExecutor executor,
Duration timeout) {
protected LettuceClusterConnection(@Nullable StatefulRedisClusterConnection<byte[], byte[]> sharedConnection,
LettuceConnectionProvider connectionProvider, ClusterTopologyProvider clusterTopologyProvider,
ClusterCommandExecutor executor, Duration timeout) {

super(sharedConnection, connectionProvider, timeout.toMillis(), 0);

Assert.notNull(executor, "ClusterCommandExecutor must not be null.");
Assert.notNull(clusterClient, "RedisClusterClient must not be null.");

this.clusterClient = clusterClient;
this.topologyProvider = new LettuceClusterTopologyProvider(clusterClient);
this.topologyProvider = clusterTopologyProvider;
this.clusterCommandExecutor = executor;
this.disposeClusterCommandExecutorOnClose = false;
}
Expand All @@ -203,13 +201,6 @@ private RedisClusterClient getClient() {
connectionProvider.getClass().getName()));
}

/**
* @return access to {@link RedisClusterClient} for non-connection access.
*/
private Partitions getPartitions() {
return clusterClient.getPartitions();
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.lettuce.LettuceConnection#geoCommands()
Expand Down Expand Up @@ -328,7 +319,11 @@ public Integer clusterGetSlotForKey(byte[] key) {
@Override
public RedisClusterNode clusterGetNodeForSlot(int slot) {

return LettuceConverters.toRedisClusterNode(getPartitions().getPartitionBySlot(slot));
Set<RedisClusterNode> nodes = topologyProvider.getTopology().getSlotServingNodes(slot);
if (nodes.isEmpty()) {
return null;
}
return nodes.iterator().next();
}

/*
Expand Down Expand Up @@ -572,7 +567,7 @@ public void select(int dbIndex) {
*/
@Override
public List<RedisClusterNode> clusterGetNodes() {
return LettuceConverters.partitionsToClusterNodes(getPartitions());
return new ArrayList<>(topologyProvider.getTopology().getNodes());
}

/*
Expand Down
Expand Up @@ -879,7 +879,21 @@ protected StatefulRedisPubSubConnection<byte[], byte[]> switchToPubSub() {
}

private LettuceSubscription initSubscription(MessageListener listener) {
return new LettuceSubscription(listener, switchToPubSub(), connectionProvider);
return doCreateSubscription(listener, switchToPubSub(), connectionProvider);
}

/**
* Customization hook to create a {@link LettuceSubscription}.
*
* @param listener the {@link MessageListener} to notify.
* @param connection Pub/Sub connection.
* @param connectionProvider the {@link LettuceConnectionProvider} for connection release.
* @return a {@link LettuceSubscription}.
* @since 2.2
*/
protected LettuceSubscription doCreateSubscription(MessageListener listener,
StatefulRedisPubSubConnection<byte[], byte[]> connection, LettuceConnectionProvider connectionProvider) {
return new LettuceSubscription(listener, connection, connectionProvider);
}

void pipeline(LettuceResult result) {
Expand Down Expand Up @@ -1250,11 +1264,11 @@ public CommandOutput getTypeHint(CommandType type, CommandOutput defaultType) {
}

@RequiredArgsConstructor
private class LettucePoolConnectionProvider implements LettuceConnectionProvider {
static class LettucePoolConnectionProvider implements LettuceConnectionProvider {

private final LettucePool pool;

/*
/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.lettuce.LettuceConnectionProvider#getConnection(java.lang.Class)
*/
Expand All @@ -1263,7 +1277,7 @@ private class LettucePoolConnectionProvider implements LettuceConnectionProvider
return connectionType.cast(pool.getResource());
}

/*
/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.lettuce.LettuceConnectionProvider#getConnectionAsync(java.lang.Class)
*/
Expand All @@ -1272,7 +1286,7 @@ private class LettucePoolConnectionProvider implements LettuceConnectionProvider
throw new UnsupportedOperationException("Async operations not supported!");
}

/*
/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.lettuce.LettuceConnectionProvider#release(io.lettuce.core.api.StatefulConnection)
*/
Expand Down
Expand Up @@ -15,6 +15,8 @@
*/
package org.springframework.data.redis.connection.lettuce;

import static org.springframework.data.redis.connection.lettuce.LettuceConnection.*;

import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.ReadFrom;
Expand All @@ -40,6 +42,7 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.dao.DataAccessException;
Expand Down Expand Up @@ -273,7 +276,7 @@ public void afterPropertiesSet() {

this.client = createClient();

this.connectionProvider = createConnectionProvider(client, LettuceConnection.CODEC);
this.connectionProvider = createConnectionProvider(client, CODEC);
this.reactiveConnectionProvider = createConnectionProvider(client, LettuceReactiveRedisConnection.CODEC);

if (isClusterAware()) {
Expand Down Expand Up @@ -341,13 +344,7 @@ public RedisConnection getConnection() {
}

LettuceConnection connection;

if (pool != null) {
connection = new LettuceConnection(getSharedConnection(), getTimeout(), null, pool, getDatabase());
} else {
connection = new LettuceConnection(getSharedConnection(), connectionProvider, getTimeout(), getDatabase());
}

connection = doCreateLettuceConnection(getSharedConnection(), connectionProvider, getTimeout(), getDatabase());
connection.setConvertPipelineAndTxResults(convertPipelineAndTxResults);
return connection;
}
Expand All @@ -365,12 +362,51 @@ public RedisClusterConnection getClusterConnection() {

RedisClusterClient clusterClient = (RedisClusterClient) client;

return getShareNativeConnection()
? new LettuceClusterConnection(
(StatefulRedisClusterConnection<byte[], byte[]>) getOrCreateSharedConnection().getConnection(),
connectionProvider, clusterClient, clusterCommandExecutor, clientConfiguration.getCommandTimeout())
: new LettuceClusterConnection(null, connectionProvider, clusterClient, clusterCommandExecutor,
clientConfiguration.getCommandTimeout());
StatefulRedisClusterConnection<byte[], byte[]> sharedConnection = getShareNativeConnection()
? (StatefulRedisClusterConnection<byte[], byte[]>) getOrCreateSharedConnection().getConnection()
: null;

LettuceClusterTopologyProvider topologyProvider = new LettuceClusterTopologyProvider(clusterClient);
return doCreateLettuceClusterConnection(sharedConnection, connectionProvider, topologyProvider,
clusterCommandExecutor, clientConfiguration.getCommandTimeout());
}

/**
* Customization hook for {@link LettuceConnection} creation.
*
* @param sharedConnection the shared {@link StatefulRedisConnection} if {@link #getShareNativeConnection()} is
* {@literal true}; {@literal null} otherwise.
* @param connectionProvider the {@link LettuceConnectionProvider} to release connections.
* @param timeout command timeout in {@link TimeUnit#MILLISECONDS}.
* @param database database index to operate on.
* @return the {@link LettuceConnection}.
* @since 2.2
*/
protected LettuceConnection doCreateLettuceConnection(StatefulRedisConnection<byte[], byte[]> sharedConnection,
LettuceConnectionProvider connectionProvider, long timeout, int database) {

return new LettuceConnection(sharedConnection, connectionProvider, timeout, database);
}

/**
* Customization hook for {@link LettuceClusterConnection} creation.
*
* @param sharedConnection the shared {@link StatefulRedisConnection} if {@link #getShareNativeConnection()} is
* {@literal true}; {@literal null} otherwise.
* @param connectionProvider the {@link LettuceConnectionProvider} to release connections.
* @param topologyProvider the {@link ClusterTopologyProvider}.
* @param clusterCommandExecutor the {@link ClusterCommandExecutor} to release connections.
* @param commandTimeout command timeout {@link Duration}.
* @return the {@link LettuceConnection}.
* @since 2.2
*/
protected LettuceClusterConnection doCreateLettuceClusterConnection(
StatefulRedisClusterConnection<byte[], byte[]> sharedConnection, LettuceConnectionProvider connectionProvider,
ClusterTopologyProvider topologyProvider, ClusterCommandExecutor clusterCommandExecutor,
Duration commandTimeout) {

return new LettuceClusterConnection(sharedConnection, connectionProvider, topologyProvider, clusterCommandExecutor,
commandTimeout);
}

/*
Expand Down Expand Up @@ -909,6 +945,10 @@ protected StatefulConnection<ByteBuffer, ByteBuffer> getSharedReactiveConnection

private LettuceConnectionProvider createConnectionProvider(AbstractRedisClient client, RedisCodec<?, ?> codec) {

if (this.pool != null) {
return new LettucePoolConnectionProvider(this.pool);
}

LettuceConnectionProvider connectionProvider = doCreateConnectionProvider(client, codec);

if (this.clientConfiguration instanceof LettucePoolingClientConfiguration) {
Expand Down
Expand Up @@ -35,41 +35,41 @@ class LettuceMessageListener implements RedisPubSubListener<byte[], byte[]> {
this.listener = listener;
}

/*
/*
* (non-Javadoc)
* @see io.lettuce.core.pubsub.RedisPubSubListener#message(java.lang.Object, java.lang.Object)
*/
public void message(byte[] channel, byte[] message) {
listener.onMessage(new DefaultMessage(channel, message), null);
}

/*
/*
* (non-Javadoc)
* @see io.lettuce.core.pubsub.RedisPubSubListener#message(java.lang.Object, java.lang.Object, java.lang.Object)
*/
public void message(byte[] pattern, byte[] channel, byte[] message) {
listener.onMessage(new DefaultMessage(channel, message), pattern);
}

/*
/*
* (non-Javadoc)
* @see io.lettuce.core.pubsub.RedisPubSubListener#subscribed(java.lang.Object, long)
*/
public void subscribed(byte[] channel, long count) {}

/*
/*
* (non-Javadoc)
* @see io.lettuce.core.pubsub.RedisPubSubListener#psubscribed(java.lang.Object, long)
*/
public void psubscribed(byte[] pattern, long count) {}

/*
/*
* (non-Javadoc)
* @see io.lettuce.core.pubsub.RedisPubSubListener#unsubscribed(java.lang.Object, long)
*/
public void unsubscribed(byte[] channel, long count) {}

/*
/*
* (non-Javadoc)
* @see io.lettuce.core.pubsub.RedisPubSubListener#punsubscribed(java.lang.Object, long)
*/
Expand Down