Skip to content

Commit

Permalink
Merge pull request #22661 from taosdata/enh/reserve
Browse files Browse the repository at this point in the history
stream change ver
  • Loading branch information
hjxilinx committed Aug 31, 2023
2 parents 7bd5439 + 83a5e2b commit 8645f76
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 81 deletions.
3 changes: 2 additions & 1 deletion include/libs/stream/tstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ extern "C" {

typedef struct SStreamTask SStreamTask;

#define SSTREAM_TASK_VER 1
#define SSTREAM_TASK_VER 2
enum {
STREAM_STATUS__NORMAL = 0,
STREAM_STATUS__STOP,
Expand Down Expand Up @@ -371,6 +371,7 @@ struct SStreamTask {
int32_t transferStateAlignCnt;
struct SStreamMeta* pMeta;
SSHashObj* pNameMap;
char reserve[256];
};

typedef struct SMetaHbInfo {
Expand Down
64 changes: 33 additions & 31 deletions source/dnode/mnode/impl/inc/mndDef.h
Original file line number Diff line number Diff line change
Expand Up @@ -615,25 +615,25 @@ void tDeleteSubscribeObj(SMqSubscribeObj* pSub);
int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeObj* pSub);
void* tDecodeSubscribeObj(const void* buf, SMqSubscribeObj* pSub, int8_t sver);

//typedef struct {
// int32_t epoch;
// SArray* consumers; // SArray<SMqConsumerEp*>
//} SMqSubActionLogEntry;

//SMqSubActionLogEntry* tCloneSMqSubActionLogEntry(SMqSubActionLogEntry* pEntry);
//void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry* pEntry);
//int32_t tEncodeSMqSubActionLogEntry(void** buf, const SMqSubActionLogEntry* pEntry);
//void* tDecodeSMqSubActionLogEntry(const void* buf, SMqSubActionLogEntry* pEntry);
// typedef struct {
// int32_t epoch;
// SArray* consumers; // SArray<SMqConsumerEp*>
// } SMqSubActionLogEntry;

// SMqSubActionLogEntry* tCloneSMqSubActionLogEntry(SMqSubActionLogEntry* pEntry);
// void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry* pEntry);
// int32_t tEncodeSMqSubActionLogEntry(void** buf, const SMqSubActionLogEntry* pEntry);
// void* tDecodeSMqSubActionLogEntry(const void* buf, SMqSubActionLogEntry* pEntry);
//
//typedef struct {
// char key[TSDB_SUBSCRIBE_KEY_LEN];
// SArray* logs; // SArray<SMqSubActionLogEntry*>
//} SMqSubActionLogObj;
// typedef struct {
// char key[TSDB_SUBSCRIBE_KEY_LEN];
// SArray* logs; // SArray<SMqSubActionLogEntry*>
// } SMqSubActionLogObj;
//
//SMqSubActionLogObj* tCloneSMqSubActionLogObj(SMqSubActionLogObj* pLog);
//void tDeleteSMqSubActionLogObj(SMqSubActionLogObj* pLog);
//int32_t tEncodeSMqSubActionLogObj(void** buf, const SMqSubActionLogObj* pLog);
//void* tDecodeSMqSubActionLogObj(const void* buf, SMqSubActionLogObj* pLog);
// SMqSubActionLogObj* tCloneSMqSubActionLogObj(SMqSubActionLogObj* pLog);
// void tDeleteSMqSubActionLogObj(SMqSubActionLogObj* pLog);
// int32_t tEncodeSMqSubActionLogObj(void** buf, const SMqSubActionLogObj* pLog);
// void* tDecodeSMqSubActionLogObj(const void* buf, SMqSubActionLogObj* pLog);

typedef struct {
int32_t oldConsumerNum;
Expand All @@ -647,12 +647,12 @@ typedef struct {
} SMqRebOutputVg;

typedef struct {
SArray* rebVgs; // SArray<SMqRebOutputVg>
SArray* newConsumers; // SArray<int64_t>
SArray* removedConsumers; // SArray<int64_t>
SArray* modifyConsumers; // SArray<int64_t>
SMqSubscribeObj* pSub;
// SMqSubActionLogEntry* pLogEntry;
SArray* rebVgs; // SArray<SMqRebOutputVg>
SArray* newConsumers; // SArray<int64_t>
SArray* removedConsumers; // SArray<int64_t>
SArray* modifyConsumers; // SArray<int64_t>
SMqSubscribeObj* pSub;
// SMqSubActionLogEntry* pLogEntry;
} SMqRebOutputObj;

typedef struct SStreamConf {
Expand All @@ -674,8 +674,8 @@ typedef struct {
int32_t totalLevel;
int64_t smaId; // 0 for unused
// info
int64_t uid;
int8_t status;
int64_t uid;
int8_t status;
SStreamConf conf;
// source and target
int64_t sourceDbUid;
Expand All @@ -690,13 +690,13 @@ typedef struct {
int32_t fixedSinkVgId; // 0 for shuffle

// transformation
char* sql;
char* ast;
char* physicalPlan;
SArray* tasks; // SArray<SArray<SStreamTask>>
char* sql;
char* ast;
char* physicalPlan;
SArray* tasks; // SArray<SArray<SStreamTask>>

SArray* pHTasksList; // generate the results for already stored ts data
int64_t hTaskUid; // stream task for history ts data
SArray* pHTasksList; // generate the results for already stored ts data
int64_t hTaskUid; // stream task for history ts data

SSchemaWrapper outputSchema;
SSchemaWrapper tagSchema;
Expand All @@ -709,6 +709,8 @@ typedef struct {

// 3.0.5.
int64_t checkpointId;
char reserve[256];

} SStreamObj;

int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj);
Expand Down
4 changes: 4 additions & 0 deletions source/dnode/mnode/impl/src/mndDef.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
// 3.0.50 ver = 3
if (tEncodeI64(pEncoder, pObj->checkpointId) < 0) return -1;

if (tEncodeCStrWithLen(pEncoder, pObj->reserve, sizeof(pObj->reserve) - 1) < 0) return -1;

tEndEncode(pEncoder);
return pEncoder->pos;
}
Expand Down Expand Up @@ -157,6 +159,8 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
if (sver >= 3) {
if (tDecodeI64(pDecoder, &pObj->checkpointId) < 0) return -1;
}
if (tDecodeCStrTo(pDecoder, pObj->reserve) < 0) return -1;

tEndDecode(pDecoder);
return 0;
}
Expand Down
29 changes: 16 additions & 13 deletions source/dnode/mnode/impl/src/mndStream.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/

#include "mndStream.h"
#include "audit.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
Expand All @@ -28,9 +29,8 @@
#include "parser.h"
#include "tmisce.h"
#include "tname.h"
#include "audit.h"

#define MND_STREAM_VER_NUMBER 3
#define MND_STREAM_VER_NUMBER 4
#define MND_STREAM_RESERVE_SIZE 64
#define MND_STREAM_MAX_NUM 60
#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint"
Expand Down Expand Up @@ -874,15 +874,18 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
code = TSDB_CODE_ACTION_IN_PROGRESS;

char detail[2000] = {0};
sprintf(detail, "checkpointFreq:%" PRId64 ", createStb:%d, deleteMark:%" PRId64 ", "
sprintf(detail,
"checkpointFreq:%" PRId64 ", createStb:%d, deleteMark:%" PRId64
", "
"fillHistory:%d, igExists:%d, "
"igExpired:%d, igUpdate:%d, lastTs:%" PRId64 ", "
"maxDelay:%" PRId64 ", numOfTags:%d, sourceDB:%s, "
"igExpired:%d, igUpdate:%d, lastTs:%" PRId64
", "
"maxDelay:%" PRId64
", numOfTags:%d, sourceDB:%s, "
"targetStbFullName:%s, triggerType:%d, watermark:%" PRId64,
createStreamReq.checkpointFreq, createStreamReq.createStb, createStreamReq.deleteMark,
createStreamReq.fillHistory, createStreamReq.igExists,
createStreamReq.igExpired, createStreamReq.igUpdate, createStreamReq.lastTs,
createStreamReq.maxDelay, createStreamReq.numOfTags, createStreamReq.sourceDB,
createStreamReq.fillHistory, createStreamReq.igExists, createStreamReq.igExpired, createStreamReq.igUpdate,
createStreamReq.lastTs, createStreamReq.maxDelay, createStreamReq.numOfTags, createStreamReq.sourceDB,
createStreamReq.targetStbFullName, createStreamReq.triggerType, createStreamReq.watermark);

auditRecord(pReq, pMnode->clusterId, "createStream", createStreamReq.name, "", detail);
Expand Down Expand Up @@ -2301,12 +2304,12 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
doExtractTasksFromStream(pMnode);
}

for(int32_t i = 0; i < req.numOfTasks; ++i) {
STaskStatusEntry* p = taosArrayGet(req.pTaskStatus, i);
int64_t k[2] = {p->streamId, p->taskId};
int32_t index = *(int32_t*) taosHashGet(execNodeList.pTaskMap, &k, sizeof(k));
for (int32_t i = 0; i < req.numOfTasks; ++i) {
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
int64_t k[2] = {p->streamId, p->taskId};
int32_t index = *(int32_t *)taosHashGet(execNodeList.pTaskMap, &k, sizeof(k));

STaskStatusEntry* pStatusEntry = taosArrayGet(execNodeList.pTaskList, index);
STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, index);
pStatusEntry->status = p->status;
if (p->status != TASK_STATUS__NORMAL) {
mDebug("received s-task:0x%x not in ready status:%s", p->taskId, streamGetTaskStatusStr(p->status));
Expand Down
74 changes: 38 additions & 36 deletions source/libs/stream/src/streamTask.c
Original file line number Diff line number Diff line change
Expand Up @@ -134,46 +134,12 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeCStr(pEncoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1;
}
if (tEncodeI64(pEncoder, pTask->triggerParam) < 0) return -1;
if (tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1) < 0) return -1;

tEndEncode(pEncoder);
return pEncoder->pos;
}

int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) {
int64_t ver;
int64_t skip64;
int8_t skip8;
int32_t skip32;
int16_t skip16;
SEpSet epSet;

if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &ver) < 0) return -1;

if (ver != SSTREAM_TASK_VER) return -1;

if (tDecodeI64(pDecoder, &skip64) < 0) return -1;
if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
if (tDecodeI16(pDecoder, &skip16) < 0) return -1;

if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
if (tDecodeI8(pDecoder, &skip8) < 0) return -1;

if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1;

if (tDecodeI64(pDecoder, &pChkpInfo->checkpointId) < 0) return -1;
if (tDecodeI64(pDecoder, &pChkpInfo->checkpointVer) < 0) return -1;

tEndDecode(pDecoder);
return 0;
}

