Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class ChangeStreamOptions {
private @Nullable FullDocumentBeforeChange fullDocumentBeforeChangeLookup;
private @Nullable Collation collation;
private @Nullable Object resumeTimestamp;
private @Nullable Boolean showExpandedEvents;
private Resume resume = Resume.UNDEFINED;

protected ChangeStreamOptions() {}
Expand Down Expand Up @@ -108,6 +109,13 @@ public Optional<BsonTimestamp> getResumeBsonTimestamp() {
return Optional.ofNullable(resumeTimestamp).map(timestamp -> asTimestampOfType(timestamp, BsonTimestamp.class));
}

/**
* @return {@link Optional#empty()} if not set.
*/
public Optional<Boolean> getShowExpandedEvents() {
return Optional.ofNullable(showExpandedEvents);
}

/**
* @return {@literal true} if the change stream should be started after the {@link #getResumeToken() token}.
* @since 2.2
Expand Down Expand Up @@ -191,6 +199,9 @@ public boolean equals(@Nullable Object o) {
if (!ObjectUtils.nullSafeEquals(this.resumeTimestamp, that.resumeTimestamp)) {
return false;
}
if (!ObjectUtils.nullSafeEquals(this.showExpandedEvents, that.showExpandedEvents)) {
return false;
}
return resume == that.resume;
}

Expand All @@ -202,6 +213,7 @@ public int hashCode() {
result = 31 * result + ObjectUtils.nullSafeHashCode(fullDocumentBeforeChangeLookup);
result = 31 * result + ObjectUtils.nullSafeHashCode(collation);
result = 31 * result + ObjectUtils.nullSafeHashCode(resumeTimestamp);
result = 31 * result + ObjectUtils.nullSafeHashCode(showExpandedEvents);
result = 31 * result + ObjectUtils.nullSafeHashCode(resume);
return result;
}
Expand Down Expand Up @@ -239,6 +251,7 @@ public static class ChangeStreamOptionsBuilder {
private @Nullable FullDocumentBeforeChange fullDocumentBeforeChangeLookup;
private @Nullable Collation collation;
private @Nullable Object resumeTimestamp;
private @Nullable Boolean showExpandedEvents;
private Resume resume = Resume.UNDEFINED;

private ChangeStreamOptionsBuilder() {}
Expand Down Expand Up @@ -432,6 +445,19 @@ public ChangeStreamOptionsBuilder startAfter(BsonValue resumeToken) {
return this;
}

/**
* Set whether expanded change events (e.g. createIndexes, shardCollection) should be emitted.
*
* @param showExpandedEvents {@code true} to include expanded events.
* @return this.
*/
@Contract("_ -> this")
public ChangeStreamOptionsBuilder showExpandedEvents(boolean showExpandedEvents) {

this.showExpandedEvents = showExpandedEvents;
return this;
}

/**
* @return the built {@link ChangeStreamOptions}
*/
Expand All @@ -446,6 +472,7 @@ public ChangeStreamOptions build() {
options.fullDocumentBeforeChangeLookup = this.fullDocumentBeforeChangeLookup;
options.collation = this.collation;
options.resumeTimestamp = this.resumeTimestamp;
options.showExpandedEvents = this.showExpandedEvents;
options.resume = this.resume;

return options;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2148,6 +2148,7 @@ public <T> Flux<ChangeStreamEvent<T>> changeStream(@Nullable String database, @N
publisher = options.getCollation().map(Collation::toMongoCollation).map(publisher::collation)
.orElse(publisher);
publisher = options.getResumeBsonTimestamp().map(publisher::startAtOperationTime).orElse(publisher);
publisher = options.getShowExpandedEvents().map(publisher::showExpandedEvents).orElse(publisher);

if (options.getFullDocumentBeforeChangeLookup().isPresent()) {
publisher = publisher.fullDocumentBeforeChange(options.getFullDocumentBeforeChangeLookup().get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ public static class ChangeStreamRequestBuilder<T> {
private @Nullable String databaseName;
private @Nullable String collectionName;
private @Nullable Duration maxAwaitTime;
private @Nullable Boolean showExpandedEvents;
private @Nullable MessageListener<ChangeStreamDocument<Document>, ? super T> listener;
private final ChangeStreamOptionsBuilder delegate = ChangeStreamOptions.builder();

Expand Down Expand Up @@ -470,6 +471,20 @@ public ChangeStreamRequestBuilder<T> maxAwaitTime(Duration timeout) {
return this;
}

/**
* Set whether expanded change events (e.g. createIndexes, shardCollection) should be emitted.
*
* @param showExpandedEvents {@code true} to include expanded events.
* @return this.
*/
@Contract("_ -> this")
public ChangeStreamRequestBuilder<T> showExpandedEvents(boolean showExpandedEvents) {

this.showExpandedEvents = showExpandedEvents;
this.delegate.showExpandedEvents(showExpandedEvents);
return this;
}

/**
* @return the build {@link ChangeStreamRequest}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ protected MongoCursor<ChangeStreamDocument<Document>> initCursor(MongoTemplate t
FullDocumentBeforeChange fullDocumentBeforeChange = null;
BsonTimestamp startAt = null;
boolean resumeAfter = true;
boolean showExpandedEvents = false;

if (options instanceof ChangeStreamRequest.ChangeStreamRequestOptions requestOptions) {

Expand All @@ -105,6 +106,10 @@ protected MongoCursor<ChangeStreamDocument<Document>> initCursor(MongoTemplate t
}
}

if (changeStreamOptions.getShowExpandedEvents().isPresent()) {
showExpandedEvents = changeStreamOptions.getShowExpandedEvents().get();
}

if (changeStreamOptions.getResumeToken().isPresent()) {

resumeToken = changeStreamOptions.getResumeToken().get().asDocument();
Expand Down Expand Up @@ -155,6 +160,10 @@ protected MongoCursor<ChangeStreamDocument<Document>> initCursor(MongoTemplate t
iterable = iterable.collation(collation);
}

if (showExpandedEvents) {
iterable = iterable.showExpandedEvents(showExpandedEvents);
}

iterable = iterable.fullDocument(fullDocument);
if(fullDocumentBeforeChange != null) {
iterable = iterable.fullDocumentBeforeChange(fullDocumentBeforeChange);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,12 @@ public void shouldNotReportResumeStartAfter() {
assertThat(options.isResumeAfter()).isFalse();
assertThat(options.isStartAfter()).isFalse();
}

@Test // GH-5069
void shouldStoreShowExpandedEvents() {

ChangeStreamOptions options = ChangeStreamOptions.builder().showExpandedEvents(true).build();

assertThat(options.getShowExpandedEvents()).isPresent().hasValue(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1668,6 +1668,21 @@ void changeStreamOptionFullDocumentBeforeChangeShouldBeApplied() {

}

@Test // GH-5069
void changeStreamOptionShowExpandedEventsShouldBeApplied() {

when(factory.getMongoDatabase(anyString())).thenReturn(Mono.just(db));
when(collection.watch(any(Class.class))).thenReturn(changeStreamPublisher);
when(changeStreamPublisher.showExpandedEvents(anyBoolean())).thenReturn(changeStreamPublisher);
when(changeStreamPublisher.fullDocument(any())).thenReturn(changeStreamPublisher);

ChangeStreamOptions options = ChangeStreamOptions.builder()
.showExpandedEvents(true).build();
template.changeStream("database", "collection", options, Object.class).subscribe();

verify(changeStreamPublisher).showExpandedEvents(true);
}

@Test // GH-4462
void replaceShouldUseCollationWhenPresent() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,22 @@ void shouldApplyFullDocumentBeforeChangeToChangeStream() {
verify(changeStreamIterable).fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED);
}

@Test // GH-5069
void shouldApplyShowExpandedEventsToChangeStream() {

when(changeStreamIterable.showExpandedEvents(true)).thenReturn(changeStreamIterable);

ChangeStreamRequest request = ChangeStreamRequest.builder() //
.collection("start-wars") //
.showExpandedEvents(true) //
.publishTo(message -> {}) //
.build();

initTask(request, Document.class);

verify(changeStreamIterable).showExpandedEvents(true);
}

private MongoCursor<ChangeStreamDocument<Document>> initTask(ChangeStreamRequest request, Class<?> targetType) {

ChangeStreamTask task = new ChangeStreamTask(template, request, targetType, er -> {});
Expand Down