diff --git a/pom.xml b/pom.xml
index d864b2e4e6..6b73e01d6e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
 
 	<groupId>org.springframework.data</groupId>
 	<artifactId>spring-data-mongodb-parent</artifactId>
-	<version>4.1.0-SNAPSHOT</version>
+	<version>4.1.x-GH-4306-SNAPSHOT</version>
 	<packaging>pom</packaging>
 
 	<name>Spring Data MongoDB</name>
diff --git a/spring-data-mongodb-benchmarks/pom.xml b/spring-data-mongodb-benchmarks/pom.xml
index 1b2a1390e6..ad9b59e791 100644
--- a/spring-data-mongodb-benchmarks/pom.xml
+++ b/spring-data-mongodb-benchmarks/pom.xml
@@ -7,7 +7,7 @@
 	<parent>
 		<groupId>org.springframework.data</groupId>
 		<artifactId>spring-data-mongodb-parent</artifactId>
-		<version>4.1.0-SNAPSHOT</version>
+		<version>4.1.x-GH-4306-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
 
diff --git a/spring-data-mongodb-distribution/pom.xml b/spring-data-mongodb-distribution/pom.xml
index 8db8d798fb..762d6ad834 100644
--- a/spring-data-mongodb-distribution/pom.xml
+++ b/spring-data-mongodb-distribution/pom.xml
@@ -15,7 +15,7 @@
 	<parent>
 		<groupId>org.springframework.data</groupId>
 		<artifactId>spring-data-mongodb-parent</artifactId>
-		<version>4.1.0-SNAPSHOT</version>
+		<version>4.1.x-GH-4306-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
 
diff --git a/spring-data-mongodb/pom.xml b/spring-data-mongodb/pom.xml
index 9a57f7eb52..99c5f3b8c8 100644
--- a/spring-data-mongodb/pom.xml
+++ b/spring-data-mongodb/pom.xml
@@ -13,7 +13,7 @@
 	<parent>
 		<groupId>org.springframework.data</groupId>
 		<artifactId>spring-data-mongodb-parent</artifactId>
-		<version>4.1.0-SNAPSHOT</version>
+		<version>4.1.x-GH-4306-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
 
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java
index 4727c0b8db..5d070edd48 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java
@@ -30,6 +30,7 @@
 import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
 import org.springframework.data.mongodb.core.aggregation.AggregationPipeline;
 import org.springframework.data.mongodb.core.aggregation.AggregationResults;
+import org.springframework.data.mongodb.core.aggregation.AggregationStage;
 import org.springframework.data.mongodb.core.aggregation.AggregationUpdate;
 import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
 import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
@@ -296,6 +297,19 @@ default MongoCollection<Document> createView(String name, Class<?> source, Aggre
 		return createView(name, source, AggregationPipeline.of(stages));
 	}
 
+	/**
+	 * Create a view with the provided name. The view content is defined by the {@link AggregationStage pipeline stages}
+	 * on another collection or view identified by the given {@link #getCollectionName(Class) source type}.
+	 *
+	 * @param name the name of the view to create.
+	 * @param source the type defining the views source collection.
+	 * @param stages the {@link AggregationOperation aggregation pipeline stages} defining the view content.
+	 * @since 4.1
+	 */
+	default MongoCollection<Document> createView(String name, Class<?> source, AggregationStage... stages) {
+		return createView(name, source, AggregationPipeline.of(stages));
+	}
+
 	/**
 	 * Create a view with the provided name. The view content is defined by the {@link AggregationPipeline pipeline} on
 	 * another collection or view identified by the given {@link #getCollectionName(Class) source type}.
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java
index b88b6d6b53..865de80b2b 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java
@@ -648,7 +648,7 @@ public MongoCollection<Document> createView(String name, Class<?> source, Aggreg
 			@Nullable ViewOptions options) {
 
 		return createView(name, getCollectionName(source),
-				queryOperations.createAggregation(Aggregation.newAggregation(source, pipeline.getOperations()), source),
+				queryOperations.createAggregation(Aggregation.newAggregation(source, pipeline.getStages()), source),
 				options);
 	}
 
@@ -657,7 +657,7 @@ public MongoCollection<Document> createView(String name, String source, Aggregat
 			@Nullable ViewOptions options) {
 
 		return createView(name, source,
-				queryOperations.createAggregation(Aggregation.newAggregation(pipeline.getOperations()), (Class<?>) null),
+				queryOperations.createAggregation(Aggregation.newAggregation(pipeline.getStages()), (Class<?>) null),
 				options);
 	}
 
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java
index 323ca9dd95..9b9f2b0116 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java
@@ -25,13 +25,13 @@
 import org.bson.Document;
 import org.reactivestreams.Publisher;
 import org.reactivestreams.Subscription;
-
 import org.springframework.data.geo.GeoResult;
 import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory;
 import org.springframework.data.mongodb.core.aggregation.Aggregation;
 import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
 import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
 import org.springframework.data.mongodb.core.aggregation.AggregationPipeline;
+import org.springframework.data.mongodb.core.aggregation.AggregationStage;
 import org.springframework.data.mongodb.core.aggregation.AggregationUpdate;
 import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
 import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
@@ -256,6 +256,19 @@ default Mono<MongoCollection<Document>> createView(String name, Class<?> source,
 		return createView(name, source, AggregationPipeline.of(stages));
 	}
 
+	/**
+	 * Create a view with the provided name. The view content is defined by the {@link AggregationStage pipeline stages}
+	 * on another collection or view identified by the given {@link #getCollectionName(Class) source type}.
+	 *
+	 * @param name the name of the view to create.
+	 * @param source the type defining the views source collection.
+	 * @param stages the {@link AggregationOperation aggregation pipeline stages} defining the view content.
+	 * @since 4.1
+	 */
+	default Mono<MongoCollection<Document>> createView(String name, Class<?> source, AggregationStage... stages) {
+		return createView(name, source, AggregationPipeline.of(stages));
+	}
+
 	/**
 	 * Create a view with the provided name. The view content is defined by the {@link AggregationPipeline pipeline} on
 	 * another collection or view identified by the given {@link #getCollectionName(Class) source type}.
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java
index d49dffafe8..f782bb022d 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java
@@ -676,7 +676,7 @@ public Mono<MongoCollection<Document>> createView(String name, Class<?> source,
 			@Nullable ViewOptions options) {
 
 		return createView(name, getCollectionName(source),
-				queryOperations.createAggregation(Aggregation.newAggregation(source, pipeline.getOperations()), source),
+				queryOperations.createAggregation(Aggregation.newAggregation(source, pipeline.getStages()), source),
 				options);
 	}
 
@@ -685,7 +685,7 @@ public Mono<MongoCollection<Document>> createView(String name, String source, Ag
 			@Nullable ViewOptions options) {
 
 		return createView(name, source,
-				queryOperations.createAggregation(Aggregation.newAggregation(pipeline.getOperations()), (Class<?>) null),
+				queryOperations.createAggregation(Aggregation.newAggregation(pipeline.getStages()), (Class<?>) null),
 				options);
 	}
 
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/Aggregation.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/Aggregation.java
index cb9e70dd17..112ebe1747 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/Aggregation.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/Aggregation.java
@@ -102,12 +102,12 @@ public class Aggregation {
 	private final AggregationOptions options;
 
 	/**
-	 * Creates a new {@link Aggregation} from the given {@link AggregationOperation}s.
+	 * Creates a new {@link Aggregation} from the given {@link AggregationStage}s.
 	 *
 	 * @param operations must not be {@literal null} or empty.
 	 */
