From c02968d00a93b943a346e81bbb1759545dba9b42 Mon Sep 17 00:00:00 2001
From: Christoph Strobl <cstrobl@vmware.com>
Date: Mon, 27 Feb 2023 09:41:43 +0100
Subject: [PATCH 1/3] Prepare issue branch.

---
 pom.xml                                  | 2 +-
 spring-data-mongodb-benchmarks/pom.xml   | 2 +-
 spring-data-mongodb-distribution/pom.xml | 2 +-
 spring-data-mongodb/pom.xml              | 2 +-
 4 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/pom.xml b/pom.xml
index d864b2e4e6..6b73e01d6e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
 
 	<groupId>org.springframework.data</groupId>
 	<artifactId>spring-data-mongodb-parent</artifactId>
-	<version>4.1.0-SNAPSHOT</version>
+	<version>4.1.x-GH-4306-SNAPSHOT</version>
 	<packaging>pom</packaging>
 
 	<name>Spring Data MongoDB</name>
diff --git a/spring-data-mongodb-benchmarks/pom.xml b/spring-data-mongodb-benchmarks/pom.xml
index 1b2a1390e6..ad9b59e791 100644
--- a/spring-data-mongodb-benchmarks/pom.xml
+++ b/spring-data-mongodb-benchmarks/pom.xml
@@ -7,7 +7,7 @@
 	<parent>
 		<groupId>org.springframework.data</groupId>
 		<artifactId>spring-data-mongodb-parent</artifactId>
-		<version>4.1.0-SNAPSHOT</version>
+		<version>4.1.x-GH-4306-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
 
diff --git a/spring-data-mongodb-distribution/pom.xml b/spring-data-mongodb-distribution/pom.xml
index 8db8d798fb..762d6ad834 100644
--- a/spring-data-mongodb-distribution/pom.xml
+++ b/spring-data-mongodb-distribution/pom.xml
@@ -15,7 +15,7 @@
 	<parent>
 		<groupId>org.springframework.data</groupId>
 		<artifactId>spring-data-mongodb-parent</artifactId>
-		<version>4.1.0-SNAPSHOT</version>
+		<version>4.1.x-GH-4306-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
 
diff --git a/spring-data-mongodb/pom.xml b/spring-data-mongodb/pom.xml
index 9a57f7eb52..99c5f3b8c8 100644
--- a/spring-data-mongodb/pom.xml
+++ b/spring-data-mongodb/pom.xml
@@ -13,7 +13,7 @@
 	<parent>
 		<groupId>org.springframework.data</groupId>
 		<artifactId>spring-data-mongodb-parent</artifactId>
-		<version>4.1.0-SNAPSHOT</version>
+		<version>4.1.x-GH-4306-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
 

From 5305c9542bb8cb862f9a0c5e4971bc7c0b1d9dd7 Mon Sep 17 00:00:00 2001
From: Christoph Strobl <cstrobl@vmware.com>
Date: Mon, 27 Feb 2023 10:27:47 +0100
Subject: [PATCH 2/3] Favor pipelineStages over toDocument in
 AggregationOperation.

This commit makes sure to use available pipeline stages to figure out the operation.
---
 .../aggregation/AggregationOperation.java     |  3 +-
 .../AggregationOperationUnitTests.java        | 49 +++++++++++++++++++
 .../AggregationPipelineUnitTests.java         | 46 +++++++++++++++++
 3 files changed, 97 insertions(+), 1 deletion(-)
 create mode 100644 spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/aggregation/AggregationOperationUnitTests.java
 create mode 100644 spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/aggregation/AggregationPipelineUnitTests.java

diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationOperation.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationOperation.java
index d9690f2a11..78d11b82f3 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationOperation.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationOperation.java
@@ -19,6 +19,7 @@
 import java.util.List;
 
 import org.bson.Document;
+import org.springframework.util.CollectionUtils;
 
 /**
  * Represents one single operation in an aggregation pipeline.
@@ -63,6 +64,6 @@ default List<Document> toPipelineStages(AggregationOperationContext context) {
 	 * @since 3.0.2
 	 */
 	default String getOperator() {
-		return toDocument(Aggregation.DEFAULT_CONTEXT).keySet().iterator().next();
+		return CollectionUtils.lastElement(toPipelineStages(Aggregation.DEFAULT_CONTEXT)).keySet().iterator().next();
 	}
 }
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/aggregation/AggregationOperationUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/aggregation/AggregationOperationUnitTests.java
new file mode 100644
index 0000000000..27246fbbcc
--- /dev/null
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/aggregation/AggregationOperationUnitTests.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2023 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.data.mongodb.core.aggregation;
+
+import static org.assertj.core.api.Assertions.*;
+
+import java.util.List;
+
+import org.bson.Document;
+import org.junit.jupiter.api.Test;
+
+/**
+ * @author Christoph Strobl
+ */
+class AggregationOperationUnitTests {
+
+	@Test // GH-4306
+	void getOperatorShouldFavorToPipelineStages() {
+
+		AggregationOperation op = new AggregationOperation() {
+
+			@Override
+			public Document toDocument(AggregationOperationContext context) {
+				return new Document();
+			}
+
+			@Override
+			public List<Document> toPipelineStages(AggregationOperationContext context) {
+				return List.of(new Document("$spring", "data"));
+			}
+		};
+
+		assertThat(op.getOperator()).isEqualTo("$spring");
+	}
+
+}
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/aggregation/AggregationPipelineUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/aggregation/AggregationPipelineUnitTests.java
new file mode 100644
index 0000000000..df8775f6b2
--- /dev/null
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/aggregation/AggregationPipelineUnitTests.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2023 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.data.mongodb.core.aggregation;
+
+import java.util.List;
+
+import org.bson.Document;
+import org.junit.jupiter.api.Test;
+
+/**
+ * @author Christoph Strobl
+ */
+class AggregationPipelineUnitTests {
+
+	@Test // Gh-4306
+	void verifyMustNotFailIfOnlyPipelineStagesUsed() {
+		AggregationPipeline.of(new OverridesPipelineStagesAndThrowsErrorOnToDocument()).verify();
+	}
+
+	static class OverridesPipelineStagesAndThrowsErrorOnToDocument implements AggregationOperation {
+
+		@Override
+		public Document toDocument(AggregationOperationContext context) {
+			throw new IllegalStateException("oh no");
+		}
+
+		@Override
+		public List<Document> toPipelineStages(AggregationOperationContext context) {
+			return List.of(Aggregation.project("data").toDocument(context));
+		}
+	}
+
+}

