Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ ext {

subprojects {
group = "com.github.sonus21"
version = "4.0.0-RC9"
version = "4.0.0-RC10"

dependencies {
// https://mvnrepository.com/artifact/org.springframework/spring-messaging
Expand Down
13 changes: 13 additions & 0 deletions docs/configuration/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,19 @@ whenever Rqueue generates the message ID internally.

## Additional Configuration

- **`rqueue.serialization.property.order`**: Controls the JSON property ordering used
when serialising `RqueueMessage` to Redis. Accepted values:
- `ALPHABETICAL` *(default)* — alphabetical order, the Jackson 3.x native default.
This is the standard setting for RQueue 4.x deployments.
- `DECLARATION` — declaration order, matching the Jackson 2.x behaviour used by RQueue 3.x.
Use this when upgrading from 3.x with messages still present in Redis queues.

{: .warning}
Switching between values while messages are present in the processing queue will cause
those in-flight messages to be unexpectedly retried — the visibility-timeout rescue
path preserves raw bytes verbatim, so the serialisation mismatch persists across
re-deliveries. Drain the processing queue before changing this setting.

- **`rqueue.retry.per.poll`**: Determines how many times a polled message is retried
immediately if processing fails, before it is moved back to the queue for a
subsequent poll. The default value is `1`. If increased to `N`, the message will
Expand Down
25 changes: 25 additions & 0 deletions docs/migrations.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,28 @@ ZCARD rqueue-processing::<queueName>

If all commands return **0**, your queues are empty and you can proceed with the
migration without additional configuration.

---

## Upgrading from 3.x to 4.x

RQueue 4.x switched from Jackson 2.x (`com.fasterxml.jackson`) to Jackson 3.x
(`tools.jackson`). Jackson 3.x defaults to **alphabetical** JSON property ordering,
while Jackson 2.x used **declaration order**. Messages enqueued by 3.x and messages
enqueued by 4.x therefore have different byte representations in Redis.

The processing queue uses byte-exact lookups (ZSCORE/ZREM) to move or acknowledge
messages. If stored bytes do not match the re-serialised bytes, the lookup silently
fails and the message is repeatedly re-delivered via the visibility-timeout rescue path.

**If you are upgrading with messages still present in Redis**, set the following
property to keep using declaration order (matching what 3.x stored):

```properties
rqueue.serialization.property.order=DECLARATION
```

{: .warning}
Changing `rqueue.serialization.property.order` while messages are present in the
processing queue will cause those messages to be unexpectedly retried. Drain the processing
queue before switching values.
1 change: 1 addition & 0 deletions rqueue-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ dependencies {
api "org.apache.commons:commons-collections4:${apacheCommonCollectionVerion}"
// https://mvnrepository.com/artifact/io.micrometer/micrometer-core
api "io.micrometer:micrometer-core:${microMeterVersion}"
testImplementation "com.fasterxml.jackson.core:jackson-databind:2.21.2"
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please bump the version in root to RC10?

testImplementation "io.lettuce:lettuce-core:${lettuceVersion}"
testImplementation "io.projectreactor:reactor-test:${projectReactorReactorTestVersion}"
testImplementation project(":rqueue-test-util")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.github.sonus21.rqueue.config;

import com.github.sonus21.rqueue.converter.RqueueRedisSerializer;
import com.github.sonus21.rqueue.models.enums.RqueueMode;
import com.github.sonus21.rqueue.utils.Constants;
import com.github.sonus21.rqueue.utils.StringUtils;
Expand Down Expand Up @@ -166,6 +167,9 @@ private static String generateBrokerId() {
@Value("${rqueue.worker.registry.enabled:true}")
private boolean workerRegistryEnabled;

@Value("${rqueue.serialization.property.order:ALPHABETICAL}")
private RqueueRedisSerializer.PropertyOrder serializationPropertyOrder;

@Value("${rqueue.worker.registry.worker.ttl:300}")
private long workerRegistryWorkerTtlInSeconds;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.github.sonus21.rqueue.common.RqueueRedisTemplate;
import com.github.sonus21.rqueue.converter.MessageConverterProvider;
import com.github.sonus21.rqueue.converter.RqueueRedisSerializer;
import com.github.sonus21.rqueue.core.RqueueBeanProvider;
import com.github.sonus21.rqueue.core.RqueueMessageIdGenerator;
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
Expand All @@ -37,6 +38,9 @@
import org.springframework.context.annotation.Conditional;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
* This is a base configuration class for Rqueue, that is used in Spring and Spring boot Rqueue libs
Expand All @@ -60,6 +64,9 @@ public abstract class RqueueListenerBaseConfig {
@Value("${rqueue.reactive.enabled:false}")
protected boolean reactiveEnabled;

@Value("${rqueue.serialization.property.order:ALPHABETICAL}")
private RqueueRedisSerializer.PropertyOrder serializationPropertyOrder;

@Value(
"${rqueue.message.converter.provider.class:com.github.sonus21.rqueue.converter.DefaultMessageConverterProvider}")
private String messageConverterProviderClass;
Expand Down Expand Up @@ -109,6 +116,31 @@ public RqueueConfig rqueueConfig(
@Value("${rqueue.backend:REDIS}") Backend backend,
@Value("${rqueue.version.key:__rq::version}") String versionKey,
@Value("${rqueue.db.version:}") Integer dbVersion) {
if (serializationPropertyOrder == RqueueRedisSerializer.PropertyOrder.DECLARATION) {
RqueueRedisSerializer serializer =
new RqueueRedisSerializer(RqueueRedisSerializer.PropertyOrder.DECLARATION);
StringRedisSerializer keySerializer = new StringRedisSerializer();
RedisUtils.redisTemplateProvider = new RedisUtils.RedisTemplateProvider() {
@Override
public <V> RedisTemplate<String, V> getRedisTemplate(
RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, V> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
template.setKeySerializer(keySerializer);
template.setValueSerializer(serializer);
template.setHashKeySerializer(keySerializer);
template.setHashValueSerializer(serializer);
return template;
}
};
RedisUtils.redisSerializationContextProvider =
() -> RedisSerializationContext.<String, Object>newSerializationContext()
.key(keySerializer)
.value(serializer)
.hashKey(keySerializer)
.hashValue(serializer)
.build();
}
boolean sharedConnection = false;
RedisConnectionFactory connectionFactory =
simpleRqueueListenerContainerFactory.getRedisConnectionFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import tools.jackson.core.JacksonException;
import tools.jackson.core.JsonGenerator;
import tools.jackson.databind.DefaultTyping;
import tools.jackson.databind.MapperFeature;
import tools.jackson.databind.ObjectMapper;
import tools.jackson.databind.SerializationContext;
import tools.jackson.databind.jsontype.BasicPolymorphicTypeValidator;
Expand All @@ -34,14 +35,35 @@
@Slf4j
public class RqueueRedisSerializer implements RedisSerializer<Object> {

/**
* Controls JSON property ordering for {@link com.github.sonus21.rqueue.core.RqueueMessage}
* serialisation. Configure via {@code rqueue.serialization.property.order}.
*
* <ul>
* <li>{@link #ALPHABETICAL} — alphabetical order, Jackson 3.x native behaviour. This is the
* default for RQueue 4.x.
* <li>{@link #DECLARATION} — declaration order, matching Jackson 2.x / RQueue 3.x. Use when
* upgrading from 3.x with messages still present in Redis queues.
* </ul>
*/
public enum PropertyOrder {
ALPHABETICAL,
DECLARATION
}

private final RedisSerializer<Object> serializer;

public RqueueRedisSerializer(RedisSerializer<Object> redisSerializer) {
this.serializer = redisSerializer;
}

/** Creates a serialiser using {@link PropertyOrder#ALPHABETICAL} (Jackson 3.x default). */
public RqueueRedisSerializer() {
this(new RqueueRedisSerDes());
this(PropertyOrder.ALPHABETICAL);
}

public RqueueRedisSerializer(PropertyOrder order) {
this(new RqueueRedisSerDes(order));
}

@Override
Expand All @@ -66,17 +88,20 @@ public Object deserialize(byte[] bytes) throws SerializationException {
private static class RqueueRedisSerDes implements RedisSerializer<Object> {
private final ObjectMapper mapper;

RqueueRedisSerDes() {
this.mapper = SerializationUtils.getObjectMapper()
RqueueRedisSerDes(PropertyOrder order) {
var builder = SerializationUtils.getObjectMapper()
.rebuild()
.addModule(new SimpleModule().addSerializer(new NullValueSerializer()))
.activateDefaultTyping(
BasicPolymorphicTypeValidator.builder()
.allowIfSubType(Object.class)
.build(),
DefaultTyping.NON_FINAL,
As.PROPERTY)
.build();
As.PROPERTY);
if (order == PropertyOrder.DECLARATION) {
builder = builder.disable(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY);
}
this.mapper = builder.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public RqueueInternalPubSubChannel(
this.rqueueConfig = rqueueConfig;
this.stringRqueueRedisTemplate = stringRqueueRedisTemplate;
this.rqueueBeanProvider = rqueueBeanProvider;
this.rqueueRedisSerializer = new RqueueRedisSerializer();
this.rqueueRedisSerializer =
new RqueueRedisSerializer(rqueueConfig.getSerializationPropertyOrder());
}

@Override
Expand Down
Loading
Loading