Skip to content

Commit

Permalink
fix(stream): fix memory leak.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjxilinx committed Jul 26, 2023
1 parent 12986ff commit cdffabc
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 33 deletions.
3 changes: 0 additions & 3 deletions source/dnode/snode/src/snode.c
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,6 @@ int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) {
}

streamMetaUnregisterTask(pSnode->pMeta, pReq->taskId);
streamMetaRemoveTask(pSnode->pMeta, pReq->taskId);

streamMetaReleaseTask(pSnode->pMeta, pTask);
streamMetaReleaseTask(pSnode->pMeta, pTask);
return 0;
}
Expand Down
5 changes: 2 additions & 3 deletions source/dnode/vnode/src/tq/tq.c
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta);
if (code < 0) {
tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks);
tFreeStreamTask(pTask);
taosWUnLockLatch(&pStreamMeta->lock);
return -1;
}
Expand Down Expand Up @@ -1261,6 +1262,7 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t remain = streamAlignTransferState(pTask);
if (remain > 0) {
tqDebug("s-task:%s receive upstream transfer state msg, remain:%d", pTask->id.idStr, remain);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
}

Expand Down Expand Up @@ -1475,9 +1477,6 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgL
}

streamMetaUnregisterTask(pTq->pStreamMeta, pReq->taskId);
streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);

streamMetaReleaseTask(pTq->pStreamMeta, pTask);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
}
Expand Down
6 changes: 1 addition & 5 deletions source/libs/stream/src/streamExec.c
Original file line number Diff line number Diff line change
Expand Up @@ -408,14 +408,10 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
streamTaskResumeFromHalt(pStreamTask);

qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);

int32_t taskId = pTask->id.taskId;
pTask->status.taskStatus = TASK_STATUS__DROPPING;

// free it and remove it from disk meta-store
streamMetaUnregisterTask(pMeta, pTask->id.taskId);
streamMetaRemoveTask(pMeta, taskId);
streamMetaReleaseTask(pMeta, pTask);
streamMetaUnregisterTask(pMeta, taskId);

// save to disk
taosWLockLatch(&pMeta->lock);
Expand Down
11 changes: 5 additions & 6 deletions source/libs/stream/src/streamMeta.c
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,7 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
}

int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
taosWLockLatch(&pMeta->lock);
int32_t code = tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(taskId), pMeta->txn);
taosWUnLockLatch(&pMeta->lock);

if (code != 0) {
qError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, taskId, tstrerror(terrno));
} else {
Expand All @@ -248,6 +245,8 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
return -1;
}

taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);

if (streamMetaSaveTask(pMeta, pTask) < 0) {
tFreeStreamTask(pTask);
return -1;
Expand All @@ -257,8 +256,6 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
tFreeStreamTask(pTask);
return -1;
}

taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
} else {
return 0;
}
Expand Down Expand Up @@ -361,13 +358,15 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) {
ASSERT(pTask->status.timerActive == 0);

int32_t num = taosArrayGetSize(pMeta->pTaskList);
qDebug("s-task:%s set the drop task flag, remain running s-task:%d", pTask->id.idStr, num - 1);
doRemoveIdFromList(pMeta, num, pTask->id.taskId);

// remove the ref by timer
if (pTask->triggerParam != 0) {
taosTmrStop(pTask->schedTimer);
}

streamMetaRemoveTask(pMeta, taskId);
streamMetaReleaseTask(pMeta, pTask);
} else {
qDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", pMeta->vgId, taskId);
}
Expand Down
2 changes: 1 addition & 1 deletion source/libs/stream/src/streamTask.c
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ static void freeItem(void* p) {
}

void tFreeStreamTask(SStreamTask* pTask) {
qDebug("free s-task:%s", pTask->id.idStr);
qDebug("free s-task:%s, %p", pTask->id.idStr, pTask);

int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus));
if (pTask->inputQueue) {
Expand Down
18 changes: 3 additions & 15 deletions tests/script/tsim/stream/basic3.sim
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c debugflag -v 131
system sh/cfg.sh -n dnode1 -c keepColumnName -v 1
system sh/exec.sh -n dnode1 -s start

sleep 5000

sleep 1000
sql connect

print ========== interval\session\state window
Expand All @@ -32,7 +30,6 @@ sql create stream streamd6 into streamt6 as select ca, _wstart,_wend, count(*),


sql alter local 'keepColumnName' '1'

sql CREATE STABLE `meters_test_data` (`ts` TIMESTAMP, `close` FLOAT, `parttime` TIMESTAMP, `parttime_str` VARCHAR(32)) TAGS (`id` VARCHAR(32));

sql_error create stream realtime_meters fill_history 1 into realtime_meters as select last(parttime),first(close),last(close) from meters_test_data partition by tbname state_window(parttime_str);
Expand All @@ -58,30 +55,25 @@ sql_error create stream streamd11 into streamd11 as select _wstart, _wend, count
sql alter local 'keepColumnName' '0'

sql create stream realtime_meters fill_history 1 into realtime_meters as select last(parttime),first(close),last(close) from meters_test_data partition by tbname state_window(parttime_str);

sql desc realtime_meters;

if $rows == 0 then
return -1
endi

sql create stream streamd7 into streamt7 as select _wstart, _wend, count(*), first(ca), last(ca) from t1 interval(10s);

sql create stream streamd7 into streamt7 as select _wstart t1, _wend t2, count(*), first(ca), last(ca) from t1 interval(10s);
sql desc streamt7;

if $rows == 0 then
return -1
endi

sql create stream streamd71 into streamt71 as select _wstart, _wend, count(*) as ca, first(ca), last(ca) as c2 from t1 interval(10s);

sql desc streamt71;

if $rows == 0 then
return -1
endi

sleep 3000
sleep 1000

sql drop stream if exists streamd1;
sql drop stream if exists streamd2;
Expand All @@ -93,23 +85,19 @@ sql drop stream if exists streamd6;
sql create stream streamd10 into streamd10 as select _wstart, _wend, count(*), first(ca), last(cb) as c2 from t1 interval(10s);

sql desc streamd10;

if $rows == 0 then
return -1
endi

sql_error create stream streamd11 into streamd11 as select _wstart, _wend, count(*), last(ca), last(ca) from t1 interval(10s);


sql create stream streamd12 into streamd12 as select _wstart, _wend, count(*), last(ca), last(cb) as c2 from t1 interval(10s);

sql desc streamd12;

if $rows == 0 then
return -1
endi


_OVER:
system sh/exec.sh -n dnode1 -s stop -x SIGINT
print =============== check
Expand Down

0 comments on commit cdffabc

Please sign in to comment.