From 38e406867a36f3102959c2a27f5036643ef45002 Mon Sep 17 00:00:00 2001
From: Christoph Strobl <cstrobl@vmware.com>
Date: Tue, 28 Feb 2023 13:25:47 +0100
Subject: [PATCH 3/3] Introduce AggregationStage API

With the introduction of AggregationStage we move the API closer to the MongoDB terminology removing kognitive overhead.
Also the change allows us to switch back and forth with the default implementations of toDocument and toDocuments which let's us remove the deprecation warnings having dedicated interfaces that indicate what to implement in order to comply with the usage pattern.
---
 .../data/mongodb/core/MongoOperations.java    |  14 +++
 .../data/mongodb/core/MongoTemplate.java      |   4 +-
 .../mongodb/core/ReactiveMongoOperations.java |  15 ++-
 .../mongodb/core/ReactiveMongoTemplate.java   |   4 +-
 .../mongodb/core/aggregation/Aggregation.java |  80 +++++++++++---
 .../aggregation/AggregationOperation.java     |  21 ++--
 .../AggregationOperationRenderer.java         |  10 +-
 .../core/aggregation/AggregationPipeline.java |  83 ++++++++++----
 .../core/aggregation/AggregationStage.java    |  45 ++++++++
 .../core/aggregation/AggregationUpdate.java   |  18 ++-
 .../core/aggregation/FacetOperation.java      |  37 +++++--
 .../core/aggregation/LookupOperation.java     |  13 ++-
 .../MultiOperationAggregationStage.java       | 103 ++++++++++++++++++
 .../core/aggregation/TypedAggregation.java    |  19 +++-
 .../repository/query/AggregationUtils.java    |  13 ++-
 ...ltiOperationAggregationStageUnitTests.java |  68 ++++++++++++
 16 files changed, 467 insertions(+), 80 deletions(-)
 create mode 100644 spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationStage.java
 create mode 100644 spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/MultiOperationAggregationStage.java
 create mode 100644 spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/aggregation/MultiOperationAggregationStageUnitTests.java

diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java
index 4727c0b8db..5d070edd48 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java
@@ -30,6 +30,7 @@
 import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
 import org.springframework.data.mongodb.core.aggregation.AggregationPipeline;
 import org.springframework.data.mongodb.core.aggregation.AggregationResults;
+import org.springframework.data.mongodb.core.aggregation.AggregationStage;
 import org.springframework.data.mongodb.core.aggregation.AggregationUpdate;
 import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
 import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
@@ -296,6 +297,19 @@ default MongoCollection<Document> createView(String name, Class<?> source, Aggre
 		return createView(name, source, AggregationPipeline.of(stages));
 	}
 
+	/**
+	 * Create a view with the provided name. The view content is defined by the {@link AggregationStage pipeline stages}
+	 * on another collection or view identified by the given {@link #getCollectionName(Class) source type}.
+	 *
+	 * @param name the name of the view to create.
+	 * @param source the type defining the views source collection.
+	 * @param stages the {@link AggregationOperation aggregation pipeline stages} defining the view content.
+	 * @since 4.1
+	 */
+	default MongoCollection<Document> createView(String name, Class<?> source, AggregationStage... stages) {
+		return createView(name, source, AggregationPipeline.of(stages));
+	}
+
 	/**
 	 * Create a view with the provided name. The view content is defined by the {@link AggregationPipeline pipeline} on
 	 * another collection or view identified by the given {@link #getCollectionName(Class) source type}.
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java
index b88b6d6b53..865de80b2b 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java
@@ -648,7 +648,7 @@ public MongoCollection<Document> createView(String name, Class<?> source, Aggreg
 			@Nullable ViewOptions options) {
 
 		return createView(name, getCollectionName(source),
-				queryOperations.createAggregation(Aggregation.newAggregation(source, pipeline.getOperations()), source),
+				queryOperations.createAggregation(Aggregation.newAggregation(source, pipeline.getStages()), source),
 				options);
 	}
 
@@ -657,7 +657,7 @@ public MongoCollection<Document> createView(String name, String source, Aggregat
 			@Nullable ViewOptions options) {
 
 		return createView(name, source,
-				queryOperations.createAggregation(Aggregation.newAggregation(pipeline.getOperations()), (Class<?>) null),
+				queryOperations.createAggregation(Aggregation.newAggregation(pipeline.getStages()), (Class<?>) null),
 				options);
 	}
 
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java
index 323ca9dd95..9b9f2b0116 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java
@@ -25,13 +25,13 @@
 import org.bson.Document;
 import org.reactivestreams.Publisher;
 import org.reactivestreams.Subscription;
-
 import org.springframework.data.geo.GeoResult;
 import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory;
 import org.springframework.data.mongodb.core.aggregation.Aggregation;
 import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
 import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
 import org.springframework.data.mongodb.core.aggregation.AggregationPipeline;
+import org.springframework.data.mongodb.core.aggregation.AggregationStage;
 import org.springframework.data.mongodb.core.aggregation.AggregationUpdate;
 import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
 import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
@@ -256,6 +256,19 @@ default Mono<MongoCollection<Document>> createView(String name, Class<?> source,
 		return createView(name, source, AggregationPipeline.of(stages));
 	}
 
+	/**
+	 * Create a view with the provided name. The view content is defined by the {@link AggregationStage pipeline stages}
+	 * on another collection or view identified by the given {@link #getCollectionName(Class) source type}.
+	 *
+	 * @param name the name of the view to create.
+	 * @param source the type defining the views source collection.
+	 * @param stages the {@link AggregationOperation aggregation pipeline stages} defining the view content.
+	 * @since 4.1
+	 */
+	default Mono<MongoCollection<Document>> createView(String name, Class<?> source, AggregationStage... stages) {
+		return createView(name, source, AggregationPipeline.of(stages));
+	}
+
 	/**
 	 * Create a view with the provided name. The view content is defined by the {@link AggregationPipeline pipeline} on
 	 * another collection or view identified by the given {@link #getCollectionName(Class) source type}.
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java
index d49dffafe8..f782bb022d 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java
@@ -676,7 +676,7 @@ public Mono<MongoCollection<Document>> createView(String name, Class<?> source,
 			@Nullable ViewOptions options) {
 
 		return createView(name, getCollectionName(source),
-				queryOperations.createAggregation(Aggregation.newAggregation(source, pipeline.getOperations()), source),
+				queryOperations.createAggregation(Aggregation.newAggregation(source, pipeline.getStages()), source),
 				options);
 	}
 
@@ -685,7 +685,7 @@ public Mono<MongoCollection<Document>> createView(String name, String source, Ag
 			@Nullable ViewOptions options) {
 
 		return createView(name, source,
-				queryOperations.createAggregation(Aggregation.newAggregation(pipeline.getOperations()), (Class<?>) null),
+				queryOperations.createAggregation(Aggregation.newAggregation(pipeline.getStages()), (Class<?>) null),
 				options);
 	}
 
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/Aggregation.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/Aggregation.java
index cb9e70dd17..112ebe1747 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/Aggregation.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/Aggregation.java
@@ -102,12 +102,12 @@ public class Aggregation {
 	private final AggregationOptions options;
 
 	/**
-	 * Creates a new {@link Aggregation} from the given {@link AggregationOperation}s.
+	 * Creates a new {@link Aggregation} from the given {@link AggregationStage}s.
 	 *
 	 * @param operations must not be {@literal null} or empty.
 	 */
