diff --git a/src/benchTmq.c b/src/benchTmq.c index 861f2c8d..c92c6f9e 100644 --- a/src/benchTmq.c +++ b/src/benchTmq.c @@ -96,7 +96,7 @@ static tmq_list_t * buildTopicList() { } static int32_t data_msg_process(TAOS_RES* msg, tmqThreadInfo* pInfo, int32_t msgIndex) { - char* buf = (char*)calloc(1, 16*1024); + char* buf = (char*)calloc(1, 64*1024+8); if (NULL == buf) { errorPrint("consumer id %d calloc memory fail.\n", pInfo->id); return 0; @@ -201,6 +201,7 @@ static void* tmqConsume(void* arg) { // "sequential" or "parallel" if (0 != strncasecmp(pConsumerInfo->createMode, "sequential", 10)) { + char* tPtr = pConsumerInfo->groupId; // "share" or "independent" char groupId[16] = {0}; if (0 != strncasecmp(pConsumerInfo->groupMode, "share", 5)) { @@ -210,11 +211,15 @@ static void* tmqConsume(void* arg) { memset(groupId, 0, sizeof(groupId)); rand_string(groupId, sizeof(groupId) - 1, 0); infoPrint("consumer id: %d generate rand group id: %s\n", pThreadInfo->id, groupId); - //pConsumerInfo->groupId = groupId; + tPtr = groupId; } } - buildConsumerAndSubscribe(pThreadInfo, groupId); + int ret = buildConsumerAndSubscribe(pThreadInfo, tPtr); + if (0 != ret) { + infoPrint("%s\n", "buildConsumerAndSubscribe() fail in tmqConsume()"); + return NULL; + } } int64_t totalMsgs = 0; @@ -342,7 +347,12 @@ int subscribeTestProcess() { // "sequential" or "parallel" if (0 == strncasecmp(pConsumerInfo->createMode, "sequential", 10)) { - buildConsumerAndSubscribe(pThreadInfo, pConsumerInfo->groupId); + int retVal = buildConsumerAndSubscribe(pThreadInfo, pConsumerInfo->groupId); + if (0 != retVal) { + infoPrint("%s\n", "buildConsumerAndSubscribe() fail!"); + ret = -1; + goto tmq_over; + } } pthread_create(pids + i, NULL, tmqConsume, pThreadInfo); }