diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java index 9c99b0e01f..40f5c018d0 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java @@ -53,6 +53,7 @@ public class ChangeStreamOptions { private @Nullable FullDocumentBeforeChange fullDocumentBeforeChangeLookup; private @Nullable Collation collation; private @Nullable Object resumeTimestamp; + private @Nullable Boolean showExpandedEvents; private Resume resume = Resume.UNDEFINED; protected ChangeStreamOptions() {} @@ -108,6 +109,13 @@ public Optional getResumeBsonTimestamp() { return Optional.ofNullable(resumeTimestamp).map(timestamp -> asTimestampOfType(timestamp, BsonTimestamp.class)); } + /** + * @return {@link Optional#empty()} if not set. + */ + public Optional getShowExpandedEvents() { + return Optional.ofNullable(showExpandedEvents); + } + /** * @return {@literal true} if the change stream should be started after the {@link #getResumeToken() token}. * @since 2.2 @@ -191,6 +199,9 @@ public boolean equals(@Nullable Object o) { if (!ObjectUtils.nullSafeEquals(this.resumeTimestamp, that.resumeTimestamp)) { return false; } + if (!ObjectUtils.nullSafeEquals(this.showExpandedEvents, that.showExpandedEvents)) { + return false; + } return resume == that.resume; } @@ -202,6 +213,7 @@ public int hashCode() { result = 31 * result + ObjectUtils.nullSafeHashCode(fullDocumentBeforeChangeLookup); result = 31 * result + ObjectUtils.nullSafeHashCode(collation); result = 31 * result + ObjectUtils.nullSafeHashCode(resumeTimestamp); + result = 31 * result + ObjectUtils.nullSafeHashCode(showExpandedEvents); result = 31 * result + ObjectUtils.nullSafeHashCode(resume); return result; } @@ -239,6 +251,7 @@ public static class ChangeStreamOptionsBuilder { private @Nullable FullDocumentBeforeChange fullDocumentBeforeChangeLookup; private @Nullable Collation collation; private @Nullable Object resumeTimestamp; + private @Nullable Boolean showExpandedEvents; private Resume resume = Resume.UNDEFINED; private ChangeStreamOptionsBuilder() {} @@ -432,6 +445,19 @@ public ChangeStreamOptionsBuilder startAfter(BsonValue resumeToken) { return this; } + /** + * Set whether expanded change events (e.g. createIndexes, shardCollection) should be emitted. + * + * @param showExpandedEvents {@code true} to include expanded events. + * @return this. + */ + @Contract("_ -> this") + public ChangeStreamOptionsBuilder showExpandedEvents(boolean showExpandedEvents) { + + this.showExpandedEvents = showExpandedEvents; + return this; + } + /** * @return the built {@link ChangeStreamOptions} */ @@ -446,6 +472,7 @@ public ChangeStreamOptions build() { options.fullDocumentBeforeChangeLookup = this.fullDocumentBeforeChangeLookup; options.collation = this.collation; options.resumeTimestamp = this.resumeTimestamp; + options.showExpandedEvents = this.showExpandedEvents; options.resume = this.resume; return options; diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java index 3fbe0d7fd9..c4ac9bf693 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java @@ -2148,6 +2148,7 @@ public Flux> changeStream(@Nullable String database, @N publisher = options.getCollation().map(Collation::toMongoCollation).map(publisher::collation) .orElse(publisher); publisher = options.getResumeBsonTimestamp().map(publisher::startAtOperationTime).orElse(publisher); + publisher = options.getShowExpandedEvents().map(publisher::showExpandedEvents).orElse(publisher); if (options.getFullDocumentBeforeChangeLookup().isPresent()) { publisher = publisher.fullDocumentBeforeChange(options.getFullDocumentBeforeChangeLookup().get()); diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamRequest.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamRequest.java index e1da0b33ce..bbabdee5b9 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamRequest.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamRequest.java @@ -243,6 +243,7 @@ public static class ChangeStreamRequestBuilder { private @Nullable String databaseName; private @Nullable String collectionName; private @Nullable Duration maxAwaitTime; + private @Nullable Boolean showExpandedEvents; private @Nullable MessageListener, ? super T> listener; private final ChangeStreamOptionsBuilder delegate = ChangeStreamOptions.builder(); @@ -470,6 +471,20 @@ public ChangeStreamRequestBuilder maxAwaitTime(Duration timeout) { return this; } + /** + * Set whether expanded change events (e.g. createIndexes, shardCollection) should be emitted. + * + * @param showExpandedEvents {@code true} to include expanded events. + * @return this. + */ + @Contract("_ -> this") + public ChangeStreamRequestBuilder showExpandedEvents(boolean showExpandedEvents) { + + this.showExpandedEvents = showExpandedEvents; + this.delegate.showExpandedEvents(showExpandedEvents); + return this; + } + /** * @return the build {@link ChangeStreamRequest}. */ diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java index cc4d3f0bdb..07c4710788 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java @@ -90,6 +90,7 @@ protected MongoCursor> initCursor(MongoTemplate t FullDocumentBeforeChange fullDocumentBeforeChange = null; BsonTimestamp startAt = null; boolean resumeAfter = true; + boolean showExpandedEvents = false; if (options instanceof ChangeStreamRequest.ChangeStreamRequestOptions requestOptions) { @@ -105,6 +106,10 @@ protected MongoCursor> initCursor(MongoTemplate t } } + if (changeStreamOptions.getShowExpandedEvents().isPresent()) { + showExpandedEvents = changeStreamOptions.getShowExpandedEvents().get(); + } + if (changeStreamOptions.getResumeToken().isPresent()) { resumeToken = changeStreamOptions.getResumeToken().get().asDocument(); @@ -155,6 +160,10 @@ protected MongoCursor> initCursor(MongoTemplate t iterable = iterable.collation(collation); } + if (showExpandedEvents) { + iterable = iterable.showExpandedEvents(showExpandedEvents); + } + iterable = iterable.fullDocument(fullDocument); if(fullDocumentBeforeChange != null) { iterable = iterable.fullDocumentBeforeChange(fullDocumentBeforeChange); diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ChangeStreamOptionsUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ChangeStreamOptionsUnitTests.java index a4192df40a..22b4024f38 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ChangeStreamOptionsUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ChangeStreamOptionsUnitTests.java @@ -53,4 +53,12 @@ public void shouldNotReportResumeStartAfter() { assertThat(options.isResumeAfter()).isFalse(); assertThat(options.isStartAfter()).isFalse(); } + + @Test // GH-5069 + void shouldStoreShowExpandedEvents() { + + ChangeStreamOptions options = ChangeStreamOptions.builder().showExpandedEvents(true).build(); + + assertThat(options.getShowExpandedEvents()).isPresent().hasValue(true); + } } diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java index 97f22378dd..866204a2dd 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java @@ -1668,6 +1668,21 @@ void changeStreamOptionFullDocumentBeforeChangeShouldBeApplied() { } + @Test // GH-5069 + void changeStreamOptionShowExpandedEventsShouldBeApplied() { + + when(factory.getMongoDatabase(anyString())).thenReturn(Mono.just(db)); + when(collection.watch(any(Class.class))).thenReturn(changeStreamPublisher); + when(changeStreamPublisher.showExpandedEvents(anyBoolean())).thenReturn(changeStreamPublisher); + when(changeStreamPublisher.fullDocument(any())).thenReturn(changeStreamPublisher); + + ChangeStreamOptions options = ChangeStreamOptions.builder() + .showExpandedEvents(true).build(); + template.changeStream("database", "collection", options, Object.class).subscribe(); + + verify(changeStreamPublisher).showExpandedEvents(true); + } + @Test // GH-4462 void replaceShouldUseCollationWhenPresent() { diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTaskUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTaskUnitTests.java index 12b57ca47d..b7c078b8e0 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTaskUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTaskUnitTests.java @@ -140,6 +140,22 @@ void shouldApplyFullDocumentBeforeChangeToChangeStream() { verify(changeStreamIterable).fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED); } + @Test // GH-5069 + void shouldApplyShowExpandedEventsToChangeStream() { + + when(changeStreamIterable.showExpandedEvents(true)).thenReturn(changeStreamIterable); + + ChangeStreamRequest request = ChangeStreamRequest.builder() // + .collection("start-wars") // + .showExpandedEvents(true) // + .publishTo(message -> {}) // + .build(); + + initTask(request, Document.class); + + verify(changeStreamIterable).showExpandedEvents(true); + } + private MongoCursor> initTask(ChangeStreamRequest request, Class targetType) { ChangeStreamTask task = new ChangeStreamTask(template, request, targetType, er -> {});