-	public static Aggregation newAggregation(List<? extends AggregationOperation> operations) {
-		return newAggregation(operations.toArray(new AggregationOperation[operations.size()]));
+	public static Aggregation newAggregation(List<? extends AggregationStage> operations) {
+		return newAggregation(operations.toArray(AggregationStage[]::new));
 	}
 
 	/**
@@ -119,6 +119,16 @@ public static Aggregation newAggregation(AggregationOperation... operations) {
 		return new Aggregation(operations);
 	}
 
+	/**
+	 * Creates a new {@link Aggregation} from the given {@link AggregationOperation}s.
+	 *
+	 * @param stages must not be {@literal null} or empty.
+	 * @since 4.1
+	 */
+	public static Aggregation newAggregation(AggregationStage... stages) {
+		return new Aggregation(stages);
+	}
+
 	/**
 	 * Creates a new {@link AggregationUpdate} from the given {@link AggregationOperation}s.
 	 *
@@ -130,6 +140,17 @@ public static AggregationUpdate newUpdate(AggregationOperation... operations) {
 		return AggregationUpdate.from(Arrays.asList(operations));
 	}
 
+	/**
+	 * Creates a new {@link AggregationUpdate} from the given {@link AggregationOperation}s.
+	 *
+	 * @param operations can be {@literal empty} but must not be {@literal null}.
+	 * @return new instance of {@link AggregationUpdate}.
+	 * @since 4.1
+	 */
+	public static AggregationUpdate newUpdate(AggregationStage... operations) {
+		return AggregationUpdate.updateFrom(Arrays.asList(operations));
+	}
+
 	/**
 	 * Returns a copy of this {@link Aggregation} with the given {@link AggregationOptions} set. Note that options are
 	 * supported in MongoDB version 2.6+.
@@ -141,7 +162,7 @@ public static AggregationUpdate newUpdate(AggregationOperation... operations) {
 	public Aggregation withOptions(AggregationOptions options) {
 
 		Assert.notNull(options, "AggregationOptions must not be null");
-		return new Aggregation(this.pipeline.getOperations(), options);
+		return new Aggregation(this.pipeline.getStages(), options);
 	}
 
 	/**
@@ -150,8 +171,8 @@ public Aggregation withOptions(AggregationOptions options) {
 	 * @param type must not be {@literal null}.
 	 * @param operations must not be {@literal null} or empty.
 	 */
-	public static <T> TypedAggregation<T> newAggregation(Class<T> type, List<? extends AggregationOperation> operations) {
-		return newAggregation(type, operations.toArray(new AggregationOperation[operations.size()]));
+	public static <T> TypedAggregation<T> newAggregation(Class<T> type, List<? extends AggregationStage> operations) {
+		return newAggregation(type, operations.toArray(AggregationStage[]::new));
 	}
 
 	/**
@@ -164,6 +185,17 @@ public static <T> TypedAggregation<T> newAggregation(Class<T> type, AggregationO
 		return new TypedAggregation<T>(type, operations);
 	}
 
+	/**
+	 * Creates a new {@link TypedAggregation} for the given type and {@link AggregationOperation}s.
+	 *
+	 * @param type must not be {@literal null}.
+	 * @param stages must not be {@literal null} or empty.
+	 * @since 4.1
+	 */
+	public static <T> TypedAggregation<T> newAggregation(Class<T> type, AggregationStage... stages) {
+		return new TypedAggregation<>(type, stages);
+	}
+
 	/**
 	 * Creates a new {@link Aggregation} from the given {@link AggregationOperation}s.
 	 *
@@ -173,6 +205,15 @@ protected Aggregation(AggregationOperation... aggregationOperations) {
 		this(asAggregationList(aggregationOperations));
 	}
 
+	/**
+	 * Creates a new {@link Aggregation} from the given {@link AggregationOperation}s.
+	 *
+	 * @param aggregationOperations must not be {@literal null} or empty.
+	 */
+	protected Aggregation(AggregationStage... aggregationOperations) {
+		this(Arrays.asList(aggregationOperations));
+	}
+
 	/**
 	 * @param aggregationOperations must not be {@literal null} or empty.
 	 * @return
@@ -189,7 +230,7 @@ protected static List<AggregationOperation> asAggregationList(AggregationOperati
 	 *
 	 * @param aggregationOperations must not be {@literal null} or empty.
 	 */
-	protected Aggregation(List<AggregationOperation> aggregationOperations) {
+	protected Aggregation(List<? extends AggregationStage> aggregationOperations) {
 		this(aggregationOperations, DEFAULT_OPTIONS);
 	}
 
@@ -199,7 +240,7 @@ protected Aggregation(List<AggregationOperation> aggregationOperations) {
 	 * @param aggregationOperations must not be {@literal null}.
 	 * @param options must not be {@literal null} or empty.
 	 */
-	protected Aggregation(List<AggregationOperation> aggregationOperations, AggregationOptions options) {
+	protected Aggregation(List<? extends AggregationStage> aggregationOperations, AggregationOptions options) {
 
 		Assert.notNull(aggregationOperations, "AggregationOperations must not be null");
 		Assert.notNull(options, "AggregationOptions must not be null");
@@ -638,6 +679,17 @@ public static FacetOperationBuilder facet(AggregationOperation... aggregationOpe
 		return facet().and(aggregationOperations);
 	}
 
+	/**
+	 * Creates a new {@link FacetOperationBuilder} given {@link Aggregation}.
+	 *
+	 * @param stages the sub-pipeline, must not be {@literal null}.
+	 * @return new instance of {@link FacetOperation}.
+	 * @since 4.1
+	 */
+	public static FacetOperationBuilder facet(AggregationStage... stages) {
+		return facet().and(stages);
+	}
+
 	/**
 	 * Creates a new {@link LookupOperation}.
 	 *
@@ -668,14 +720,14 @@ public static LookupOperation lookup(Field from, Field localField, Field foreign
 
 	/**
 	 * Entrypoint for creating {@link LookupOperation $lookup} using a fluent builder API.
+	 * 
 	 * <pre class="code">
-	 * Aggregation.lookup().from("restaurants")
-	 * 	.localField("restaurant_name")
-	 * 	.foreignField("name")
-	 * 	.let(newVariable("orders_drink").forField("drink"))
-	 * 	.pipeline(match(ctx -> new Document("$expr", new Document("$in", List.of("$$orders_drink", "$beverages")))))
-	 * 	.as("matches")
+	 * Aggregation.lookup().from("restaurants").localField("restaurant_name").foreignField("name")
+	 * 		.let(newVariable("orders_drink").forField("drink"))
+	 * 		.pipeline(match(ctx -> new Document("$expr", new Document("$in", List.of("$$orders_drink", "$beverages")))))
+	 * 		.as("matches")
 	 * </pre>
+	 * 
 	 * @return new instance of {@link LookupOperationBuilder}.
 	 * @since 4.1
 	 */
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationOperation.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationOperation.java
index 78d11b82f3..99758871c8 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationOperation.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationOperation.java
@@ -15,7 +15,6 @@
  */
 package org.springframework.data.mongodb.core.aggregation;
 
-import java.util.Collections;
 import java.util.List;
 
 import org.bson.Document;
@@ -30,30 +29,24 @@
  * @author Christoph Strobl
  * @since 1.3
  */
-public interface AggregationOperation {
+public interface AggregationOperation extends MultiOperationAggregationStage {
 
 	/**
-	 * Turns the {@link AggregationOperation} into a {@link Document} by using the given
-	 * {@link AggregationOperationContext}.
-	 *
 	 * @param context the {@link AggregationOperationContext} to operate within. Must not be {@literal null}.
-	 * @return the Document
-	 * @deprecated since 2.2 in favor of {@link #toPipelineStages(AggregationOperationContext)}.
+	 * @return
 	 */
-	@Deprecated
+	@Override
 	Document toDocument(AggregationOperationContext context);
 
 	/**
-	 * Turns the {@link AggregationOperation} into list of {@link Document stages} by using the given
-	 * {@link AggregationOperationContext}. This allows a single {@link AggregationOptions} to add additional stages for
-	 * eg. {@code $sort} or {@code $limit}.
+	 * More the exception than the default.
 	 *
 	 * @param context the {@link AggregationOperationContext} to operate within. Must not be {@literal null}.
-	 * @return the pipeline stages to run through. Never {@literal null}.
-	 * @since 2.2
+	 * @return never {@literal null}.
 	 */
+	@Override
 	default List<Document> toPipelineStages(AggregationOperationContext context) {
-		return Collections.singletonList(toDocument(context));
+		return List.of(toDocument(context));
 	}
 
 	/**
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationOperationRenderer.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationOperationRenderer.java
index cc3be58520..06170a6191 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationOperationRenderer.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationOperationRenderer.java
@@ -45,15 +45,19 @@ class AggregationOperationRenderer {
 	 * @param rootContext must not be {@literal null}.
 	 * @return the {@link List} of {@link Document}.
 	 */
-	static List<Document> toDocument(List<AggregationOperation> operations, AggregationOperationContext rootContext) {
+	static List<Document> toDocument(List<? extends AggregationStage> operations, AggregationOperationContext rootContext) {
 
 		List<Document> operationDocuments = new ArrayList<Document>(operations.size());
 
 		AggregationOperationContext contextToUse = rootContext;
 
-		for (AggregationOperation operation : operations) {
+		for (AggregationStage operation : operations) {
 
-			operationDocuments.addAll(operation.toPipelineStages(contextToUse));
+			if(operation instanceof MultiOperationAggregationStage mops) {
+				operationDocuments.addAll(mops.toPipelineStages(contextToUse));
+			} else {
+				operationDocuments.add(operation.toDocument(contextToUse));
+			}
 
 			if (operation instanceof FieldsExposingAggregationOperation) {
 
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationPipeline.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationPipeline.java
index b3c1ab56d5..cb8880d0b9 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationPipeline.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationPipeline.java
@@ -23,9 +23,10 @@
 
 import org.bson.Document;
 import org.springframework.util.Assert;
+import org.springframework.util.ClassUtils;
 
 /**
- * The {@link AggregationPipeline} holds the collection of {@link AggregationOperation aggregation stages}.
+ * The {@link AggregationPipeline} holds the collection of {@link AggregationStage aggregation stages}.
  *
  * @author Christoph Strobl
  * @author Mark Paluch
@@ -33,12 +34,23 @@
  */
 public class AggregationPipeline {
 
-	private final List<AggregationOperation> pipeline;
+	private final List<AggregationStage> pipeline;
 
 	public static AggregationPipeline of(AggregationOperation... stages) {
 		return new AggregationPipeline(Arrays.asList(stages));
 	}
 
+	/**
+	 * Create a new {@link AggregationPipeline} out of the given {@link AggregationStage stages}.
+	 *
+	 * @param stages the pipeline stages.
+	 * @return new instance of {@link AggregationPipeline}.
+	 * @since 4.1
+	 */
+	public static AggregationPipeline of(AggregationStage... stages) {
+		return new AggregationPipeline(Arrays.asList(stages));
+	}
+
 	/**
 	 * Create an empty pipeline
 	 */
@@ -49,12 +61,12 @@ public AggregationPipeline() {
 	/**
 	 * Create a new pipeline with given {@link AggregationOperation stages}.
 	 *
-	 * @param aggregationOperations must not be {@literal null}.
+	 * @param aggregationStages must not be {@literal null}.
 	 */
-	public AggregationPipeline(List<AggregationOperation> aggregationOperations) {
+	public AggregationPipeline(List<? extends AggregationStage> aggregationStages) {
 
-		Assert.notNull(aggregationOperations, "AggregationOperations must not be null");
-		pipeline = new ArrayList<>(aggregationOperations);
+		Assert.notNull(aggregationStages, "AggregationStages must not be null");
+		pipeline = new ArrayList<>(aggregationStages);
 	}
 
 	/**
@@ -64,10 +76,21 @@ public AggregationPipeline(List<AggregationOperation> aggregationOperations) {
 	 * @return this.
 	 */
 	public AggregationPipeline add(AggregationOperation aggregationOperation) {
+		return add((AggregationStage) aggregationOperation);
+	}
+
+	/**
+	 * Append the given {@link AggregationOperation stage} to the pipeline.
+	 *
+	 * @param stage must not be {@literal null}.
+	 * @return this.
+	 * @since 4.1
+	 */
+	public AggregationPipeline add(AggregationStage stage) {
 
-		Assert.notNull(aggregationOperation, "AggregationOperation must not be null");
+		Assert.notNull(stage, "AggregationOperation must not be null");
 
-		pipeline.add(aggregationOperation);
+		pipeline.add(stage);
 		return this;
 	}
 
@@ -76,7 +99,17 @@ public AggregationPipeline add(AggregationOperation aggregationOperation) {
 	 *
 	 * @return never {@literal null}.
 	 */
-	public List<AggregationOperation> getOperations() {
+	public List<AggregationStage> getOperations() {
+		return getStages();
+	}
+
+	/**
+	 * Get the list of {@link AggregationOperation aggregation stages}.
+	 *
+	 * @return never {@literal null}.
+	 * @since 4.1
+	 */
+	public List<AggregationStage> getStages() {
 		return Collections.unmodifiableList(pipeline);
 	}
 
@@ -95,14 +128,14 @@ public boolean isOutOrMerge() {
 			return false;
 		}
 
-		AggregationOperation operation = pipeline.get(pipeline.size() - 1);
+		AggregationStage operation = pipeline.get(pipeline.size() - 1);
 		return isOut(operation) || isMerge(operation);
 	}
 
 	void verify() {
 
 		// check $out/$merge is the last operation if it exists
-		for (AggregationOperation operation : pipeline) {
+		for (AggregationStage operation : pipeline) {
 
 			if (isOut(operation) && !isLast(operation)) {
 				throw new IllegalArgumentException("The $out operator must be the last stage in the pipeline");
@@ -134,13 +167,13 @@ public boolean isEmpty() {
 		return pipeline.isEmpty();
 	}
 
-	private boolean containsOperation(Predicate<AggregationOperation> predicate) {
+	private boolean containsOperation(Predicate<AggregationStage> predicate) {
 
 		if (isEmpty()) {
 			return false;
 		}
 
-		for (AggregationOperation element : pipeline) {
+		for (AggregationStage element : pipeline) {
 			if (predicate.test(element)) {
 				return true;
 			}
@@ -149,19 +182,29 @@ private boolean containsOperation(Predicate<AggregationOperation> predicate) {
 		return false;
 	}
 
-	private boolean isLast(AggregationOperation aggregationOperation) {
+	private boolean isLast(AggregationStage aggregationOperation) {
 		return pipeline.indexOf(aggregationOperation) == pipeline.size() - 1;
 	}
 
-	private static boolean isUnionWith(AggregationOperation operator) {
-		return operator instanceof UnionWithOperation || operator.getOperator().equals("$unionWith");
+	private static boolean isUnionWith(AggregationStage stage) {
+		return isSpecificStage(stage, UnionWithOperation.class, "$unionWith");
+	}
+
+	private static boolean isMerge(AggregationStage stage) {
+		return isSpecificStage(stage, MergeOperation.class, "$merge");
 	}
 
-	private static boolean isMerge(AggregationOperation operator) {
-		return operator instanceof MergeOperation || operator.getOperator().equals("$merge");
+	private static boolean isOut(AggregationStage stage) {
+		return isSpecificStage(stage, OutOperation.class, "$out");
 	}
 
-	private static boolean isOut(AggregationOperation operator) {
-		return operator instanceof OutOperation || operator.getOperator().equals("$out");
+	private static boolean isSpecificStage(AggregationStage stage, Class<?> type, String operator) {
+		if (ClassUtils.isAssignable(type, stage.getClass())) {
+			return true;
+		}
+		if (stage instanceof AggregationOperation operation) {
+			return operation.getOperator().equals(operator);
+		}
+		return stage.toDocument(Aggregation.DEFAULT_CONTEXT).keySet().iterator().next().equals(operator);
 	}
 }
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationStage.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationStage.java
new file mode 100644
index 0000000000..bab565e1d3
--- /dev/null
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationStage.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2023 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.data.mongodb.core.aggregation;
+
+import org.bson.Document;
+
+/**
+ * Abstraction for a single
+ * <a href="https://www.mongodb.com/docs/manual/reference/operator/aggregation-pipeline/#stages">Aggregation Pipeline
+ * Stage</a> to be used within an {@link AggregationPipeline}.
+ * <p>
+ * An {@link AggregationStage} may operate upon domain specific types but will render to a ready to use store native
+ * representation within a given {@link AggregationOperationContext context}. The most straight forward way of writing a
+ * custom {@link AggregationStage} is just returning the raw document.
+ * 
+ * <pre class="code">
+ * AggregationStage stage = (ctx) -> Document.parse("{ $sort : { borough : 1 } }");
+ * </pre>
+ * 
+ * @author Christoph Strobl
+ * @since 4.1
+ */
+public interface AggregationStage {
+
+	/**
+	 * Turns the {@link AggregationStage} into a {@link Document} by using the given {@link AggregationOperationContext}.
+	 *
+	 * @param context the {@link AggregationOperationContext} to operate within. Must not be {@literal null}.
+	 * @return the ready to use {@link Document} representing the stage.
+	 */
+	Document toDocument(AggregationOperationContext context);
+}
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationUpdate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationUpdate.java
index 6d4105d62d..a88b4fb316 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationUpdate.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationUpdate.java
@@ -25,7 +25,6 @@
 import java.util.stream.Collectors;
 
 import org.bson.Document;
-
 import org.springframework.data.mongodb.core.query.Query;
 import org.springframework.data.mongodb.core.query.SerializationUtils;
 import org.springframework.data.mongodb.core.query.UpdateDefinition;
@@ -71,7 +70,8 @@
  *
  * @author Christoph Strobl
  * @author Mark Paluch
- * @see <a href="https://docs.mongodb.com/manual/reference/method/db.collection.update/#update-with-aggregation-pipeline">MongoDB
+ * @see <a href=
+ *      "https://docs.mongodb.com/manual/reference/method/db.collection.update/#update-with-aggregation-pipeline">MongoDB
  *      Reference Documentation</a>
  * @since 3.0
  */
@@ -92,11 +92,11 @@ protected AggregationUpdate() {
 	 *
 	 * @param pipeline must not be {@literal null}.
 	 */
-	protected AggregationUpdate(List<AggregationOperation> pipeline) {
+	protected AggregationUpdate(List<? extends AggregationStage> pipeline) {
 
 		super(pipeline);
 
-		for (AggregationOperation operation : pipeline) {
+		for (AggregationStage operation : pipeline) {
 			if (operation instanceof FieldsExposingAggregationOperation) {
 				((FieldsExposingAggregationOperation) operation).getFields().forEach(it -> {
 					keysTouched.add(it.getName());
@@ -123,6 +123,16 @@ public static AggregationUpdate from(List<AggregationOperation> pipeline) {
 		return new AggregationUpdate(pipeline);
 	}
 
+	/**
+	 * Create a new AggregationUpdate from the given {@link AggregationStage stages}.
+	 *
+	 * @return new instance of {@link AggregationUpdate}.
+	 * @since 4.1
+	 */
+	public static AggregationUpdate updateFrom(List<? extends AggregationStage> stages) {
+		return new AggregationUpdate(stages);
+	}
+
 	/**
 	 * Adds new fields to documents. {@code $set} outputs documents that contain all existing fields from the input
 	 * documents and newly added fields.
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/FacetOperation.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/FacetOperation.java
index c515f918c0..f0a8cc8d63 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/FacetOperation.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/FacetOperation.java
@@ -78,6 +78,23 @@ public FacetOperationBuilder and(AggregationOperation... operations) {
 		return new FacetOperationBuilder(facets, Arrays.asList(operations));
 	}
 
+	/**
+	 * Creates a new {@link FacetOperationBuilder} to append a new facet using {@literal operations}. <br />
+	 * {@link FacetOperationBuilder} takes a pipeline of {@link AggregationStage stages} to categorize documents into a
+	 * single facet.
+	 *
+	 * @param stages must not be {@literal null} or empty.
+	 * @return
+	 * @since 4.1
+	 */
+	public FacetOperationBuilder and(AggregationStage... stages) {
+
+		Assert.notNull(stages, "Stages must not be null");
+		Assert.notEmpty(stages, "Stages must not be empty");
+
+		return new FacetOperationBuilder(facets, Arrays.asList(stages));
+	}
+
 	@Override
 	public Document toDocument(AggregationOperationContext context) {
 		return new Document(getOperator(), facets.toDocument(context));
@@ -102,11 +119,11 @@ public ExposedFields getFields() {
 	public static class FacetOperationBuilder {
 
 		private final Facets current;
-		private final List<AggregationOperation> operations;
+		private final List<AggregationStage> operations;
 
-		private FacetOperationBuilder(Facets current, List<AggregationOperation> operations) {
+		private FacetOperationBuilder(Facets current, List<? extends AggregationStage> operations) {
 			this.current = current;
-			this.operations = operations;
+			this.operations = new ArrayList<>(operations);
 		}
 
 		/**
@@ -176,7 +193,7 @@ protected Document toDocument(AggregationOperationContext context) {
 		 * @param operations must not be {@literal null}.
 		 * @return the new {@link Facets}.
 		 */
-		Facets and(String fieldName, List<AggregationOperation> operations) {
+		Facets and(String fieldName, List<? extends AggregationStage> operations) {
 
 			Assert.hasText(fieldName, "FieldName must not be null or empty");
 			Assert.notNull(operations, "AggregationOperations must not be null");
@@ -197,21 +214,21 @@ Facets and(String fieldName, List<AggregationOperation> operations) {
 	private static class Facet {
 
 		private final ExposedField exposedField;
-		private final List<AggregationOperation> operations;
+		private final List<AggregationStage> stages;
 
 		/**
 		 * Creates a new {@link Facet} given {@link ExposedField} and {@link AggregationOperation} pipeline.
 		 *
 		 * @param exposedField must not be {@literal null}.
-		 * @param operations must not be {@literal null}.
+		 * @param stages must not be {@literal null}.
 		 */
-		Facet(ExposedField exposedField, List<AggregationOperation> operations) {
+		Facet(ExposedField exposedField, List<? extends AggregationStage> stages) {
 
 			Assert.notNull(exposedField, "ExposedField must not be null");
-			Assert.notNull(operations, "AggregationOperations must not be null");
+			Assert.notNull(stages, "AggregationOperations must not be null");
 
 			this.exposedField = exposedField;
-			this.operations = operations;
+			this.stages = new ArrayList<>(stages);
 		}
 
 		ExposedField getExposedField() {
@@ -219,7 +236,7 @@ ExposedField getExposedField() {
 		}
 
 		protected List<Document> toDocuments(AggregationOperationContext context) {
-			return AggregationOperationRenderer.toDocument(operations, context);
+			return AggregationOperationRenderer.toDocument(stages, context);
 		}
 	}
 }
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/LookupOperation.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/LookupOperation.java
index 44d0f1569e..2bd20f509c 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/LookupOperation.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/LookupOperation.java
@@ -230,7 +230,7 @@ public interface PipelineBuilder extends LetBuilder {
 		AsBuilder pipeline(AggregationPipeline pipeline);
 
 		/**
-		 * Specifies the {@link AggregationPipeline#getOperations() stages} that determine the resulting documents.
+		 * Specifies the {@link AggregationPipeline#getStages() stages} that determine the resulting documents.
 		 *
 		 * @param stages must not be {@literal null} can be empty.
 		 * @return never {@literal null}.
@@ -239,6 +239,17 @@ default AsBuilder pipeline(AggregationOperation... stages) {
 			return pipeline(AggregationPipeline.of(stages));
 		}
 
+		/**
+		 * Specifies the {@link AggregationPipeline#getStages() stages} that determine the resulting documents.
+		 *
+		 * @param stages must not be {@literal null} can be empty.
+		 * @return never {@literal null}.
+		 * @since 4.1
+		 */
+		default AsBuilder pipeline(AggregationStage... stages) {
+			return pipeline(AggregationPipeline.of(stages));
+		}
+
 		/**
 		 * @param name the name of the new array field to add to the input documents, must not be {@literal null} or empty.
 		 * @return new instance of {@link LookupOperation}.
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/MultiOperationAggregationStage.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/MultiOperationAggregationStage.java
new file mode 100644
index 0000000000..e594b2b3b0
--- /dev/null
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/MultiOperationAggregationStage.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2023 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.data.mongodb.core.aggregation;
+
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+
+import org.bson.Document;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+/**
+ * An {@link AggregationStage} that may consist of a main operation and potential follow up stages for eg. {@code $sort}
+ * or {@code $limit}.
+ * <p>
+ * The {@link MultiOperationAggregationStage} may operate upon domain specific types but will render to the store native
+ * representation within a given {@link AggregationOperationContext context}.
+ * <p>
+ * {@link #toDocument(AggregationOperationContext)} will render a synthetic {@link Document} that contains the ordered
+ * stages. The list returned from {@link #toPipelineStages(AggregationOperationContext)}
+ * 
+ * <pre class="code">
+ * [
+ *   { $match: { $text: { $search: "operating" } } },
+ *   { $sort: { score: { $meta: "textScore" }, posts: -1 } }
+ * ]
+ * </pre>
+ * 
+ * will be represented as
+ * 
+ * <pre class="code">
+ * {
+ *   $match: { $text: { $search: "operating" } },
+ *   $sort: { score: { $meta: "textScore" }, posts: -1 }
+ * }
+ * </pre>
+ * 
+ * In case stages appear multiple times the order no longer can be guaranteed when calling
+ * {@link #toDocument(AggregationOperationContext)}, so consumers of the API should rely on
+ * {@link #toPipelineStages(AggregationOperationContext)}. Nevertheless, by default the values will be collected into a
+ * list rendering to
+ * 
+ * <pre class="code">
+ * {
+ *   $match: [{ $text: { $search: "operating" } }, { $text: ... }],
+ *   $sort: { score: { $meta: "textScore" }, posts: -1 }
+ * }
+ * </pre>
+ * 
+ * @author Christoph Strobl
+ * @since 4.1
+ */
+public interface MultiOperationAggregationStage extends AggregationStage {
+
+	/**
+	 * Returns a synthetic {@link Document stage} that contains the {@link #toPipelineStages(AggregationOperationContext)
+	 * actual stages} by folding them into a single {@link Document}. In case of colliding entries, those used multiple
+	 * times thus having the same key, the entries will be held as a list for the given operator.
+	 *
+	 * @param context the {@link AggregationOperationContext} to operate within. Must not be {@literal null}.
+	 * @return never {@literal null}.
+	 */
+	@Override
+	default Document toDocument(AggregationOperationContext context) {
+
+		List<Document> documents = toPipelineStages(context);
+		if (documents.size() == 1) {
+			return documents.get(0);
+		}
+
+		MultiValueMap<String, Document> stages = new LinkedMultiValueMap<>(documents.size());
+		for (Document current : documents) {
+			String key = current.keySet().iterator().next();
+			stages.add(key, current.get(key, Document.class));
+		}
+		return new Document(stages.entrySet().stream()
+				.collect(Collectors.toMap(Entry::getKey, v -> v.getValue().size() == 1 ? v.getValue().get(0) : v.getValue())));
+	}
+
+	/**
+	 * Turns the {@link MultiOperationAggregationStage} into list of {@link Document stages} by using the given
+	 * {@link AggregationOperationContext}. This allows an {@link AggregationStage} to add follow up stages for eg.
+	 * {@code $sort} or {@code $limit}.
+	 *
+	 * @param context the {@link AggregationOperationContext} to operate within. Must not be {@literal null}.
+	 * @return the pipeline stages to run through. Never {@literal null}.
+	 */
+	List<Document> toPipelineStages(AggregationOperationContext context);
+}
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/TypedAggregation.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/TypedAggregation.java
index a600d3f69f..f73e1119da 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/TypedAggregation.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/TypedAggregation.java
@@ -15,6 +15,7 @@
  */
 package org.springframework.data.mongodb.core.aggregation;
 
+import java.util.Arrays;
 import java.util.List;
 
 import org.springframework.util.Assert;
@@ -24,6 +25,7 @@
  *
  * @author Thomas Darimont
  * @author Oliver Gierke
+ * @author Christoph Strobl
  */
 public class TypedAggregation<I> extends Aggregation {
 
@@ -39,13 +41,24 @@ public TypedAggregation(Class<I> inputType, AggregationOperation... operations)
 		this(inputType, asAggregationList(operations));
 	}
 
+	/**
+	 * Creates a new {@link TypedAggregation} from the given {@link AggregationStage stages}.
+	 *
+	 * @param inputType must not be {@literal null}.
+	 * @param stages must not be {@literal null} or empty.
+	 * @since 4.1
+	 */
+	public TypedAggregation(Class<I> inputType, AggregationStage... stages) {
+		this(inputType, Arrays.asList(stages));
+	}
+
 	/**
 	 * Creates a new {@link TypedAggregation} from the given {@link AggregationOperation}s.
 	 *
 	 * @param inputType must not be {@literal null}.
 	 * @param operations must not be {@literal null} or empty.
 	 */
-	public TypedAggregation(Class<I> inputType, List<AggregationOperation> operations) {
+	public TypedAggregation(Class<I> inputType, List<? extends AggregationStage> operations) {
 		this(inputType, operations, DEFAULT_OPTIONS);
 	}
 
@@ -57,7 +70,7 @@ public TypedAggregation(Class<I> inputType, List<AggregationOperation> operation
 	 * @param operations must not be {@literal null} or empty.
 	 * @param options must not be {@literal null}.
 	 */
-	public TypedAggregation(Class<I> inputType, List<AggregationOperation> operations, AggregationOptions options) {
+	public TypedAggregation(Class<I> inputType, List<? extends AggregationStage> operations, AggregationOptions options) {
 
 		super(operations, options);
 
@@ -77,6 +90,6 @@ public Class<I> getInputType() {
 	public TypedAggregation<I> withOptions(AggregationOptions options) {
 
 		Assert.notNull(options, "AggregationOptions must not be null");
-		return new TypedAggregation<I>(inputType, pipeline.getOperations(), options);
+		return new TypedAggregation<>(inputType, pipeline.getStages(), options);
 	}
 }
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AggregationUtils.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AggregationUtils.java
index 0ee488c336..2d24bd597b 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AggregationUtils.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AggregationUtils.java
@@ -27,6 +27,7 @@
 import org.springframework.data.mongodb.core.aggregation.Aggregation;
 import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
 import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
+import org.springframework.data.mongodb.core.aggregation.AggregationStage;
 import org.springframework.data.mongodb.core.convert.MongoConverter;
 import org.springframework.data.mongodb.core.query.Collation;
 import org.springframework.data.mongodb.core.query.Meta;
@@ -109,14 +110,14 @@ static AggregationOptions.Builder applyMeta(AggregationOptions.Builder builder,
 	 * @param accessor
 	 * @param targetType
 	 */
-	static void appendSortIfPresent(List<AggregationOperation> aggregationPipeline, ConvertingParameterAccessor accessor,
+	static void appendSortIfPresent(List<? extends AggregationStage> aggregationPipeline, ConvertingParameterAccessor accessor,
 			Class<?> targetType) {
 
 		if (accessor.getSort().isUnsorted()) {
 			return;
 		}
 
-		aggregationPipeline.add(ctx -> {
+		((List<AggregationStage>) aggregationPipeline).add(ctx -> {
 
 			Document sort = new Document();
 			for (Order order : accessor.getSort()) {
@@ -134,7 +135,7 @@ static void appendSortIfPresent(List<AggregationOperation> aggregationPipeline,
 	 * @param aggregationPipeline
 	 * @param accessor
 	 */
-	static void appendLimitAndOffsetIfPresent(List<AggregationOperation> aggregationPipeline,
+	static void appendLimitAndOffsetIfPresent(List<? extends AggregationStage> aggregationPipeline,
 			ConvertingParameterAccessor accessor) {
 		appendLimitAndOffsetIfPresent(aggregationPipeline, accessor, LongUnaryOperator.identity(),
 				IntUnaryOperator.identity());
@@ -150,7 +151,7 @@ static void appendLimitAndOffsetIfPresent(List<AggregationOperation> aggregation
 	 * @param limitOperator
 	 * @since 3.3
 	 */
-	static void appendLimitAndOffsetIfPresent(List<AggregationOperation> aggregationPipeline,
+	static void appendLimitAndOffsetIfPresent(List<? extends AggregationStage> aggregationPipeline,
 			ConvertingParameterAccessor accessor, LongUnaryOperator offsetOperator, IntUnaryOperator limitOperator) {
 
 		Pageable pageable = accessor.getPageable();
@@ -159,10 +160,10 @@ static void appendLimitAndOffsetIfPresent(List<AggregationOperation> aggregation
 		}
 
 		if (pageable.getOffset() > 0) {
-			aggregationPipeline.add(Aggregation.skip(offsetOperator.applyAsLong(pageable.getOffset())));
+			((List<AggregationStage>) aggregationPipeline).add(Aggregation.skip(offsetOperator.applyAsLong(pageable.getOffset())));
 		}
 
-		aggregationPipeline.add(Aggregation.limit(limitOperator.applyAsInt(pageable.getPageSize())));
+		((List<AggregationStage>) aggregationPipeline).add(Aggregation.limit(limitOperator.applyAsInt(pageable.getPageSize())));
 	}
 
 	/**
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/aggregation/MultiOperationAggregationStageUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/aggregation/MultiOperationAggregationStageUnitTests.java
new file mode 100644
index 0000000000..dee36bc146
--- /dev/null
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/aggregation/MultiOperationAggregationStageUnitTests.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2023 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.data.mongodb.core.aggregation;
+
+import static org.springframework.data.mongodb.test.util.Assertions.*;
+
+import java.util.List;
+
+import org.bson.Document;
+import org.junit.jupiter.api.Test;
+
+/**
+ * @author Christoph Strobl
+ */
+class MultiOperationAggregationStageUnitTests {
+
+	@Test // GH-4306
+	void toDocumentRendersSingleOperation() {
+
+		MultiOperationAggregationStage stage = (ctx) -> List.of(Document.parse("{ $text: { $search: 'operating' } }"));
+
+		assertThat(stage.toDocument(Aggregation.DEFAULT_CONTEXT)).isEqualTo("{ $text: { $search: 'operating' } }");
+	}
+
+	@Test // GH-4306
+	void toDocumentRendersMultiOperation() {
+
+		MultiOperationAggregationStage stage = (ctx) -> List.of(Document.parse("{ $text: { $search: 'operating' } }"),
+				Document.parse("{ $sort: { score: { $meta: 'textScore' } } }"));
+
+		assertThat(stage.toDocument(Aggregation.DEFAULT_CONTEXT)).isEqualTo("""
+				{
+					$text: { $search: 'operating' },
+					$sort: { score: { $meta: 'textScore' } }
+				}
+				""");
+	}
+
+	@Test // GH-4306
+	void toDocumentCollectsDuplicateOperation() {
+
+		MultiOperationAggregationStage stage = (ctx) -> List.of(Document.parse("{ $text: { $search: 'operating' } }"),
+				Document.parse("{ $sort: { score: { $meta: 'textScore' } } }"), Document.parse("{ $sort: { posts: -1 } }"));
+
+		assertThat(stage.toDocument(Aggregation.DEFAULT_CONTEXT)).isEqualTo("""
+				{
+					$text: { $search: 'operating' },
+					$sort: [
+						{ score: { $meta: 'textScore' } },
+						{ posts: -1 }
+					]
+				}
+				""");
+	}
+}