Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: refactor the fill history operation #21735

Merged
merged 99 commits into from Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
99 commits
Select commit Hold shift + click to select a range
2977a4d
refactor: do some internal refactor.
hjxilinx May 30, 2023
be8fd9e
enh(stream): create additional task for history data processing.
hjxilinx May 30, 2023
d9c364d
Merge branch '3.0' into refact/fillhistory
hjxilinx May 30, 2023
eda0608
enh(stream): refactor and serialize the attributes of history tasks.
hjxilinx May 30, 2023
e237b4a
refactor: do some internal refactor.
hjxilinx May 31, 2023
bbb6359
fix(query): add check for version range when dumping partial rows of …
hjxilinx May 31, 2023
077e1a1
Merge branch '3.0' into refact/fillhistory
hjxilinx Jun 5, 2023
fcc706c
enh(stream): refactor the fill history task.
hjxilinx Jun 5, 2023
e8549ce
refactor: do some internal refactor.
hjxilinx Jun 5, 2023
0dd9330
enh(stream): make history task for stream running.
hjxilinx Jun 5, 2023
6a3c935
fix(stream): fix error in fill history scan.
hjxilinx Jun 7, 2023
06cf358
refactor: do some internal refactor.
hjxilinx Jun 7, 2023
cb26dd9
refactor(stream): do some internal refactor.
hjxilinx Jun 8, 2023
f582705
refactor: do some internal refactor.
hjxilinx Jun 8, 2023
2b6ace6
refactor: do some internal refactor.
hjxilinx Jun 8, 2023
6c13491
refactor: do some internal refactor.
hjxilinx Jun 8, 2023
6726eda
fix(stream): add more logs.
hjxilinx Jun 8, 2023
c9aa59c
refactor: do some internal refactor.
hjxilinx Jun 8, 2023
0fd11aa
other: add some logs.
hjxilinx Jun 8, 2023
53246ed
other: merge other branch.
hjxilinx Jun 9, 2023
63ef045
fix: fix syntax error.
hjxilinx Jun 9, 2023
69c9eda
fix(stream): fix race condition.
hjxilinx Jun 9, 2023
78a240b
enh(stream): add dispatch msg.
hjxilinx Jun 12, 2023
fb24ed1
enh(stream): set correct task status.
hjxilinx Jun 12, 2023
c58bde9
stream operator fill history
54liuyao Jun 13, 2023
97da2a8
fix(stream): set the fill history status.
hjxilinx Jun 13, 2023
e392fb1
Merge remote-tracking branch 'origin/refact/fillhistory' into refact/…
hjxilinx Jun 13, 2023
b44447e
enh(stream): support restore from disk.
hjxilinx Jun 14, 2023
5282492
refactor: do some internal refactor.
hjxilinx Jun 14, 2023
11f0c3b
refactor: do some internal refactor.
hjxilinx Jun 14, 2023
7c6fbd7
enh(stream): do some internal refactor and support secondary scan for…
hjxilinx Jun 14, 2023
9a3708e
pause&resume fill history
54liuyao Jun 14, 2023
53377c2
fix(stream): wait for stream task completed.
hjxilinx Jun 14, 2023
1302874
refactor: do some internal refactor.
hjxilinx Jun 15, 2023
5572415
refactor: remove one function.
hjxilinx Jun 15, 2023
1742874
fix(test): fix unit test error.
hjxilinx Jun 15, 2023
3b2f2f0
fix(test): fix link error in unit test.
hjxilinx Jun 15, 2023
693942b
trans state
54liuyao Jun 15, 2023
74e0138
Merge branch '3.0' into refact/fillhistory
hjxilinx Jun 15, 2023
f3df37b
fix(stream): fix memory leak.
hjxilinx Jun 15, 2023
a3e1882
fix: fix dead lock.
hjxilinx Jun 15, 2023
0aa141e
use stream task state
54liuyao Jun 15, 2023
9f91717
support fill history
Jun 15, 2023
8cb8c05
support fill history
Jun 15, 2023
0ef8afb
support fill history
Jun 16, 2023
f4b9d63
support fill history
Jun 16, 2023
9338012
fix(stream): fix memory leak and failed to close vnode.
hjxilinx Jun 16, 2023
4c82558
Merge remote-tracking branch 'origin/refact/fillhistory' into refact/…
hjxilinx Jun 16, 2023
1e05f5c
support fill history
Jun 16, 2023
6436566
fix(stream): update the info, and do some internal refactor.
hjxilinx Jun 16, 2023
0eeaab9
Merge branch 'refact/fillhistory1' into refact/fillhistory
Jun 19, 2023
41c1939
fill history pause&resume
54liuyao Jun 19, 2023
870d75b
fill history pause&resume
54liuyao Jun 20, 2023
b6d4e98
fix restart crash
Jun 20, 2023
aafbdcb
set task status
54liuyao Jun 20, 2023
74bea44
stream op transform
54liuyao Jun 20, 2023
3cfda2c
scan op transform
54liuyao Jun 20, 2023
fbed0ac
free mem
54liuyao Jun 20, 2023
1bed2b1
free mem
54liuyao Jun 21, 2023
d23c80b
free mem
54liuyao Jun 21, 2023
13d9136
add ci
54liuyao Jun 21, 2023
ea7b8e9
fix(stream): fix memory leak.
hjxilinx Jun 21, 2023
ceb78fd
Merge branch '3.0' into refact/fillhistory
hjxilinx Jun 24, 2023
a8de369
Merge branch 'refact/fillhistory' of github.com:taosdata/tdengine int…
hjxilinx Jun 24, 2023
0322fdc
fix(stream): fix memory leak.
hjxilinx Jun 24, 2023
7c23630
fix(stream): set correct stream error code.
hjxilinx Jun 25, 2023
6857881
Merge branch '3.0' into refact/fillhistory
hjxilinx Jun 25, 2023
6ac1795
fix: fix syntax error.
hjxilinx Jun 25, 2023
e9d3198
mem leak
54liuyao Jun 25, 2023
c950c73
Merge branch 'refact/fillhistory' of https://github.com/taosdata/TDen…
54liuyao Jun 25, 2023
7e7feef
fix: fix memory leak.
hjxilinx Jun 25, 2023
e236bf8
set fill history info
54liuyao Jun 25, 2023
000325f
fix(stream): fix memory leak.
hjxilinx Jun 25, 2023
9d8f6f3
Merge remote-tracking branch 'origin/refact/fillhistory' into refact/…
hjxilinx Jun 25, 2023
402c091
fix(stream): fix sma error.
hjxilinx Jun 25, 2023
f9be16b
fix: update test case.
hjxilinx Jun 25, 2023
daafe24
fix(stream): fix memory leak.
hjxilinx Jun 26, 2023
437eb93
fix(stream): fix error while fill history exists.
hjxilinx Jun 26, 2023
b73444b
fix(stream): start stream task in case of scan history completing.
hjxilinx Jun 27, 2023
3289ad6
other: merge 3.0
hjxilinx Jun 27, 2023
32fddef
fix(stream): fix error retrieve data from source task.And optimize th…
hjxilinx Jun 27, 2023
4bb78df
fix(stream): reduce the sleep time.
hjxilinx Jun 27, 2023
a19e63f
fix(stream): fix error in handling fill history.
hjxilinx Jun 27, 2023
3710ea4
refactor: do some internal refactor.
hjxilinx Jun 28, 2023
57fcd55
refactor: do some internal refactor.
hjxilinx Jun 28, 2023
ed900d6
Merge branch '3.0' into refact/fillhistory
hjxilinx Jun 28, 2023
f430960
fix(stream): fix syntax error.
hjxilinx Jun 28, 2023
78739e4
fix(stream): fix syntax error.
hjxilinx Jun 28, 2023
8867a57
fix(stream): fix syntax error.
hjxilinx Jun 28, 2023
2fc5eeb
session win range
54liuyao Jun 28, 2023
863da2a
set task status of ins_stream_tasks
54liuyao Jul 3, 2023
bc5e709
set task status of ins_stream_tasks
54liuyao Jul 3, 2023
2005756
Merge branch '3.0' into refact/fillhistory
hjxilinx Jul 3, 2023
2dbccc1
refactor: do some internal refactor.
hjxilinx Jul 3, 2023
393b36d
Merge branch 'refact/fillhistory' of github.com:taosdata/tdengine int…
hjxilinx Jul 3, 2023
418849e
test: disable stream meta storage format compatible test cases.
hjxilinx Jul 4, 2023
77530e1
refactor: record the downstream input Queue blocking time.
hjxilinx Jul 4, 2023
86b1e49
test: update test cases.
hjxilinx Jul 4, 2023
ff2bf35
fix(stream): fix error in set the version range for secondary scan.
hjxilinx Jul 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions cmake/rocksdb_CMakeLists.txt.in
Expand Up @@ -5,8 +5,8 @@ if (${BUILD_CONTRIB})
URL https://github.com/facebook/rocksdb/archive/refs/tags/v8.1.1.tar.gz
URL_HASH MD5=3b4c97ee45df9c8a5517308d31ab008b
DOWNLOAD_NO_PROGRESS 1
DOWNLOAD_DIR "${TD_CONTRIB_DIR}/deps-download"
SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb"
DOWNLOAD_DIR "${TD_CONTRIB_DIR}/deps-download"
SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb"
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
Expand All @@ -18,8 +18,8 @@ else()
URL https://github.com/facebook/rocksdb/archive/refs/tags/v8.1.1.tar.gz
URL_HASH MD5=3b4c97ee45df9c8a5517308d31ab008b
DOWNLOAD_NO_PROGRESS 1
DOWNLOAD_DIR "${TD_CONTRIB_DIR}/deps-download"
SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb"
DOWNLOAD_DIR "${TD_CONTRIB_DIR}/deps-download"
SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb"
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
Expand Down
5 changes: 5 additions & 0 deletions include/common/tcommon.h
Expand Up @@ -54,6 +54,11 @@ typedef struct SSessionKey {
uint64_t groupId;
} SSessionKey;

