Skip to content

Commit ec8daa2

Browse files
authored
feat: supports aggregateWindow in LINQ expressions (#282)
1 parent a5a8f4d commit ec8daa2

File tree

11 files changed

+323
-7
lines changed

11 files changed

+323
-7
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
## 3.4.0 [unreleased]
22

3+
### Breaking Changes
4+
Changed type of `Duration.magnitude` from `int?` to `long?`.
5+
6+
### Features
7+
1. [#282](https://github.com/influxdata/influxdb-client-csharp/pull/282): Add support for AggregateWindow function [LINQ]
8+
39
## 3.3.0 [2022-02-04]
410

511
### Bug Fixes

Client.Linq.Test/DomainObjects.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,12 @@ public class DataEntityWithLong
8686
{
8787
public long EndWithTicks { get; set; }
8888
}
89+
90+
class SensorDateTimeAsField
91+
{
92+
[Column("data")]
93+
public int Value { get; set; }
94+
95+
[Column( "dataTime")] public DateTime DateTimeField { get; set; }
96+
}
8997
}

Client.Linq.Test/InfluxDBQueryVisitorTest.cs

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -929,6 +929,112 @@ public void FilterByLong()
929929
Assert.AreEqual("p3", endWithTicksAssignment?.Id.Name);
930930
Assert.AreEqual("637656739543829486", (endWithTicksAssignment?.Init as IntegerLiteral)?.Value);
931931
}
932+
933+
[Test]
934+
public void AggregateWindow()
935+
{
936+
var query = from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", _queryApi)
937+
where s.Timestamp.AggregateWindow(TimeSpan.FromSeconds(20), TimeSpan.FromSeconds(40), "mean")
938+
where s.Value == 5
939+
select s;
940+
var visitor = BuildQueryVisitor(query);
941+
942+
StringAssert.Contains("aggregateWindow(every: p3, period: p4, fn: p5)", visitor.BuildFluxQuery());
943+
944+
var ast = visitor.BuildFluxAST();
945+
946+
Assert.NotNull(ast);
947+
Assert.NotNull(ast.Body);
948+
Assert.AreEqual(6, ast.Body.Count);
949+
950+
var everyAssignment = ((OptionStatement) ast.Body[2]).Assignment as VariableAssignment;
951+
Assert.AreEqual("p3", everyAssignment?.Id.Name);
952+
Assert.AreEqual(20000000, (everyAssignment.Init as DurationLiteral)?.Values[0].Magnitude);
953+
Assert.AreEqual("us", (everyAssignment.Init as DurationLiteral)?.Values[0].Unit);
954+
955+
var periodAssignment = ((OptionStatement) ast.Body[3]).Assignment as VariableAssignment;
956+
Assert.AreEqual("p4", periodAssignment?.Id.Name);
957+
Assert.AreEqual(40000000, (periodAssignment.Init as DurationLiteral)?.Values[0].Magnitude);
958+
Assert.AreEqual("us", (periodAssignment.Init as DurationLiteral)?.Values[0].Unit);
959+
960+
var fnAssignment = ((OptionStatement) ast.Body[4]).Assignment as VariableAssignment;
961+
Assert.AreEqual("p5", fnAssignment?.Id.Name);
962+
Assert.AreEqual("mean", (fnAssignment.Init as Identifier)?.Name);
963+
}
964+
965+
[Test]
966+
public void AggregateWindowFluxQuery()
967+
{
968+
var queries = new[]
969+
{
970+
(
971+
from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", _queryApi)
972+
where s.Timestamp.AggregateWindow(TimeSpan.FromSeconds(20), TimeSpan.FromSeconds(40), "mean")
973+
select s,
974+
"aggregateWindow(every: p3, period: p4, fn: p5)",
975+
""
976+
),
977+
(
978+
from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", _queryApi)
979+
where s.Timestamp.AggregateWindow(TimeSpan.FromSeconds(20), null, "mean")
980+
select s,
981+
"aggregateWindow(every: p3, fn: p4)",
982+
""
983+
),
984+
(
985+
from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", _queryApi)
986+
where s.Timestamp.AggregateWindow(TimeSpan.FromSeconds(20), null, "mean")
987+
where s.Value == 5
988+
select s,
989+
"aggregateWindow(every: p3, fn: p4)",
990+
" |> filter(fn: (r) => (r[\"data\"] == p5))"
991+
),
992+
(
993+
from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", _queryApi)
994+
where s.Value == 5
995+
where s.Timestamp.AggregateWindow(TimeSpan.FromSeconds(20), null, "mean")
996+
select s,
997+
"aggregateWindow(every: p4, fn: p5)",
998+
" |> filter(fn: (r) => (r[\"data\"] == p3))"
999+
),
1000+
(
1001+
from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", _queryApi)
1002+
where s.Deployment == "prod"
1003+
where s.Value == 5
1004+
where s.Timestamp.AggregateWindow(TimeSpan.FromSeconds(20), null, "mean")
1005+
select s,
1006+
"filter(fn: (r) => (r[\"deployment\"] == p3)) |> aggregateWindow(every: p5, fn: p6)",
1007+
" |> filter(fn: (r) => (r[\"data\"] == p4))"
1008+
),
1009+
(
1010+
from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", _queryApi)
1011+
where s.Deployment == "prod" && s.Value == 5 && s.Timestamp.AggregateWindow(TimeSpan.FromSeconds(20), null, "mean")
1012+
select s,
1013+
"filter(fn: (r) => (r[\"deployment\"] == p3)) |> aggregateWindow(every: p5, fn: p6)",
1014+
" |> filter(fn: (r) => (r[\"data\"] == p4))"
1015+
)
1016+
};
1017+
1018+
foreach (var (queryable, expected, filter) in queries)
1019+
{
1020+
var visitor = BuildQueryVisitor(queryable);
1021+
1022+
var flux = "start_shifted = int(v: time(v: p2))\n\nfrom(bucket: p1) |> range(start: time(v: start_shifted)) |> " + expected + " |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\") |> drop(columns: [\"_start\", \"_stop\", \"_measurement\"])" + filter;
1023+
Assert.AreEqual(flux, visitor.BuildFluxQuery());
1024+
}
1025+
}
1026+
1027+
[Test]
1028+
public void AggregateWindowOnlyForTimestamp()
1029+
{
1030+
var query = from s in InfluxDBQueryable<SensorDateTimeAsField>.Queryable("my-bucket", "my-org", _queryApi)
1031+
where s.DateTimeField.AggregateWindow(TimeSpan.FromSeconds(20), TimeSpan.FromSeconds(40), "mean")
1032+
where s.Value == 5
1033+
select s;
1034+
1035+
var nse = Assert.Throws<NotSupportedException>(() => BuildQueryVisitor(query));
1036+
Assert.AreEqual("AggregateWindow() has to be used only for Timestamp member, e.g. [Column(IsTimestamp = true)].", nse?.Message);
1037+
}
9321038

