Skip to content

Commit

Permalink
merge main
Browse files Browse the repository at this point in the history
  • Loading branch information
xiao-yu-wang committed Apr 10, 2023
2 parents d1decca + 74abd01 commit e2f82f4
Show file tree
Hide file tree
Showing 177 changed files with 6,397 additions and 5,597 deletions.
2 changes: 1 addition & 1 deletion cmake/cmake.version
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER})
ELSE ()
SET(TD_VER_NUMBER "3.0.3.1")
SET(TD_VER_NUMBER "3.0.3.2")
ENDIF ()

IF (DEFINED VERCOMPATIBLE)
Expand Down
2 changes: 1 addition & 1 deletion cmake/taosadapter_CMakeLists.txt.in
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# taosadapter
ExternalProject_Add(taosadapter
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
GIT_TAG d8059ff
GIT_TAG cb1e89c
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
Expand Down
2 changes: 1 addition & 1 deletion cmake/taostools_CMakeLists.txt.in
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG 2864326
GIT_TAG 149ac34
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
Expand Down
3 changes: 0 additions & 3 deletions docs/en/07-develop/07-tmq.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,6 @@ You configure the following parameters when creating a consumer:
| `auto.offset.reset` | enum | Initial offset for the consumer group | Specify `earliest`, `latest`, or `none`(default) |
| `enable.auto.commit` | boolean | Commit automatically | Specify `true` or `false`. |
| `auto.commit.interval.ms` | integer | Interval for automatic commits, in milliseconds |
| `enable.heartbeat.background` | boolean | Backend heartbeat; if enabled, the consumer does not go offline even if it has not polled for a long time | |
| `experimental.snapshot.enable` | boolean | Specify whether to consume messages from the WAL or from TSBS | |
| `msg.with.table.name` | boolean | Specify whether to deserialize table names from messages |

Expand Down Expand Up @@ -368,7 +367,6 @@ conf := &tmq.ConfigMap{
"td.connect.port": "6030",
"client.id": "test_tmq_c",
"enable.auto.commit": "false",
"enable.heartbeat.background": "true",
"experimental.snapshot.enable": "true",
"msg.with.table.name": "true",
}
Expand Down Expand Up @@ -418,7 +416,6 @@ Python programs use the following parameters:
| `auto.commit.interval.ms` | string | Interval for automatic commits, in milliseconds | |
| `auto.offset.reset` | string | Initial offset for the consumer group | Specify `earliest`, `latest`, or `none`(default) |
| `experimental.snapshot.enable` | string | Specify whether to consume messages from the WAL or from TSDB | Specify `true` or `false` |
| `enable.heartbeat.background` | string | Backend heartbeat; if enabled, the consumer does not go offline even if it has not polled for a long time | Specify `true` or `false` |

</TabItem>

Expand Down
1 change: 0 additions & 1 deletion docs/examples/go/sub/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ func main() {
"td.connect.port": "6030",
"client.id": "test_tmq_client",
"enable.auto.commit": "false",
"enable.heartbeat.background": "true",
"experimental.snapshot.enable": "true",
"msg.with.table.name": "true",
})
Expand Down
3 changes: 0 additions & 3 deletions docs/zh/07-develop/07-tmq.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,6 @@ CREATE TOPIC topic_name AS DATABASE db_name;
| `auto.offset.reset` | enum | 消费组订阅的初始位置 | 可选:`earliest`(default), `latest`, `none` |
| `enable.auto.commit` | boolean | 是否启用消费位点自动提交 | 合法值:`true`, `false`。 |
| `auto.commit.interval.ms` | integer | 以毫秒为单位的消费记录自动提交消费位点时间间 | 默认 5000 m |
| `enable.heartbeat.background` | boolean | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 默认开启 |
| `experimental.snapshot.enable` | boolean | 是否允许从 TSDB 消费数据 | 实验功能,默认关闭 |
| `msg.with.table.name` | boolean | 是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句) | |

