Skip to content

Commit

Permalink
Merge pull request #20656 from taosdata/fix/main_bugfix_wxy
Browse files Browse the repository at this point in the history
fix: create stream does not support event_window
  • Loading branch information
xiao-yu-wang committed Mar 27, 2023
2 parents af53a3a + c4791d5 commit a14b3e6
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 14 deletions.
23 changes: 13 additions & 10 deletions source/libs/parser/src/parCalcConst.c
Expand Up @@ -183,16 +183,18 @@ static int32_t calcConstProject(SNode* pProject, bool dual, SNode** pNew) {
} else {
code = scalarCalculateConstants(pProject, pNew);
}
if (TSDB_CODE_SUCCESS == code && QUERY_NODE_VALUE == nodeType(*pNew) && NULL != pAssociation) {
if (TSDB_CODE_SUCCESS == code) {
strcpy(((SExprNode*)*pNew)->aliasName, aliasName);
int32_t size = taosArrayGetSize(pAssociation);
for (int32_t i = 0; i < size; ++i) {
SNode** pCol = taosArrayGetP(pAssociation, i);
nodesDestroyNode(*pCol);
*pCol = nodesCloneNode(*pNew);
if (NULL == *pCol) {
code = TSDB_CODE_OUT_OF_MEMORY;
break;
if (QUERY_NODE_VALUE == nodeType(*pNew) && NULL != pAssociation) {
int32_t size = taosArrayGetSize(pAssociation);
for (int32_t i = 0; i < size; ++i) {
SNode** pCol = taosArrayGetP(pAssociation, i);
nodesDestroyNode(*pCol);
*pCol = nodesCloneNode(*pNew);
if (NULL == *pCol) {
code = TSDB_CODE_OUT_OF_MEMORY;
break;
}
}
}
}
Expand Down Expand Up @@ -383,7 +385,8 @@ static int32_t calcConstSetOpProjections(SCalcConstContext* pCxt, SSetOperator*
int32_t index = 0;
SNode* pProj = NULL;
WHERE_EACH(pProj, pSetOp->pProjectionList) {
if (subquery && notRefByOrderBy((SColumnNode*)pProj, pSetOp->pOrderByList) && isSetUselessCol(pSetOp, index, (SExprNode*)pProj)) {
if (subquery && notRefByOrderBy((SColumnNode*)pProj, pSetOp->pOrderByList) &&
isSetUselessCol(pSetOp, index, (SExprNode*)pProj)) {
ERASE_NODE(pSetOp->pProjectionList);
eraseSetOpChildProjection(pSetOp, index);
continue;
Expand Down
12 changes: 8 additions & 4 deletions source/libs/parser/src/parTranslater.c
Expand Up @@ -1342,8 +1342,8 @@ static bool isCountNotNullValue(SFunctionNode* pFunc) {
// count(1) is rewritten as count(ts) for scannning optimization
static int32_t rewriteCountNotNullValue(STranslateContext* pCxt, SFunctionNode* pCount) {
SValueNode* pValue = (SValueNode*)nodesListGetNode(pCount->pParameterList, 0);
STableNode* pTable = NULL;
int32_t code = findTable(pCxt, NULL, &pTable);
STableNode* pTable = NULL;
int32_t code = findTable(pCxt, NULL, &pTable);
if (TSDB_CODE_SUCCESS == code && QUERY_NODE_REAL_TABLE == nodeType(pTable)) {
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) {
Expand Down Expand Up @@ -1424,7 +1424,7 @@ static int32_t translateAggFunc(STranslateContext* pCxt, SFunctionNode* pFunc) {
}
if (isCountNotNullValue(pFunc)) {
return rewriteCountNotNullValue(pCxt, pFunc);
}
}
if (isCountTbname(pFunc)) {
return rewriteCountTbname(pCxt, pFunc);
}
Expand Down Expand Up @@ -5923,11 +5923,15 @@ static int32_t addSubtableInfoToCreateStreamQuery(STranslateContext* pCxt, STabl
return code;
}

static bool isEventWindowQuery(SSelectStmt* pSelect) {
return NULL != pSelect->pWindow && QUERY_NODE_EVENT_WINDOW == nodeType(pSelect->pWindow);
}

static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type ||
!pSelect->isTimeLineResult || crossTableWithoutAggOper(pSelect) || NULL != pSelect->pOrderByList ||
crossTableWithUdaf(pSelect)) {
crossTableWithUdaf(pSelect) || isEventWindowQuery(pSelect)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
}
if (NULL != pSelect->pSubtable && TSDB_DATA_TYPE_VARCHAR != ((SExprNode*)pSelect->pSubtable)->resType.type) {
Expand Down

0 comments on commit a14b3e6

Please sign in to comment.