Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReactiveMongoTemplate does not return ChangeStreamEvents with PreImages #4495

Closed
PhilippAllstadt opened this issue Sep 6, 2023 · 4 comments
Labels
type: bug A general bug

Comments

@PhilippAllstadt
Copy link

PhilippAllstadt commented Sep 6, 2023

Hi @ALL,

in our use case we have the requirement to consume changestream events with pre-images. We are using MongoDB version 6.0.7.
Further, our spring version is the newest and we are using the latest mongodb drivers.

Despite it, delete and update events haven´t filled fullDocumentBeforeChange in raw if event is consumed in our spring app.

In database. pre-images should work because we´ve tested consuming via mongosh shell.

This is our code to consume change stream in our spring app:

var collectionChangeStream = reactiveMongoTemplate .changeStream(TransactionEntity.class) .withOptions( (optionsBuilder) -> optionsBuilder .returnFullDocumentBeforeChange() .returnFullDocumentOnUpdate()) .watchCollection("transaction");

if (changeStreamToken.isPresent() && changeStreamToken.get().getToken() != null) {
  collectionChangeStream.resumeAfter(changeStreamToken.get().getToken());
} else {
  collectionChangeStream.resumeAt(Instant.now().minusSeconds(seconds));
}

return collectionChangeStream.listen();

Am I missing something?

Thank you very much.

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Sep 6, 2023
@christophstrobl
Copy link
Member

Thank you @PhilippAllstadt for bringing this to our attention.
At a quick glance it seems ReactiveMongoTemplate#changeStream(String, String, ChangeStreamOptions, Class) does not forward the FullDocumentBeforeChange option to the ChangeStreamPublisher.

@christophstrobl christophstrobl added type: bug A general bug and removed status: waiting-for-triage An issue we've not yet triaged labels Sep 6, 2023
@PhilippAllstadt
Copy link
Author

PhilippAllstadt commented Sep 6, 2023

Thank you for your response.

Yes, i think the problem is prepareFilter method:

`public Flux<ChangeStreamEvent> changeStream(@nullable String database, @nullable String collectionName,
ChangeStreamOptions options, Class targetType) {

	List<Document> filter = prepareFilter(options);
	FullDocument fullDocument = ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT
			: FullDocument.UPDATE_LOOKUP;

	return ReactiveMongoDatabaseUtils.getDatabase(database, mongoDatabaseFactory) //
			.map(db -> {
				ChangeStreamPublisher<Document> publisher;
				if (StringUtils.hasText(collectionName)) {
					publisher = filter.isEmpty() ? db.getCollection(collectionName).watch(Document.class)
							: db.getCollection(collectionName).watch(filter, Document.class);

				} else {
					publisher = filter.isEmpty() ? db.watch(Document.class) : db.watch(filter, Document.class);
				}

				if (options.isResumeAfter()) {
					publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::resumeAfter)
							.orElse(publisher);
				} else if (options.isStartAfter()) {
					publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::startAfter)
							.orElse(publisher);
				}
				publisher = options.getCollation().map(Collation::toMongoCollation).map(publisher::collation)
						.orElse(publisher);
				publisher = options.getResumeBsonTimestamp().map(publisher::startAtOperationTime).orElse(publisher);
				return publisher.fullDocument(options.getFullDocumentLookup().orElse(fullDocument));
			}) //
			.flatMapMany(publisher -> Flux.from(publisher)
					.map(document -> new ChangeStreamEvent<>(document, targetType, getConverter())));
}`

prepareFilter:
`

	Object filter = options.getFilter().orElse(Collections.emptyList());

	if (filter instanceof Aggregation agg) {
		AggregationOperationContext context = agg instanceof TypedAggregation typedAggregation
				? new TypeBasedAggregationOperationContext(typedAggregation.getInputType(),
						getConverter().getMappingContext(), queryMapper)
				: new RelaxedTypeBasedAggregationOperationContext(Object.class, mappingContext, queryMapper);

		return agg.toPipeline(new PrefixingDelegatingAggregationOperationContext(context, "fullDocument",
				Arrays.asList("operationType", "fullDocument", "documentKey", "updateDescription", "ns")));
	}

	if (filter instanceof List) {
		return (List<Document>) filter;
	}

	throw new IllegalArgumentException(
			"ChangeStreamRequestOptions.filter mut be either an Aggregation or a plain list of Documents");

`

Changestreamoptions aren´t type of agg and returned list is empty.

So, would you fix that by the next release?

Thank you very much

@Alfagun74
Copy link

Oh my god, I've been searching for the cause of this problem for a long time! I'm also having this issue!

@PhilippAllstadt
Copy link
Author

Hi there, is there any update to this issue ? This feature would be very important for our use case.

Thank you.

@mp911de mp911de changed the title ReactiveMongoTemplate does not return ChangeStreamEvents with PreImages ReactiveMongoTemplate does not return ChangeStreamEvents with PreImages Nov 2, 2023
@mp911de mp911de added this to the 4.0.12 (2022.0.12) milestone Nov 2, 2023
@mp911de mp911de closed this as completed in bf54054 Nov 2, 2023
mp911de added a commit that referenced this issue Nov 2, 2023
Reformat code. Suppress warnings in tests.

Original pull request: #4541
See #4495
mp911de pushed a commit that referenced this issue Nov 2, 2023
…lisher.

This commit ensures to pass on a potentially set FullDocumentBeforeChange option to the change stream iterable/publisher.
It also corrects false optional behavior within the change stream task which did some defaulting though the actual value is an optional one that must not be present.

Original pull request: #4541
Closes #4495
mp911de added a commit that referenced this issue Nov 2, 2023
Reformat code. Suppress warnings in tests.

Original pull request: #4541
See #4495
mp911de pushed a commit that referenced this issue Nov 2, 2023
…lisher.

This commit ensures to pass on a potentially set FullDocumentBeforeChange option to the change stream iterable/publisher.
It also corrects false optional behavior within the change stream task which did some defaulting though the actual value is an optional one that must not be present.

Original pull request: #4541
Closes #4495
mp911de added a commit that referenced this issue Nov 2, 2023
Reformat code. Suppress warnings in tests.

Original pull request: #4541
See #4495
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: bug A general bug
Projects
None yet
5 participants