Expand Down Expand Up @@ -366,7 +365,6 @@ conf := &tmq.ConfigMap{
"td.connect.port": "6030",
"client.id": "test_tmq_c",
"enable.auto.commit": "false",
"enable.heartbeat.background": "true",
"experimental.snapshot.enable": "true",
"msg.with.table.name": "true",
}
Expand Down Expand Up @@ -418,7 +416,6 @@ consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"})
| `auto.commit.interval.ms` | string | 以毫秒为单位的自动提交时间间隔 | 默认值:5000 ms |
| `auto.offset.reset` | string | 消费组订阅的初始位置 | 可选:`earliest`(default), `latest`, `none` |
| `experimental.snapshot.enable` | string | 是否允许从 TSDB 消费数据 | 合法值:`true`, `false` |
| `enable.heartbeat.background` | string | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 合法值:`true`, `false` |

</TabItem>

Expand Down
7 changes: 2 additions & 5 deletions include/common/tcommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ typedef struct SBlockID {
typedef struct SDataBlockInfo {
STimeWindow window;
int32_t rowSize;
int32_t rows; // todo hide this attribute
int64_t rows; // todo hide this attribute
uint32_t capacity;
SBlockID id;
int16_t hasVarCol;
Expand All @@ -208,15 +208,12 @@ typedef struct SSDataBlock {
} SSDataBlock;

enum {
FETCH_TYPE__DATA = 1,
FETCH_TYPE__META,
FETCH_TYPE__SEP,
FETCH_TYPE__DATA = 0,
FETCH_TYPE__NONE,
};

typedef struct {
int8_t fetchType;
STqOffsetVal offset;
union {
SSDataBlock data;
void* meta;
Expand Down
2 changes: 1 addition & 1 deletion include/common/tdatablock.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ int32_t getJsonValueLen(const char* data);

int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull);
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull);
int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, uint32_t numOfRows);
int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, uint32_t numOfRows, bool trimValue);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity,
const SColumnInfoData* pSource, int32_t numOfRow2);
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows,
Expand Down
2 changes: 1 addition & 1 deletion include/common/tmsgcb.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ typedef enum {
WRITE_QUEUE,
APPLY_QUEUE,
SYNC_QUEUE,
SYNC_CTRL_QUEUE,
SYNC_RD_QUEUE,
STREAM_QUEUE,
QUEUE_MAX,
} EQueueType;
Expand Down
2 changes: 1 addition & 1 deletion include/common/tmsgdef.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ enum {

TD_NEW_MSG_SEG(TDMT_SYNC_MSG)
TD_DEF_MSG_TYPE(TDMT_SYNC_TIMEOUT, "sync-timer", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_PING, "sync-ping", NULL, NULL) // no longer used
TD_DEF_MSG_TYPE(TDMT_SYNC_TIMEOUT_ELECTION, "sync-elect", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_PING_REPLY, "sync-ping-reply", NULL, NULL) // no longer used
TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST, "sync-client-request", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST_BATCH, "sync-client-request-batch", NULL, NULL)
Expand Down
36 changes: 18 additions & 18 deletions include/common/ttokendef.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,24 +127,24 @@
#define TK_NK_EQ 109
#define TK_USING 110
#define TK_TAGS 111
#define TK_COMMENT 112
#define TK_BOOL 113
#define TK_TINYINT 114
#define TK_SMALLINT 115
#define TK_INT 116
#define TK_INTEGER 117
#define TK_BIGINT 118
#define TK_FLOAT 119
#define TK_DOUBLE 120
#define TK_BINARY 121
#define TK_NCHAR 122
#define TK_UNSIGNED 123
#define TK_JSON 124
#define TK_VARCHAR 125
#define TK_MEDIUMBLOB 126
#define TK_BLOB 127
#define TK_VARBINARY 128
#define TK_DECIMAL 129
#define TK_BOOL 112
#define TK_TINYINT 113
#define TK_SMALLINT 114
#define TK_INT 115
#define TK_INTEGER 116
#define TK_BIGINT 117
#define TK_FLOAT 118
#define TK_DOUBLE 119
#define TK_BINARY 120
#define TK_NCHAR 121
#define TK_UNSIGNED 122
#define TK_JSON 123
#define TK_VARCHAR 124
#define TK_MEDIUMBLOB 125
#define TK_BLOB 126
#define TK_VARBINARY 127
#define TK_DECIMAL 128
#define TK_COMMENT 129
#define TK_MAX_DELAY 130
#define TK_WATERMARK 131
#define TK_ROLLUP 132
Expand Down
9 changes: 5 additions & 4 deletions include/libs/executor/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v
* @param SReadHandle
* @return
*/
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, SSchemaWrapper** pSchema);
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, uint64_t id);