9331039
private InfluxDBQueryVisitor BuildQueryVisitor(IQueryable queryable, Expression expression = null)
9341040
{

Client.Linq.Test/ItInfluxDBQueryableTest.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,23 @@ orderby s.Timestamp
454454

455455
Assert.AreEqual(8, count);
456456
}
457+
458+
[Test]
459+
public void QueryAggregateWindow()
460+
{
461+
var query = from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", _client.GetQueryApiSync())
462+
where s.Timestamp.AggregateWindow(TimeSpan.FromDays(4), null, "mean")
463+
where s.Timestamp > new DateTime(2020, 11, 15, 0, 0, 0, DateTimeKind.Utc)
464+
where s.Timestamp < new DateTime(2020, 11, 18, 0, 0, 0, DateTimeKind.Utc)
465+
select s;
466+
467+
var sensors = query.ToList();
468+
469+
Assert.AreEqual(2, sensors.Count);
470+
// (28 + 12 + 89) / 3 = 43
471+
Assert.AreEqual(43, sensors[0].Value);
472+
Assert.AreEqual(43, sensors.Last().Value);
473+
}
457474

458475
[TearDown]
459476
protected void After()
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
using System;
2+
using InfluxDB.Client.Api.Domain;
3+
using InfluxDB.Client.Core.Test;
4+
using InfluxDB.Client.Linq.Internal;
5+
using NUnit.Framework;
6+
7+
namespace Client.Linq.Test
8+
{
9+
[TestFixture]
10+
public class VariableAggregatorTest : AbstractTest
11+
{
12+
[Test]
13+
public void TimeStamp()
14+
{
15+
var data = new[]
16+
{
17+
(
18+
TimeSpan.FromMilliseconds(1),
19+
1000
20+
),
21+
(
22+
TimeSpan.FromMilliseconds(-1),
23+
-1000
24+
),
25+
(
26+
TimeSpan.FromDays(2 * 365),
27+
63072000000000
28+
)
29+
};
30+
31+
foreach (var (timeSpan, expected) in data)
32+
{
33+
var aggregator = new VariableAggregator();
34+
aggregator.AddNamedVariable(timeSpan);
35+
36+
var duration =
37+
(((aggregator.GetStatements()[0] as OptionStatement)?.Assignment as VariableAssignment)?.Init as
38+
DurationLiteral)?.Values[0];
39+
Assert.NotNull(duration);
40+
Assert.AreEqual(expected, duration.Magnitude);
41+
}
42+
}
43+
}
44+
}

