Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge stable sort intead of qsort as insert unordered data #22934

Merged
merged 2 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/common/tdataformat.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ int32_t tBufferReserve(SBuffer *pBuffer, int64_t nData, void **ppData);
int32_t tRowBuild(SArray *aColVal, const STSchema *pTSchema, SRow **ppRow);
int32_t tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
void tRowDestroy(SRow *pRow);
void tRowSort(SArray *aRowP);
int32_t tRowSort(SArray *aRowP);
int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag);
int32_t tRowUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData, int32_t flag);

Expand Down
11 changes: 11 additions & 0 deletions include/util/talgo.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,17 @@ typedef int32_t (*__ext_compar_fn_t)(const void *p1, const void *p2, const void
*/
void taosqsort(void *src, int64_t numOfElem, int64_t size, const void *param, __ext_compar_fn_t comparFn);

/**
* merge sort, with the compare function requiring additional parameters support
*
* @param src
* @param numOfElem
* @param size
* @param comparFn
* @return int32_t 0 for success, other for failure.
*/
int32_t taosMergeSort(void *src, int64_t numOfElem, int64_t size, __compar_fn_t comparFn);

/**
* binary search, with range support
*
Expand Down
9 changes: 8 additions & 1 deletion include/util/tarray.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,19 @@ void taosArrayDestroyEx(SArray* pArray, FDelete fp);
void taosArraySwap(SArray* a, SArray* b);

/**
* sort the array
* sort the array use qsort
* @param pArray
* @param compar
*/
void taosArraySort(SArray* pArray, __compar_fn_t comparFn);

/**
* sort the array use merge sort
* @param pArray
* @param compar
*/
int32_t taosArrayMSort(SArray* pArray, __compar_fn_t comparFn);

/**
* search the array
* @param pArray
Expand Down
10 changes: 7 additions & 3 deletions source/common/src/tdataformat.c
Original file line number Diff line number Diff line change
Expand Up @@ -610,9 +610,13 @@ static int32_t tRowMergeImpl(SArray *aRowP, STSchema *pTSchema, int32_t iStart,
return code;
}

void tRowSort(SArray *aRowP) {
if (TARRAY_SIZE(aRowP) <= 1) return;
taosArraySort(aRowP, tRowPCmprFn);
int32_t tRowSort(SArray *aRowP) {
if (TARRAY_SIZE(aRowP) <= 1) return 0;
int32_t code = taosArrayMSort(aRowP, tRowPCmprFn);
if (code != TSDB_CODE_SUCCESS) {
uError("taosArrayMSort failed caused by %d", code);
}
return code;
}

int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag) {
Expand Down
4 changes: 2 additions & 2 deletions source/libs/executor/src/dataInserter.c
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,8 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
}

if (disorderTs) {
tRowSort(tbData.aRowP);
if ((terrno = tRowMerge(tbData.aRowP, (STSchema*)pTSchema, 0)) != 0) {
if ((tRowSort(tbData.aRowP) != TSDB_CODE_SUCCESS) ||
(terrno = tRowMerge(tbData.aRowP, (STSchema*)pTSchema, 0)) != 0) {
goto _end;
}
}
Expand Down
4 changes: 2 additions & 2 deletions source/libs/parser/src/parInsertUtil.c
Original file line number Diff line number Diff line change
Expand Up @@ -495,9 +495,9 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) {
tColDataSortMerge(pTableCxt->pData->aCol);
} else {
if (!pTableCxt->ordered) {
tRowSort(pTableCxt->pData->aRowP);
code = tRowSort(pTableCxt->pData->aRowP);
}
if (!pTableCxt->ordered || pTableCxt->duplicateTs) {
if (code == TSDB_CODE_SUCCESS && (!pTableCxt->ordered || pTableCxt->duplicateTs)) {
code = tRowMerge(pTableCxt->pData->aRowP, pTableCxt->pSchema, 0);
}
}
Expand Down
83 changes: 83 additions & 0 deletions source/util/src/talgo.c
Original file line number Diff line number Diff line change
Expand Up @@ -273,3 +273,86 @@ void taosheapsort(void *base, int32_t size, int32_t len, const void *parcompar,

taosMemoryFree(buf);
}

static void taosMerge(void *src, int32_t start, int32_t leftend, int32_t end, int64_t size, const void *param,
__ext_compar_fn_t comparFn, void *tmp) {
int32_t leftSize = leftend - start + 1;
int32_t rightSize = end - leftend;

void *leftBuf = tmp;
void *rightBuf = (char *)tmp + (leftSize * size);

memcpy(leftBuf, elePtrAt(src, size, start), leftSize * size);
memcpy(rightBuf, elePtrAt(src, size, leftend + 1), rightSize * size);

int32_t i = 0, j = 0, k = start;

while (i < leftSize && j < rightSize) {
int32_t ret = comparFn(elePtrAt(leftBuf, size, i), elePtrAt(rightBuf, size, j), param);
if (ret <= 0) {
memcpy(elePtrAt(src, size, k), elePtrAt(leftBuf, size, i), size);
i++;
} else {
memcpy(elePtrAt(src, size, k), elePtrAt(rightBuf, size, j), size);
j++;
}
k++;
}

while (i < leftSize) {
memcpy(elePtrAt(src, size, k), elePtrAt(leftBuf, size, i), size);
i++;
k++;
}

while (j < rightSize) {
memcpy(elePtrAt(src, size, k), elePtrAt(rightBuf, size, j), size);
j++;
k++;
}
}

static int32_t taosMergeSortHelper(void *src, int64_t numOfElem, int64_t size, const void *param,
__ext_compar_fn_t comparFn) {
// short array sort, instead of merge sort process
const int32_t THRESHOLD_SIZE = 6;
char *buf = taosMemoryCalloc(1, size); // prepare the swap buffer
if (buf == NULL) return TSDB_CODE_OUT_OF_MEMORY;
for (int32_t start = 0; start < numOfElem - 1; start += THRESHOLD_SIZE) {
int32_t end = (start + THRESHOLD_SIZE - 1) <= numOfElem - 1 ? (start + THRESHOLD_SIZE - 1) : numOfElem - 1;
tInsertSort(src, size, start, end, param, comparFn, buf);
}
taosMemoryFreeClear(buf);

if (numOfElem > THRESHOLD_SIZE) {
int32_t currSize;
void *tmp = taosMemoryMalloc(numOfElem * size);
if (tmp == NULL) return TSDB_CODE_OUT_OF_MEMORY;

for (currSize = THRESHOLD_SIZE; currSize <= numOfElem - 1; currSize = 2 * currSize) {
int32_t leftStart;
for (leftStart = 0; leftStart < numOfElem - 1; leftStart += 2 * currSize) {
int32_t leftend = leftStart + currSize - 1;
int32_t rightEnd =
(leftStart + 2 * currSize - 1 < numOfElem - 1) ? (leftStart + 2 * currSize - 1) : (numOfElem - 1);
if (leftend >= rightEnd) break;

taosMerge(src, leftStart, leftend, rightEnd, size, param, comparFn, tmp);
}
}

taosMemoryFreeClear(tmp);
}
return 0;
}

int32_t msortHelper(const void *p1, const void *p2, const void *param) {
__compar_fn_t comparFn = param;
return comparFn(p1, p2);
}


int32_t taosMergeSort(void *src, int64_t numOfElem, int64_t size, __compar_fn_t comparFn) {
void *param = comparFn;
return taosMergeSortHelper(src, numOfElem, size, param, msortHelper);
}
4 changes: 4 additions & 0 deletions source/util/src/tarray.c
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,10 @@ void taosArraySort(SArray* pArray, __compar_fn_t compar) {
taosSort(pArray->pData, pArray->size, pArray->elemSize, compar);
}

int32_t taosArrayMSort(SArray* pArray, __compar_fn_t compar) {
return taosMergeSort(pArray->pData, pArray->size, pArray->elemSize, compar);
}

void* taosArraySearch(const SArray* pArray, const void* key, __compar_fn_t comparFn, int32_t flags) {
return taosbsearch(key, pArray->pData, pArray->size, pArray->elemSize, comparFn, flags);
}
Expand Down
8 changes: 8 additions & 0 deletions source/util/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,11 @@ add_test(
NAME pageBufferTest
COMMAND pageBufferTest
)

# talgoTest
add_executable(talgoTest "talgoTest.cpp")
target_link_libraries(talgoTest os util gtest_main)
add_test(
NAME talgoTest
COMMAND talgoTest
)
104 changes: 104 additions & 0 deletions source/util/test/talgoTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
#include <gtest/gtest.h>
#include <stdlib.h>
#include "talgo.h"

struct TestStruct {
int a;
float b;
};

// Define a custom comparison function for testing
int cmpFunc(const void* a, const void* b) {
const TestStruct* pa = reinterpret_cast<const TestStruct*>(a);
const TestStruct* pb = reinterpret_cast<const TestStruct*>(b);
if (pa->a < pb->a) {
return -1;
} else if (pa->a > pb->a) {
return 1;
} else {
return 0;
}
}

TEST(utilTest, taosMSort) {
// Create an array of test data
TestStruct arr[] = {{4, 2.5}, {3, 6}, {2, 1.5}, {3, 2}, {1, 3.5}, {3, 5}};

// Sort the array using taosSort
taosMergeSort(arr, 6, sizeof(TestStruct), cmpFunc);

for (int i = 0; i < sizeof(arr) / sizeof(TestStruct); i++) {
printf("%d: %d %f\n", i, arr[i].a, arr[i].b);
}

// Check that the array is sorted correctly
EXPECT_EQ(arr[0].a, 1);
EXPECT_EQ(arr[1].a, 2);
EXPECT_EQ(arr[2].a, 3);
EXPECT_EQ(arr[2].b, 6);
EXPECT_EQ(arr[3].a, 3);
EXPECT_EQ(arr[3].b, 2);
EXPECT_EQ(arr[4].a, 3);
EXPECT_EQ(arr[4].b, 5);
EXPECT_EQ(arr[5].a, 4);
}

int cmpInt(const void* a, const void* b) {
int int_a = *((int*)a);
int int_b = *((int*)b);

if (int_a == int_b)
return 0;
else if (int_a < int_b)
return -1;
else
return 1;
}

TEST(utilTest, taosMSort2) {
clock_t start_time, end_time;
double cpu_time_used;

int times = 10000;
start_time = clock();
for (int i = 0; i < 10000; i++) {
TestStruct arr[] = {{4, 2.5}, {3, 6}, {2, 1.5}, {3, 2}, {1, 3.5}, {3, 5}};
taosMergeSort(arr, 6, sizeof(TestStruct), cmpFunc);
}
end_time = clock();
cpu_time_used = ((double)(end_time - start_time)) / CLOCKS_PER_SEC;
printf("taosMSort %d times: %f s\n", times, cpu_time_used);

start_time = clock();
for (int i = 0; i < 10000; i++) {
TestStruct arr[] = {{4, 2.5}, {3, 6}, {2, 1.5}, {3, 2}, {1, 3.5}, {3, 5}};
taosSort(arr, 6, sizeof(TestStruct), cmpFunc);
}
end_time = clock();
cpu_time_used = ((double)(end_time - start_time)) / CLOCKS_PER_SEC;
printf("taosSort %d times: %f s\n", times, cpu_time_used);

const int arraySize = 1000000;
int data1[arraySize];
int data2[arraySize];
for (int i = 0; i < arraySize; ++i) {
data1[i] = taosRand();
data2[i] = data1[i];
}
start_time = clock();
taosMergeSort(data1, arraySize, sizeof(int), cmpInt);
end_time = clock();
cpu_time_used = ((double)(end_time - start_time)) / CLOCKS_PER_SEC;
printf("taosMSort length:%d cost: %f s\n", arraySize, cpu_time_used);

start_time = clock();
taosSort(data2, arraySize, sizeof(int), cmpInt);
end_time = clock();
cpu_time_used = ((double)(end_time - start_time)) / CLOCKS_PER_SEC;
printf("taosSort length:%d cost: %f s\n", arraySize, cpu_time_used);

for (int i = 0; i < arraySize - 1; i++) {
EXPECT_EQ(data1[i], data2[i]);
ASSERT_LE(data1[i], data1[i+1]);
}
}