-
Notifications
You must be signed in to change notification settings - Fork 82
/
Copy pathWindowCommon.h
306 lines (261 loc) · 10.7 KB
/
WindowCommon.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
#pragma once
#include <Core/ColumnWithTypeAndName.h>
#include <Columns/ColumnsDateTime.h>
#include <Interpreters/Streaming/SessionInfo.h>
#include <Interpreters/Streaming/TableFunctionDescription_fwd.h>
#include <Parsers/IAST_fwd.h>
#include <base/types.h>
#include <Common/DateLUTImpl.h>
#include <Common/IntervalKind.h>
#include <Common/TypePromotion.h>
namespace DB
{
class ASTFunction;
class Chunk;
namespace Streaming
{
#define DISPATCH_FOR_WINDOW_INTERVAL(interval_kind, M) \
do \
{ \
switch (interval_kind) \
{ \
case IntervalKind::Nanosecond: { \
M(IntervalKind::Nanosecond); \
break; \
} \
case IntervalKind::Microsecond: { \
M(IntervalKind::Microsecond); \
break; \
} \
case IntervalKind::Millisecond: { \
M(IntervalKind::Millisecond); \
break; \
} \
case IntervalKind::Second: { \
M(IntervalKind::Second); \
break; \
} \
case IntervalKind::Minute: { \
M(IntervalKind::Minute); \
break; \
} \
case IntervalKind::Hour: { \
M(IntervalKind::Hour); \
break; \
} \
case IntervalKind::Day: { \
M(IntervalKind::Day); \
break; \
} \
case IntervalKind::Week: { \
M(IntervalKind::Week); \
break; \
} \
case IntervalKind::Month: { \
M(IntervalKind::Month); \
break; \
} \
case IntervalKind::Quarter: { \
M(IntervalKind::Quarter); \
break; \
} \
case IntervalKind::Year: { \
M(IntervalKind::Year); \
break; \
} \
} \
} while (0);
enum class WindowType
{
None,
Hop,
Tumble,
Session
};
const String TUMBLE_HELP_MESSAGE = "Function 'tumble' requires from 2 to 4 parameters: "
"<name of the table>, [timestamp column], <tumble window size>, [time zone]";
const String HOP_HELP_MESSAGE = "Function 'hop' requires from 3 to 5 parameters: "
"<name of the table>, [timestamp column], <hop interval size>, <hop window size>, [time zone]";
const String SESSION_HELP_MESSAGE = "Function 'session' requires at least 2 parameters: "
"<name of the stream>, [timestamp column], <timeout interval>, [max session time], [session range comparision] | [start_prediction, end_prediction]";
bool isTableFunctionTumble(const ASTFunction * ast);
bool isTableFunctionHop(const ASTFunction * ast);
bool isTableFunctionTable(const ASTFunction * ast);
bool isTableFunctionSession(const ASTFunction * ast);
bool isTableFunctionChangelog(const ASTFunction * ast);
/// Note: the extracted arguments is whole (include omitted parameters represented by an empty ASTPtr)
/// for example:
/// tumble(table, interval 5 second)
/// v
/// [table, timestamp(nullptr), win_interval, timezone(nullptr)]
ASTs checkAndExtractTumbleArguments(const ASTFunction * func_ast);
ASTs checkAndExtractHopArguments(const ASTFunction * func_ast);
ASTs checkAndExtractSessionArguments(const ASTFunction * func_ast);
struct WindowInterval
{
Int64 interval = 0;
IntervalKind::Kind unit = IntervalKind::Second;
operator bool() const { return interval != 0; }
};
void checkIntervalAST(const ASTPtr & ast, const String & msg = "Invalid interval");
void extractInterval(const ASTFunction * ast, Int64 & interval, IntervalKind::Kind & kind);
WindowInterval extractInterval(const ASTFunction * ast);
WindowInterval extractInterval(const ColumnWithTypeAndName & interval_column);
UInt32 toStartTime(UInt32 time_sec, IntervalKind::Kind kind, Int64 num_units, const DateLUTImpl & time_zone);
Int64 toStartTime(Int64 dt, IntervalKind::Kind kind, Int64 num_units, const DateLUTImpl & time_zone, UInt32 time_scale);
UInt32 addTime(UInt32 time_sec, IntervalKind::Kind kind, Int64 num_units, const DateLUTImpl & time_zone);
Int64 addTime(Int64 dt, IntervalKind::Kind kind, Int64 num_units, const DateLUTImpl & time_zone, UInt32 time_scale);
WindowType toWindowType(const String & func_name);
/// BaseScaleInterval util class converts interval in different scale to a common base scale.
/// BaseScale-1: Nanosecond Range: Nanosecond, Microsecond, Millisecond, Second, Minute, Hour, Day, Week
/// BaseScale-2: Month Range: Month, Quarter, Year
/// example: '1m' -> '60000000000ns' '1y' -> '12M'
class BaseScaleInterval
{
public:
static constexpr IntervalKind::Kind SCALE_NANOSECOND = IntervalKind::Nanosecond;
static constexpr IntervalKind::Kind SCALE_MONTH = IntervalKind::Month;
Int64 num_units = 0;
IntervalKind::Kind scale = SCALE_NANOSECOND;
IntervalKind::Kind src_kind = SCALE_NANOSECOND;
BaseScaleInterval() = default;
static constexpr BaseScaleInterval toBaseScale(Int64 num_units, IntervalKind::Kind kind)
{
switch (kind)
{
/// FIXME: check overflow ?
/// Based on SCALE_NANOSECOND
case IntervalKind::Nanosecond:
return BaseScaleInterval{num_units, SCALE_NANOSECOND, kind};
case IntervalKind::Microsecond:
return BaseScaleInterval{num_units * 1'000, SCALE_NANOSECOND, kind};
case IntervalKind::Millisecond:
return BaseScaleInterval{num_units * 1'000000, SCALE_NANOSECOND, kind};
case IntervalKind::Second:
return BaseScaleInterval{num_units * 1'000000000, SCALE_NANOSECOND, kind};
case IntervalKind::Minute:
return BaseScaleInterval{num_units * 60'000000000, SCALE_NANOSECOND, kind};
case IntervalKind::Hour:
return BaseScaleInterval{num_units * 3600'000000000, SCALE_NANOSECOND, kind};
case IntervalKind::Day:
return BaseScaleInterval{num_units * 86400'000000000, SCALE_NANOSECOND, kind};
case IntervalKind::Week:
return BaseScaleInterval{num_units * 604800'000000000, SCALE_NANOSECOND, kind};
/// Based on SCALE_MONTH
case IntervalKind::Month:
return BaseScaleInterval{num_units, SCALE_MONTH, kind};
case IntervalKind::Quarter:
return BaseScaleInterval{num_units * 3, SCALE_MONTH, kind};
case IntervalKind::Year:
return BaseScaleInterval{num_units * 12, SCALE_MONTH, kind};
}
UNREACHABLE();
}
static BaseScaleInterval toBaseScale(const WindowInterval & interval)
{
return toBaseScale(interval.interval, interval.unit);
}
Int64 toIntervalKind(IntervalKind::Kind to_kind) const;
BaseScaleInterval & operator+(const BaseScaleInterval & bs)
{
assert(scale == bs.scale);
num_units += bs.num_units;
return *this;
}
BaseScaleInterval & operator-(const BaseScaleInterval & bs)
{
assert(scale == bs.scale);
num_units -= bs.num_units;
return *this;
}
Int64 operator/(const BaseScaleInterval & bs) const
{
assert(scale == bs.scale);
assert(bs.num_units != 0);
return num_units / bs.num_units;
}
BaseScaleInterval operator/(Int64 num) const
{
assert(num != 0);
return {num_units / num, scale, src_kind};
}
String toString() const;
protected:
constexpr BaseScaleInterval(Int64 num_units_, IntervalKind::Kind scale_, IntervalKind::Kind src_kind_)
: num_units(num_units_), scale(scale_), src_kind(src_kind_)
{
}
};
using BasedScaleIntervalPtr = std::shared_ptr<BaseScaleInterval>;
ASTPtr makeASTInterval(Int64 num_units, IntervalKind kind);
ASTPtr makeASTInterval(const WindowInterval & interval);
void convertToSameKindIntervalAST(const BaseScaleInterval & bs1, const BaseScaleInterval & bs2, ASTPtr & ast1, ASTPtr & ast2);
UInt32 getAutoScaleByInterval(Int64 num_units, IntervalKind kind);
/// Window Params
struct WindowParams;
using WindowParamsPtr = std::shared_ptr<WindowParams>;
struct WindowParams : public TypePromotion<WindowParams>
{
WindowType type;
TableFunctionDescriptionPtr desc;
String time_col_name;
bool time_col_is_datetime64; /// DateTime64 or DateTime
UInt32 time_scale;
const DateLUTImpl * time_zone;
static WindowParamsPtr create(const TableFunctionDescriptionPtr & desc);
protected:
WindowParams(TableFunctionDescriptionPtr window_desc);
virtual ~WindowParams() = default;
};
/// __tumble(time_expr, win_interval, [timezone])
struct TumbleWindowParams : WindowParams
{
Int64 window_interval = 0;
IntervalKind::Kind interval_kind = IntervalKind::Second;
TumbleWindowParams(TableFunctionDescriptionPtr window_desc);
};
/// __hop(time_expr, hop_interval, win_interval, [timezone])
struct HopWindowParams : WindowParams
{
Int64 window_interval = 0;
Int64 slide_interval = 0;
/// Base interval is the greatest common divisor of window_interval and slide_interval
/// By splitting a window into multiple base-windows, a base-window can be shared by multiple windows
Int64 gcd_interval = 0;
IntervalKind::Kind interval_kind = IntervalKind::Second;
HopWindowParams(TableFunctionDescriptionPtr window_desc);
};
/// __session(timestamp_expr, timeout_interval, max_emit_interval, start_cond, start_with_inclusion, end_cond, end_with_inclusion)
struct SessionWindowParams : WindowParams
{
Int64 session_timeout;
Int64 max_session_size;
IntervalKind::Kind interval_kind;
bool start_with_inclusion;
bool end_with_inclusion;
/// So far, only for session window, we evaluate the watermark and window for the events in Aggregate Transform
/// For other windows, we assigned the watermark in window assignment step.
bool pushdown_window_assignment = true;
SessionWindowParams(TableFunctionDescriptionPtr window_desc);
};
struct Window
{
Int64 start;
Int64 end;
bool isValid() const { return end > start; }
operator bool() const { return isValid(); }
};
struct WindowWithBuckets
{
Window window;
/// The time buckets where the current window data is located in window aggregation
/// For hop window, there are multiple base time buckets
std::vector<Int64> buckets;
};
using WindowsWithBuckets = std::vector<WindowWithBuckets>;
void assignWindow(
Columns & columns, const WindowInterval & interval, size_t time_col_pos, bool time_col_is_datetime64, const DateLUTImpl & time_zone);
void reassignWindow(
Chunk & chunk, const Window & window, bool time_col_is_datetime64, std::optional<size_t> start_pos, std::optional<size_t> end_pos);
}
}