From a578e0d7eb8cdab797c5b6ace71a65d42194f1a1 Mon Sep 17 00:00:00 2001 From: Severin Kistler Date: Thu, 9 Oct 2025 19:55:39 +0200 Subject: [PATCH 1/2] GH-10471: add the lockKey to the unlockChannelKey to control cluster slot hashing As a consequence, change the ChannelTopic to a PatternTopic to keep receiving subscription events for PubSub lock Signed-off-by: Severin Kistler --- .../integration/redis/util/RedisLockRegistry.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java b/spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java index 894782c66c..1d137863a0 100644 --- a/spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java +++ b/spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java @@ -53,7 +53,7 @@ import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.RedisScript; -import org.springframework.data.redis.listener.ChannelTopic; +import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.Topic; import org.springframework.integration.support.locks.DistributedLock; @@ -97,6 +97,7 @@ * @author Alex Peelman * @author Youbin Wu * @author Michal Domagala + * @author Severin Kistler * * @since 4.0 * @@ -213,7 +214,7 @@ private void setupUnlockMessageListener(RedisConnectionFactory connectionFactory "'unlockNotifyMessageListener' must not have been re-initialized."); RedisLockRegistry.this.redisMessageListenerContainer = new RedisMessageListenerContainer(); RedisLockRegistry.this.unlockNotifyMessageListener = new RedisPubSubLock.RedisUnLockNotifyMessageListener(); - final Topic topic = new ChannelTopic(this.unLockChannelKey); + final Topic topic = new PatternTopic(this.unLockChannelKey + ":*"); RedisMessageListenerContainer container = RedisLockRegistry.this.redisMessageListenerContainer; RedisPubSubLock.RedisUnLockNotifyMessageListener listener = RedisLockRegistry.this.unlockNotifyMessageListener; container.setConnectionFactory(connectionFactory); @@ -690,7 +691,7 @@ protected boolean tryRedisLockInner(long time, long expireAfter) protected boolean removeLockKeyInnerUnlink() { return Boolean.TRUE.equals(RedisLockRegistry.this.redisTemplate.execute( UNLINK_UNLOCK_REDIS_SCRIPT, Collections.singletonList(this.lockKey), - RedisLockRegistry.this.clientId, RedisLockRegistry.this.unLockChannelKey)); + RedisLockRegistry.this.clientId, RedisLockRegistry.this.unLockChannelKey + ":" + this.lockKey)); } private boolean subscribeLock(long time, long expireAfter) throws ExecutionException, InterruptedException { From 6804c03ce094e1cc62da124eefde0c21f3cf5adb Mon Sep 17 00:00:00 2001 From: Severin Kistler Date: Thu, 9 Oct 2025 21:57:02 +0200 Subject: [PATCH 2/2] GH-10471: pass unLockChannelKey as `KEYS[2]` instead of `ARGV[2]` as values used as keys should be passed as such Signed-off-by: Severin Kistler --- .../integration/redis/util/RedisLockRegistry.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java b/spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java index 1d137863a0..6baa12b6f6 100644 --- a/spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java +++ b/spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java @@ -23,6 +23,7 @@ import java.util.ConcurrentModificationException; import java.util.Date; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.UUID; @@ -668,7 +669,7 @@ private final class RedisPubSubLock extends RedisLock { private static final String UNLINK_UNLOCK_SCRIPT = """ local lockClientId = redis.call('GET', KEYS[1]) if (lockClientId == ARGV[1] and redis.call('UNLINK', KEYS[1]) == 1) then - redis.call('PUBLISH', ARGV[2], KEYS[1]) + redis.call('PUBLISH', KEYS[2], KEYS[1]) return true end return false @@ -689,9 +690,10 @@ protected boolean tryRedisLockInner(long time, long expireAfter) @Override protected boolean removeLockKeyInnerUnlink() { + final String unLockChannelKey = RedisLockRegistry.this.unLockChannelKey + ":" + this.lockKey; return Boolean.TRUE.equals(RedisLockRegistry.this.redisTemplate.execute( - UNLINK_UNLOCK_REDIS_SCRIPT, Collections.singletonList(this.lockKey), - RedisLockRegistry.this.clientId, RedisLockRegistry.this.unLockChannelKey + ":" + this.lockKey)); + UNLINK_UNLOCK_REDIS_SCRIPT, List.of(this.lockKey, unLockChannelKey), + RedisLockRegistry.this.clientId)); } private boolean subscribeLock(long time, long expireAfter) throws ExecutionException, InterruptedException {