-	public static Aggregation newAggregation(List<? extends AggregationOperation> operations) {
-		return newAggregation(operations.toArray(new AggregationOperation[operations.size()]));
+	public static Aggregation newAggregation(List<? extends AggregationStage> operations) {
+		return newAggregation(operations.toArray(AggregationStage[]::new));
 	}
 
 	/**
@@ -119,6 +119,16 @@ public static Aggregation newAggregation(AggregationOperation... operations) {
 		return new Aggregation(operations);
 	}
 
+	/**
+	 * Creates a new {@link Aggregation} from the given {@link AggregationOperation}s.
+	 *
+	 * @param stages must not be {@literal null} or empty.
+	 * @since 4.1
+	 */
+	public static Aggregation newAggregation(AggregationStage... stages) {
+		return new Aggregation(stages);
+	}
+
 	/**
 	 * Creates a new {@link AggregationUpdate} from the given {@link AggregationOperation}s.
 	 *
@@ -130,6 +140,17 @@ public static AggregationUpdate newUpdate(AggregationOperation... operations) {
 		return AggregationUpdate.from(Arrays.asList(operations));
 	}
 
+	/**
+	 * Creates a new {@link AggregationUpdate} from the given {@link AggregationOperation}s.
+	 *
+	 * @param operations can be {@literal empty} but must not be {@literal null}.
+	 * @return new instance of {@link AggregationUpdate}.
+	 * @since 4.1
+	 */
+	public static AggregationUpdate newUpdate(AggregationStage... operations) {
+		return AggregationUpdate.updateFrom(Arrays.asList(operations));
+	}
+
 	/**
 	 * Returns a copy of this {@link Aggregation} with the given {@link AggregationOptions} set. Note that options are
 	 * supported in MongoDB version 2.6+.
@@ -141,7 +162,7 @@ public static AggregationUpdate newUpdate(AggregationOperation... operations) {
 	public Aggregation withOptions(AggregationOptions options) {
 
 		Assert.notNull(options, "AggregationOptions must not be null");
-		return new Aggregation(this.pipeline.getOperations(), options);
+		return new Aggregation(this.pipeline.getStages(), options);
 	}
 
 	/**
@@ -150,8 +171,8 @@ public Aggregation withOptions(AggregationOptions options) {
 	 * @param type must not be {@literal null}.
 	 * @param operations must not be {@literal null} or empty.
 	 */
-	public static <T> TypedAggregation<T> newAggregation(Class<T> type, List<? extends AggregationOperation> operations) {
-		return newAggregation(type, operations.toArray(new AggregationOperation[operations.size()]));
+	public static <T> TypedAggregation<T> newAggregation(Class<T> type, List<? extends AggregationStage> operations) {
+		return newAggregation(type, operations.toArray(AggregationStage[]::new));
 	}
 
 	/**
@@ -164,6 +185,17 @@ public static <T> TypedAggregation<T> newAggregation(Class<T> type, AggregationO
 		return new TypedAggregation<T>(type, operations);
 	}
 
+	/**
+	 * Creates a new {@link TypedAggregation} for the given type and {@link AggregationOperation}s.
+	 *
+	 * @param type must not be {@literal null}.
+	 * @param stages must not be {@literal null} or empty.
+	 * @since 4.1
+	 */
+	public static <T> TypedAggregation<T> newAggregation(Class<T> type, AggregationStage... stages) {
+		return new TypedAggregation<>(type, stages);
+	}
+
 	/**
 	 * Creates a new {@link Aggregation} from the given {@link AggregationOperation}s.
 	 *
@@ -173,6 +205,15 @@ protected Aggregation(AggregationOperation... aggregationOperations) {
 		this(asAggregationList(aggregationOperations));
 	}
 
+	/**
+	 * Creates a new {@link Aggregation} from the given {@link AggregationOperation}s.
+	 *
+	 * @param aggregationOperations must not be {@literal null} or empty.
+	 */
+	protected Aggregation(AggregationStage... aggregationOperations) {
+		this(Arrays.asList(aggregationOperations));
+	}
+
 	/**
 	 * @param aggregationOperations must not be {@literal null} or empty.
 	 * @return
@@ -189,7 +230,7 @@ protected static List<AggregationOperation> asAggregationList(AggregationOperati
 	 *
 	 * @param aggregationOperations must not be {@literal null} or empty.
 	 */
-	protected Aggregation(List<AggregationOperation> aggregationOperations) {
+	protected Aggregation(List<? extends AggregationStage> aggregationOperations) {
 		this(aggregationOperations, DEFAULT_OPTIONS);
 	}
 
@@ -199,7 +240,7 @@ protected Aggregation(List<AggregationOperation> aggregationOperations) {
 	 * @param aggregationOperations must not be {@literal null}.
 	 * @param options must not be {@literal null} or empty.
 	 */
-	protected Aggregation(List<AggregationOperation> aggregationOperations, AggregationOptions options) {
+	protected Aggregation(List<? extends AggregationStage> aggregationOperations, AggregationOptions options) {
 
 		Assert.notNull(aggregationOperations, "AggregationOperations must not be null");
 		Assert.notNull(options, "AggregationOptions must not be null");
@@ -638,6 +679,17 @@ public static FacetOperationBuilder facet(AggregationOperation... aggregationOpe
 		return facet().and(aggregationOperations);
 	}
 
+	/**
+	 * Creates a new {@link FacetOperationBuilder} given {@link Aggregation}.
+	 *
+	 * @param stages the sub-pipeline, must not be {@literal null}.
+	 * @return new instance of {@link FacetOperation}.
+	 * @since 4.1
+	 */
+	public static FacetOperationBuilder facet(AggregationStage... stages) {
+		return facet().and(stages);
+	}
+
 	/**
 	 * Creates a new {@link LookupOperation}.
 	 *
@@ -668,14 +720,14 @@ public static LookupOperation lookup(Field from, Field localField, Field foreign
 
 	/**
 	 * Entrypoint for creating {@link LookupOperation $lookup} using a fluent builder API.
+	 * 
 	 * <pre class="code">
-	 * Aggregation.lookup().from("restaurants")
-	 * 	.localField("restaurant_name")
-	 * 	.foreignField("name")
-	 * 	.let(newVariable("orders_drink").forField("drink"))
-	 * 	.pipeline(match(ctx -> new Document("$expr", new Document("$in", List.of("$$orders_drink", "$beverages")))))
-	 * 	.as("matches")
+	 * Aggregation.lookup().from("restaurants").localField("restaurant_name").foreignField("name")
+	 * 		.let(newVariable("orders_drink").forField("drink"))
+	 * 		.pipeline(match(ctx -> new Document("$expr", new Document("$in", List.of("$$orders_drink", "$beverages")))))
+	 * 		.as("matches")
 	 * </pre>
+	 * 
 	 * @return new instance of {@link LookupOperationBuilder}.
 	 * @since 4.1
 	 */
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationOperation.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationOperation.java
index d9690f2a11..99758871c8 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationOperation.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationOperation.java
@@ -15,10 +15,10 @@
  */
 package org.springframework.data.mongodb.core.aggregation;
 
-import java.util.Collections;
 import java.util.List;
 
 import org.bson.Document;
+import org.springframework.util.CollectionUtils;
 
 /**
  * Represents one single operation in an aggregation pipeline.
@@ -29,30 +29,24 @@
  * @author Christoph Strobl
  * @since 1.3
  */
-public interface AggregationOperation {
+public interface AggregationOperation extends MultiOperationAggregationStage {
 
 	/**
-	 * Turns the {@link AggregationOperation} into a {@link Document} by using the given
-	 * {@link AggregationOperationContext}.
-	 *
 	 * @param context the {@link AggregationOperationContext} to operate within. Must not be {@literal null}.
-	 * @return the Document
-	 * @deprecated since 2.2 in favor of {@link #toPipelineStages(AggregationOperationContext)}.
+	 * @return
 	 */
-	@Deprecated
+	@Override
 	Document toDocument(AggregationOperationContext context);
 
 	/**
-	 * Turns the {@link AggregationOperation} into list of {@link Document stages} by using the given
-	 * {@link AggregationOperationContext}. This allows a single {@link AggregationOptions} to add additional stages for
-	 * eg. {@code $sort} or {@code $limit}.
+	 * More the exception than the default.
 	 *
 	 * @param context the {@link AggregationOperationContext} to operate within. Must not be {@literal null}.
-	 * @return the pipeline stages to run through. Never {@literal null}.
-	 * @since 2.2
+	 * @return never {@literal null}.
 	 */
+	@Override
 	default List<Document> toPipelineStages(AggregationOperationContext context) {
-		return Collections.singletonList(toDocument(context));
+		return List.of(toDocument(context));
 	}
 
 	/**
@@ -63,6 +57,6 @@ default List<Document> toPipelineStages(AggregationOperationContext context) {
 	 * @since 3.0.2
 	 */
 	default String getOperator() {
-		return toDocument(Aggregation.DEFAULT_CONTEXT).keySet().iterator().next();
+		return CollectionUtils.lastElement(toPipelineStages(Aggregation.DEFAULT_CONTEXT)).keySet().iterator().next();
 	}
 }
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationOperationRenderer.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationOperationRenderer.java
index cc3be58520..06170a6191 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationOperationRenderer.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationOperationRenderer.java
@@ -45,15 +45,19 @@ class AggregationOperationRenderer {
 	 * @param rootContext must not be {@literal null}.
 	 * @return the {@link List} of {@link Document}.
 	 */
-	static List<Document> toDocument(List<AggregationOperation> operations, AggregationOperationContext rootContext) {
+	static List<Document> toDocument(List<? extends AggregationStage> operations, AggregationOperationContext rootContext) {
 
 		List<Document> operationDocuments = new ArrayList<Document>(operations.size());
 
 		AggregationOperationContext contextToUse = rootContext;
 
-		for (AggregationOperation operation : operations) {
+		for (AggregationStage operation : operations) {
 
-			operationDocuments.addAll(operation.toPipelineStages(contextToUse));
+			if(operation instanceof MultiOperationAggregationStage mops) {
+				operationDocuments.addAll(mops.toPipelineStages(contextToUse));
+			} else {
+				operationDocuments.add(operation.toDocument(contextToUse));
+			}
 
 			if (operation instanceof FieldsExposingAggregationOperation) {
 
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationPipeline.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationPipeline.java
index b3c1ab56d5..cb8880d0b9 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationPipeline.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationPipeline.java
@@ -23,9 +23,10 @@
 
 import org.bson.Document;
 import org.springframework.util.Assert;
+import org.springframework.util.ClassUtils;
 
 /**
- * The {@link AggregationPipeline} holds the collection of {@link AggregationOperation aggregation stages}.
+ * The {@link AggregationPipeline} holds the collection of {@link AggregationStage aggregation stages}.
  *
  * @author Christoph Strobl
  * @author Mark Paluch
@@ -33,12 +34,23 @@
  */
 public class AggregationPipeline {
 
-	private final List<AggregationOperation> pipeline;
+	private final List<AggregationStage> pipeline;
 
 	public static AggregationPipeline of(AggregationOperation... stages) {
 		return new AggregationPipeline(Arrays.asList(stages));
 	}
 
+	/**
+	 * Create a new {@link AggregationPipeline} out of the given {@link AggregationStage stages}.
+	 *
+	 * @param stages the pipeline stages.
+	 * @return new instance of {@link AggregationPipeline}.
+	 * @since 4.1
+	 */
+	public static AggregationPipeline of(AggregationStage... stages) {
+		return new AggregationPipeline(Arrays.asList(stages));
+	}
+
 	/**
 	 * Create an empty pipeline
 	 */
@@ -49,12 +61,12 @@ public AggregationPipeline() {
 	/**
 	 * Create a new pipeline with given {@link AggregationOperation stages}.
 	 *
-	 * @param aggregationOperations must not be {@literal null}.
+	 * @param aggregationStages must not be {@literal null}.
 	 */
-	public AggregationPipeline(List<AggregationOperation> aggregationOperations) {
+	public AggregationPipeline(List<? extends AggregationStage> aggregationStages) {
 
-		Assert.notNull(aggregationOperations, "AggregationOperations must not be null");
-		pipeline = new ArrayList<>(aggregationOperations);
+		Assert.notNull(aggregationStages, "AggregationStages must not be null");
+		pipeline = new ArrayList<>(aggregationStages);
 	}
 
 	/**
@@ -64,10 +76,21 @@ public AggregationPipeline(List<AggregationOperation> aggregationOperations) {
 	 * @return this.
 	 */
 	public AggregationPipeline add(AggregationOperation aggregationOperation) {
+		return add((AggregationStage) aggregationOperation);
+	}
+
+	/**
+	 * Append the given {@link AggregationOperation stage} to the pipeline.
+	 *
+	 * @param stage must not be {@literal null}.
+	 * @return this.
+	 * @since 4.1
+	 */
+	public AggregationPipeline add(AggregationStage stage) {
 
-		Assert.notNull(aggregationOperation, "AggregationOperation must not be null");
+		Assert.notNull(stage, "AggregationOperation must not be null");
 
-		pipeline.add(aggregationOperation);
+		pipeline.add(stage);
 		return this;
 	}
 
@@ -76,7 +99,17 @@ public AggregationPipeline add(AggregationOperation aggregationOperation) {
 	 *
 	 * @return never {@literal null}.
 	 */
-	public List<AggregationOperation> getOperations() {
+	public List<AggregationStage> getOperations() {
+		return getStages();
+	}
+
+	/**
+	 * Get the list of {@link AggregationOperation aggregation stages}.
+	 *
+	 * @return never {@literal null}.
+	 * @since 4.1
+	 */
+	public List<AggregationStage> getStages() {
 		return Collections.unmodifiableList(pipeline);
 	}
 
@@ -95,14 +128,14 @@ public boolean isOutOrMerge() {
 			return false;
 		}
 
-		AggregationOperation operation = pipeline.get(pipeline.size() - 1);
+		AggregationStage operation = pipeline.get(pipeline.size() - 1);
 		return isOut(operation) || isMerge(operation);
 	}
 
 	void verify() {
 
 		// check $out/$merge is the last operation if it exists
-		for (AggregationOperation operation : pipeline) {
+		for (AggregationStage operation : pipeline) {
 
 			if (isOut(operation) && !isLast(operation)) {
 				throw new IllegalArgumentException("The $out operator must be the last stage in the pipeline");
@@ -134,13 +167,13 @@ public boolean isEmpty() {
 		return pipeline.isEmpty();
 	}
 
-	private boolean containsOperation(Predicate<AggregationOperation> predicate) {
+	private boolean containsOperation(Predicate<AggregationStage> predicate) {
 
 		if (isEmpty()) {
 			return false;
 		}
 
-		for (AggregationOperation element : pipeline) {
+		for (AggregationStage element : pipeline) {
 			if (predicate.test(element)) {
 				return true;
 			}
@@ -149,19 +182,29 @@ private boolean containsOperation(Predicate<AggregationOperation> predicate) {
 		return false;
 	}
 
-	private boolean isLast(AggregationOperation aggregationOperation) {
+	private boolean isLast(AggregationStage aggregationOperation) {
 		return pipeline.indexOf(aggregationOperation) == pipeline.size() - 1;
 	}
 
-	private static boolean isUnionWith(AggregationOperation operator) {
-		return operator instanceof UnionWithOperation || operator.getOperator().equals("$unionWith");
+	private static boolean isUnionWith(AggregationStage stage) {
+		return isSpecificStage(stage, UnionWithOperation.class, "$unionWith");
+	}
+
+	private static boolean isMerge(AggregationStage stage) {
+		return isSpecificStage(stage, MergeOperation.class, "$merge");
 	}
 
-	private static boolean isMerge(AggregationOperation operator) {
-		return operator instanceof MergeOperation || operator.getOperator().equals("$merge");
+	private static boolean isOut(AggregationStage stage) {
+		return isSpecificStage(stage, OutOperation.class, "$out");
 	}
 
-	private static boolean isOut(AggregationOperation operator) {
-		return operator instanceof OutOperation || operator.getOperator().equals("$out");
+	private static boolean isSpecificStage(AggregationStage stage, Class<?> type, String operator) {
+		if (ClassUtils.isAssignable(type, stage.getClass())) {
+			return true;
+		}
+		if (stage instanceof AggregationOperation operation) {
+			return operation.getOperator().equals(operator);
+		}
+		return stage.toDocument(Aggregation.DEFAULT_CONTEXT).keySet().iterator().next().equals(operator);
 	}
 }
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationStage.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationStage.java
new file mode 100644
index 0000000000..bab565e1d3
--- /dev/null
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationStage.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2023 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.data.mongodb.core.aggregation;
+
+import org.bson.Document;
+
+/**
+ * Abstraction for a single
+ * <a href="https://www.mongodb.com/docs/manual/reference/operator/aggregation-pipeline/#stages">Aggregation Pipeline
+ * Stage</a> to be used within an {@link AggregationPipeline}.
+ * <p>
+ * An {@link AggregationStage} may operate upon domain specific types but will render to a ready to use store native
+ * representation within a given {@link AggregationOperationContext context}. The most straight forward way of writing a
+ * custom {@link AggregationStage} is just returning the raw document.
+ * 
+ * <pre class="code">
+ * AggregationStage stage = (ctx) -> Document.parse("{ $sort : { borough : 1 } }");
+ * </pre>
+ * 
+ * @author Christoph Strobl
+ * @since 4.1
+ */
+public interface AggregationStage {
+
+	/**
+	 * Turns the {@link AggregationStage} into a {@link Document} by using the given {@link AggregationOperationContext}.
+	 *
+	 * @param context the {@link AggregationOperationContext} to operate within. Must not be {@literal null}.
+	 * @return the ready to use {@link Document} representing the stage.
+	 */
+	Document toDocument(AggregationOperationContext context);
+}
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationUpdate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationUpdate.java
index 6d4105d62d..a88b4fb316 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationUpdate.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationUpdate.java
@@ -25,7 +25,6 @@
 import java.util.stream.Collectors;
 
 import org.bson.Document;
-
 import org.springframework.data.mongodb.core.query.Query;
 import org.springframework.data.mongodb.core.query.SerializationUtils;
 import org.springframework.data.mongodb.core.query.UpdateDefinition;
@@ -71,7 +70,8 @@
  *
  * @author Christoph Strobl
  * @author Mark Paluch
- * @see <a href="https://docs.mongodb.com/manual/reference/method/db.collection.update/#update-with-aggregation-pipeline">MongoDB
+ * @see <a href=
+ *      "https://docs.mongodb.com/manual/reference/method/db.collection.update/#update-with-aggregation-pipeline">MongoDB
  *      Reference Documentation</a>
  * @since 3.0
  */
@@ -92,11 +92,11 @@ protected AggregationUpdate() {
 	 *
 	 * @param pipeline must not be {@literal null}.
 	 */
-	protected AggregationUpdate(List<AggregationOperation> pipeline) {
+	protected AggregationUpdate(List<? extends AggregationStage> pipeline) {
 
 		super(pipeline);
 
-		for (AggregationOperation operation : pipeline) {
+		for (AggregationStage operation : pipeline) {
 			if (operation instanceof FieldsExposingAggregationOperation) {
 				((FieldsExposingAggregationOperation) operation).getFields().forEach(it -> {
 					keysTouched.add(it.getName());
@@ -123,6 +123,16 @@ public static AggregationUpdate from(List<AggregationOperation> pipeline) {
 		return new AggregationUpdate(pipeline);
 	}
 
+	/**
+	 * Create a new AggregationUpdate from the given {@link AggregationStage stages}.
+	 *
+	 * @return new instance of {@link AggregationUpdate}.
+	 * @since 4.1
+	 */
+	public static AggregationUpdate updateFrom(List<? extends AggregationStage> stages) {
+		return new AggregationUpdate(stages);
+	}
+
 	/**
 	 * Adds new fields to documents. {@code $set} outputs documents that contain all existing fields from the input
 	 * documents and newly added fields.
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/FacetOperation.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/FacetOperation.java
index c515f918c0..f0a8cc8d63 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/FacetOperation.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/FacetOperation.java
@@ -78,6 +78,23 @@ public FacetOperationBuilder and(AggregationOperation... operations) {
 		return new FacetOperationBuilder(facets, Arrays.asList(operations));
 	}
 
+	/**
+	 * Creates a new {@link FacetOperationBuilder} to append a new facet using {@literal operations}. <br />
+	 * {@link FacetOperationBuilder} takes a pipeline of {@link AggregationStage stages} to categorize documents into a
+	 * single facet.
+	 *
+	 * @param stages must not be {@literal null} or empty.
+	 * @return
+	 * @since 4.1
+	 */
+	public FacetOperationBuilder and(AggregationStage... stages) {
+
+		Assert.notNull(stages, "Stages must not be null");
+		Assert.notEmpty(stages, "Stages must not be empty");
+
+		return new FacetOperationBuilder(facets, Arrays.asList(stages));
+	}
+
 	@Override
 	public Document toDocument(AggregationOperationContext context) {
 		return new Document(getOperator(), facets.toDocument(context));
@@ -102,11 +119,11 @@ public ExposedFields getFields() {
 	public static class FacetOperationBuilder {
 
 		private final Facets current;
-		private final List<AggregationOperation> operations;
+		private final List<AggregationStage> operations;
 
-		private FacetOperationBuilder(Facets current, List<AggregationOperation> operations) {
+		private FacetOperationBuilder(Facets current, List<? extends AggregationStage> operations) {
 			this.current = current;
-			this.operations = operations;
+			this.operations = new ArrayList<>(operations);
 		}
 
 		/**
@@ -176,7 +193,7 @@ protected Document toDocument(AggregationOperationContext context) {
 		 * @param operations must not be {@literal null}.
 		 * @return the new {@link Facets}.
 		 */
-		Facets and(String fieldName, List<AggregationOperation> operations) {
+		Facets and(String fieldName, List<? extends AggregationStage> operations) {
 
 			Assert.hasText(fieldName, "FieldName must not be null or empty");
 			Assert.notNull(operations, "AggregationOperations must not be null");
@@ -197,21 +214,21 @@ Facets and(String fieldName, List<AggregationOperation> operations) {
 	private static class Facet {
 
 		private final ExposedField exposedField;
-		private final List<AggregationOperation> operations;
+		private final List<AggregationStage> stages;
 
 		/**
 		 * Creates a new {@link Facet} given {@link ExposedField} and {@link AggregationOperation} pipeline.
 		 *
 		 * @param exposedField must not be {@literal null}.
-		 * @param operations must not be {@literal null}.
+		 * @param stages must not be {@literal null}.
 		 */
-		Facet(ExposedField exposedField, List<AggregationOperation> operations) {
+		Facet(ExposedField exposedField, List<? extends AggregationStage> stages) {
 
 			Assert.notNull(exposedField, "ExposedField must not be null");
-			Assert.notNull(operations, "AggregationOperations must not be null");
+			Assert.notNull(stages, "AggregationOperations must not be null");
 
 			this.exposedField = exposedField;
-			this.operations = operations;
+			this.stages = new ArrayList<>(stages);
 		}
 
 		ExposedField getExposedField() {
@@ -219,7 +236,7 @@ ExposedField getExposedField() {
 		}
 
 		protected List<Document> toDocuments(AggregationOperationContext context) {
-			return AggregationOperationRenderer.toDocument(operations, context);
+			return AggregationOperationRenderer.toDocument(stages, context);
 		}
 	}
 }
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/LookupOperation.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/LookupOperation.java
index 44d0f1569e..2bd20f509c 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/LookupOperation.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/LookupOperation.java
@@ -230,7 +230,7 @@ public interface PipelineBuilder extends LetBuilder {
 		AsBuilder pipeline(AggregationPipeline pipeline);
 
 		/**
-		 * Specifies the {@link AggregationPipeline#getOperations() stages} that determine the resulting documents.
+		 * Specifies the {@link AggregationPipeline#getStages() stages} that determine the resulting documents.
 		 *
 		 * @param stages must not be {@literal null} can be empty.
 		 * @return never {@literal null}.
@@ -239,6 +239,17 @@ default AsBuilder pipeline(AggregationOperation... stages) {
 			return pipeline(AggregationPipeline.of(stages));
 		}
 
+		/**
+		 * Specifies the {@link AggregationPipeline#getStages() stages} that determine the resulting documents.
+		 *
+		 * @param stages must not be {@literal null} can be empty.
+		 * @return never {@literal null}.
+		 * @since 4.1
+		 */
+		default AsBuilder pipeline(AggregationStage... stages) {
+			return pipeline(AggregationPipeline.of(stages));
+		}
+
 		/**
 		 * @param name the name of the new array field to add to the input documents, must not be {@literal null} or empty.
 		 * @return new instance of {@link LookupOperation}.
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/MultiOperationAggregationStage.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/MultiOperationAggregationStage.java
new file mode 100644
index 0000000000..e594b2b3b0
--- /dev/null
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/MultiOperationAggregationStage.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2023 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.data.mongodb.core.aggregation;
+
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+
+import org.bson.Document;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+/**
+ * An {@link AggregationStage} that may consist of a main operation and potential follow up stages for eg. {@code $sort}
+ * or {@code $limit}.
+ * <p>
+ * The {@link MultiOperationAggregationStage} may operate upon domain specific types but will render to the store native
+ * representation within a given {@link AggregationOperationContext context}.
+ * <p>
+ * {@link #toDocument(AggregationOperationContext)} will render a synthetic {@link Document} that contains the ordered
+ * stages. The list returned from {@link #toPipelineStages(AggregationOperationContext)}
+ * 
+ * <pre class="code">
+ * [
+ *   { $match: { $text: { $search: "operating" } } },
+ *   { $sort: { score: { $meta: "textScore" }, posts: -1 } }
+ * ]
+ * </pre>
+ * 
+ * will be represented as
+ * 
+ * <pre class="code">
+ * {
+ *   $match: { $text: { $search: "operating" } },
+ *   $sort: { score: { $meta: "textScore" }, posts: -1 }
+ * }
+ * </pre>
+ * 
+ * In case stages appear multiple times the order no longer can be guaranteed when calling
+ * {@link #toDocument(AggregationOperationContext)}, so consumers of the API should rely on
+ * {@link #toPipelineStages(AggregationOperationContext)}. Nevertheless, by default the values will be collected into a
+ * list rendering to
+ * 
+ * <pre class="code">
+ * {
+ *   $match: [{ $text: { $search: "operating" } }, { $text: ... }],
+ *   $sort: { score: { $meta: "textScore" }, posts: -1 }
+ * }
+ * </pre>
+ * 
+ * @author Christoph Strobl
+ * @since 4.1
+ */
+public interface MultiOperationAggregationStage extends AggregationStage {
+
+	/**
+	 * Returns a synthetic {@link Document stage} that contains the {@link #toPipelineStages(AggregationOperationContext)
+	 * actual stages} by folding them into a single {@link Document}. In case of colliding entries, those used multiple
+	 * times thus having the same key, the entries will be held as a list for the given operator.
+	 *
+	 * @param context the {@link AggregationOperationContext} to operate within. Must not be {@literal null}.
+	 * @return never {@literal null}.
+	 */
+	@Override
+	default Document toDocument(AggregationOperationContext context) {
+
+		List<Document> documents = toPipelineStages(context);
+		if (documents.size() == 1) {
+			return documents.get(0);
+		}
+
+		MultiValueMap<String, Document> stages = new LinkedMultiValueMap<>(documents.size());
+		for (Document current : documents) {
+			String key = current.keySet().iterator().next();
+			stages.add(key, current.get(key, Document.class));
+		}
+		return new Document(stages.entrySet().stream()
+				.collect(Collectors.toMap(Entry::getKey, v -> v.getValue().size() == 1 ? v.getValue().get(0) : v.getValue())));
+	}
+
+	/**
+	 * Turns the {@link MultiOperationAggregationStage} into list of {@link Document stages} by using the given
+	 * {@link AggregationOperationContext}. This allows an {@link AggregationStage} to add follow up stages for eg.
+	 * {@code $sort} or {@code $limit}.
+	 *
+	 * @param context the {@link AggregationOperationContext} to operate within. Must not be {@literal null}.
+	 * @return the pipeline stages to run through. Never {@literal null}.
+	 */
+	List<Document> toPipelineStages(AggregationOperationContext context);
+}
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/TypedAggregation.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/TypedAggregation.java
index a600d3f69f..f73e1119da 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/TypedAggregation.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/TypedAggregation.java
@@ -15,6 +15,7 @@
  */
 package org.springframework.data.mongodb.core.aggregation;
 
+import java.util.Arrays;
 import java.util.List;
 
 import org.springframework.util.Assert;
@@ -24,6 +25,7 @@
  *
  * @author Thomas Darimont
  * @author Oliver Gierke
+ * @author Christoph Strobl
  */
 public class TypedAggregation<I> extends Aggregation {
 
@@ -39,13 +41,24 @@ public TypedAggregation(Class<I> inputType, AggregationOperation... operations)
 		this(inputType, asAggregationList(operations));
 	}
 
+	/**
+	 * Creates a new {@link TypedAggregation} from the given {@link AggregationStage stages}.
+	 *
+	 * @param inputType must not be {@literal null}.
+	 * @param stages must not be {@literal null} or empty.
+	 * @since 4.1
+	 */
+	public TypedAggregation(Class<I> inputType, AggregationStage... stages) {
+		this(inputType, Arrays.asList(stages));
+	}
+
 	/**
 	 * Creates a new {@link TypedAggregation} from the given {@link AggregationOperation}s.
 	 *
 	 * @param inputType must not be {@literal null}.
 	 * @param operations must not be {@literal null} or empty.
 	 */
-	public TypedAggregation(Class<I> inputType, List<AggregationOperation> operations) {
+	public TypedAggregation(Class<I> inputType, List<? extends AggregationStage> operations) {
 		this(inputType, operations, DEFAULT_OPTIONS);
 	}
 
@@ -57,7 +70,7 @@ public TypedAggregation(Class<I> inputType, List<AggregationOperation> operation
 	 * @param operations must not be {@literal null} or empty.
 	 * @param options must not be {@literal null}.
 	 */
-	public TypedAggregation(Class<I> inputType, List<AggregationOperation> operations, AggregationOptions options) {
+	public TypedAggregation(Class<I> inputType, List<? extends AggregationStage> operations, AggregationOptions options) {
 
 		super(operations, options);
 
@@ -77,6 +90,6 @@ public Class<I> getInputType() {
 	public TypedAggregation<I> withOptions(AggregationOptions options) {
 
 		Assert.notNull(options, "AggregationOptions must not be null");
-		return new TypedAggregation<I>(inputType, pipeline.getOperations(), options);
+		return new TypedAggregation<>(inputType, pipeline.getStages(), options);
 	}
 }
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AggregationUtils.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AggregationUtils.java
index 0ee488c336..2d24bd597b 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AggregationUtils.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AggregationUtils.java
@@ -27,6 +27,7 @@
 import org.springframework.data.mongodb.core.aggregation.Aggregation;
 import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
 import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
+import org.springframework.data.mongodb.core.aggregation.AggregationStage;
 import org.springframework.data.mongodb.core.convert.MongoConverter;
 import org.springframework.data.mongodb.core.query.Collation;
 import org.springframework.data.mongodb.core.query.Meta;
@@ -109,14 +110,14 @@ static AggregationOptions.Builder applyMeta(AggregationOptions.Builder builder,
 	 * @param accessor
 	 * @param targetType
 	 */
-	static void appendSortIfPresent(List<AggregationOperation> aggregationPipeline, ConvertingParameterAccessor accessor,
+	static void appendSortIfPresent(List<? extends AggregationStage> aggregationPipeline, ConvertingParameterAccessor accessor,
 			Class<?> targetType) {
 
 		if (accessor.getSort().isUnsorted()) {
 			return;
 		}
 
-		aggregationPipeline.add(ctx -> {
+		((List<AggregationStage>) aggregationPipeline).add(ctx -> {
 
 			Document sort = new Document();
 			for (Order order : accessor.getSort()) {
@@ -134,7 +135,7 @@ static void appendSortIfPresent(List<AggregationOperation> aggregationPipeline,
 	 * @param aggregationPipeline
 	 * @param accessor
 	 */
-	static void appendLimitAndOffsetIfPresent(List<AggregationOperation> aggregationPipeline,
+	static void appendLimitAndOffsetIfPresent(List<? extends AggregationStage> aggregationPipeline,
 			ConvertingParameterAccessor accessor) {
 		appendLimitAndOffsetIfPresent(aggregationPipeline, accessor, LongUnaryOperator.identity(),
 				IntUnaryOperator.identity());
@@ -150,7 +151,7 @@ static void appendLimitAndOffsetIfPresent(List<AggregationOperation> aggregation
 	 * @param limitOperator
 	 * @since 3.3
 	 */
-	static void appendLimitAndOffsetIfPresent(List<AggregationOperation> aggregationPipeline,
+	static void appendLimitAndOffsetIfPresent(List<? extends AggregationStage> aggregationPipeline,
 			ConvertingParameterAccessor accessor, LongUnaryOperator offsetOperator, IntUnaryOperator limitOperator) {
 
 		Pageable pageable = accessor.getPageable();
@@ -159,10 +160,10 @@ static void appendLimitAndOffsetIfPresent(List<AggregationOperation> aggregation
 		}
 
 		if (pageable.getOffset() > 0) {
-			aggregationPipeline.add(Aggregation.skip(offsetOperator.applyAsLong(pageable.getOffset())));
+			((List<AggregationStage>) aggregationPipeline).add(Aggregation.skip(offsetOperator.applyAsLong(pageable.getOffset())));
 		}
 
-		aggregationPipeline.add(Aggregation.limit(limitOperator.applyAsInt(pageable.getPageSize())));
+		((List<AggregationStage>) aggregationPipeline).add(Aggregation.limit(limitOperator.applyAsInt(pageable.getPageSize())));
 	}
 
 	/**
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/aggregation/AggregationOperationUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/aggregation/AggregationOperationUnitTests.java
new file mode 100644
index 0000000000..27246fbbcc
--- /dev/null
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/aggregation/AggregationOperationUnitTests.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2023 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.data.mongodb.core.aggregation;
+
+import static org.assertj.core.api.Assertions.*;
+
+import java.util.List;
+
+import org.bson.Document;
+import org.junit.jupiter.api.Test;
+
+/**
+ * @author Christoph Strobl
+ */
+class AggregationOperationUnitTests {
+
+	@Test // GH-4306
+	void getOperatorShouldFavorToPipelineStages() {
+
+		AggregationOperation op = new AggregationOperation() {
+
+			@Override
+			public Document toDocument(AggregationOperationContext context) {
+				return new Document();
+			}
+
+			@Override
+			public List<Document> toPipelineStages(AggregationOperationContext context) {
+				return List.of(new Document("$spring", "data"));
+			}
+		};
+
+		assertThat(op.getOperator()).isEqualTo("$spring");
+	}
+
+}
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/aggregation/AggregationPipelineUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/aggregation/AggregationPipelineUnitTests.java
new file mode 100644
index 0000000000..df8775f6b2
--- /dev/null
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/aggregation/AggregationPipelineUnitTests.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2023 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.data.mongodb.core.aggregation;
+
+import java.util.List;
+
+import org.bson.Document;
+import org.junit.jupiter.api.Test;
+
+/**
+ * @author Christoph Strobl
+ */
+class AggregationPipelineUnitTests {
+
+	@Test // Gh-4306
+	void verifyMustNotFailIfOnlyPipelineStagesUsed() {
+		AggregationPipeline.of(new OverridesPipelineStagesAndThrowsErrorOnToDocument()).verify();
+	}
+
+	static class OverridesPipelineStagesAndThrowsErrorOnToDocument implements AggregationOperation {
+
+		@Override
+		public Document toDocument(AggregationOperationContext context) {
+			throw new IllegalStateException("oh no");
+		}
+
+		@Override
+		public List<Document> toPipelineStages(AggregationOperationContext context) {
+			return List.of(Aggregation.project("data").toDocument(context));
+		}
+	}
+
+}
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/aggregation/MultiOperationAggregationStageUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/aggregation/MultiOperationAggregationStageUnitTests.java
new file mode 100644
index 0000000000..dee36bc146
--- /dev/null
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/aggregation/MultiOperationAggregationStageUnitTests.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2023 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.data.mongodb.core.aggregation;
+
+import static org.springframework.data.mongodb.test.util.Assertions.*;
+
+import java.util.List;
+
+import org.bson.Document;
+import org.junit.jupiter.api.Test;
+
+/**
+ * @author Christoph Strobl
+ */
+class MultiOperationAggregationStageUnitTests {
+
+	@Test // GH-4306
+	void toDocumentRendersSingleOperation() {
+
+		MultiOperationAggregationStage stage = (ctx) -> List.of(Document.parse("{ $text: { $search: 'operating' } }"));
+
+		assertThat(stage.toDocument(Aggregation.DEFAULT_CONTEXT)).isEqualTo("{ $text: { $search: 'operating' } }");
+	}
+
+	@Test // GH-4306
+	void toDocumentRendersMultiOperation() {
+
+		MultiOperationAggregationStage stage = (ctx) -> List.of(Document.parse("{ $text: { $search: 'operating' } }"),
+				Document.parse("{ $sort: { score: { $meta: 'textScore' } } }"));
+
+		assertThat(stage.toDocument(Aggregation.DEFAULT_CONTEXT)).isEqualTo("""
+				{
+					$text: { $search: 'operating' },
+					$sort: { score: { $meta: 'textScore' } }
+				}
+				""");
+	}
+
+	@Test // GH-4306
+	void toDocumentCollectsDuplicateOperation() {
+
+		MultiOperationAggregationStage stage = (ctx) -> List.of(Document.parse("{ $text: { $search: 'operating' } }"),
+				Document.parse("{ $sort: { score: { $meta: 'textScore' } } }"), Document.parse("{ $sort: { posts: -1 } }"));
+
+		assertThat(stage.toDocument(Aggregation.DEFAULT_CONTEXT)).isEqualTo("""
+				{
+					$text: { $search: 'operating' },
+					$sort: [
+						{ score: { $meta: 'textScore' } },
+						{ posts: -1 }
+					]
+				}
+				""");
+	}
+}