From 87e90e61271e5db1f63e103ace18e5f65143ec13 Mon Sep 17 00:00:00 2001 From: Robin Date: Fri, 15 May 2026 16:23:25 +0200 Subject: [PATCH 1/3] =?UTF-8?q?fix:=20restore=20Jackson=202.x=20property?= =?UTF-8?q?=20order=20in=20RqueueRedisSerializer=20to=20prevent=20stale=20?= =?UTF-8?q?processing-queue=20entries=20after=203.x=20=E2=86=92=204.x=20up?= =?UTF-8?q?grade?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rqueue-core/build.gradle | 1 + .../converter/RqueueRedisSerializer.java | 2 + ...RedisMessageBrokerV3CompatibilityTest.java | 144 ++++++++++++++++++ 3 files changed, 147 insertions(+) create mode 100644 rqueue-core/src/test/java/com/github/sonus21/rqueue/core/spi/redis/RedisMessageBrokerV3CompatibilityTest.java diff --git a/rqueue-core/build.gradle b/rqueue-core/build.gradle index b941f9c1..e37f1b20 100644 --- a/rqueue-core/build.gradle +++ b/rqueue-core/build.gradle @@ -52,6 +52,7 @@ dependencies { api "org.apache.commons:commons-collections4:${apacheCommonCollectionVerion}" // https://mvnrepository.com/artifact/io.micrometer/micrometer-core api "io.micrometer:micrometer-core:${microMeterVersion}" + testImplementation "com.fasterxml.jackson.core:jackson-databind:2.21.2" testImplementation "io.lettuce:lettuce-core:${lettuceVersion}" testImplementation "io.projectreactor:reactor-test:${projectReactorReactorTestVersion}" testImplementation project(":rqueue-test-util") diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/converter/RqueueRedisSerializer.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/converter/RqueueRedisSerializer.java index 245fe181..db116f2c 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/converter/RqueueRedisSerializer.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/converter/RqueueRedisSerializer.java @@ -25,6 +25,7 @@ import tools.jackson.core.JacksonException; import tools.jackson.core.JsonGenerator; import tools.jackson.databind.DefaultTyping; +import tools.jackson.databind.MapperFeature; import tools.jackson.databind.ObjectMapper; import tools.jackson.databind.SerializationContext; import tools.jackson.databind.jsontype.BasicPolymorphicTypeValidator; @@ -76,6 +77,7 @@ private static class RqueueRedisSerDes implements RedisSerializer { .build(), DefaultTyping.NON_FINAL, As.PROPERTY) + .disable(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY) .build(); } diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/spi/redis/RedisMessageBrokerV3CompatibilityTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/spi/redis/RedisMessageBrokerV3CompatibilityTest.java new file mode 100644 index 00000000..b2089458 --- /dev/null +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/spi/redis/RedisMessageBrokerV3CompatibilityTest.java @@ -0,0 +1,144 @@ +package com.github.sonus21.rqueue.core.spi.redis; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.github.sonus21.rqueue.core.RqueueMessage; +import com.github.sonus21.rqueue.core.impl.RqueueMessageTemplateImpl; +import com.github.sonus21.rqueue.listener.QueueDetail; +import com.github.sonus21.rqueue.utils.TestUtils; +import java.io.IOException; +import java.net.ServerSocket; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.springframework.data.redis.connection.RedisStandaloneConfiguration; +import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.serializer.RedisSerializer; +import org.springframework.data.redis.serializer.StringRedisSerializer; +import redis.embedded.RedisServer; + +@Tag("core") +class RedisMessageBrokerV3CompatibilityTest { + + private static RedisServer redisServer; + private static int redisPort; + + @BeforeAll + static void startRedis() throws IOException { + try (ServerSocket socket = new ServerSocket(0)) { + redisPort = socket.getLocalPort(); + } + redisServer = new RedisServer(redisPort); + redisServer.start(); + } + + @AfterAll + static void stopRedis() throws IOException { + if (redisServer != null) { + redisServer.stop(); + } + } + + private LettuceConnectionFactory connectionFactory; + private RqueueMessageTemplateImpl messageTemplate; + private RedisMessageBroker broker; + // Passthrough serialiser: inserts raw bytes into the ZSET without re-serialising, mirroring + // how dequeue_message.lua byte-copies messages from q-queue into the processing queue. + private RedisTemplate rawTemplate; + + @BeforeEach + void setUp() { + connectionFactory = + new LettuceConnectionFactory(new RedisStandaloneConfiguration("localhost", redisPort)); + connectionFactory.afterPropertiesSet(); + messageTemplate = new RqueueMessageTemplateImpl(connectionFactory, null); + broker = new RedisMessageBroker(messageTemplate); + + rawTemplate = new RedisTemplate<>(); + rawTemplate.setConnectionFactory(connectionFactory); + rawTemplate.setKeySerializer(new StringRedisSerializer()); + rawTemplate.setValueSerializer(RedisSerializer.byteArray()); + rawTemplate.afterPropertiesSet(); + } + + @AfterEach + void tearDown() { + QueueDetail queueDetail = TestUtils.createQueueDetail("test-queue"); + messageTemplate.getRedisTemplate().delete(queueDetail.getProcessingQueueName()); + messageTemplate.getRedisTemplate().delete(queueDetail.getScheduledQueueName()); + connectionFactory.destroy(); + } + + @Test + void parkForRetry_v3SerializedMessage_isMovedToScheduledQueue() throws Exception { + QueueDetail queueDetail = TestUtils.createQueueDetail("test-queue"); + + RqueueMessage original = RqueueMessage.builder() + .id("test-msg-v3-001") + .queueName("test-queue") + .message("{\"payload\":\"test\"}") + .processAt(1_000_000L) + .queuedTime(2_000_000L) + .build(); + + rawTemplate + .opsForZSet() + .add(queueDetail.getProcessingQueueName(), v3Bytes(original), System.currentTimeMillis()); + + RqueueMessage updated = original.toBuilder().failureCount(1).build().updateReEnqueuedAt(); + + broker.parkForRetry(queueDetail, original, updated, 60_000L); + + assertEquals( + 0L, + messageTemplate.getRedisTemplate().opsForZSet().size(queueDetail.getProcessingQueueName()), + "Processing queue must be empty after parkForRetry"); + assertEquals( + 1L, + messageTemplate.getRedisTemplate().opsForZSet().size(queueDetail.getScheduledQueueName()), + "Scheduled queue must contain the rescheduled message"); + } + + @Test + void ack_v3SerializedMessage_isRemovedFromProcessingQueue() throws Exception { + QueueDetail queueDetail = TestUtils.createQueueDetail("test-queue"); + + RqueueMessage original = RqueueMessage.builder() + .id("test-msg-v3-002") + .queueName("test-queue") + .message("{\"payload\":\"test\"}") + .processAt(1_000_000L) + .queuedTime(2_000_000L) + .build(); + + rawTemplate + .opsForZSet() + .add(queueDetail.getProcessingQueueName(), v3Bytes(original), System.currentTimeMillis()); + + broker.ack(queueDetail, original); + + assertEquals( + 0L, + messageTemplate.getRedisTemplate().opsForZSet().size(queueDetail.getProcessingQueueName()), + "Processing queue must be empty after ack"); + } + + // Reproduce the bytes RQueue 3.x would have stored using the actual Jackson 2.x ObjectMapper + // so the serialised form stays in sync with RqueueMessage field changes automatically. + private static byte[] v3Bytes(RqueueMessage message) throws Exception { + com.fasterxml.jackson.databind.ObjectMapper jackson2 = + new com.fasterxml.jackson.databind.ObjectMapper() + .configure( + com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, + false); + //noinspection deprecation — mirrors the API RQueue 3.x RqueueRedisSerDes used + jackson2.enableDefaultTyping( + com.fasterxml.jackson.databind.ObjectMapper.DefaultTyping.NON_FINAL, + com.fasterxml.jackson.annotation.JsonTypeInfo.As.PROPERTY); + return jackson2.writeValueAsBytes(message); + } +} From 15a25f82cc266ac4ded191ab64c8f64423b25f29 Mon Sep 17 00:00:00 2001 From: Robin Date: Sat, 16 May 2026 18:59:04 +0200 Subject: [PATCH 2/3] build: bump version to 4.0.0-RC10 --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 87093c40..4a0d8b21 100644 --- a/build.gradle +++ b/build.gradle @@ -84,7 +84,7 @@ ext { subprojects { group = "com.github.sonus21" - version = "4.0.0-RC9" + version = "4.0.0-RC10" dependencies { // https://mvnrepository.com/artifact/org.springframework/spring-messaging From becb549831c41d7b720bfab2bbb955c189949d79 Mon Sep 17 00:00:00 2001 From: Robin Date: Sat, 16 May 2026 21:29:13 +0200 Subject: [PATCH 3/3] feat: add rqueue.serialization.property.order property to control JSON field ordering MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduces RqueueRedisSerializer.PropertyOrder enum (ALPHABETICAL | DECLARATION) and wires it via rqueue.serialization.property.order (default: ALPHABETICAL). ALPHABETICAL uses Jackson 3.x alphabetical ordering, the native default for RQueue 4.x deployments. No configuration change required for new installs. DECLARATION uses declaration order, matching the Jackson 2.x behaviour of RQueue 3.x. Set this when upgrading from 3.x with messages still in Redis queues, as switching while messages are in-flight causes unexpected retries. The setting is applied in RqueueListenerBaseConfig before any Redis template is created (overriding RedisUtils providers when DECLARATION is requested), and flows through RqueueConfig to RqueueInternalPubSubChannel so all serialiser instances in the application use the same order. Docs: configuration.md and migrations.md updated with property description, accepted values, and the 3.x → 4.x migration warning. Assisted-By: Claude Sonnet 4.6 --- docs/configuration/configuration.md | 13 + docs/migrations.md | 25 ++ .../sonus21/rqueue/config/RqueueConfig.java | 4 + .../config/RqueueListenerBaseConfig.java | 32 ++ .../converter/RqueueRedisSerializer.java | 35 +- .../core/RqueueInternalPubSubChannel.java | 3 +- ...isMessageBrokerSerializationOrderTest.java | 390 ++++++++++++++++++ ...RedisMessageBrokerV3CompatibilityTest.java | 144 ------- 8 files changed, 495 insertions(+), 151 deletions(-) create mode 100644 rqueue-core/src/test/java/com/github/sonus21/rqueue/core/spi/redis/RedisMessageBrokerSerializationOrderTest.java delete mode 100644 rqueue-core/src/test/java/com/github/sonus21/rqueue/core/spi/redis/RedisMessageBrokerV3CompatibilityTest.java diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index 49676387..c4bb83c3 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -274,6 +274,19 @@ whenever Rqueue generates the message ID internally. ## Additional Configuration +- **`rqueue.serialization.property.order`**: Controls the JSON property ordering used + when serialising `RqueueMessage` to Redis. Accepted values: + - `ALPHABETICAL` *(default)* — alphabetical order, the Jackson 3.x native default. + This is the standard setting for RQueue 4.x deployments. + - `DECLARATION` — declaration order, matching the Jackson 2.x behaviour used by RQueue 3.x. + Use this when upgrading from 3.x with messages still present in Redis queues. + + {: .warning} + Switching between values while messages are present in the processing queue will cause + those in-flight messages to be unexpectedly retried — the visibility-timeout rescue + path preserves raw bytes verbatim, so the serialisation mismatch persists across + re-deliveries. Drain the processing queue before changing this setting. + - **`rqueue.retry.per.poll`**: Determines how many times a polled message is retried immediately if processing fails, before it is moved back to the queue for a subsequent poll. The default value is `1`. If increased to `N`, the message will diff --git a/docs/migrations.md b/docs/migrations.md index 75aac352..b0924ace 100644 --- a/docs/migrations.md +++ b/docs/migrations.md @@ -73,3 +73,28 @@ ZCARD rqueue-processing:: If all commands return **0**, your queues are empty and you can proceed with the migration without additional configuration. + +--- + +## Upgrading from 3.x to 4.x + +RQueue 4.x switched from Jackson 2.x (`com.fasterxml.jackson`) to Jackson 3.x +(`tools.jackson`). Jackson 3.x defaults to **alphabetical** JSON property ordering, +while Jackson 2.x used **declaration order**. Messages enqueued by 3.x and messages +enqueued by 4.x therefore have different byte representations in Redis. + +The processing queue uses byte-exact lookups (ZSCORE/ZREM) to move or acknowledge +messages. If stored bytes do not match the re-serialised bytes, the lookup silently +fails and the message is repeatedly re-delivered via the visibility-timeout rescue path. + +**If you are upgrading with messages still present in Redis**, set the following +property to keep using declaration order (matching what 3.x stored): + +```properties +rqueue.serialization.property.order=DECLARATION +``` + +{: .warning} +Changing `rqueue.serialization.property.order` while messages are present in the +processing queue will cause those messages to be unexpectedly retried. Drain the processing +queue before switching values. diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueConfig.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueConfig.java index 9131421c..891df1cf 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueConfig.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueConfig.java @@ -16,6 +16,7 @@ package com.github.sonus21.rqueue.config; +import com.github.sonus21.rqueue.converter.RqueueRedisSerializer; import com.github.sonus21.rqueue.models.enums.RqueueMode; import com.github.sonus21.rqueue.utils.Constants; import com.github.sonus21.rqueue.utils.StringUtils; @@ -166,6 +167,9 @@ private static String generateBrokerId() { @Value("${rqueue.worker.registry.enabled:true}") private boolean workerRegistryEnabled; + @Value("${rqueue.serialization.property.order:ALPHABETICAL}") + private RqueueRedisSerializer.PropertyOrder serializationPropertyOrder; + @Value("${rqueue.worker.registry.worker.ttl:300}") private long workerRegistryWorkerTtlInSeconds; diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueListenerBaseConfig.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueListenerBaseConfig.java index 2bd43887..30ebfc2c 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueListenerBaseConfig.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueListenerBaseConfig.java @@ -18,6 +18,7 @@ import com.github.sonus21.rqueue.common.RqueueRedisTemplate; import com.github.sonus21.rqueue.converter.MessageConverterProvider; +import com.github.sonus21.rqueue.converter.RqueueRedisSerializer; import com.github.sonus21.rqueue.core.RqueueBeanProvider; import com.github.sonus21.rqueue.core.RqueueMessageIdGenerator; import com.github.sonus21.rqueue.core.RqueueMessageTemplate; @@ -37,6 +38,9 @@ import org.springframework.context.annotation.Conditional; import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.serializer.RedisSerializationContext; +import org.springframework.data.redis.serializer.StringRedisSerializer; /** * This is a base configuration class for Rqueue, that is used in Spring and Spring boot Rqueue libs @@ -60,6 +64,9 @@ public abstract class RqueueListenerBaseConfig { @Value("${rqueue.reactive.enabled:false}") protected boolean reactiveEnabled; + @Value("${rqueue.serialization.property.order:ALPHABETICAL}") + private RqueueRedisSerializer.PropertyOrder serializationPropertyOrder; + @Value( "${rqueue.message.converter.provider.class:com.github.sonus21.rqueue.converter.DefaultMessageConverterProvider}") private String messageConverterProviderClass; @@ -109,6 +116,31 @@ public RqueueConfig rqueueConfig( @Value("${rqueue.backend:REDIS}") Backend backend, @Value("${rqueue.version.key:__rq::version}") String versionKey, @Value("${rqueue.db.version:}") Integer dbVersion) { + if (serializationPropertyOrder == RqueueRedisSerializer.PropertyOrder.DECLARATION) { + RqueueRedisSerializer serializer = + new RqueueRedisSerializer(RqueueRedisSerializer.PropertyOrder.DECLARATION); + StringRedisSerializer keySerializer = new StringRedisSerializer(); + RedisUtils.redisTemplateProvider = new RedisUtils.RedisTemplateProvider() { + @Override + public RedisTemplate getRedisTemplate( + RedisConnectionFactory redisConnectionFactory) { + RedisTemplate template = new RedisTemplate<>(); + template.setConnectionFactory(redisConnectionFactory); + template.setKeySerializer(keySerializer); + template.setValueSerializer(serializer); + template.setHashKeySerializer(keySerializer); + template.setHashValueSerializer(serializer); + return template; + } + }; + RedisUtils.redisSerializationContextProvider = + () -> RedisSerializationContext.newSerializationContext() + .key(keySerializer) + .value(serializer) + .hashKey(keySerializer) + .hashValue(serializer) + .build(); + } boolean sharedConnection = false; RedisConnectionFactory connectionFactory = simpleRqueueListenerContainerFactory.getRedisConnectionFactory(); diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/converter/RqueueRedisSerializer.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/converter/RqueueRedisSerializer.java index db116f2c..20796958 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/converter/RqueueRedisSerializer.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/converter/RqueueRedisSerializer.java @@ -35,14 +35,35 @@ @Slf4j public class RqueueRedisSerializer implements RedisSerializer { + /** + * Controls JSON property ordering for {@link com.github.sonus21.rqueue.core.RqueueMessage} + * serialisation. Configure via {@code rqueue.serialization.property.order}. + * + *
    + *
  • {@link #ALPHABETICAL} — alphabetical order, Jackson 3.x native behaviour. This is the + * default for RQueue 4.x. + *
  • {@link #DECLARATION} — declaration order, matching Jackson 2.x / RQueue 3.x. Use when + * upgrading from 3.x with messages still present in Redis queues. + *
+ */ + public enum PropertyOrder { + ALPHABETICAL, + DECLARATION + } + private final RedisSerializer serializer; public RqueueRedisSerializer(RedisSerializer redisSerializer) { this.serializer = redisSerializer; } + /** Creates a serialiser using {@link PropertyOrder#ALPHABETICAL} (Jackson 3.x default). */ public RqueueRedisSerializer() { - this(new RqueueRedisSerDes()); + this(PropertyOrder.ALPHABETICAL); + } + + public RqueueRedisSerializer(PropertyOrder order) { + this(new RqueueRedisSerDes(order)); } @Override @@ -67,8 +88,8 @@ public Object deserialize(byte[] bytes) throws SerializationException { private static class RqueueRedisSerDes implements RedisSerializer { private final ObjectMapper mapper; - RqueueRedisSerDes() { - this.mapper = SerializationUtils.getObjectMapper() + RqueueRedisSerDes(PropertyOrder order) { + var builder = SerializationUtils.getObjectMapper() .rebuild() .addModule(new SimpleModule().addSerializer(new NullValueSerializer())) .activateDefaultTyping( @@ -76,9 +97,11 @@ private static class RqueueRedisSerDes implements RedisSerializer { .allowIfSubType(Object.class) .build(), DefaultTyping.NON_FINAL, - As.PROPERTY) - .disable(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY) - .build(); + As.PROPERTY); + if (order == PropertyOrder.DECLARATION) { + builder = builder.disable(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY); + } + this.mapper = builder.build(); } @Override diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueInternalPubSubChannel.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueInternalPubSubChannel.java index ac2517c9..e8cae8d1 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueInternalPubSubChannel.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueInternalPubSubChannel.java @@ -57,7 +57,8 @@ public RqueueInternalPubSubChannel( this.rqueueConfig = rqueueConfig; this.stringRqueueRedisTemplate = stringRqueueRedisTemplate; this.rqueueBeanProvider = rqueueBeanProvider; - this.rqueueRedisSerializer = new RqueueRedisSerializer(); + this.rqueueRedisSerializer = + new RqueueRedisSerializer(rqueueConfig.getSerializationPropertyOrder()); } @Override diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/spi/redis/RedisMessageBrokerSerializationOrderTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/spi/redis/RedisMessageBrokerSerializationOrderTest.java new file mode 100644 index 00000000..f303ad7e --- /dev/null +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/spi/redis/RedisMessageBrokerSerializationOrderTest.java @@ -0,0 +1,390 @@ +package com.github.sonus21.rqueue.core.spi.redis; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.github.sonus21.rqueue.converter.RqueueRedisSerializer; +import com.github.sonus21.rqueue.converter.RqueueRedisSerializer.PropertyOrder; +import com.github.sonus21.rqueue.core.RqueueMessage; +import com.github.sonus21.rqueue.core.impl.RqueueMessageTemplateImpl; +import com.github.sonus21.rqueue.listener.QueueDetail; +import com.github.sonus21.rqueue.utils.RedisUtils; +import com.github.sonus21.rqueue.utils.TestUtils; +import java.io.IOException; +import java.net.ServerSocket; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.springframework.data.redis.connection.RedisStandaloneConfiguration; +import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.serializer.RedisSerializer; +import org.springframework.data.redis.serializer.StringRedisSerializer; +import redis.embedded.RedisServer; + +@Tag("core") +class RedisMessageBrokerSerializationOrderTest { + + private static RedisServer redisServer; + private static int redisPort; + + @BeforeAll + static void startRedis() throws IOException { + try (ServerSocket socket = new ServerSocket(0)) { + redisPort = socket.getLocalPort(); + } + redisServer = new RedisServer(redisPort); + redisServer.start(); + } + + @AfterAll + static void stopRedis() throws IOException { + if (redisServer != null) { + redisServer.stop(); + } + } + + private LettuceConnectionFactory connectionFactory; + private RqueueMessageTemplateImpl messageTemplate; + private RedisMessageBroker broker; + // Passthrough serialiser: inserts raw bytes into the ZSET without re-serialising, mirroring + // how dequeue_message.lua byte-copies messages from q-queue into the processing queue. + private RedisTemplate rawTemplate; + + @BeforeEach + void setUp() { + connectionFactory = + new LettuceConnectionFactory(new RedisStandaloneConfiguration("localhost", redisPort)); + connectionFactory.afterPropertiesSet(); + messageTemplate = new RqueueMessageTemplateImpl(connectionFactory, null); + broker = new RedisMessageBroker(messageTemplate); + + rawTemplate = new RedisTemplate<>(); + rawTemplate.setConnectionFactory(connectionFactory); + rawTemplate.setKeySerializer(new StringRedisSerializer()); + rawTemplate.setValueSerializer(RedisSerializer.byteArray()); + rawTemplate.afterPropertiesSet(); + } + + @AfterEach + void tearDown() { + QueueDetail queueDetail = TestUtils.createQueueDetail("test-queue"); + messageTemplate.getRedisTemplate().delete(queueDetail.getProcessingQueueName()); + messageTemplate.getRedisTemplate().delete(queueDetail.getScheduledQueueName()); + connectionFactory.destroy(); + } + + // --- property order: ALPHABETICAL (Jackson 3.x default, RQueue 4.x default) --- + + @Test + void parkForRetry_alphabeticalOrder_4xMessage_isMovedToScheduledQueue() { + QueueDetail queueDetail = TestUtils.createQueueDetail("test-queue"); + + RqueueMessage original = RqueueMessage.builder() + .id("test-msg-alpha-4x-001") + .queueName("test-queue") + .message("{\"payload\":\"test\"}") + .processAt(1_000_000L) + .queuedTime(2_000_000L) + .build(); + + messageTemplate.addToZset( + queueDetail.getProcessingQueueName(), original, System.currentTimeMillis()); + + RqueueMessage updated = original.toBuilder().failureCount(1).build().updateReEnqueuedAt(); + broker.parkForRetry(queueDetail, original, updated, 60_000L); + + assertEquals( + 0L, + messageTemplate.getRedisTemplate().opsForZSet().size(queueDetail.getProcessingQueueName()), + "Processing queue must be empty after parkForRetry"); + assertEquals( + 1L, + messageTemplate.getRedisTemplate().opsForZSet().size(queueDetail.getScheduledQueueName()), + "Scheduled queue must contain the rescheduled message"); + } + + @Test + void ack_alphabeticalOrder_4xMessage_isRemovedFromProcessingQueue() { + QueueDetail queueDetail = TestUtils.createQueueDetail("test-queue"); + + RqueueMessage original = RqueueMessage.builder() + .id("test-msg-alpha-4x-ack-001") + .queueName("test-queue") + .message("{\"payload\":\"test\"}") + .processAt(1_000_000L) + .queuedTime(2_000_000L) + .build(); + + messageTemplate.addToZset( + queueDetail.getProcessingQueueName(), original, System.currentTimeMillis()); + + broker.ack(queueDetail, original); + + assertEquals( + 0L, + messageTemplate.getRedisTemplate().opsForZSet().size(queueDetail.getProcessingQueueName()), + "Processing queue must be empty after ack"); + } + + @Test + void parkForRetry_alphabeticalOrder_v3Message_strandsMessage() throws Exception { + QueueDetail queueDetail = TestUtils.createQueueDetail("test-queue"); + + rawTemplate + .opsForZSet() + .add( + queueDetail.getProcessingQueueName(), + v3Bytes(RqueueMessage.builder() + .id("test-msg-alpha-v3-001") + .queueName("test-queue") + .message("{\"payload\":\"test\"}") + .processAt(1_000_000L) + .queuedTime(2_000_000L) + .build()), + System.currentTimeMillis()); + + RqueueMessage original = RqueueMessage.builder() + .id("test-msg-alpha-v3-001") + .queueName("test-queue") + .message("{\"payload\":\"test\"}") + .processAt(1_000_000L) + .queuedTime(2_000_000L) + .build(); + RqueueMessage updated = original.toBuilder().failureCount(1).build().updateReEnqueuedAt(); + broker.parkForRetry(queueDetail, original, updated, 60_000L); + + assertEquals( + 1L, + messageTemplate.getRedisTemplate().opsForZSet().size(queueDetail.getProcessingQueueName()), + "3.x message must remain stranded under ALPHABETICAL order"); + assertEquals( + 0L, + messageTemplate.getRedisTemplate().opsForZSet().size(queueDetail.getScheduledQueueName()), + "Scheduled queue must be empty: ZSCORE missed due to alphabetical vs declaration mismatch"); + } + + @Test + void ack_alphabeticalOrder_v3Message_strandsMessage() throws Exception { + QueueDetail queueDetail = TestUtils.createQueueDetail("test-queue"); + + RqueueMessage original = RqueueMessage.builder() + .id("test-msg-alpha-v3-ack-001") + .queueName("test-queue") + .message("{\"payload\":\"test\"}") + .processAt(1_000_000L) + .queuedTime(2_000_000L) + .build(); + + rawTemplate + .opsForZSet() + .add(queueDetail.getProcessingQueueName(), v3Bytes(original), System.currentTimeMillis()); + + broker.ack(queueDetail, original); + + assertEquals( + 1L, + messageTemplate.getRedisTemplate().opsForZSet().size(queueDetail.getProcessingQueueName()), + "3.x message must remain stranded under ALPHABETICAL order"); + } + + // --- property order: DECLARATION (declaration order, matching RQueue 3.x) --- + + @Test + void parkForRetry_declarationOrder_v3Message_isMovedToScheduledQueue() throws Exception { + QueueDetail queueDetail = TestUtils.createQueueDetail("test-queue"); + RedisUtils.RedisTemplateProvider saved = RedisUtils.redisTemplateProvider; + try { + RedisMessageBroker declarationBroker = brokerWithOrder(PropertyOrder.DECLARATION); + RqueueMessageTemplateImpl declarationTemplate = templateWithOrder(PropertyOrder.DECLARATION); + + RqueueMessage original = RqueueMessage.builder() + .id("test-msg-v3-001") + .queueName("test-queue") + .message("{\"payload\":\"test\"}") + .processAt(1_000_000L) + .queuedTime(2_000_000L) + .build(); + + rawTemplate + .opsForZSet() + .add(queueDetail.getProcessingQueueName(), v3Bytes(original), System.currentTimeMillis()); + + RqueueMessage updated = original.toBuilder().failureCount(1).build().updateReEnqueuedAt(); + declarationBroker.parkForRetry(queueDetail, original, updated, 60_000L); + + assertEquals( + 0L, + declarationTemplate + .getRedisTemplate() + .opsForZSet() + .size(queueDetail.getProcessingQueueName()), + "Processing queue must be empty after parkForRetry"); + assertEquals( + 1L, + declarationTemplate + .getRedisTemplate() + .opsForZSet() + .size(queueDetail.getScheduledQueueName()), + "Scheduled queue must contain the rescheduled message"); + } finally { + RedisUtils.redisTemplateProvider = saved; + } + } + + @Test + void ack_declarationOrder_v3Message_isRemovedFromProcessingQueue() throws Exception { + QueueDetail queueDetail = TestUtils.createQueueDetail("test-queue"); + RedisUtils.RedisTemplateProvider saved = RedisUtils.redisTemplateProvider; + try { + RedisMessageBroker declarationBroker = brokerWithOrder(PropertyOrder.DECLARATION); + RqueueMessageTemplateImpl declarationTemplate = templateWithOrder(PropertyOrder.DECLARATION); + + RqueueMessage original = RqueueMessage.builder() + .id("test-msg-v3-002") + .queueName("test-queue") + .message("{\"payload\":\"test\"}") + .processAt(1_000_000L) + .queuedTime(2_000_000L) + .build(); + + rawTemplate + .opsForZSet() + .add(queueDetail.getProcessingQueueName(), v3Bytes(original), System.currentTimeMillis()); + + declarationBroker.ack(queueDetail, original); + + assertEquals( + 0L, + declarationTemplate + .getRedisTemplate() + .opsForZSet() + .size(queueDetail.getProcessingQueueName()), + "Processing queue must be empty after ack"); + } finally { + RedisUtils.redisTemplateProvider = saved; + } + } + + @Test + void parkForRetry_declarationOrder_4xAlphabeticalMessage_strandsMessage() { + QueueDetail queueDetail = TestUtils.createQueueDetail("test-queue"); + RedisUtils.RedisTemplateProvider saved = RedisUtils.redisTemplateProvider; + try { + RedisMessageBroker declarationBroker = brokerWithOrder(PropertyOrder.DECLARATION); + RqueueMessageTemplateImpl declarationTemplate = templateWithOrder(PropertyOrder.DECLARATION); + + RqueueMessage original = RqueueMessage.builder() + .id("test-msg-legacy-4x-001") + .queueName("test-queue") + .message("{\"payload\":\"test\"}") + .processAt(1_000_000L) + .queuedTime(2_000_000L) + .build(); + + byte[] v4AlphaBytes = + new RqueueRedisSerializer(PropertyOrder.ALPHABETICAL).serialize(original); + rawTemplate + .opsForZSet() + .add(queueDetail.getProcessingQueueName(), v4AlphaBytes, System.currentTimeMillis()); + + RqueueMessage updated = original.toBuilder().failureCount(1).build().updateReEnqueuedAt(); + declarationBroker.parkForRetry(queueDetail, original, updated, 60_000L); + + assertEquals( + 1L, + declarationTemplate + .getRedisTemplate() + .opsForZSet() + .size(queueDetail.getProcessingQueueName()), + "4.x alphabetical message must remain stranded under DECLARATION order"); + assertEquals( + 0L, + declarationTemplate + .getRedisTemplate() + .opsForZSet() + .size(queueDetail.getScheduledQueueName()), + "Scheduled queue must be empty: ZSCORE missed due to declaration vs alphabetical" + + " mismatch"); + } finally { + RedisUtils.redisTemplateProvider = saved; + } + } + + @Test + void ack_declarationOrder_4xAlphabeticalMessage_strandsMessage() { + QueueDetail queueDetail = TestUtils.createQueueDetail("test-queue"); + RedisUtils.RedisTemplateProvider saved = RedisUtils.redisTemplateProvider; + try { + RedisMessageBroker declarationBroker = brokerWithOrder(PropertyOrder.DECLARATION); + RqueueMessageTemplateImpl declarationTemplate = templateWithOrder(PropertyOrder.DECLARATION); + + RqueueMessage original = RqueueMessage.builder() + .id("test-msg-legacy-4x-ack-001") + .queueName("test-queue") + .message("{\"payload\":\"test\"}") + .processAt(1_000_000L) + .queuedTime(2_000_000L) + .build(); + + byte[] v4AlphaBytes = + new RqueueRedisSerializer(PropertyOrder.ALPHABETICAL).serialize(original); + rawTemplate + .opsForZSet() + .add(queueDetail.getProcessingQueueName(), v4AlphaBytes, System.currentTimeMillis()); + + declarationBroker.ack(queueDetail, original); + + assertEquals( + 1L, + declarationTemplate + .getRedisTemplate() + .opsForZSet() + .size(queueDetail.getProcessingQueueName()), + "4.x alphabetical message must remain stranded under DECLARATION order"); + } finally { + RedisUtils.redisTemplateProvider = saved; + } + } + + private RqueueMessageTemplateImpl templateWithOrder(PropertyOrder order) { + RqueueRedisSerializer serializer = new RqueueRedisSerializer(order); + StringRedisSerializer key = new StringRedisSerializer(); + RedisUtils.redisTemplateProvider = new RedisUtils.RedisTemplateProvider() { + @Override + public org.springframework.data.redis.core.RedisTemplate getRedisTemplate( + org.springframework.data.redis.connection.RedisConnectionFactory factory) { + org.springframework.data.redis.core.RedisTemplate t = + new org.springframework.data.redis.core.RedisTemplate<>(); + t.setConnectionFactory(factory); + t.setKeySerializer(key); + t.setValueSerializer(serializer); + t.setHashKeySerializer(key); + t.setHashValueSerializer(serializer); + return t; + } + }; + return new RqueueMessageTemplateImpl(connectionFactory, null); + } + + private RedisMessageBroker brokerWithOrder(PropertyOrder order) { + return new RedisMessageBroker(templateWithOrder(order)); + } + + // Reproduce the bytes RQueue 3.x would have stored using the actual Jackson 2.x ObjectMapper + // so the serialised form stays in sync with RqueueMessage field changes automatically. + private static byte[] v3Bytes(RqueueMessage message) throws Exception { + com.fasterxml.jackson.databind.ObjectMapper jackson2 = + new com.fasterxml.jackson.databind.ObjectMapper() + .configure( + com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, + false); + //noinspection deprecation — mirrors the API RQueue 3.x RqueueRedisSerDes used + jackson2.enableDefaultTyping( + com.fasterxml.jackson.databind.ObjectMapper.DefaultTyping.NON_FINAL, + com.fasterxml.jackson.annotation.JsonTypeInfo.As.PROPERTY); + return jackson2.writeValueAsBytes(message); + } +} diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/spi/redis/RedisMessageBrokerV3CompatibilityTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/spi/redis/RedisMessageBrokerV3CompatibilityTest.java deleted file mode 100644 index b2089458..00000000 --- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/spi/redis/RedisMessageBrokerV3CompatibilityTest.java +++ /dev/null @@ -1,144 +0,0 @@ -package com.github.sonus21.rqueue.core.spi.redis; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -import com.github.sonus21.rqueue.core.RqueueMessage; -import com.github.sonus21.rqueue.core.impl.RqueueMessageTemplateImpl; -import com.github.sonus21.rqueue.listener.QueueDetail; -import com.github.sonus21.rqueue.utils.TestUtils; -import java.io.IOException; -import java.net.ServerSocket; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; -import org.springframework.data.redis.connection.RedisStandaloneConfiguration; -import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.data.redis.serializer.RedisSerializer; -import org.springframework.data.redis.serializer.StringRedisSerializer; -import redis.embedded.RedisServer; - -@Tag("core") -class RedisMessageBrokerV3CompatibilityTest { - - private static RedisServer redisServer; - private static int redisPort; - - @BeforeAll - static void startRedis() throws IOException { - try (ServerSocket socket = new ServerSocket(0)) { - redisPort = socket.getLocalPort(); - } - redisServer = new RedisServer(redisPort); - redisServer.start(); - } - - @AfterAll - static void stopRedis() throws IOException { - if (redisServer != null) { - redisServer.stop(); - } - } - - private LettuceConnectionFactory connectionFactory; - private RqueueMessageTemplateImpl messageTemplate; - private RedisMessageBroker broker; - // Passthrough serialiser: inserts raw bytes into the ZSET without re-serialising, mirroring - // how dequeue_message.lua byte-copies messages from q-queue into the processing queue. - private RedisTemplate rawTemplate; - - @BeforeEach - void setUp() { - connectionFactory = - new LettuceConnectionFactory(new RedisStandaloneConfiguration("localhost", redisPort)); - connectionFactory.afterPropertiesSet(); - messageTemplate = new RqueueMessageTemplateImpl(connectionFactory, null); - broker = new RedisMessageBroker(messageTemplate); - - rawTemplate = new RedisTemplate<>(); - rawTemplate.setConnectionFactory(connectionFactory); - rawTemplate.setKeySerializer(new StringRedisSerializer()); - rawTemplate.setValueSerializer(RedisSerializer.byteArray()); - rawTemplate.afterPropertiesSet(); - } - - @AfterEach - void tearDown() { - QueueDetail queueDetail = TestUtils.createQueueDetail("test-queue"); - messageTemplate.getRedisTemplate().delete(queueDetail.getProcessingQueueName()); - messageTemplate.getRedisTemplate().delete(queueDetail.getScheduledQueueName()); - connectionFactory.destroy(); - } - - @Test - void parkForRetry_v3SerializedMessage_isMovedToScheduledQueue() throws Exception { - QueueDetail queueDetail = TestUtils.createQueueDetail("test-queue"); - - RqueueMessage original = RqueueMessage.builder() - .id("test-msg-v3-001") - .queueName("test-queue") - .message("{\"payload\":\"test\"}") - .processAt(1_000_000L) - .queuedTime(2_000_000L) - .build(); - - rawTemplate - .opsForZSet() - .add(queueDetail.getProcessingQueueName(), v3Bytes(original), System.currentTimeMillis()); - - RqueueMessage updated = original.toBuilder().failureCount(1).build().updateReEnqueuedAt(); - - broker.parkForRetry(queueDetail, original, updated, 60_000L); - - assertEquals( - 0L, - messageTemplate.getRedisTemplate().opsForZSet().size(queueDetail.getProcessingQueueName()), - "Processing queue must be empty after parkForRetry"); - assertEquals( - 1L, - messageTemplate.getRedisTemplate().opsForZSet().size(queueDetail.getScheduledQueueName()), - "Scheduled queue must contain the rescheduled message"); - } - - @Test - void ack_v3SerializedMessage_isRemovedFromProcessingQueue() throws Exception { - QueueDetail queueDetail = TestUtils.createQueueDetail("test-queue"); - - RqueueMessage original = RqueueMessage.builder() - .id("test-msg-v3-002") - .queueName("test-queue") - .message("{\"payload\":\"test\"}") - .processAt(1_000_000L) - .queuedTime(2_000_000L) - .build(); - - rawTemplate - .opsForZSet() - .add(queueDetail.getProcessingQueueName(), v3Bytes(original), System.currentTimeMillis()); - - broker.ack(queueDetail, original); - - assertEquals( - 0L, - messageTemplate.getRedisTemplate().opsForZSet().size(queueDetail.getProcessingQueueName()), - "Processing queue must be empty after ack"); - } - - // Reproduce the bytes RQueue 3.x would have stored using the actual Jackson 2.x ObjectMapper - // so the serialised form stays in sync with RqueueMessage field changes automatically. - private static byte[] v3Bytes(RqueueMessage message) throws Exception { - com.fasterxml.jackson.databind.ObjectMapper jackson2 = - new com.fasterxml.jackson.databind.ObjectMapper() - .configure( - com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, - false); - //noinspection deprecation — mirrors the API RQueue 3.x RqueueRedisSerDes used - jackson2.enableDefaultTyping( - com.fasterxml.jackson.databind.ObjectMapper.DefaultTyping.NON_FINAL, - com.fasterxml.jackson.annotation.JsonTypeInfo.As.PROPERTY); - return jackson2.writeValueAsBytes(message); - } -}