int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->ver) < 0) return -1;
Expand Down Expand Up @@ -245,6 +211,42 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tDecodeCStrTo(pDecoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1;
}
if (tDecodeI64(pDecoder, &pTask->triggerParam) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pTask->reserve) < 0) return -1;

tEndDecode(pDecoder);
return 0;
}

int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) {
int64_t ver;
int64_t skip64;
int8_t skip8;
int32_t skip32;
int16_t skip16;
SEpSet epSet;

if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &ver) < 0) return -1;

if (ver != SSTREAM_TASK_VER) return -1;

if (tDecodeI64(pDecoder, &skip64) < 0) return -1;
if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
if (tDecodeI16(pDecoder, &skip16) < 0) return -1;

if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
if (tDecodeI8(pDecoder, &skip8) < 0) return -1;

if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1;

if (tDecodeI64(pDecoder, &pChkpInfo->checkpointId) < 0) return -1;
if (tDecodeI64(pDecoder, &pChkpInfo->checkpointVer) < 0) return -1;

tEndDecode(pDecoder);
return 0;
Expand Down Expand Up @@ -483,7 +485,7 @@ int32_t streamTaskStop(SStreamTask* pTask) {
pTask->status.taskStatus = TASK_STATUS__STOP;
qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);

while (/*pTask->status.schedStatus != TASK_SCHED_STATUS__INACTIVE */!streamTaskIsIdle(pTask)) {
while (/*pTask->status.schedStatus != TASK_SCHED_STATUS__INACTIVE */ !streamTaskIsIdle(pTask)) {
qDebug("s-task:%s level:%d wait for task to be idle, check again in 100ms", id, pTask->info.taskLevel);
taosMsleep(100);
}
Expand Down

0 comments on commit 8645f76

Please sign in to comment.