Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce AggregationStage API #4309

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@

<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>4.1.0-SNAPSHOT</version>
<version>4.1.x-GH-4306-SNAPSHOT</version>
<packaging>pom</packaging>

<name>Spring Data MongoDB</name>
2 changes: 1 addition & 1 deletion spring-data-mongodb-benchmarks/pom.xml
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>4.1.0-SNAPSHOT</version>
<version>4.1.x-GH-4306-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

2 changes: 1 addition & 1 deletion spring-data-mongodb-distribution/pom.xml
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>4.1.0-SNAPSHOT</version>
<version>4.1.x-GH-4306-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

2 changes: 1 addition & 1 deletion spring-data-mongodb/pom.xml
Original file line number Diff line number Diff line change
@@ -13,7 +13,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>4.1.0-SNAPSHOT</version>
<version>4.1.x-GH-4306-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Original file line number Diff line number Diff line change
@@ -30,6 +30,7 @@
import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
import org.springframework.data.mongodb.core.aggregation.AggregationPipeline;
import org.springframework.data.mongodb.core.aggregation.AggregationResults;
import org.springframework.data.mongodb.core.aggregation.AggregationStage;
import org.springframework.data.mongodb.core.aggregation.AggregationUpdate;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
@@ -296,6 +297,19 @@ default MongoCollection<Document> createView(String name, Class<?> source, Aggre
return createView(name, source, AggregationPipeline.of(stages));
}

/**
* Create a view with the provided name. The view content is defined by the {@link AggregationStage pipeline stages}
* on another collection or view identified by the given {@link #getCollectionName(Class) source type}.
*
* @param name the name of the view to create.
* @param source the type defining the views source collection.
* @param stages the {@link AggregationOperation aggregation pipeline stages} defining the view content.
* @since 4.1
*/
default MongoCollection<Document> createView(String name, Class<?> source, AggregationStage... stages) {
return createView(name, source, AggregationPipeline.of(stages));
}

/**
* Create a view with the provided name. The view content is defined by the {@link AggregationPipeline pipeline} on
* another collection or view identified by the given {@link #getCollectionName(Class) source type}.
Original file line number Diff line number Diff line change
@@ -648,7 +648,7 @@ public MongoCollection<Document> createView(String name, Class<?> source, Aggreg
@Nullable ViewOptions options) {

return createView(name, getCollectionName(source),
queryOperations.createAggregation(Aggregation.newAggregation(source, pipeline.getOperations()), source),
queryOperations.createAggregation(Aggregation.newAggregation(source, pipeline.getStages()), source),
options);
}

@@ -657,7 +657,7 @@ public MongoCollection<Document> createView(String name, String source, Aggregat
@Nullable ViewOptions options) {

return createView(name, source,
queryOperations.createAggregation(Aggregation.newAggregation(pipeline.getOperations()), (Class<?>) null),
queryOperations.createAggregation(Aggregation.newAggregation(pipeline.getStages()), (Class<?>) null),
options);
}

Original file line number Diff line number Diff line change
@@ -25,13 +25,13 @@
import org.bson.Document;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;

import org.springframework.data.geo.GeoResult;
import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
import org.springframework.data.mongodb.core.aggregation.AggregationPipeline;
import org.springframework.data.mongodb.core.aggregation.AggregationStage;
import org.springframework.data.mongodb.core.aggregation.AggregationUpdate;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
@@ -256,6 +256,19 @@ default Mono<MongoCollection<Document>> createView(String name, Class<?> source,
return createView(name, source, AggregationPipeline.of(stages));
}

/**
* Create a view with the provided name. The view content is defined by the {@link AggregationStage pipeline stages}
* on another collection or view identified by the given {@link #getCollectionName(Class) source type}.
*
* @param name the name of the view to create.
* @param source the type defining the views source collection.
* @param stages the {@link AggregationOperation aggregation pipeline stages} defining the view content.
* @since 4.1
*/
default Mono<MongoCollection<Document>> createView(String name, Class<?> source, AggregationStage... stages) {
return createView(name, source, AggregationPipeline.of(stages));
}

