Skip to content

Commit

Permalink
Merge pull request #23763 from taosdata/fix/pause_stream
Browse files Browse the repository at this point in the history
fix(stream): fix error in generating token in bucket.
  • Loading branch information
hjxilinx committed Nov 21, 2023
2 parents a74063f + 068252f commit 8c4558c
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 15 deletions.
2 changes: 2 additions & 0 deletions source/common/src/systable.c
Expand Up @@ -156,6 +156,8 @@ static const SSysDbTableSchema streamSchema[] = {
{.name = "target_table", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
{.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "sink_quota", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "history_scan_idle", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
};

static const SSysDbTableSchema streamTaskSchema[] = {
Expand Down
16 changes: 15 additions & 1 deletion source/dnode/mnode/impl/src/mndStream.c
Expand Up @@ -1481,7 +1481,6 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
if (pShow->pIter == NULL) break;

SColumnInfoData *pColInfo;
SName n;
int32_t cols = 0;

char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
Expand Down Expand Up @@ -1534,6 +1533,21 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&trigger, false);

char sinkQuota[20 + VARSTR_HEADER_SIZE] = {0};
sinkQuota[0] = '0';
char dstStr[20] = {0};
STR_TO_VARSTR(dstStr, sinkQuota)
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char*) dstStr, false);

char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0};
strcpy(scanHistoryIdle, "100a");

memset(dstStr, 0, tListLen(dstStr));
STR_TO_VARSTR(dstStr, scanHistoryIdle)
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char*) dstStr, false);

numOfRows++;
sdbRelease(pSdb, pStream);
}
Expand Down
8 changes: 6 additions & 2 deletions source/dnode/vnode/src/tq/tqStreamTask.c
Expand Up @@ -213,8 +213,10 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;

bool alreadyRestored = pTq->pVnode->restored;

// do not launch the stream tasks, if it is a follower or not restored vnode.
if (!(vnodeIsRoleLeader(pTq->pVnode) && pTq->pVnode->restored)) {
if (!(vnodeIsRoleLeader(pTq->pVnode) && alreadyRestored)) {
return TSDB_CODE_SUCCESS;
}

Expand Down Expand Up @@ -256,7 +258,9 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
return -1;
}

tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d", vgId, numOfTasks);
tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, restored:%d", vgId, numOfTasks,
alreadyRestored);

pRunReq->head.vgId = vgId;
pRunReq->streamId = 0;
pRunReq->taskId = STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID;
Expand Down
3 changes: 2 additions & 1 deletion source/libs/stream/inc/streamInt.h
Expand Up @@ -75,7 +75,8 @@ struct STokenBucket {
double quotaCapacity; // available capacity for maximum input size, KiloBytes per Second
double quotaRemain; // not consumed bytes per second
double quotaRate; // number of token per second
int64_t fillTimestamp; // fill timestamp
int64_t tokenFillTimestamp; // fill timestamp
int64_t quotaFillTimestamp; // fill timestamp
};

struct SStreamQueue {
Expand Down
2 changes: 1 addition & 1 deletion source/libs/stream/src/streamCheckpoint.c
Expand Up @@ -299,7 +299,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
continue;
}

ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId &&
ASSERT(p->chkInfo.checkpointId <= p->checkpointingId && p->checkpointingId == checkpointId &&
p->chkInfo.checkpointVer <= p->chkInfo.processedVer);

p->chkInfo.checkpointId = p->checkpointingId;
Expand Down
20 changes: 12 additions & 8 deletions source/libs/stream/src/streamQueue.c
Expand Up @@ -388,32 +388,36 @@ int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t
pBucket->quotaCapacity = quotaRate * MAX_SMOOTH_BURST_RATIO;
pBucket->quotaRemain = pBucket->quotaCapacity;

pBucket->fillTimestamp = taosGetTimestampMs();
pBucket->tokenFillTimestamp = taosGetTimestampMs();
pBucket->quotaFillTimestamp = taosGetTimestampMs();
stDebug("s-task:%s sink quotaRate:%.2fMiB, numRate:%d", id, quotaRate, numRate);
return TSDB_CODE_SUCCESS;
}

static void fillTokenBucket(STokenBucket* pBucket, const char* id) {
int64_t now = taosGetTimestampMs();
int64_t delta = now - pBucket->fillTimestamp;

int64_t deltaToken = now - pBucket->tokenFillTimestamp;
ASSERT(pBucket->numOfToken >= 0);

int32_t incNum = (delta / 1000.0) * pBucket->numRate;
int32_t incNum = (deltaToken / 1000.0) * pBucket->numRate;
if (incNum > 0) {
pBucket->numOfToken = TMIN(pBucket->numOfToken + incNum, pBucket->numCapacity);
pBucket->fillTimestamp = now;
pBucket->tokenFillTimestamp = now;
}

// increase the new available quota as time goes on
double incSize = (delta / 1000.0) * pBucket->quotaRate;
int64_t deltaQuota = now - pBucket->quotaFillTimestamp;
double incSize = (deltaQuota / 1000.0) * pBucket->quotaRate;
if (incSize > 0) {
pBucket->quotaRemain = TMIN(pBucket->quotaRemain + incSize, pBucket->quotaCapacity);
pBucket->fillTimestamp = now;
pBucket->quotaFillTimestamp = now;
}

if (incNum > 0 || incSize > 0) {
stTrace("token/quota available, token:%d inc:%d, quota:%.2fMiB inc:%.3fMiB, ts:%" PRId64 " idle:%" PRId64 "ms, %s",
pBucket->numOfToken, incNum, pBucket->quotaRemain, incSize, now, delta, id);
stTrace("token/quota available, token:%d inc:%d, token_TsDelta:%" PRId64
", quota:%.2fMiB inc:%.3fMiB quotaTs:%" PRId64 " now:%" PRId64 "ms, %s",
pBucket->numOfToken, incNum, deltaToken, pBucket->quotaRemain, incSize, deltaQuota, now, id);
}
}

Expand Down
1 change: 0 additions & 1 deletion source/libs/stream/src/streamState.c
Expand Up @@ -1086,7 +1086,6 @@ int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, ch
}

int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
stDebug("try to write to cf parname");
#ifdef USE_ROCKSDB
if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) {
if (tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)) == NULL) {
Expand Down
2 changes: 1 addition & 1 deletion tests/system-test/0-others/information_schema.py
Expand Up @@ -217,7 +217,7 @@ def ins_columns_check(self):
tdSql.checkEqual(20470,len(tdSql.queryResult))

tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'")
tdSql.checkEqual(208, len(tdSql.queryResult))
tdSql.checkEqual(210, len(tdSql.queryResult))

tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'")
tdSql.checkEqual(54, len(tdSql.queryResult))
Expand Down

0 comments on commit 8c4558c

Please sign in to comment.