Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(tmq): use tdb to store check info #16177

Merged
merged 1 commit into from
Aug 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions include/common/tmsg.h
Original file line number Diff line number Diff line change
Expand Up @@ -2555,10 +2555,14 @@ typedef struct {
char topic[TSDB_TOPIC_FNAME_LEN];
int64_t ntbUid;
SArray* colIdList; // SArray<int16_t>
} SCheckAlterInfo;
} STqCheckInfo;

int32_t tEncodeSCheckAlterInfo(SEncoder* pEncoder, const SCheckAlterInfo* pInfo);
int32_t tDecodeSCheckAlterInfo(SDecoder* pDecoder, SCheckAlterInfo* pInfo);
int32_t tEncodeSTqCheckInfo(SEncoder* pEncoder, const STqCheckInfo* pInfo);
int32_t tDecodeSTqCheckInfo(SDecoder* pDecoder, STqCheckInfo* pInfo);

typedef struct {
char topic[TSDB_TOPIC_FNAME_LEN];
} STqDelCheckInfoReq;

typedef struct {
int32_t vgId;
Expand Down
3 changes: 2 additions & 1 deletion include/common/tmsgdef.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_DELETE, "vnode-mq-vg-delete", SMqVDeleteReq, SMqVDeleteRsp)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_COMMIT_OFFSET, "vnode-commit-offset", STqOffset, STqOffset)
TD_DEF_MSG_TYPE(TDMT_VND_CHECK_ALTER_INFO, "vnode-alter-check-info", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ADD_CHECK_INFO, "vnode-add-check-info", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DELETE_CHECK_INFO, "vnode-delete-check-info", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TOPIC, "vnode-alter-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL)
Expand Down
2 changes: 1 addition & 1 deletion include/libs/stream/tstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
void streamMetaClose(SStreamMeta* streamMeta);

int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, char* msg, int32_t msgLen);
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen);
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId);

Expand Down
8 changes: 2 additions & 6 deletions source/common/src/tmsg.c
Original file line number Diff line number Diff line change
Expand Up @@ -4262,7 +4262,6 @@ int32_t tDeserializeSServerStatusRsp(void *buf, int32_t bufLen, SServerStatusRsp
tDecoderClear(&decoder);
return 0;
}

int32_t tEncodeSMqOffset(SEncoder *encoder, const SMqOffset *pOffset) {
if (tEncodeI32(encoder, pOffset->vgId) < 0) return -1;
if (tEncodeI64(encoder, pOffset->offset) < 0) return -1;
Expand Down Expand Up @@ -4300,7 +4299,6 @@ int32_t tDecodeSMqCMCommitOffsetReq(SDecoder *decoder, SMqCMCommitOffsetReq *pRe
tEndDecode(decoder);
return 0;
}

int32_t tSerializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
Expand Down Expand Up @@ -5590,7 +5588,6 @@ int32_t tDecodeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffsetVal) {
return 0;
}

#if 1
int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
if (pVal->type == TMQ_OFFSET__RESET_NONE) {
snprintf(buf, maxLen, "offset(reset to none)");
Expand All @@ -5609,7 +5606,6 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
}
return 0;
}
#endif

bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) {
if (pLeft->type == pRight->type) {
Expand Down Expand Up @@ -5643,7 +5639,7 @@ int32_t tDecodeSTqOffset(SDecoder *pDecoder, STqOffset *pOffset) {
return 0;
}

int32_t tEncodeSCheckAlterInfo(SEncoder *pEncoder, const SCheckAlterInfo *pInfo) {
int32_t tEncodeSTqCheckInfo(SEncoder *pEncoder, const STqCheckInfo *pInfo) {
if (tEncodeCStr(pEncoder, pInfo->topic) < 0) return -1;
if (tEncodeI64(pEncoder, pInfo->ntbUid) < 0) return -1;
int32_t sz = taosArrayGetSize(pInfo->colIdList);
Expand All @@ -5655,7 +5651,7 @@ int32_t tEncodeSCheckAlterInfo(SEncoder *pEncoder, const SCheckAlterInfo *pInfo)
return pEncoder->pos;
}

int32_t tDecodeSCheckAlterInfo(SDecoder *pDecoder, SCheckAlterInfo *pInfo) {
int32_t tDecodeSTqCheckInfo(SDecoder *pDecoder, STqCheckInfo *pInfo) {
if (tDecodeCStrTo(pDecoder, pInfo->topic) < 0) return -1;
if (tDecodeI64(pDecoder, &pInfo->ntbUid) < 0) return -1;
int32_t sz;
Expand Down
3 changes: 2 additions & 1 deletion source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CHECK_ALTER_INFO_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ADD_CHECK_INFO_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE_CHECK_INFO_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
Expand Down
3 changes: 2 additions & 1 deletion source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,8 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CHECK_ALTER_INFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ADD_CHECK_INFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE_CHECK_INFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
Expand Down
4 changes: 2 additions & 2 deletions source/dnode/mnode/impl/src/mndOffset.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@

#define _DEFAULT_SOURCE
#include "mndOffset.h"
#include "mndPrivilege.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndPrivilege.h"
#include "mndShow.h"
#include "mndStb.h"
#include "mndTopic.h"
Expand Down Expand Up @@ -305,7 +305,7 @@ int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
sdbRelease(pSdb, pOffset);
}

return code;
return code;
}

int32_t mndDropOffsetByTopic(SMnode *pMnode, STrans *pTrans, const char *topic) {
Expand Down
44 changes: 39 additions & 5 deletions source/dnode/mnode/impl/src/mndTopic.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ int32_t mndInitTopic(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_CHECK_ALTER_INFO_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_ADD_CHECK_INFO_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DELETE_CHECK_INFO_RSP, mndTransProcessRsp);

mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic);
Expand Down Expand Up @@ -450,7 +451,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);

if (topicObj.ntbUid != 0) {
SCheckAlterInfo info;
STqCheckInfo info;
memcpy(info.topic, topicObj.name, TSDB_TOPIC_FNAME_LEN);
info.ntbUid = topicObj.ntbUid;
info.colIdList = topicObj.ntbColIds;
Expand All @@ -470,7 +471,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
// encoder check alter info
int32_t len;
int32_t code;
tEncodeSize(tEncodeSCheckAlterInfo, &info, len, code);
tEncodeSize(tEncodeSTqCheckInfo, &info, len, code);
if (code < 0) {
sdbRelease(pSdb, pVgroup);
mndTransDrop(pTrans);
Expand All @@ -481,7 +482,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, len);
if (tEncodeSCheckAlterInfo(&encoder, &info) < 0) {
if (tEncodeSTqCheckInfo(&encoder, &info) < 0) {
sdbRelease(pSdb, pVgroup);
mndTransDrop(pTrans);
return -1;
Expand All @@ -493,7 +494,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
action.pCont = buf;
action.contLen = sizeof(SMsgHead) + len;
action.msgType = TDMT_VND_CHECK_ALTER_INFO;
action.msgType = TDMT_VND_ADD_CHECK_INFO;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(buf);
sdbRelease(pSdb, pVgroup);
Expand Down Expand Up @@ -659,12 +660,14 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {

mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);

#if 0
if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) {
ASSERT(0);
mndTransDrop(pTrans);
mndReleaseTopic(pMnode, pTopic);
return -1;
}
#endif

// TODO check if rebalancing
if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) {
Expand All @@ -675,6 +678,37 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
return -1;
}

if (pTopic->ntbUid != 0) {
// broadcast to all vnode
void *pIter = NULL;
SVgObj *pVgroup = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
sdbRelease(pSdb, pVgroup);
continue;
}

void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN);
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId);
memcpy(abuf, pTopic->name, TSDB_TOPIC_FNAME_LEN);

STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
action.pCont = buf;
action.contLen = sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN;
action.msgType = TDMT_VND_DELETE_CHECK_INFO;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(buf);
sdbRelease(pSdb, pVgroup);
mndTransDrop(pTrans);
return -1;
}
}
}

int32_t code = mndDropTopic(pMnode, pTrans, pReq, pTopic);
mndReleaseTopic(pMnode, pTopic);

Expand Down
14 changes: 8 additions & 6 deletions source/dnode/vnode/src/inc/tq.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,15 @@ typedef struct {
struct STQ {
SVnode* pVnode;
char* path;
SHashObj* pushMgr; // consumerId -> STqHandle*
SHashObj* handles; // subKey -> STqHandle
SHashObj* pAlterInfo; // topic -> SAlterCheckInfo
SHashObj* pPushMgr; // consumerId -> STqHandle*
SHashObj* pHandle; // subKey -> STqHandle
SHashObj* pCheckInfo; // topic -> SAlterCheckInfo

STqOffsetStore* pOffsetStore;

TDB* pMetaStore;
TDB* pMetaDB;
TTB* pExecStore;

TTB* pAlterInfoStore;
TTB* pCheckStore;

SStreamMeta* pStreamMeta;
};
Expand Down Expand Up @@ -155,6 +154,9 @@ int32_t tqMetaClose(STQ* pTq);
int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle);
int32_t tqMetaDeleteHandle(STQ* pTq, const char* key);
int32_t tqMetaRestoreHandle(STQ* pTq);
int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen);
int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key);
int32_t tqMetaRestoreCheckInfo(STQ* pTq);

typedef struct {
int32_t size;
Expand Down
15 changes: 9 additions & 6 deletions source/dnode/vnode/src/inc/vnodeInt.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,16 @@ int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId);
int32_t tqProcessCheckAlterInfoReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ver);
// tq-mq
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessVgDeleteReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen);
// tq-stream
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data, int64_t ver);
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec);
Expand Down
Loading