Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: event window query #18925

Merged
merged 6 commits into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 72 additions & 70 deletions include/common/ttokendef.h
Original file line number Diff line number Diff line change
Expand Up @@ -267,76 +267,78 @@
#define TK_BY 249
#define TK_SESSION 250
#define TK_STATE_WINDOW 251
#define TK_SLIDING 252
#define TK_FILL 253
#define TK_VALUE 254
#define TK_NONE 255
#define TK_PREV 256
#define TK_LINEAR 257
#define TK_NEXT 258
#define TK_HAVING 259
#define TK_RANGE 260
#define TK_EVERY 261
#define TK_ORDER 262
#define TK_SLIMIT 263
#define TK_SOFFSET 264
#define TK_LIMIT 265
#define TK_OFFSET 266
#define TK_ASC 267
#define TK_NULLS 268
#define TK_ABORT 269
#define TK_AFTER 270
#define TK_ATTACH 271
#define TK_BEFORE 272
#define TK_BEGIN 273
#define TK_BITAND 274
#define TK_BITNOT 275
#define TK_BITOR 276
#define TK_BLOCKS 277
#define TK_CHANGE 278
#define TK_COMMA 279
#define TK_COMPACT 280
#define TK_CONCAT 281
#define TK_CONFLICT 282
#define TK_COPY 283
#define TK_DEFERRED 284
#define TK_DELIMITERS 285
#define TK_DETACH 286
#define TK_DIVIDE 287
#define TK_DOT 288
#define TK_EACH 289
#define TK_FAIL 290
#define TK_FILE 291
#define TK_FOR 292
#define TK_GLOB 293
#define TK_ID 294
#define TK_IMMEDIATE 295
#define TK_IMPORT 296
#define TK_INITIALLY 297
#define TK_INSTEAD 298
#define TK_ISNULL 299
#define TK_KEY 300
#define TK_MODULES 301
#define TK_NK_BITNOT 302
#define TK_NK_SEMI 303
#define TK_NOTNULL 304
#define TK_OF 305
#define TK_PLUS 306
#define TK_PRIVILEGE 307
#define TK_RAISE 308
#define TK_REPLACE 309
#define TK_RESTRICT 310
#define TK_ROW 311
#define TK_SEMI 312
#define TK_STAR 313
#define TK_STATEMENT 314
#define TK_STRING 315
#define TK_TIMES 316
#define TK_UPDATE 317
#define TK_VALUES 318
#define TK_VARIABLE 319
#define TK_VIEW 320
#define TK_WAL 321
#define TK_EVENT_WINDOW 252
#define TK_START 253
#define TK_SLIDING 254
#define TK_FILL 255
#define TK_VALUE 256
#define TK_NONE 257
#define TK_PREV 258
#define TK_LINEAR 259
#define TK_NEXT 260
#define TK_HAVING 261
#define TK_RANGE 262
#define TK_EVERY 263
#define TK_ORDER 264
#define TK_SLIMIT 265
#define TK_SOFFSET 266
#define TK_LIMIT 267
#define TK_OFFSET 268
#define TK_ASC 269
#define TK_NULLS 270
#define TK_ABORT 271
#define TK_AFTER 272
#define TK_ATTACH 273
#define TK_BEFORE 274
#define TK_BEGIN 275
#define TK_BITAND 276
#define TK_BITNOT 277
#define TK_BITOR 278
#define TK_BLOCKS 279
#define TK_CHANGE 280
#define TK_COMMA 281
#define TK_COMPACT 282
#define TK_CONCAT 283
#define TK_CONFLICT 284
#define TK_COPY 285
#define TK_DEFERRED 286
#define TK_DELIMITERS 287
#define TK_DETACH 288
#define TK_DIVIDE 289
#define TK_DOT 290
#define TK_EACH 291
#define TK_FAIL 292
#define TK_FILE 293
#define TK_FOR 294
#define TK_GLOB 295
#define TK_ID 296
#define TK_IMMEDIATE 297
#define TK_IMPORT 298
#define TK_INITIALLY 299
#define TK_INSTEAD 300
#define TK_ISNULL 301
#define TK_KEY 302
#define TK_MODULES 303
#define TK_NK_BITNOT 304
#define TK_NK_SEMI 305
#define TK_NOTNULL 306
#define TK_OF 307
#define TK_PLUS 308
#define TK_PRIVILEGE 309
#define TK_RAISE 310
#define TK_REPLACE 311
#define TK_RESTRICT 312
#define TK_ROW 313
#define TK_SEMI 314
#define TK_STAR 315
#define TK_STATEMENT 316
#define TK_STRING 317
#define TK_TIMES 318
#define TK_UPDATE 319
#define TK_VALUES 320
#define TK_VARIABLE 321
#define TK_VIEW 322
#define TK_WAL 323