Client.Linq/InfluxDBQueryable.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,5 +237,30 @@ public static InfluxDBQueryable<T> ToInfluxQueryable<T>(this IQueryable<T> sourc
237237

238238
return queryable;
239239
}
240+
241+
/// <summary>
242+
/// The extension to use Flux window operator. For more info see https://docs.influxdata.com/flux/v0.x/stdlib/universe/aggregatewindow/
243+
///
244+
/// <example>
245+
/// <code>
246+
/// var query = from s in InfluxDBQueryable&lt;Sensor&gt;.Queryable("my-bucket", "my-org", _queryApi)
247+
/// where s.Timestamp.AggregateWindow(TimeSpan.FromSeconds(20), TimeSpan.FromSeconds(40), "mean")
248+
/// where s.Value == 5
249+
/// select s;
250+
/// </code>
251+
/// </example>
252+
/// </summary>
253+
/// <param name="timestamp">The entity value which is market as a Timestamp.</param>
254+
/// <param name="every">Duration of time between windows.</param>
255+
/// <param name="period">Duration of the window.</param>
256+
/// <param name="fn">Aggregate or selector function used to operate on each window of time.</param>
257+
/// <returns>NotSupportedException if it's called outside LINQ expression.</returns>
258+
/// <exception cref="NotSupportedException">Caused by calling outside of LINQ expression.</exception>
259+
// ReSharper disable UnusedParameter.Global
260+
public static bool AggregateWindow(this DateTime timestamp, TimeSpan every, TimeSpan? period = null, string fn = "mean")
261+
{
262+
throw new NotSupportedException("This should be used only in LINQ expression. " +
263+
"Something like: 'where s.Timestamp.AggregateWindow(TimeSpan.FromSeconds(20), TimeSpan.FromSeconds(40), \"mean\")'.");
264+
}
240265
}
241266
}

Client.Linq/Internal/QueryAggregator.cs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ internal class QueryAggregator
6565
private readonly List<string> _filterByTags;
6666
private readonly List<string> _filterByFields;
6767
private readonly List<(string, string, bool, string)> _orders;
68+
private (string Every, string Period, string Fn)? _aggregateWindow;
6869

6970
internal QueryAggregator()
7071
{
@@ -73,6 +74,7 @@ internal QueryAggregator()
7374
_filterByTags = new List<string>();
7475
_filterByFields = new List<string>();
7576
_orders = new List<(string, string, bool, string)>();
77+
_aggregateWindow = null;
7678
}
7779

7880
internal void AddBucket(string bucket)
@@ -91,6 +93,12 @@ internal void AddRangeStop(string rangeStop, RangeExpressionType expressionType)
9193
_rangeStopAssignment = rangeStop;
9294
_rangeStopExpression = expressionType;
9395
}
96+
97+
internal void AddAggregateWindow(string everyVariable, string periodVariable, string fnVariable)
98+
{
99+
_aggregateWindow = (everyVariable, periodVariable, fnVariable);
100+
}
101+
94102

95103
internal void AddLimitN(string limitNAssignment)
96104
{
@@ -155,6 +163,7 @@ internal string BuildFluxQuery(QueryableOptimizerSettings settings)
155163
BuildOperator("from", "bucket", _bucketAssignment),
156164
BuildRange(transforms),
157165
BuildFilter(_filterByTags),
166+
BuildAggregateWindow(_aggregateWindow),
158167
"pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")"
159168
};
160169

@@ -209,6 +218,25 @@ internal string BuildFluxQuery(QueryableOptimizerSettings settings)
209218
return query.ToString();
210219
}
211220

