Skip to content

Commit

Permalink
Merge branch 'main' into release/ver-3.3.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaopingcs committed May 10, 2024
2 parents 296edc2 + f7f9496 commit eb6094f
Show file tree
Hide file tree
Showing 36 changed files with 2,011 additions and 1,948 deletions.
10 changes: 5 additions & 5 deletions include/common/rsync.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ extern "C" {

#include "tarray.h"

void stopRsync();
void startRsync();
int uploadRsync(char* id, char* path);
int downloadRsync(char* id, char* path);
int deleteRsync(char* id);
void stopRsync();
void startRsync();
int32_t uploadRsync(const char* id, const char* path);
int32_t downloadRsync(const char* id, const char* path);
int32_t deleteRsync(const char* id);

#ifdef __cplusplus
}
Expand Down
1 change: 0 additions & 1 deletion include/dnode/vnode/tqCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
bool isLeader, bool restored);
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen);
int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader);
int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta);
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta);
int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta);
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
Expand Down
1 change: 0 additions & 1 deletion include/libs/executor/storageapi.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ extern "C" {
#define CACHESCAN_RETRIEVE_TYPE_SINGLE 0x2
#define CACHESCAN_RETRIEVE_LAST_ROW 0x4
#define CACHESCAN_RETRIEVE_LAST 0x8
#define CACHESCAN_RETRIEVE_PK 0x10

#define META_READER_LOCK 0x0
#define META_READER_NOLOCK 0x1
Expand Down
185 changes: 185 additions & 0 deletions include/libs/stream/streammsg.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

#ifndef TDENGINE_STREAMMSG_H
#define TDENGINE_STREAMMSG_H

#include "tmsg.h"

#ifdef __cplusplus
extern "C" {
#endif

typedef struct SStreamChildEpInfo {
int32_t nodeId;
int32_t childId;
int32_t taskId;
SEpSet epSet;
bool dataAllowed; // denote if the data from this upstream task is allowed to put into inputQ, not serialize it
int64_t stage; // upstream task stage value, to denote if the upstream node has restart/replica changed/transfer
} SStreamChildEpInfo;

int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);

// mndTrigger: denote if this checkpoint is triggered by mnode or as requested from tasks when transfer-state finished
typedef struct {
int64_t streamId;
int64_t checkpointId;
int32_t taskId;
int32_t nodeId;
SEpSet mgmtEps;
int32_t mnodeId;
int32_t transId;
int8_t mndTrigger;
int64_t expireTime;
} SStreamCheckpointSourceReq;

int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq);
int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq);

typedef struct {
int64_t streamId;
int64_t checkpointId;
int32_t taskId;
int32_t nodeId;
int32_t mnodeId;
int64_t expireTime;
int8_t success;
} SStreamCheckpointSourceRsp;

int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp);

typedef struct SStreamTaskNodeUpdateMsg {
int32_t transId; // to identify the msg
int64_t streamId;
int32_t taskId;
SArray* pNodeList; // SArray<SNodeUpdateInfo>
} SStreamTaskNodeUpdateMsg;

int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg);
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg);

typedef struct {
int64_t reqId;
int64_t stage;
int64_t streamId;
int32_t upstreamNodeId;
int32_t upstreamTaskId;
int32_t downstreamNodeId;
int32_t downstreamTaskId;
int32_t childId;
} SStreamTaskCheckReq;

int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq);
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq);

typedef struct {
int64_t reqId;
int64_t streamId;
int32_t upstreamNodeId;
int32_t upstreamTaskId;
int32_t downstreamNodeId;
int32_t downstreamTaskId;
int32_t childId;
int64_t oldStage;
int8_t status;
} SStreamTaskCheckRsp;

int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp);
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp);

typedef struct {
SMsgHead msgHead;
int64_t streamId;
int64_t checkpointId;
int32_t downstreamTaskId;
int32_t downstreamNodeId;
int32_t upstreamTaskId;
int32_t upstreamNodeId;
int32_t childId;
} SStreamCheckpointReadyMsg;

int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pRsp);
int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp);

struct SStreamDispatchReq {
int32_t type;
int64_t stage; // nodeId from upstream task
int64_t streamId;
int32_t taskId;
int32_t msgId; // msg id to identify if the incoming msg from the same sender
int32_t srcVgId;
int32_t upstreamTaskId;
int32_t upstreamChildId;
int32_t upstreamNodeId;
int32_t upstreamRelTaskId;
int32_t blockNum;
int64_t totalLen;
SArray* dataLen; // SArray<int32_t>
SArray* data; // SArray<SRetrieveTableRsp*>
};

int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const struct SStreamDispatchReq* pReq);
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, struct SStreamDispatchReq* pReq);
void tCleanupStreamDispatchReq(struct SStreamDispatchReq* pReq);

struct SStreamRetrieveReq {
int64_t streamId;
int64_t reqId;
int32_t srcTaskId;
int32_t srcNodeId;
int32_t dstTaskId;
int32_t dstNodeId;
int32_t retrieveLen;
SRetrieveTableRsp* pRetrieve;
};

int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const struct SStreamRetrieveReq* pReq);
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, struct SStreamRetrieveReq* pReq);
void tCleanupStreamRetrieveReq(struct SStreamRetrieveReq* pReq);

typedef struct SStreamTaskCheckpointReq {
int64_t streamId;
int32_t taskId;
int32_t nodeId;
} SStreamTaskCheckpointReq;

int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq);
int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq);

typedef struct SStreamHbMsg {
int32_t vgId;
int32_t numOfTasks;
SArray* pTaskStatus; // SArray<STaskStatusEntry>
SArray* pUpdateNodes; // SArray<int32_t>, needs update the epsets in stream tasks for those nodes.
} SStreamHbMsg;

int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp);
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pRsp);
void tCleanupStreamHbMsg(SStreamHbMsg* pMsg);

typedef struct {
SMsgHead head;
int64_t streamId;
int32_t taskId;
int32_t reqType;
} SStreamTaskRunReq;

#ifdef __cplusplus
}
#endif

#endif // TDENGINE_STREAMMSG_H

0 comments on commit eb6094f

Please sign in to comment.