Skip to content

Commit

Permalink
GH-3336: Change MongoDb Store sequence to long (#3385)
Browse files Browse the repository at this point in the history
* GH-3336: Change MongoDb Store sequence to long

Fixes #3336

Turns out there are some scenarios where too many messages
are transferred through the message store, so `int` for
sequence is not enough as a type

* Change sequence to `long` to widen a sequence lifespan

* * Change MongoDb store to deal with `Number.longValue()`
instead of casting which doesn't work from `Integer` to `Long`.
This way we can keep an old sequence document with an `int`
type for value
* Documents with new `long` type for their sequence field are OK.
The `NumberToNumberConverter` has an effect converting `int` to `long`
properly.
  • Loading branch information
artembilan committed Sep 15, 2020
1 parent 7f33085 commit 7a97eb6
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,15 @@ public void afterPropertiesSet() {

indexOperations.ensureIndex(new Index(MessageDocumentFields.MESSAGE_ID, Sort.Direction.ASC));

indexOperations.ensureIndex(new Index(MessageDocumentFields.GROUP_ID, Sort.Direction.ASC)
.on(MessageDocumentFields.MESSAGE_ID, Sort.Direction.ASC)
.unique());

indexOperations.ensureIndex(new Index(MessageDocumentFields.GROUP_ID, Sort.Direction.ASC)
.on(MessageDocumentFields.LAST_MODIFIED_TIME, Sort.Direction.DESC)
.on(MessageDocumentFields.SEQUENCE, Sort.Direction.DESC));
indexOperations.ensureIndex(
new Index(MessageDocumentFields.GROUP_ID, Sort.Direction.ASC)
.on(MessageDocumentFields.MESSAGE_ID, Sort.Direction.ASC)
.unique());

indexOperations.ensureIndex(
new Index(MessageDocumentFields.GROUP_ID, Sort.Direction.ASC)
.on(MessageDocumentFields.LAST_MODIFIED_TIME, Sort.Direction.DESC)
.on(MessageDocumentFields.SEQUENCE, Sort.Direction.DESC));
}

public Message<?> getMessage(UUID id) {
Expand Down Expand Up @@ -198,14 +200,15 @@ public int messageGroupSize(Object groupId) {
* The {@link #SEQUENCE_NAME} document is created on demand.
* @return the next sequence value.
*/
protected int getNextId() {
protected long getNextId() {
Query query = Query.query(Criteria.where("_id").is(SEQUENCE_NAME));
query.fields().include(MessageDocumentFields.SEQUENCE);
return (Integer) this.mongoTemplate.findAndModify(query,
new Update().inc(MessageDocumentFields.SEQUENCE, 1),
return ((Number) this.mongoTemplate.findAndModify(query,
new Update().inc(MessageDocumentFields.SEQUENCE, 1L),
FindAndModifyOptions.options().returnNew(true).upsert(true),
Map.class, this.collectionName)
.get(MessageDocumentFields.SEQUENCE); // NOSONAR - never returns null
.get(MessageDocumentFields.SEQUENCE)) // NOSONAR - never returns null
.longValue();
}

protected void addMessageDocument(final MessageDocument document) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2019 the original author or authors.
* Copyright 2014-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -61,7 +61,7 @@ public class MessageDocument {

private Integer lastReleasedSequence = 0;

private int sequence;
private long sequence;

public MessageDocument(Message<?> message) {
this(message, message.getHeaders().getId());
Expand Down Expand Up @@ -139,7 +139,7 @@ public void setLastReleasedSequence(int lastReleasedSequence) {
this.lastReleasedSequence = lastReleasedSequence;
}

public void setSequence(int sequence) {
public void setSequence(long sequence) {
this.sequence = sequence;
}

Expand All @@ -151,7 +151,7 @@ public Object getGroupId() {
return this.groupId;
}

public int getSequence() {
public long getSequence() {
return this.sequence;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@
public class MongoDbChannelMessageStore extends AbstractConfigurableMongoDbMessageStore
implements PriorityCapableChannelMessageStore {

/**
* The default conventional collection name.
*/
public static final String DEFAULT_COLLECTION_NAME = "channelMessages";

private boolean priorityEnabled;
Expand All @@ -65,7 +68,9 @@ public MongoDbChannelMessageStore(MongoDatabaseFactory mongoDbFactory) {
this(mongoDbFactory, null, DEFAULT_COLLECTION_NAME);
}

public MongoDbChannelMessageStore(MongoDatabaseFactory mongoDbFactory, MappingMongoConverter mappingMongoConverter) {
public MongoDbChannelMessageStore(MongoDatabaseFactory mongoDbFactory,
MappingMongoConverter mappingMongoConverter) {

this(mongoDbFactory, mappingMongoConverter, DEFAULT_COLLECTION_NAME);
}

Expand All @@ -75,6 +80,7 @@ public MongoDbChannelMessageStore(MongoDatabaseFactory mongoDbFactory, String co

public MongoDbChannelMessageStore(MongoDatabaseFactory mongoDbFactory, MappingMongoConverter mappingMongoConverter,
String collectionName) {

super(mongoDbFactory, mappingMongoConverter, collectionName);
}

Expand All @@ -90,7 +96,8 @@ public boolean isPriorityEnabled() {
@Override
public void afterPropertiesSet() {
super.afterPropertiesSet();
getMongoTemplate().indexOps(this.collectionName)
getMongoTemplate()
.indexOps(this.collectionName)
.ensureIndex(new Index(MessageDocumentFields.GROUP_ID, Sort.Direction.ASC)
.on(MessageDocumentFields.PRIORITY, Sort.Direction.DESC)
.on(MessageDocumentFields.LAST_MODIFIED_TIME, Sort.Direction.ASC)
Expand All @@ -109,10 +116,10 @@ public MessageGroup addMessageToGroup(Object groupId, Message<?> message) {
if (this.priorityEnabled) {
document.setPriority(message.getHeaders().get(IntegrationMessageHeaderAccessor.PRIORITY, Integer.class));
}
document.setSequence(this.getNextId());
document.setSequence(getNextId());

this.addMessageDocument(document);
return this.getMessageGroup(groupId);
addMessageDocument(document);
return getMessageGroup(groupId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,14 +470,15 @@ private void updateGroup(Object groupId, Update update) {
this.template.findAndModify(query, update, FindAndModifyOptions.none(), Map.class, this.collectionName);
}

private int getNextId() {
private long getNextId() {
Query query = Query.query(Criteria.where("_id").is(SEQUENCE_NAME));
query.fields().include(SEQUENCE);
return (Integer) this.template.findAndModify(query,
new Update().inc(SEQUENCE, 1),
return ((Number) this.template.findAndModify(query,
new Update().inc(SEQUENCE, 1L),
FindAndModifyOptions.options().returnNew(true).upsert(true),
Map.class,
this.collectionName).get(SEQUENCE); // NOSONAR - never returns null
Map.class, this.collectionName)
.get(SEQUENCE)) // NOSONAR - never returns null
.longValue();
}

@SuppressWarnings(UNCHECKED)
Expand Down Expand Up @@ -848,7 +849,7 @@ private static final class MessageWrapper {
private volatile boolean _group_complete; // NOSONAR name

@SuppressWarnings(UNUSED)
private int sequence;
private long sequence;

MessageWrapper(Message<?> message) {
Assert.notNull(message, "'message' must not be null");
Expand Down Expand Up @@ -917,7 +918,7 @@ public void set_Group_complete(boolean completedGroup) { // NOSONAR name
this._group_complete = completedGroup;
}

public void set_Sequence(int sequence) { // NOSONAR name
public void set_Sequence(long sequence) { // NOSONAR name
this.sequence = sequence;
}

Expand Down

0 comments on commit 7a97eb6

Please sign in to comment.