Skip to content

Commit

Permalink
single table subscribe is done
Browse files Browse the repository at this point in the history
  • Loading branch information
localvar committed May 12, 2020
1 parent a369376 commit 577cf92
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 31 deletions.
14 changes: 12 additions & 2 deletions src/query/src/queryExecutor.c
Original file line number Diff line number Diff line change
Expand Up @@ -4185,6 +4185,12 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool
pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQInfo->tableIdGroupInfo);
} else if (!isSTableQuery || isIntervalQuery(pQuery) || isFixedOutputQuery(pQuery)) {

if(pQInfo->groupInfo.numOfTables == 1) {
SArray* pa = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0);
SGroupItem* pItem = taosArrayGet(pa, 0);
cond.twindow = pItem->info->win;
}

pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableIdGroupInfo);
}
}
Expand Down Expand Up @@ -4903,6 +4909,12 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
qTrace("QInfo:%p query paused due to output limitation, next qrange:%" PRId64 "-%" PRId64, pQInfo,
pQuery->current->lastKey, pQuery->window.ekey);
} else if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
STableIdInfo tidInfo;
tidInfo.uid = pQuery->current->id.uid;
tidInfo.tid = pQuery->current->id.tid;
tidInfo.key = pQuery->current->lastKey;
taosArrayPush(pQInfo->arrTableIdInfo, &tidInfo);
}