typedef struct SVersionRange {
uint64_t minVer;
uint64_t maxVer;
} SVersionRange;

static inline int winKeyCmprImpl(const void* pKey1, const void* pKey2) {
SWinKey* pWin1 = (SWinKey*)pKey1;
SWinKey* pWin2 = (SWinKey*)pKey2;
Expand Down
10 changes: 3 additions & 7 deletions include/common/tdatablock.h
Expand Up @@ -177,7 +177,6 @@ static FORCE_INLINE void colDataSetDouble(SColumnInfoData* pColumnInfoData, uint
int32_t getJsonValueLen(const char* data);

int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull);
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull);
int32_t colDataReassignVal(SColumnInfoData* pColumnInfoData, uint32_t dstRowIdx, uint32_t srcRowIdx, const char* pData);
int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, uint32_t numOfRows, bool trimValue);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity,
Expand All @@ -187,6 +186,7 @@ int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* p
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex);

int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows);

int32_t colDataGetRowLength(const SColumnInfoData* pColumnInfoData, int32_t rowIdx);
void colDataTrim(SColumnInfoData* pColumnInfoData);

Expand All @@ -208,7 +208,6 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock);
size_t blockDataGetSerialMetaSize(uint32_t numOfCols);

int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo);
int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);