221+
private string BuildAggregateWindow((string Every, string Period, string Fn)? aggregateWindow)
222+
{
223+
if (aggregateWindow == null)
224+
{
225+
return null;
226+
}
227+
228+
var (every, period, fn) = aggregateWindow.Value;
229+
var list = new List<string>
230+
{
231+
$"every: {every}",
232+
period != null ? $"period: {period}" : null,
233+
$"fn: {fn}"
234+
};
235+
236+
237+
return $"aggregateWindow({JoinList(list, ", ")})";
238+
}
239+
212240
private string BuildDrop(QueryableOptimizerSettings settings)
213241
{
214242
var columns = new List<string>();

Client.Linq/Internal/QueryExpressionTreeVisitor.cs

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,17 @@
22
using System.Collections.Generic;
33
using System.Linq;
44
using System.Linq.Expressions;
5+
using InfluxDB.Client.Api.Domain;
56
using InfluxDB.Client.Core;
67
using InfluxDB.Client.Linq.Internal.Expressions;
78
using Remotion.Linq.Clauses;
89
using Remotion.Linq.Clauses.Expressions;
910
using Remotion.Linq.Clauses.ResultOperators;
1011
using Remotion.Linq.Parsing;
12+
using BinaryExpression = System.Linq.Expressions.BinaryExpression;
13+
using Expression = System.Linq.Expressions.Expression;
14+
using MemberExpression = System.Linq.Expressions.MemberExpression;
15+
using UnaryExpression = System.Linq.Expressions.UnaryExpression;
1116

1217
namespace InfluxDB.Client.Linq.Internal
1318
{
@@ -140,7 +145,51 @@ protected override Expression VisitUnary(UnaryExpression expression)
140145

141146
return base.VisitUnary(expression);
142147
}
143-
148+
149+
protected override Expression VisitMethodCall(MethodCallExpression expression)
150+
{
151+
if (expression.Method.Name.Equals("AggregateWindow"))
152+
{
153+
var member = (MemberExpression)expression.Arguments[0];
154+
if (_context.MemberResolver.ResolveMemberType(member.Member) != MemberType.Timestamp)
155+
{
156+
throw new NotSupportedException(
157+
"AggregateWindow() has to be used only for Timestamp member, e.g. [Column(IsTimestamp = true)].");
158+
}
159+
160+
//
161+
// every
162+
//
163+
var every = (TimeSpan) ((ConstantExpression)expression.Arguments[1]).Value;
164+
Arguments.CheckNotNull(every, "every");
165+
var everyVariable = _context.Variables.AddNamedVariable(every);
166+
167+
//
168+
// period
169+
//
170+
string periodVariable = null;
171+
var period = ((ConstantExpression)expression.Arguments[2]).Value as TimeSpan?;
172+
if (period.HasValue)
173+
{
174+
Arguments.CheckNotNull(period, "period");
175+
periodVariable = _context.Variables.AddNamedVariable(period);
176+
}
177+
178+
//
179+
// fn
180+
//
181+
var fn = ((ConstantExpression)expression.Arguments[3]).Value as string;
182+
Arguments.CheckNonEmptyString(fn, "fn");
183+
var fnVariable = _context.Variables.AddNamedVariable(new Identifier("Identifier", "mean"));
184+
185+
_context.QueryAggregator.AddAggregateWindow(everyVariable, periodVariable, fnVariable);
186+
187+
return expression;
188+
}
189+
190+
return base.VisitMethodCall(expression);
191+
}
192+
144193
protected override Exception CreateUnhandledItemException<T>(T unhandledItem, string visitMethod)
145194
{
146195
var message = $"The expression '{unhandledItem}', type: '{typeof(T)}' is not supported.";
@@ -353,7 +402,7 @@ internal static void NormalizeExpressions(List<IExpressionPart> parts)
353402
foreach (var index in indexes)
354403
{
355404
// ()
356-
if (parts[index + 1] is RightParenthesis)
405+
if (parts.Count > index + 1 && parts[index + 1] is RightParenthesis)
357406
{
358407
parts.RemoveAt(index + 1);
359408
parts.RemoveAt(index);
@@ -369,6 +418,8 @@ internal static void NormalizeExpressions(List<IExpressionPart> parts)
369418
{
370419
parts.RemoveAt(parts.Count - 1);
371420
parts.RemoveAt(0);
421+
422+
NormalizeExpressions(parts);
372423
}
373424
}
374425
}

0 commit comments

Comments
 (0)