#define TK_NK_SPACE 600
#define TK_NK_COMMENT 601
Expand Down
5 changes: 4 additions & 1 deletion include/libs/nodes/nodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ typedef enum ENodeType {
QUERY_NODE_COLUMN_REF,
QUERY_NODE_WHEN_THEN,
QUERY_NODE_CASE_WHEN,
QUERY_NODE_EVENT_WINDOW,

// Statement nodes are used in parser and planner module.
QUERY_NODE_SET_OPERATOR = 100,
Expand Down Expand Up @@ -265,7 +266,9 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_DELETE,
QUERY_NODE_PHYSICAL_SUBPLAN,
QUERY_NODE_PHYSICAL_PLAN,
QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN
QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN,
QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT,
QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT
} ENodeType;

/**
Expand Down
17 changes: 16 additions & 1 deletion include/libs/nodes/plannodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,12 @@ typedef struct SMergeLogicNode {
bool groupSort;
} SMergeLogicNode;

typedef enum EWindowType { WINDOW_TYPE_INTERVAL = 1, WINDOW_TYPE_SESSION, WINDOW_TYPE_STATE } EWindowType;
typedef enum EWindowType {
WINDOW_TYPE_INTERVAL = 1,
WINDOW_TYPE_SESSION,
WINDOW_TYPE_STATE,
WINDOW_TYPE_EVENT
} EWindowType;

typedef enum EWindowAlgorithm {
INTERVAL_ALGO_HASH = 1,
Expand All @@ -212,6 +217,8 @@ typedef struct SWindowLogicNode {
SNode* pTspk;
SNode* pTsEnd;
SNode* pStateExpr;
SNode* pStartCond;
SNode* pEndCond;
int8_t triggerType;
int64_t watermark;
int64_t deleteMark;
Expand Down Expand Up @@ -498,6 +505,14 @@ typedef struct SStateWinodwPhysiNode {

typedef SStateWinodwPhysiNode SStreamStateWinodwPhysiNode;

typedef struct SEventWinodwPhysiNode {
SWinodwPhysiNode window;
SNode* pStartCond;
SNode* pEndCond;
} SEventWinodwPhysiNode;

typedef SEventWinodwPhysiNode SStreamEventWinodwPhysiNode;

typedef struct SSortPhysiNode {
SPhysiNode node;
SNodeList* pExprs; // these are expression list of order_by_clause and parameter expression of aggregate function
Expand Down
7 changes: 7 additions & 0 deletions include/libs/nodes/querynodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,13 @@ typedef struct SIntervalWindowNode {
SNode* pFill;
} SIntervalWindowNode;

typedef struct SEventWindowNode {
ENodeType type; // QUERY_NODE_EVENT_WINDOW
SNode* pCol; // timestamp primary key
SNode* pStartCond;
SNode* pEndCond;
} SEventWindowNode;

typedef enum EFillMode {
FILL_MODE_NONE = 1,
FILL_MODE_VALUE,
Expand Down
12 changes: 12 additions & 0 deletions source/libs/nodes/src/nodesCloneFuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,13 @@ static int32_t stateWindowNodeCopy(const SStateWindowNode* pSrc, SStateWindowNod
return TSDB_CODE_SUCCESS;
}

static int32_t eventWindowNodeCopy(const SEventWindowNode* pSrc, SEventWindowNode* pDst) {
CLONE_NODE_FIELD(pCol);
CLONE_NODE_FIELD(pStartCond);
CLONE_NODE_FIELD(pEndCond);
return TSDB_CODE_SUCCESS;
}

static int32_t sessionWindowNodeCopy(const SSessionWindowNode* pSrc, SSessionWindowNode* pDst) {
CLONE_NODE_FIELD_EX(pCol, SColumnNode*);
CLONE_NODE_FIELD_EX(pGap, SValueNode*);
Expand Down Expand Up @@ -462,6 +469,8 @@ static int32_t logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* p
CLONE_NODE_FIELD(pTspk);
CLONE_NODE_FIELD(pTsEnd);
CLONE_NODE_FIELD(pStateExpr);
CLONE_NODE_FIELD(pStartCond);
CLONE_NODE_FIELD(pEndCond);
COPY_SCALAR_FIELD(triggerType);
COPY_SCALAR_FIELD(watermark);
COPY_SCALAR_FIELD(deleteMark);
Expand Down Expand Up @@ -709,6 +718,9 @@ SNode* nodesCloneNode(const SNode* pNode) {
case QUERY_NODE_STATE_WINDOW:
code = stateWindowNodeCopy((const SStateWindowNode*)pNode, (SStateWindowNode*)pDst);
break;
case QUERY_NODE_EVENT_WINDOW:
code = eventWindowNodeCopy((const SEventWindowNode*)pNode, (SEventWindowNode*)pDst);
break;
case QUERY_NODE_SESSION_WINDOW:
code = sessionWindowNodeCopy((const SSessionWindowNode*)pNode, (SSessionWindowNode*)pDst);
break;
Expand Down
77 changes: 77 additions & 0 deletions source/libs/nodes/src/nodesCodeFuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ const char* nodesNodeName(ENodeType type) {
return "WhenThen";
case QUERY_NODE_CASE_WHEN:
return "CaseWhen";
case QUERY_NODE_EVENT_WINDOW:
return "EventWindow";
case QUERY_NODE_SET_OPERATOR:
return "SetOperator";
case QUERY_NODE_SELECT_STMT:
Expand Down Expand Up @@ -233,6 +235,10 @@ const char* nodesNodeName(ENodeType type) {
return "PhysiLastRowScan";
case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN:
return "PhysiTableCountScan";
case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT:
return "PhysiMergeEventWindow";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT:
return "PhysiStreamEventWindow";
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
return "PhysiProject";
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN:
Expand Down Expand Up @@ -2272,6 +2278,37 @@ static int32_t jsonToPhysiStateWindowNode(const SJson* pJson, void* pObj) {
return code;
}

static const char* jkEventWindowPhysiPlanStartCond = "StartCond";
static const char* jkEventWindowPhysiPlanEndCond = "EndCond";

static int32_t physiEventWindowNodeToJson(const void* pObj, SJson* pJson) {
const SEventWinodwPhysiNode* pNode = (const SEventWinodwPhysiNode*)pObj;

int32_t code = physiWindowNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkEventWindowPhysiPlanStartCond, nodeToJson, pNode->pStartCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkEventWindowPhysiPlanEndCond, nodeToJson, pNode->pEndCond);
}

return code;
}

static int32_t jsonToPhysiEventWindowNode(const SJson* pJson, void* pObj) {
SEventWinodwPhysiNode* pNode = (SEventWinodwPhysiNode*)pObj;

int32_t code = jsonToPhysiWindowNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkEventWindowPhysiPlanStartCond, &pNode->pStartCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkEventWindowPhysiPlanEndCond, &pNode->pEndCond);
}

return code;
}

static const char* jkPartitionPhysiPlanExprs = "Exprs";
static const char* jkPartitionPhysiPlanPartitionKeys = "PartitionKeys";
static const char* jkPartitionPhysiPlanTargets = "Targets";
Expand Down Expand Up @@ -3660,6 +3697,36 @@ static int32_t jsonToSessionWindowNode(const SJson* pJson, void* pObj) {
return code;
}

static const char* jkEventWindowTsPrimaryKey = "TsPrimaryKey";
static const char* jkEventWindowStartCond = "StartCond";
static const char* jkEventWindowEndCond = "EndCond";

static int32_t eventWindowNodeToJson(const void* pObj, SJson* pJson) {
const SEventWindowNode* pNode = (const SEventWindowNode*)pObj;

int32_t code = tjsonAddObject(pJson, jkEventWindowTsPrimaryKey, nodeToJson, pNode->pCol);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkEventWindowStartCond, nodeToJson, pNode->pStartCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkEventWindowEndCond, nodeToJson, pNode->pEndCond);
}
return code;
}

static int32_t jsonToEventWindowNode(const SJson* pJson, void* pObj) {
SEventWindowNode* pNode = (SEventWindowNode*)pObj;

int32_t code = jsonToNodeObject(pJson, jkEventWindowTsPrimaryKey, &pNode->pCol);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkEventWindowStartCond, &pNode->pStartCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkEventWindowEndCond, &pNode->pEndCond);
}
return code;
}

static const char* jkIntervalWindowInterval = "Interval";
static const char* jkIntervalWindowOffset = "Offset";
static const char* jkIntervalWindowSliding = "Sliding";
Expand Down Expand Up @@ -4615,6 +4682,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return whenThenNodeToJson(pObj, pJson);
case QUERY_NODE_CASE_WHEN:
return caseWhenNodeToJson(pObj, pJson);
case QUERY_NODE_EVENT_WINDOW:
return eventWindowNodeToJson(pObj, pJson);
case QUERY_NODE_SET_OPERATOR:
return setOperatorToJson(pObj, pJson);
case QUERY_NODE_SELECT_STMT:
Expand Down Expand Up @@ -4712,6 +4781,9 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
return physiStateWindowNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT:
return physiEventWindowNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
return physiPartitionNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION:
Expand Down Expand Up @@ -4787,6 +4859,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToWhenThenNode(pJson, pObj);
case QUERY_NODE_CASE_WHEN:
return jsonToCaseWhenNode(pJson, pObj);
case QUERY_NODE_EVENT_WINDOW:
return jsonToEventWindowNode(pJson, pObj);
case QUERY_NODE_SET_OPERATOR:
return jsonToSetOperator(pJson, pObj);
case QUERY_NODE_SELECT_STMT:
Expand Down Expand Up @@ -4871,6 +4945,9 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
return jsonToPhysiStateWindowNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT:
return jsonToPhysiEventWindowNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
return jsonToPhysiPartitionNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION:
Expand Down
Loading