int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload);
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
Expand Down Expand Up @@ -237,11 +236,10 @@ int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColIn
SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId);
SColumnInfoData* bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index);

int32_t blockGetEncodeSize(const SSDataBlock* pBlock);
int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols);
const char* blockDecode(SSDataBlock* pBlock, const char* pData);

void blockDebugShowDataBlock(SSDataBlock* pBlock, const char* flag);
void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag);
// for debug
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf);

Expand All @@ -251,9 +249,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pData
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId);
int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf);

static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
return blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock)) + blockDataGetSize(pBlock);
}
void trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList);

#ifdef __cplusplus
}
Expand Down
7 changes: 4 additions & 3 deletions include/common/tmsgdef.h
Expand Up @@ -252,7 +252,9 @@ enum {
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_UNUSED1, "stream-unused1", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_RECOVER_FINISH, "stream-recover-finish", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_SCAN_HISTORY, "stream-scan-history", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_SCAN_HISTORY_FINISH, "stream-scan-history-finish", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TRANSFER_STATE, "stream-transfer-state", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECK, "stream-task-check", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECKPOINT, "stream-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_REPORT_CHECKPOINT, "stream-report-checkpoint", NULL, NULL)
Expand Down Expand Up @@ -297,8 +299,7 @@ enum {

TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE, "vnode-stream-recover1", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, "vnode-stream-recover2", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY, "vnode-stream-scan-history", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_CHECK_POINT_SOURCE, "vnode-stream-checkpoint-source", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL)

Expand Down
28 changes: 15 additions & 13 deletions include/libs/executor/executor.h
Expand Up @@ -55,6 +55,9 @@ typedef struct {

void* pStateBackend;
struct SStorageAPI api;

int8_t fillHistory;
STimeWindow winRange;
} SReadHandle;

// in queue mode, data streams are seperated by msg
Expand Down Expand Up @@ -193,14 +196,6 @@ int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t le
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order);
void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery);
STimeWindow getAlignQueryTimeWindow(const SInterval* pInterval, int64_t key);
/**
* return the scan info, in the form of tuple of two items, including table uid and current timestamp
* @param tinfo
* @param uid
* @param ts
* @return
*/
int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts);

SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo);

Expand All @@ -220,15 +215,22 @@ void* qExtractReaderFromStreamScanner(void* scanner);

int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner);

int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo);
int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver);
int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver);
int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo);
int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow);
int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow);
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
int32_t qStreamRestoreParam(qTaskInfo_t tinfo);
int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo);
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo);
void qStreamCloseTsdbReader(void* task);
bool qStreamRecoverScanStep1Finished(qTaskInfo_t tinfo);
bool qStreamRecoverScanStep2Finished(qTaskInfo_t tinfo);
int32_t qStreamRecoverSetAllStepFinished(qTaskInfo_t tinfo);
void resetTaskInfo(qTaskInfo_t tinfo);