/**
* Create a view with the provided name. The view content is defined by the {@link AggregationPipeline pipeline} on
* another collection or view identified by the given {@link #getCollectionName(Class) source type}.
Original file line number Diff line number Diff line change
@@ -676,7 +676,7 @@ public Mono<MongoCollection<Document>> createView(String name, Class<?> source,
@Nullable ViewOptions options) {

return createView(name, getCollectionName(source),
queryOperations.createAggregation(Aggregation.newAggregation(source, pipeline.getOperations()), source),
queryOperations.createAggregation(Aggregation.newAggregation(source, pipeline.getStages()), source),
options);
}

@@ -685,7 +685,7 @@ public Mono<MongoCollection<Document>> createView(String name, String source, Ag
@Nullable ViewOptions options) {

return createView(name, source,
queryOperations.createAggregation(Aggregation.newAggregation(pipeline.getOperations()), (Class<?>) null),
queryOperations.createAggregation(Aggregation.newAggregation(pipeline.getStages()), (Class<?>) null),
options);
}

Original file line number Diff line number Diff line change
@@ -102,12 +102,12 @@ public class Aggregation {
private final AggregationOptions options;

/**
* Creates a new {@link Aggregation} from the given {@link AggregationOperation}s.
* Creates a new {@link Aggregation} from the given {@link AggregationStage}s.
*
* @param operations must not be {@literal null} or empty.
*/
public static Aggregation newAggregation(List<? extends AggregationOperation> operations) {
return newAggregation(operations.toArray(new AggregationOperation[operations.size()]));
public static Aggregation newAggregation(List<? extends AggregationStage> operations) {
return newAggregation(operations.toArray(AggregationStage[]::new));
}

/**
@@ -119,6 +119,16 @@ public static Aggregation newAggregation(AggregationOperation... operations) {
return new Aggregation(operations);
}

/**
* Creates a new {@link Aggregation} from the given {@link AggregationOperation}s.
*
* @param stages must not be {@literal null} or empty.
* @since 4.1
*/
public static Aggregation newAggregation(AggregationStage... stages) {
return new Aggregation(stages);
}

/**
* Creates a new {@link AggregationUpdate} from the given {@link AggregationOperation}s.
*
@@ -130,6 +140,17 @@ public static AggregationUpdate newUpdate(AggregationOperation... operations) {
return AggregationUpdate.from(Arrays.asList(operations));
}

/**
* Creates a new {@link AggregationUpdate} from the given {@link AggregationOperation}s.
*
* @param operations can be {@literal empty} but must not be {@literal null}.
* @return new instance of {@link AggregationUpdate}.
* @since 4.1
*/
public static AggregationUpdate newUpdate(AggregationStage... operations) {
return AggregationUpdate.updateFrom(Arrays.asList(operations));
}

/**
* Returns a copy of this {@link Aggregation} with the given {@link AggregationOptions} set. Note that options are
* supported in MongoDB version 2.6+.
@@ -141,7 +162,7 @@ public static AggregationUpdate newUpdate(AggregationOperation... operations) {
public Aggregation withOptions(AggregationOptions options) {

Assert.notNull(options, "AggregationOptions must not be null");
return new Aggregation(this.pipeline.getOperations(), options);
return new Aggregation(this.pipeline.getStages(), options);
}

/**
@@ -150,8 +171,8 @@ public Aggregation withOptions(AggregationOptions options) {
* @param type must not be {@literal null}.
* @param operations must not be {@literal null} or empty.
*/
public static <T> TypedAggregation<T> newAggregation(Class<T> type, List<? extends AggregationOperation> operations) {
return newAggregation(type, operations.toArray(new AggregationOperation[operations.size()]));
public static <T> TypedAggregation<T> newAggregation(Class<T> type, List<? extends AggregationStage> operations) {
return newAggregation(type, operations.toArray(AggregationStage[]::new));
}

/**
@@ -164,6 +185,17 @@ public static <T> TypedAggregation<T> newAggregation(Class<T> type, AggregationO
return new TypedAggregation<T>(type, operations);
}

/**
* Creates a new {@link TypedAggregation} for the given type and {@link AggregationOperation}s.
*
* @param type must not be {@literal null}.
* @param stages must not be {@literal null} or empty.
* @since 4.1
*/
public static <T> TypedAggregation<T> newAggregation(Class<T> type, AggregationStage... stages) {
return new TypedAggregation<>(type, stages);
}

/**
* Creates a new {@link Aggregation} from the given {@link AggregationOperation}s.
*
@@ -173,6 +205,15 @@ protected Aggregation(AggregationOperation... aggregationOperations) {
this(asAggregationList(aggregationOperations));
}

/**
* Creates a new {@link Aggregation} from the given {@link AggregationOperation}s.
*
* @param aggregationOperations must not be {@literal null} or empty.
*/
protected Aggregation(AggregationStage... aggregationOperations) {
this(Arrays.asList(aggregationOperations));
}

/**
* @param aggregationOperations must not be {@literal null} or empty.
* @return
@@ -189,7 +230,7 @@ protected static List<AggregationOperation> asAggregationList(AggregationOperati
*
* @param aggregationOperations must not be {@literal null} or empty.
*/
protected Aggregation(List<AggregationOperation> aggregationOperations) {
protected Aggregation(List<? extends AggregationStage> aggregationOperations) {
this(aggregationOperations, DEFAULT_OPTIONS);
}

@@ -199,7 +240,7 @@ protected Aggregation(List<AggregationOperation> aggregationOperations) {
* @param aggregationOperations must not be {@literal null}.
* @param options must not be {@literal null} or empty.
*/
protected Aggregation(List<AggregationOperation> aggregationOperations, AggregationOptions options) {
protected Aggregation(List<? extends AggregationStage> aggregationOperations, AggregationOptions options) {

Assert.notNull(aggregationOperations, "AggregationOperations must not be null");
Assert.notNull(options, "AggregationOptions must not be null");
@@ -638,6 +679,17 @@ public static FacetOperationBuilder facet(AggregationOperation... aggregationOpe
return facet().and(aggregationOperations);
}

/**
* Creates a new {@link FacetOperationBuilder} given {@link Aggregation}.
*
* @param stages the sub-pipeline, must not be {@literal null}.
* @return new instance of {@link FacetOperation}.
* @since 4.1
*/
public static FacetOperationBuilder facet(AggregationStage... stages) {
return facet().and(stages);
}

/**
* Creates a new {@link LookupOperation}.
*
@@ -668,14 +720,14 @@ public static LookupOperation lookup(Field from, Field localField, Field foreign

/**
* Entrypoint for creating {@link LookupOperation $lookup} using a fluent builder API.
*
* <pre class="code">
* Aggregation.lookup().from("restaurants")
* .localField("restaurant_name")
* .foreignField("name")
* .let(newVariable("orders_drink").forField("drink"))
* .pipeline(match(ctx -> new Document("$expr", new Document("$in", List.of("$$orders_drink", "$beverages")))))
* .as("matches")
* Aggregation.lookup().from("restaurants").localField("restaurant_name").foreignField("name")
* .let(newVariable("orders_drink").forField("drink"))
* .pipeline(match(ctx -> new Document("$expr", new Document("$in", List.of("$$orders_drink", "$beverages")))))
* .as("matches")
* </pre>
*
* @return new instance of {@link LookupOperationBuilder}.
* @since 4.1
*/
Original file line number Diff line number Diff line change
@@ -15,10 +15,10 @@
*/
package org.springframework.data.mongodb.core.aggregation;

import java.util.Collections;
import java.util.List;

import org.bson.Document;
import org.springframework.util.CollectionUtils;

/**
* Represents one single operation in an aggregation pipeline.
@@ -29,30 +29,24 @@
* @author Christoph Strobl
* @since 1.3
*/
public interface AggregationOperation {
public interface AggregationOperation extends MultiOperationAggregationStage {

/**
* Turns the {@link AggregationOperation} into a {@link Document} by using the given
* {@link AggregationOperationContext}.
*
* @param context the {@link AggregationOperationContext} to operate within. Must not be {@literal null}.
* @return the Document
* @deprecated since 2.2 in favor of {@link #toPipelineStages(AggregationOperationContext)}.
* @return
*/
@Deprecated
@Override
Document toDocument(AggregationOperationContext context);

/**
* Turns the {@link AggregationOperation} into list of {@link Document stages} by using the given
* {@link AggregationOperationContext}. This allows a single {@link AggregationOptions} to add additional stages for
* eg. {@code $sort} or {@code $limit}.
* More the exception than the default.
*
* @param context the {@link AggregationOperationContext} to operate within. Must not be {@literal null}.
* @return the pipeline stages to run through. Never {@literal null}.
* @since 2.2
* @return never {@literal null}.
*/
@Override
default List<Document> toPipelineStages(AggregationOperationContext context) {
return Collections.singletonList(toDocument(context));
return List.of(toDocument(context));
}

/**
@@ -63,6 +57,6 @@ default List<Document> toPipelineStages(AggregationOperationContext context) {
* @since 3.0.2
*/
default String getOperator() {
return toDocument(Aggregation.DEFAULT_CONTEXT).keySet().iterator().next();
return CollectionUtils.lastElement(toPipelineStages(Aggregation.DEFAULT_CONTEXT)).keySet().iterator().next();
}
}
Original file line number Diff line number Diff line change
@@ -45,15 +45,19 @@ class AggregationOperationRenderer {
* @param rootContext must not be {@literal null}.
* @return the {@link List} of {@link Document}.
*/
static List<Document> toDocument(List<AggregationOperation> operations, AggregationOperationContext rootContext) {
static List<Document> toDocument(List<? extends AggregationStage> operations, AggregationOperationContext rootContext) {

List<Document> operationDocuments = new ArrayList<Document>(operations.size());

AggregationOperationContext contextToUse = rootContext;

for (AggregationOperation operation : operations) {
for (AggregationStage operation : operations) {

operationDocuments.addAll(operation.toPipelineStages(contextToUse));
if(operation instanceof MultiOperationAggregationStage mops) {
operationDocuments.addAll(mops.toPipelineStages(contextToUse));
} else {
operationDocuments.add(operation.toDocument(contextToUse));
}

if (operation instanceof FieldsExposingAggregationOperation) {

Loading
Oops, something went wrong.