if (!isTSCompQuery(pQuery)) {
Expand Down Expand Up @@ -5195,7 +5207,6 @@ static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **p
pTableIdInfo->uid = htobe64(pTableIdInfo->uid);
pTableIdInfo->key = htobe64(pTableIdInfo->key);

printf("createTableIdList: uid = %ld, key = %ld\n", pTableIdInfo->uid, pTableIdInfo->key);
taosArrayPush(*pTableIdList, pTableIdInfo);
pMsg += sizeof(STableIdInfo);
}
Expand Down Expand Up @@ -5759,7 +5770,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
// not a problem at present because we only use their 1st int64_t field
STableIdInfo* pTableId = taosArraySearch( pTableIdList, compareTableIdInfo, &id );
if (pTableId != NULL ) {
printf("create QInfoImpl: %ld %ld\n", pTableId->uid, pTableId->key);
window.skey = pTableId->key;
} else {
window.skey = INT64_MIN;
Expand Down
73 changes: 44 additions & 29 deletions tests/examples/c/subscribe.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,32 +56,46 @@ void run_test(TAOS* taos) {
taos_query(taos, "drop database if exists test;");

usleep(100000);
taos_query(taos, "create database test tables 5;");
//taos_query(taos, "create database test tables 5;");
taos_query(taos, "create database test;");
usleep(100000);
taos_query(taos, "use test;");

usleep(100000);
taos_query(taos, "create table meters(ts timestamp, a int, b binary(20)) tags(loc binary(20), area int);");

taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:00:00.000', 0, 'china');");
taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:01:00.000', 0, 'china');");
taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:02:00.000', 0, 'china');");
taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:00:00.000', 0, 'china');");
taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:01:00.000', 0, 'china');");
taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:02:00.000', 0, 'china');");
taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:03:00.000', 0, 'china');");
taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:00:00.000', 0, 'UK');");
taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:00.000', 0, 'UK');");
taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:01.000', 0, 'UK');");
taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:02.000', 0, 'UK');");
taos_query(taos, "insert into t3 using meters tags('tianjin', 0) values('2020-01-01 00:01:02.000', 0, 'china');");
taos_query(taos, "insert into t4 using meters tags('wuhan', 0) values('2020-01-01 00:01:02.000', 0, 'china');");
taos_query(taos, "insert into t5 using meters tags('jinan', 0) values('2020-01-01 00:01:02.000', 0, 'china');");
taos_query(taos, "insert into t6 using meters tags('haikou', 0) values('2020-01-01 00:01:02.000', 0, 'china');");
taos_query(taos, "insert into t7 using meters tags('nanjing', 0) values('2020-01-01 00:01:02.000', 0, 'china');");
taos_query(taos, "insert into t8 using meters tags('lanzhou', 0) values('2020-01-01 00:01:02.000', 0, 'china');");
taos_query(taos, "insert into t9 using meters tags('tokyo', 0) values('2020-01-01 00:01:02.000', 0, 'japan');");
taos_query(taos, "create table meters(ts timestamp, a int) tags(area int);");

taos_query(taos, "create table t0 using meters tags(0);");
taos_query(taos, "create table t1 using meters tags(1);");
taos_query(taos, "create table t2 using meters tags(2);");
taos_query(taos, "create table t3 using meters tags(3);");
taos_query(taos, "create table t4 using meters tags(4);");
taos_query(taos, "create table t5 using meters tags(5);");
taos_query(taos, "create table t6 using meters tags(6);");
taos_query(taos, "create table t7 using meters tags(7);");
taos_query(taos, "create table t8 using meters tags(8);");
taos_query(taos, "create table t9 using meters tags(9);");

taos_query(taos, "insert into t0 values('2020-01-01 00:00:00.000', 0);");
taos_query(taos, "insert into t0 values('2020-01-01 00:01:00.000', 0);");
taos_query(taos, "insert into t0 values('2020-01-01 00:02:00.000', 0);");
taos_query(taos, "insert into t1 values('2020-01-01 00:00:00.000', 0);");
taos_query(taos, "insert into t1 values('2020-01-01 00:01:00.000', 0);");
taos_query(taos, "insert into t1 values('2020-01-01 00:02:00.000', 0);");
taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.000', 0);");
taos_query(taos, "insert into t2 values('2020-01-01 00:00:00.000', 0);");
taos_query(taos, "insert into t2 values('2020-01-01 00:01:00.000', 0);");
taos_query(taos, "insert into t2 values('2020-01-01 00:01:01.000', 0);");
taos_query(taos, "insert into t2 values('2020-01-01 00:01:02.000', 0);");
taos_query(taos, "insert into t3 values('2020-01-01 00:01:02.000', 0);");
taos_query(taos, "insert into t4 values('2020-01-01 00:01:02.000', 0);");
taos_query(taos, "insert into t5 values('2020-01-01 00:01:02.000', 0);");
taos_query(taos, "insert into t6 values('2020-01-01 00:01:02.000', 0);");
taos_query(taos, "insert into t7 values('2020-01-01 00:01:02.000', 0);");
taos_query(taos, "insert into t8 values('2020-01-01 00:01:02.000', 0);");
taos_query(taos, "insert into t9 values('2020-01-01 00:01:02.000', 0);");

// super tables subscription
usleep(1000000);

TAOS_SUB* tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
TAOS_RES* res = taos_consume(tsub);
Expand All @@ -90,23 +104,23 @@ void run_test(TAOS* taos) {
res = taos_consume(tsub);
check_row_count(__LINE__, res, 0);

taos_query(taos, "insert into t0 values('2020-01-01 00:03:00.000', 0, 'china');");
taos_query(taos, "insert into t8 values('2020-01-01 00:01:03.000', 0, 'china');");
taos_query(taos, "insert into t0 values('2020-01-01 00:02:00.001', 0);");
taos_query(taos, "insert into t8 values('2020-01-01 00:01:03.000', 0);");
res = taos_consume(tsub);
check_row_count(__LINE__, res, 2);

taos_query(taos, "insert into t2 values('2020-01-01 00:01:02.001', 0, 'UK');");
taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.001', 0, 'UK');");
taos_query(taos, "insert into t2 values('2020-01-01 00:01:02.001', 0);");
taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.001', 0);");
res = taos_consume(tsub);
check_row_count(__LINE__, res, 2);

taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.002', 0, 'china');");
taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.002', 0);");
res = taos_consume(tsub);
check_row_count(__LINE__, res, 1);

// keep progress information and restart subscription
taos_unsubscribe(tsub, 1);
taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.000', 0, 'china');");
taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.000', 0);");
tsub = taos_subscribe(taos, 1, "test", "select * from meters;", NULL, NULL, 0);
res = taos_consume(tsub);
check_row_count(__LINE__, res, 24);
Expand All @@ -133,7 +147,7 @@ void run_test(TAOS* taos) {
res = taos_consume(tsub);
check_row_count(__LINE__, res, 0);

taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.001', 0, 'china');");
taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.001', 0);");
res = taos_consume(tsub);
check_row_count(__LINE__, res, 1);

Expand Down Expand Up @@ -197,7 +211,7 @@ int main(int argc, char *argv[]) {
// init TAOS
taos_init();

TAOS* taos = taos_connect(host, user, passwd, "test", 0);
TAOS* taos = taos_connect(host, user, passwd, "", 0);
if (taos == NULL) {
printf("failed to connect to db, reason:%s\n", taos_errstr(taos));
exit(1);
Expand All @@ -209,6 +223,7 @@ int main(int argc, char *argv[]) {
exit(0);
}

taos_query(taos, "use test;");
TAOS_SUB* tsub = NULL;
if (async) {
// create an asynchronized subscription, the callback function will be called every 1s
Expand Down

0 comments on commit 577cf92

Please sign in to comment.