void qResetStreamInfoTimeWindow(qTaskInfo_t tinfo);

int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo);
int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo);

#ifdef __cplusplus
}
#endif
Expand Down
26 changes: 2 additions & 24 deletions include/libs/executor/storageapi.h
Expand Up @@ -234,29 +234,6 @@ typedef struct SStoreSnapshotFn {
int32_t (*getTableInfoFromSnapshot)(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid);
} SStoreSnapshotFn;

/**
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags);
void metaReaderReleaseLock(SMetaReader *pReader);
void metaReaderClear(SMetaReader *pReader);
int32_t metaReaderGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
int32_t metaReaderGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid);
int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList);
const void *metaGetTableTagVal(void *tag, int16_t type, STagVal *tagVal);
int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName);

int metaGetTableUidByName(void *meta, char *tbName, uint64_t *uid);
int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType);
bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid);
int32_t metaGetCachedTableUidList(SMeta *pMeta, tb_uid_t suid, const uint8_t *key, int32_t keyLen, SArray *pList,
bool *acquired);
int32_t metaUidFilterCachePut(SMeta *pMeta, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload,
int32_t payloadLen, double selectivityRatio);
tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name);
int32_t metaGetCachedTbGroup(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList);
int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t
payloadLen);
*/

typedef struct SStoreMeta {
SMTbCursor* (*openTableMetaCursor)(void* pVnode); // metaOpenTbCursor
void (*closeTableMetaCursor)(SMTbCursor* pTbCur); // metaCloseTbCursor
Expand Down Expand Up @@ -403,7 +380,7 @@ typedef struct SStateStore {
SStreamStateCur* (*streamStateSessionSeekKeyCurrentNext)(SStreamState* pState, const SSessionKey* key);

struct SStreamFileState* (*streamFileStateInit)(int64_t memSize, uint32_t keySize, uint32_t rowSize,
uint32_t selectRowSize, GetTsFun fp, void* pFile, TSKEY delMark);
uint32_t selectRowSize, GetTsFun fp, void* pFile, TSKEY delMark, const char*id);

void (*streamFileStateDestroy)(struct SStreamFileState* pFileState);
void (*streamFileStateClear)(struct SStreamFileState* pFileState);
Expand All @@ -415,6 +392,7 @@ typedef struct SStateStore {
int32_t (*streamStateCommit)(SStreamState* pState);
void (*streamStateDestroy)(SStreamState* pState, bool remove);
int32_t (*streamStateDeleteCheckPoint)(SStreamState* pState, TSKEY mark);
void (*streamStateReloadInfo)(SStreamState* pState, TSKEY ts);
} SStateStore;

typedef struct SStorageAPI {
Expand Down
52 changes: 30 additions & 22 deletions include/libs/function/function.h
Expand Up @@ -129,30 +129,38 @@ typedef struct SSerializeDataHandle {
} SSerializeDataHandle;

// incremental state storage

typedef struct SBackendCfWrapper {
void *rocksdb;
void **pHandle;
void *writeOpts;
void *readOpts;
void **cfOpts;
void *dbOpt;
void *param;
void *env;
SListNode *pComparNode;
void *pBackend;
void *compactFactory;
TdThreadRwlock rwLock;
bool remove;
int64_t backendId;
char idstr[64];
} SBackendCfWrapper;
typedef struct STdbState {
void *rocksdb;
void **pHandle;
void *writeOpts;
void *readOpts;
void **cfOpts;
void *dbOpt;
SBackendCfWrapper *pBackendCfWrapper;
int64_t backendCfWrapperId;
char idstr[64];

struct SStreamTask *pOwner;
void *param;
void *env;
SListNode *pComparNode;
void *pBackend;
char idstr[64];
void *compactFactory;
TdThreadRwlock rwLock;

void *db;
void *pStateDb;
void *pFuncStateDb;
void *pFillStateDb; // todo refactor
void *pSessionStateDb;
void *pParNameDb;
void *pParTagDb;
void *txn;
void *db;
void *pStateDb;
void *pFuncStateDb;
void *pFillStateDb; // todo refactor
void *pSessionStateDb;
void *pParNameDb;
void *pParTagDb;
void *txn;
} STdbState;

typedef struct {
Expand Down
2 changes: 2 additions & 0 deletions include/libs/stream/streamState.h
Expand Up @@ -138,6 +138,8 @@ int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char* tbname);
int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal);

void streamStateReloadInfo(SStreamState* pState, TSKEY ts);

/***compare func **/

typedef struct SStateChekpoint {
Expand Down