Skip to content

Commit

Permalink
Add builders for window functions (#747)
Browse files Browse the repository at this point in the history
  • Loading branch information
stIncMale committed Jul 13, 2021
1 parent 9014c64 commit 78b27e8
Show file tree
Hide file tree
Showing 21 changed files with 3,083 additions and 12 deletions.
2 changes: 1 addition & 1 deletion bson/src/main/org/bson/BsonNumber.java
Expand Up @@ -19,7 +19,7 @@
import org.bson.types.Decimal128;

/**
* Base class for the three numeric BSON types. This class mirrors the functionality provided by {@code java.lang.Number}.
* Base class for the numeric BSON types. This class mirrors the functionality provided by {@code java.lang.Number}.
*
* @since 3.0
*/
Expand Down
4 changes: 3 additions & 1 deletion config/codenarc/codenarc.xml
Expand Up @@ -128,6 +128,9 @@
<rule-config name='NestedBlockDepth'>
<property name='doNotApplyToFilesMatching' value='.*Specification.groovy'/>
</rule-config>
<rule-config name='ClassSize'>
<property name='doNotApplyToFilesMatching' value='.*Specification.groovy'/>
</rule-config>
<exclude name='CrapMetric'/>
<exclude name='AbcMetric'/>
<exclude name='MethodSize'/>
Expand All @@ -148,4 +151,3 @@
</ruleset-ref>

</ruleset>

32 changes: 29 additions & 3 deletions docs/reference/content/builders/aggregation.md
Expand Up @@ -10,8 +10,9 @@ title = "Aggregates"

## Aggregates

The [`Aggregates`]({{< apiref "mongodb-driver-core" "com/mongodb/client/model/Aggregates" >}}) class provides static factory methods that build [aggregation
pipeline operators]({{< docsref "reference/operator/aggregation/" >}}). Each method returns an instance of the
The [`Aggregates`]({{< apiref "mongodb-driver-core" "com/mongodb/client/model/Aggregates" >}}) class provides static factory methods
that build [aggregation pipeline stages]({{< docsref "meta/aggregation-quick-reference/#stages" >}}).
Each method returns an instance of the
[`Bson`]({{< relref "bson/documents.md#bson" >}}) type, which can in turn be passed to the `aggregate` method of `MongoCollection`.

For brevity, you may choose to import the methods of the `Aggregates` class statically:
Expand Down Expand Up @@ -427,9 +428,34 @@ This stage returns a document that looks like this:
```
### SetWindowFields
{{% note class="important" %}}
Support for `$setWindowFields` is in beta. Backwards-breaking changes may be made before the final release.
{{% /note %}}
The [`$setWindowFields`](https://dochub.mongodb.org/core/window-functions-set-window-fields) pipeline stage
allows using window operators. This stage partitions the input documents similarly to the [`$group`](#group) pipeline stage,
optionally sorts them, computes fields in the documents by computing window functions over windows specified per function
(a window is a subset of a partition), and outputs the documents. The important difference from the `$group` pipeline stage is that
documents belonging to the same partition or window are not folded into a single document.
The driver includes the [`WindowedComputations`]({{< apiref "mongodb-driver-core" "com/mongodb/client/model/WindowedComputations" >}})
class with factory methods for supported window operators.
This example computes the accumulated rainfall and the average temperature over the past month per each locality
from more fine-grained measurements presented via the `rainfall` and `temperature` fields:
```java
Window pastMonth = Windows.timeRange(-1, Windows.Bound.CURRENT, MongoTimeUnit.MONTH);
setWindowFields("$localityId", Sorts.ascending("measurementDateTime"),
WindowedComputations.sum("monthlyRainfall", "$rainfall", pastMonth),
WindowedComputations.avg("monthlyAvgTemp", "$temperature", pastMonth));
```
### Creating a Pipeline
The above pipeline operators are typically combined into a list and passed to the `aggregate` method of a `MongoCollection`. For instance:
The above pipeline stages are typically combined into a list and passed to the `aggregate` method of a `MongoCollection`. For instance:
```java
collection.aggregate(Arrays.asList(match(eq("author", "Dave")),
Expand Down
36 changes: 31 additions & 5 deletions docs/reference/content/driver-scala/builders/aggregation.md
Expand Up @@ -10,8 +10,9 @@ title = "Aggregation"

## Aggregation

The [`Aggregates`]({{< apiref "mongo-scala-driver" "org/mongodb/scala/model/Aggregates$" >}}) class provides static factory methods that build [aggregation
pipeline operators]({{< docsref "reference/operator/aggregation/" >}}). Each method returns an instance of the
The [`Aggregates`]({{< apiref "mongo-scala-driver" "org/mongodb/scala/model/Aggregates$" >}}) class provides static factory methods
that build [aggregation pipeline stages]({{< docsref "meta/aggregation-quick-reference/#stages" >}}).
Each method returns an instance of the
[`Bson`]({{< relref "bson/documents.md#bson" >}}) type, which can in turn be passed to the `aggregate` method of `MongoCollection`.

For brevity, you may choose to import the methods of the `Aggregates` class statically:
Expand Down Expand Up @@ -129,9 +130,9 @@ expression and outputs to the next stage a document for each distinct grouping.
expression on which to group, and zero or more
[accumulators]({{< docsref "reference/operator/aggregation/group/#accumulator-operator" >}}) which are evaluated for each
grouping. To simplify the expression of accumulators, the driver includes an
[`Accumulators`]({{< apiref "mongo-scala-driver" "org/mongodb/scala/model/Aggregates$" >}}) class with static factory methods for each of the supported
accumulators. In the example below, it's assumed that the `sum` and `avg` methods of the `Accumulators` class have been statically
imported.
[`Accumulators`]({{< apiref "mongo-scala-driver" "org/mongodb/scala/model/Accumulators$" >}}) singleton object with factory methods
for each of the supported accumulators.
In the example below, it's assumed that the `sum` and `avg` methods of the `Accumulators` class have been statically imported.

This example groups documents by the value of the `customerId` field, and for each group accumulates the sum and average of the values of
the `quantity` field into the `totalQuantity` and `averageQuantity` fields, respectively.
Expand Down Expand Up @@ -175,6 +176,31 @@ This example writes the pipeline to the `authors` collection:
out("authors")
```

### SetWindowFields

{{% note class="important" %}}
Support for `$setWindowFields` is in beta. Backwards-breaking changes may be made before the final release.
{{% /note %}}

The [`$setWindowFields`](https://dochub.mongodb.org/core/window-functions-set-window-fields) pipeline stage
allows using window operators. This stage partitions the input documents similarly to the [`$group`](#group) pipeline stage,
optionally sorts them, computes fields in the documents by computing window functions over windows specified per function
(a window is a subset of a partition), and outputs the documents. The important difference from the `$group` pipeline stage is that
documents belonging to the same partition or window are not folded into a single document.

The driver includes the [`WindowedComputations`]({{< apiref "mongo-scala-driver" "org/mongodb/scala/model/WindowedComputations$" >}})
singleton object with factory methods for supported window operators.

This example computes the accumulated rainfall and the average temperature over the past month per each locality
from more fine-grained measurements presented via the `rainfall` and `temperature` fields:

```scala
val pastMonth: Window = Windows.timeRange(-1, Windows.Bound.CURRENT, MongoTimeUnit.MONTH)
setWindowFields("$localityId", Sorts.ascending("measurementDateTime"),
WindowedComputations.sum("monthlyRainfall", "$rainfall", pastMonth),
WindowedComputations.avg("monthlyAvgTemp", "$temperature", pastMonth))
```

### Creating a Pipeline

The above pipeline operators are typically combined into a list and passed to the `aggregate` method of a `MongoCollection`. For instance:
Expand Down
3 changes: 3 additions & 0 deletions docs/reference/content/whats-new.md
Expand Up @@ -28,6 +28,9 @@ New features of the 4.3 Java driver release include:
[Netty](https://netty.io/) [`io.netty.handler.ssl.SslContext`]({{< nettyapiref "io/netty/handler/ssl/SslContext.html" >}}),
which may be used as a convenient way to utilize [OpenSSL](https://www.openssl.org/) as an alternative
to the TLS/SSL protocol implementation in a JDK.
* Added [builders]({{< apiref "mongodb-driver-core" "com/mongodb/client/model/Aggregates.html#setWindowFields(TExpression,org.bson.conversions.Bson,java.util.List)" >}})
for the new [`$setWindowFields`](https://dochub.mongodb.org/core/window-functions-set-window-fields)
pipeline stage of an aggregation pipeline.

# What's new in 4.2

Expand Down
122 changes: 122 additions & 0 deletions driver-core/src/main/com/mongodb/client/model/Aggregates.java
Expand Up @@ -17,6 +17,7 @@
package com.mongodb.client.model;

import com.mongodb.MongoNamespace;
import com.mongodb.annotations.Beta;
import com.mongodb.lang.Nullable;
import org.bson.BsonBoolean;
import org.bson.BsonDocument;
Expand All @@ -28,6 +29,7 @@
import org.bson.conversions.Bson;

import java.util.List;
import java.util.Objects;

import static java.util.Arrays.asList;
import static org.bson.assertions.Assertions.notNull;
Expand Down Expand Up @@ -594,6 +596,60 @@ public static Bson sample(final int size) {
return new BsonDocument("$sample", new BsonDocument("size", new BsonInt32(size)));
}

/**
* Creates a {@code $setWindowFields} pipeline stage, which allows using window operators.
* This stage partitions the input documents similarly to the {@link #group(Object, List) $group} pipeline stage,
* optionally sorts them, computes fields in the documents by computing window functions over {@linkplain Window windows} specified per
* function, and outputs the documents. The important difference from the {@code $group} pipeline stage is that
* documents belonging to the same partition or window are not folded into a single document.
*
* @param partitionBy Optional partitioning of data specified like {@code id} in {@link #group(Object, List)}.
* If {@code null}, then all documents belong to the same partition.
* @param sortBy Fields to sort by. The syntax is identical to {@code sort} in {@link #sort(Bson)} (see {@link Sorts}).
* Sorting is required by certain functions and may be required by some windows (see {@link Windows} for more details).
* Sorting is used only for the purpose of computing window functions and does not guarantee ordering
* of the output documents.
* @param output A nonempty array of {@linkplain WindowedComputation windowed computations}.
* @param <TExpression> The {@code partitionBy} expression type.
* @return The {@code $setWindowFields} pipeline stage.
* @mongodb.driver.dochub core/window-functions-set-window-fields $setWindowFields
* @mongodb.server.release 5.0
* @since 4.3
*/
@Beta
public static <TExpression> Bson setWindowFields(@Nullable final TExpression partitionBy, @Nullable final Bson sortBy,
final WindowedComputation... output) {
notNull("output", output);
return setWindowFields(partitionBy, sortBy, asList(output));
}

/**
* Creates a {@code $setWindowFields} pipeline stage, which allows using window operators.
* This stage partitions the input documents similarly to the {@link #group(Object, List) $group} pipeline stage,
* optionally sorts them, computes fields in the documents by computing window functions over {@linkplain Window windows} specified per
* function, and outputs the documents. The important difference from the {@code $group} pipeline stage is that
* documents belonging to the same partition or window are not folded into a single document.
*
* @param partitionBy Optional partitioning of data specified like {@code id} in {@link #group(Object, List)}.
* If {@code null}, then all documents belong to the same partition.
* @param sortBy Fields to sort by. The syntax is identical to {@code sort} in {@link #sort(Bson)} (see {@link Sorts}).
* Sorting is required by certain functions and may be required by some windows (see {@link Windows} for more details).
* Sorting is used only for the purpose of computing window functions and does not guarantee ordering
* of the output documents.
* @param output A nonempty list of {@linkplain WindowedComputation windowed computations}.
* @param <TExpression> The {@code partitionBy} expression type.
* @return The {@code $setWindowFields} pipeline stage.
* @mongodb.driver.dochub core/window-functions-set-window-fields $setWindowFields
* @mongodb.server.release 5.0
* @since 4.3
*/
@Beta
public static <TExpression> Bson setWindowFields(@Nullable final TExpression partitionBy, @Nullable final Bson sortBy,
final List<WindowedComputation> output) {
notNull("output", output);
return new SetWindowFieldsStage<>(partitionBy, sortBy, output);
}

static void writeBucketOutput(final CodecRegistry codecRegistry, final BsonDocumentWriter writer,
@Nullable final List<BsonField> output) {
if (output != null) {
Expand Down Expand Up @@ -1490,6 +1546,72 @@ public String toString() {
}
}

private static final class SetWindowFieldsStage<TExpression> implements Bson {
@Nullable
private final TExpression partitionBy;
@Nullable
private final Bson sortBy;
private final List<WindowedComputation> output;

SetWindowFieldsStage(@Nullable final TExpression partitionBy, @Nullable final Bson sortBy, final List<WindowedComputation> output) {
this.partitionBy = partitionBy;
this.sortBy = sortBy;
this.output = output;
}

@Override
public <TDocument> BsonDocument toBsonDocument(final Class<TDocument> tDocumentClass, final CodecRegistry codecRegistry) {
BsonDocumentWriter writer = new BsonDocumentWriter(new BsonDocument());
writer.writeStartDocument();
writer.writeStartDocument("$setWindowFields");
if (partitionBy != null) {
writer.writeName("partitionBy");
BuildersHelper.encodeValue(writer, partitionBy, codecRegistry);
}
if (sortBy != null) {
writer.writeName("sortBy");
BuildersHelper.encodeValue(writer, sortBy, codecRegistry);
}
writer.writeStartDocument("output");
for (WindowedComputation windowedComputation : output) {
BsonField field = windowedComputation.toBsonField();
writer.writeName(field.getName());
BuildersHelper.encodeValue(writer, field.getValue(), codecRegistry);
}
writer.writeEndDocument(); // end output
writer.writeEndDocument(); // end $setWindowFields
writer.writeEndDocument();
return writer.getDocument();
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final SetWindowFieldsStage<?> that = (SetWindowFieldsStage<?>) o;
return Objects.equals(partitionBy, that.partitionBy) && Objects.equals(sortBy, that.sortBy) && output.equals(that.output);
}

@Override
public int hashCode() {
return Objects.hash(partitionBy, sortBy, output);
}

@Override
public String toString() {
return "Stage{"
+ "name='$setWindowFields'"
+ ", partitionBy=" + partitionBy
+ ", sortBy=" + sortBy
+ ", output=" + output
+ '}';
}
}

private Aggregates() {
}
}
58 changes: 58 additions & 0 deletions driver-core/src/main/com/mongodb/client/model/MongoTimeUnit.java
@@ -0,0 +1,58 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.mongodb.client.model;

import com.mongodb.annotations.Beta;

/**
* Units for specifying time-based bounds for {@linkplain Window windows} and output units for some time-based
* {@linkplain WindowedComputation windowed computations}.
*
* @mongodb.server.release 5.0
* @since 4.3
*/
@Beta
public enum MongoTimeUnit {
YEAR("year", false),
QUARTER("quarter", false),
MONTH("month", false),
WEEK("week", true),
DAY("day", true),
HOUR("hour", true),
MINUTE("minute", true),
SECOND("second", true),
MILLISECOND("millisecond", true);

private final String value;
private final boolean fixed;

MongoTimeUnit(final String value, final boolean fixed) {
this.value = value;
this.fixed = fixed;
}

String value() {
return value;
}

/**
* Returns {@code true} iff the unit represents a fixed duration.
* E.g., a minute is a fixed duration equal to 60_000 milliseconds, while the duration of a month varies.
*/
boolean fixed() {
return fixed;
}
}
32 changes: 32 additions & 0 deletions driver-core/src/main/com/mongodb/client/model/Window.java
@@ -0,0 +1,32 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.mongodb.client.model;

import com.mongodb.annotations.Beta;
import org.bson.conversions.Bson;

import java.util.List;

/**
* A subset of documents within a partition in the {@link Aggregates#setWindowFields(Object, Bson, List) $setWindowFields} pipeline stage
* of an aggregation pipeline (see {@code partitionBy} in {@link Aggregates#setWindowFields(Object, Bson, List)}).
*
* @see Windows
* @since 4.3
*/
@Beta
public interface Window extends Bson {
}

0 comments on commit 78b27e8

Please sign in to comment.