/**
* set the task Id, usually used by message queue process
Expand All @@ -89,6 +89,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3
void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId);

int32_t qSetStreamOpOpen(qTaskInfo_t tinfo);

/**
* Set multiple input data blocks for the stream scan.
* @param tinfo
Expand Down Expand Up @@ -197,11 +198,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
//
int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit);

int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);
void qStreamSetOpen(qTaskInfo_t tinfo);

SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo);
void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);

int64_t qStreamExtractPrepareUid(qTaskInfo_t tinfo);
SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo);

const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo);

Expand Down
2 changes: 1 addition & 1 deletion include/libs/function/function.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ typedef struct SResultDataInfo {
typedef struct SInputColumnInfoData {
int32_t totalRows; // total rows in current columnar data
int32_t startRowIndex; // handle started row index
int32_t numOfRows; // the number of rows needs to be handled
int64_t numOfRows; // the number of rows needs to be handled
int32_t numOfInputCols; // PTS is not included
bool colDataSMAIsSet; // if agg is set or not
SColumnInfoData *pPTS; // primary timestamp column
Expand Down
1 change: 1 addition & 0 deletions include/libs/nodes/querynodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ typedef struct SSelectStmt {
bool hasUniqueFunc;
bool hasTailFunc;
bool hasInterpFunc;
bool hasInterpPseudoColFunc;
bool hasLastRowFunc;
bool hasLastFunc;
bool hasTimeLineFunc;
Expand Down
1 change: 1 addition & 0 deletions include/libs/qcom/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ typedef enum {
JOB_TASK_STATUS_INIT,
JOB_TASK_STATUS_EXEC,
JOB_TASK_STATUS_PART_SUCC,
JOB_TASK_STATUS_FETCH,
JOB_TASK_STATUS_SUCC,
JOB_TASK_STATUS_FAIL,
JOB_TASK_STATUS_DROP,
Expand Down
23 changes: 11 additions & 12 deletions include/libs/stream/streamState.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,19 @@ extern "C" {
#ifndef _STREAM_STATE_H_
#define _STREAM_STATE_H_

typedef struct SStreamTask SStreamTask;

typedef bool (*state_key_cmpr_fn)(void* pKey1, void* pKey2);

typedef struct STdbState {
SStreamTask* pOwner;
TDB* db;
TTB* pStateDb;
TTB* pFuncStateDb;
TTB* pFillStateDb; // todo refactor
TTB* pSessionStateDb;
TTB* pParNameDb;
TTB* pParTagDb;
TXN* txn;
struct SStreamTask* pOwner;

TDB* db;
TTB* pStateDb;
TTB* pFuncStateDb;
TTB* pFillStateDb; // todo refactor
TTB* pSessionStateDb;
TTB* pParNameDb;
TTB* pParTagDb;
TXN* txn;
} STdbState;

// incremental state storage
Expand All @@ -45,7 +44,7 @@ typedef struct {
int32_t number;
} SStreamState;

SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages);
SStreamState* streamStateOpen(char* path, struct SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages);
void streamStateClose(SStreamState* pState);
int32_t streamStateBegin(SStreamState* pState);
int32_t streamStateCommit(SStreamState* pState);
Expand Down
66 changes: 7 additions & 59 deletions include/libs/stream/tstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,27 +223,12 @@ static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) {
return queue->qItem;
}

static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) {
int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING);
if (dequeueFlag == STREAM_QUEUE__FAILED) {
ASSERT(queue->qItem != NULL);
return streamQueueCurItem(queue);
} else {
queue->qItem = NULL;
taosGetQitem(queue->qall, &queue->qItem);
if (queue->qItem == NULL) {
taosReadAllQitems(queue->queue, queue->qall);
taosGetQitem(queue->qall, &queue->qItem);
}
return streamQueueCurItem(queue);
}
}
void* streamQueueNextItem(SStreamQueue* queue);

SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit);
void streamDataSubmitDestroy(SStreamDataSubmit2* pDataSubmit);

void streamDataSubmitRefDec(SStreamDataSubmit2* pDataSubmit);

SStreamDataSubmit2* streamSubmitRefClone(SStreamDataSubmit2* pSubmit);
SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit);

typedef struct {
char* qmsg;
Expand Down Expand Up @@ -295,7 +280,7 @@ typedef struct {
SEpSet epSet;
} SStreamChildEpInfo;

typedef struct SStreamTask {
struct SStreamTask {
int64_t streamId;
int32_t taskId;
int32_t totalLevel;
Expand Down Expand Up @@ -362,8 +347,7 @@ typedef struct SStreamTask {

int64_t checkpointingId;
int32_t checkpointAlignCnt;

} SStreamTask;
};

int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);
Expand All @@ -372,43 +356,7 @@ SStreamTask* tNewSStreamTask(int64_t streamId);
int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeSStreamTask(SStreamTask* pTask);

static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) {
int8_t type = pItem->type;
if (type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit2* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit2*)pItem);
if (pSubmitClone == NULL) {
qDebug("task %d %p submit enqueue failed since out of memory", pTask->taskId, pTask);
terrno = TSDB_CODE_OUT_OF_MEMORY;
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
return -1;
}
qDebug("task %d %p submit enqueue %p %p %p %d %" PRId64, pTask->taskId, pTask, pItem, pSubmitClone,
pSubmitClone->submit.msgStr, pSubmitClone->submit.msgLen, pSubmitClone->submit.ver);
taosWriteQitem(pTask->inputQueue->queue, pSubmitClone);
// qStreamInput(pTask->exec.executor, pSubmitClone);
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
type == STREAM_INPUT__REF_DATA_BLOCK) {
taosWriteQitem(pTask->inputQueue->queue, pItem);
// qStreamInput(pTask->exec.executor, pItem);
} else if (type == STREAM_INPUT__CHECKPOINT) {
taosWriteQitem(pTask->inputQueue->queue, pItem);
// qStreamInput(pTask->exec.executor, pItem);
} else if (type == STREAM_INPUT__GET_RES) {
taosWriteQitem(pTask->inputQueue->queue, pItem);
// qStreamInput(pTask->exec.executor, pItem);
}

if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
}

#if 0
// TODO: back pressure
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL);
#endif
return 0;
}
int32_t tAppendDataForStream(SStreamTask* pTask, SStreamQueueItem* pItem);

static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) {
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
Expand Down Expand Up @@ -587,7 +535,7 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code);

int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);
// int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);

int32_t streamTryExec(SStreamTask* pTask);
int32_t streamSchedExec(SStreamTask* pTask);
Expand Down
4 changes: 2 additions & 2 deletions include/libs/wal/wal.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ typedef struct {
int64_t curFileFirstVer;
int64_t curVersion;
int64_t capacity;
int8_t curInvalid;
int8_t curStopped;
// int8_t curInvalid;
// int8_t curStopped;
TdThreadMutex mutex;
SWalFilterCond cond;
// TODO remove it
Expand Down
Loading

0 comments on commit e2f82f4

Please sign in to comment.