Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize TupleAggregateStep::doThreadedAggregate() through removing unnecessary I/O. #2957

Draft
wants to merge 26 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
7a819d2
Refactor different aggregation cases into one if/else case each.
phoeinx Jul 21, 2023
18536d0
Remove unneccessary processing step for only GROUP BY cases.
phoeinx Jul 21, 2023
b53655b
Refactor loop index variables.
phoeinx Jul 21, 2023
0ebb079
Update/fix AGGR on DISTINCT COLUMN case when multiple buckets are enf…
phoeinx Jul 27, 2023
a927858
Refactor row output function.
phoeinx Jul 27, 2023
88095ad
Refactor: use more descriptive variables for differentation between d…
phoeinx Aug 1, 2023
c2e31bd
Add proper exception for unhandled case.
phoeinx Aug 1, 2023
7b2e106
Rename output function to be more semantically correct.
phoeinx Aug 1, 2023
fa6118d
Improve comments for doThreadedAggregate.
phoeinx Aug 1, 2023
4700e9f
Refactor better comments into doThreadedAggregate.
phoeinx Aug 1, 2023
83f2b37
Add benchmarks for optimize groupby feature
phoeinx Sep 14, 2023
ab1da4c
Revert changes made to force two-bucket aggregation
phoeinx Sep 17, 2023
041c019
Add Columnstore.xml changes for reproducible benchmarking.
phoeinx Sep 17, 2023
6af7334
WIP for fix of too few output records.
phoeinx Sep 17, 2023
ad9b6c6
WIP: fix disk-based aggregation.
phoeinx Sep 22, 2023
8aace7a
WIP: Get Patch working with adapted output rowgroups method.
phoeinx Sep 24, 2023
701c7b2
Remove files added through ill-advised "git add .". Update .gitignore.
phoeinx Sep 25, 2023
860c947
Revert libmarias3 to state at develop HEAD
phoeinx Sep 25, 2023
34892a8
Cleanup rowaggregation.cpp
phoeinx Sep 25, 2023
9725cdf
Cleanup RowGroupStorage::getNextOutputRGData()
phoeinx Sep 25, 2023
1f78226
Further cleanup of rowstorage.cpp.
phoeinx Sep 25, 2023
b66464a
Fix output disk-based aggregated RowGroups bug.
phoeinx Sep 26, 2023
c949434
Fix AGG(DISTINCT ...) (single-threaded & multi-threaded).
phoeinx Sep 27, 2023
9f73b82
Fix DISTINCT & GROUP BY Aggregation.
phoeinx Sep 27, 2023
f9bb80f
Add optimization for COUNT(DISTINCT <col_name>) case.
phoeinx Sep 28, 2023
6862da7
Revert Columnstore.xml to default for CI suite.
phoeinx Sep 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Expand Up @@ -176,6 +176,7 @@ mariadb-columnstore-regression-test/
build/Testing/
tests/*\[1\]_tests.cmake
tests/*\[1\]_include.cmake
tests/Testing/Temporary
.boost
build/.cmake
*.vtg
Expand All @@ -185,5 +186,6 @@ versioning/BRM/brmshmimpl.h
versioning/BRM/shmkeys.cpp
obj/
build/build
build/utils/loggingcpp


1 change: 1 addition & 0 deletions benchmarks/groupbybench/.gitignore
@@ -0,0 +1 @@
*.csv
5 changes: 5 additions & 0 deletions benchmarks/groupbybench/create_bench_database.sql
@@ -0,0 +1,5 @@
CREATE OR REPLACE DATABASE bench;
USE bench;
CREATE TABLE `bench_real` (`c` varchar(30)) ENGINE=Columnstore;
CREATE TABLE `bench_two_groups` (`c` varchar(1)) ENGINE=Columnstore;
CREATE TABLE `bench_ten_groups` (`c` varchar(1)) ENGINE=Columnstore;
12 changes: 12 additions & 0 deletions benchmarks/groupbybench/develop_groupbybench.sh
@@ -0,0 +1,12 @@
#get the parent directory of this script
script_full_path=$(realpath $0)
script_dir_path=$(dirname $script_full_path)

cd $script_dir_path

hyperfine --prepare 'sync; echo 3 | sudo tee /proc/sys/vm/drop_caches' \
'mariadb --database="bench" < query_bench_real.sql' --export-csv develop_bench_real.csv
hyperfine --prepare 'sync; echo 3 | sudo tee /proc/sys/vm/drop_caches' \
'mariadb --database="bench" < query_two_groups.sql' --export-csv develop_bench_two_groups.csv
hyperfine --prepare 'sync; echo 3 | sudo tee /proc/sys/vm/drop_caches' \
'mariadb --database="bench" < query_ten_groups.sql' --export-csv develop_bench_ten_groups.csv
12 changes: 12 additions & 0 deletions benchmarks/groupbybench/feature_groupbybench.sh
@@ -0,0 +1,12 @@
#get the parent directory of this script
script_full_path=$(realpath $0)
script_dir_path=$(dirname $script_full_path)

cd $script_dir_path

hyperfine --prepare 'sync; echo 3 | sudo tee /proc/sys/vm/drop_caches' \
'mariadb --database="bench" < query_bench_real.sql' --export-csv feature_bench_real.csv
hyperfine --prepare 'sync; echo 3 | sudo tee /proc/sys/vm/drop_caches' \
'mariadb --database="bench" < query_two_groups.sql' --export-csv feature_bench_two_groups.csv
hyperfine --prepare 'sync; echo 3 | sudo tee /proc/sys/vm/drop_caches' \
'mariadb --database="bench" < query_ten_groups.sql' --export-csv feature_bench_ten_groups.csv
1 change: 1 addition & 0 deletions benchmarks/groupbybench/query_bench_real.sql
@@ -0,0 +1 @@
select count(*) from (select 1 from bench_real GROUP BY c) s;
1 change: 1 addition & 0 deletions benchmarks/groupbybench/query_ten_groups.sql
@@ -0,0 +1 @@
select count(*) from (select 1 from bench_ten_groups GROUP BY c) s;
1 change: 1 addition & 0 deletions benchmarks/groupbybench/query_two_groups.sql
@@ -0,0 +1 @@
select count(*) from (select 1 from bench_two_groups GROUP BY c) s;
9 changes: 9 additions & 0 deletions benchmarks/groupbybench/setup_groupby_bench.sh
@@ -0,0 +1,9 @@
#get the parent directory of this script
script_full_path=$(realpath $0)
script_dir_path=$(dirname $script_full_path)

mariadb < "$script_dir_path/create_bench_database.sql"

cpimport -s ',' bench bench_real "$script_dir_path/bench_data/dump_11_08_22_36kk.csv"
cpimport -s ',' bench bench_two_groups "$script_dir_path/bench_data/two_groups.csv"
cpimport -s ',' bench bench_ten_groups "$script_dir_path/bench_data/ten_groups.csv"
188 changes: 111 additions & 77 deletions dbcon/joblist/tupleaggregatestep.cpp
Expand Up @@ -347,6 +347,7 @@ TupleAggregateStep::TupleAggregateStep(const SP_ROWAGG_UM_t& agg, const RowGroup
fNumOfBuckets =
calcNumberOfBuckets(memLimit, fNumOfThreads, fNumOfBuckets, fNumOfRowGroups, fRowGroupIn.getRowSize(),
fRowGroupOut.getRowSize(), fRm->getAllowDiskAggregation());

fNumOfThreads = std::min(fNumOfThreads, fNumOfBuckets);

fMemUsage.reset(new uint64_t[fNumOfThreads]);
Expand Down Expand Up @@ -450,7 +451,7 @@ void TupleAggregateStep::doThreadedSecondPhaseAggregate(uint32_t threadID)
rowGroupIn->initRow(&rowIn);
auto* subDistAgg = dynamic_cast<RowAggregationUM*>(multiDist->subAggregators()[j].get());

while (subDistAgg->nextRowGroup())
while (subDistAgg->nextOutputRowGroup())
{
rowGroupIn = (multiDist->subAggregators()[j]->getOutputRowGroup());
rgDataVec.emplace_back(subDistAgg->moveCurrentRGData());
Expand All @@ -474,7 +475,7 @@ void TupleAggregateStep::doThreadedSecondPhaseAggregate(uint32_t threadID)
rowGroupIn->initRow(&rowIn);
auto* subAgg = dynamic_cast<RowAggregationUM*>(aggDist->aggregator().get());

while (subAgg->nextRowGroup())
while (subAgg->nextOutputRowGroup())
{
rowGroupIn->setData(aggDist->aggregator()->getOutputRowGroup()->getRGData());
rgDataVec.emplace_back(subAgg->moveCurrentRGData());
Expand Down Expand Up @@ -629,7 +630,7 @@ bool TupleAggregateStep::nextDeliveredRowGroup()
{
for (; fBucketNum < fNumOfBuckets; fBucketNum++)
{
while (fAggregators[fBucketNum]->nextRowGroup())
while (fAggregators[fBucketNum]->nextOutputRowGroup())
{
fAggregators[fBucketNum]->finalize();
fRowGroupDelivered.setData(fAggregators[fBucketNum]->getOutputRowGroup()->getRGData());
Expand Down Expand Up @@ -5656,14 +5657,27 @@ void TupleAggregateStep::doAggregate()
return;
}

/** @brief Aggregate input row groups in two-phase multi-threaded aggregation.
* In second phase handle three different aggregation cases differently:
* 1. Query contains at least one aggregation on a DISTINCT column, e.g. SUM (DISTINCT col1) AND at least one
* GROUP BY column
* 2. Query contains at least one aggregation on a DISTINCT column but no GROUP BY column
* 3. Query contains no aggregation on a DISTINCT column, but at least one GROUP BY column
* DISTINCT selects (e.g. SELECT DISTINCT col1 FROM ...) are handled in tupleannexstep.cpp.
*/
uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp)
{
uint32_t i;
RGData rgData;
// initialize return value variable
uint64_t rowCount = 0;

try
{
/*
* Phase 1: Distribute input rows to different buckets depending on the hash value of the group by columns
* per row. Then distribute buckets equally on aggregators in fAggregators. (Number of fAggregators ==
* fNumOfBuckets). Each previously created hash bucket is represented as one RowGroup in a fAggregator.
*/

if (!fDoneAggregate)
{
initializeMultiThread();
Expand All @@ -5672,9 +5686,9 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp
runners.reserve(fNumOfThreads); // to prevent a resize during use

// Start the aggregator threads
for (i = 0; i < fNumOfThreads; i++)
for (uint32_t threadNum = 0; threadNum < fNumOfThreads; threadNum++)
{
runners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, i)));
runners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, threadNum)));
}

// Now wait for all those threads
Expand All @@ -5688,151 +5702,152 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp
// much memory on average
uint32_t threads = std::max(1U, fNumOfThreads / 2);
runners.reserve(threads);
for (i = 0; i < threads; ++i)
for (uint32_t threadNum = 0; threadNum < threads; ++threadNum)
{
runners.push_back(jobstepThreadPool.invoke(ThreadedAggregateFinalizer(this, i)));
runners.push_back(jobstepThreadPool.invoke(ThreadedAggregateFinalizer(this, threadNum)));
}
jobstepThreadPool.join(runners);
}

if (dynamic_cast<RowAggregationDistinct*>(fAggregator.get()) && fAggregator->aggMapKeyLength() > 0)
/*
* Phase 2: Depending on query type (see below) do aggregation per previously created RowGroup of rows
* that need to aggregated and output results.
*/

auto* distinctAggregator = dynamic_cast<RowAggregationDistinct*>(fAggregator.get());
const bool hasGroupByColumns = fAggregator->aggMapKeyLength() > 0;

// Case 1: Query contains at least one aggregation on a DISTINCT column AND at least one GROUP BY column
// e.g. SELECT SUM(DISTINCT col1) FROM test GROUP BY col2;
if (distinctAggregator && hasGroupByColumns)
{
// 2nd phase multi-threaded aggregate
if (!fEndOfResult)
{
// Do multi-threaded second phase aggregation (per row group created for GROUP BY statement)
if (!fDoneAggregate)
{
vector<uint64_t> runners; // thread pool handles
fRowGroupsDeliveredData.resize(fNumOfBuckets);

uint32_t bucketsPerThread = fNumOfBuckets / fNumOfThreads;
uint32_t numThreads = ((fNumOfBuckets % fNumOfThreads) == 0 ? fNumOfThreads : fNumOfThreads + 1);
// uint32_t bucketsPerThread = 1;
// uint32_t numThreads = fNumOfBuckets;

runners.reserve(numThreads);

for (i = 0; i < numThreads; i++)
for (uint32_t threadNum = 0; threadNum < numThreads; threadNum++)
{
runners.push_back(jobstepThreadPool.invoke(
ThreadedSecondPhaseAggregator(this, i * bucketsPerThread, bucketsPerThread)));
ThreadedSecondPhaseAggregator(this, threadNum * bucketsPerThread, bucketsPerThread)));
}

jobstepThreadPool.join(runners);
}

// Deliver results
fDoneAggregate = true;
bool done = true;

while (nextDeliveredRowGroup())
while (nextDeliveredRowGroup() && !cancelled())
{
done = false;
rowCount = fRowGroupOut.getRowCount();

if (rowCount != 0)
{
if (fRowGroupOut.getColumnCount() != fRowGroupDelivered.getColumnCount())
pruneAuxColumns();

if (dlp)
{
rgData = fRowGroupDelivered.duplicate();
dlp->insert(rgData);
}
else
{
bs.restart();
fRowGroupDelivered.serializeRGData(bs);
if (!cleanUpAndOutputRowGroup(bs, dlp))
break;
}
}

done = true;
}

if (done)
if (done) {
fEndOfResult = true;
}
}
}
else
// Case 2. Query contains at least one aggregation on a DISTINCT column but no GROUP BY column
// e.g. SELECT SUM(DISTINCT col1) FROM test;
else if (distinctAggregator)
{
auto* agg = dynamic_cast<RowAggregationDistinct*>(fAggregator.get());

if (!fEndOfResult)
{
if (!fDoneAggregate)
{
for (i = 0; i < fNumOfBuckets; i++)
// Do aggregation over all row groups. As all row groups need to be aggregated together there is no
// easy way of multi-threading this and it's done in a single thread for now.
for (uint32_t bucketNum = 0; bucketNum < fNumOfBuckets; bucketNum++)
{
if (fEndOfResult == false)
{
// do the final aggregtion and deliver the results
// at least one RowGroup for aggregate results
// for "distinct without group by" case
if (agg != nullptr)
{
auto* aggMultiDist = dynamic_cast<RowAggregationMultiDistinct*>(fAggregators[i].get());
auto* aggDist = dynamic_cast<RowAggregationDistinct*>(fAggregators[i].get());
agg->aggregator(aggDist->aggregator());

if (aggMultiDist)
{
(dynamic_cast<RowAggregationMultiDistinct*>(agg))
->subAggregators(aggMultiDist->subAggregators());
}
// The distinctAggregator accumulates the aggregation results of all row groups by being added
// all row groups of each bucket aggregator and doing an aggregation step after each addition.
auto* bucketMultiDistinctAggregator = dynamic_cast<RowAggregationMultiDistinct*>(fAggregators[bucketNum].get());
auto* bucketDistinctAggregator = dynamic_cast<RowAggregationDistinct*>(fAggregators[bucketNum].get());
distinctAggregator->aggregator(bucketDistinctAggregator->aggregator());

agg->doDistinctAggregation();
}
// for "group by without distinct" case
else
if (bucketMultiDistinctAggregator)
{
fAggregator->append(fAggregators[i].get());
(dynamic_cast<RowAggregationMultiDistinct*>(distinctAggregator))
->subAggregators(bucketMultiDistinctAggregator->subAggregators());
}

distinctAggregator->doDistinctAggregation();
}
}
}

// Deliver results
fDoneAggregate = true;
}
bool done = true;
while (fAggregator->nextRowGroup() && !cancelled())
{
done = false;
fAggregator->finalize();
rowCount = fRowGroupOut.getRowCount();
fRowsReturned += rowCount;
fRowGroupDelivered.setData(fRowGroupOut.getRGData());

if (rowCount != 0)
{
if (!cleanUpAndOutputRowGroup(bs, dlp))
break;
}
done = true;
}
if (done)
fEndOfResult = true;
}
} else if (hasGroupByColumns) {
// CASE 3: Query contains no aggregation on a DISTINCT column, but at least one GROUP BY column
// e.g. SELECT SUM(col1) FROM test GROUP BY col2;
// As the aggregation algorithm of first phase calculates aggregation results online, e.g. for SUM()
// the aggregation here is already finished and the row groups just need to be delivered.
// TODO: Previous code meant that no more aggregation steps are done here. Is this always the case?
fDoneAggregate = true;
bool done = true;

//@bug4459
while (fAggregator->nextRowGroup() && !cancelled())
while (nextDeliveredRowGroup() && !cancelled())
{
done = false;
fAggregator->finalize();
rowCount = fRowGroupOut.getRowCount();
fRowsReturned += rowCount;
fRowGroupDelivered.setData(fRowGroupOut.getRGData());

if (rowCount != 0)
{
if (fRowGroupOut.getColumnCount() != fRowGroupDelivered.getColumnCount())
pruneAuxColumns();

if (dlp)
{
rgData = fRowGroupDelivered.duplicate();
dlp->insert(rgData);
}
else
{
bs.restart();
fRowGroupDelivered.serializeRGData(bs);
if (!cleanUpAndOutputRowGroup(bs, dlp))
break;
}
}

done = true;
}

if (done)
{
if (done) {
fEndOfResult = true;
}
} else {
throw logic_error(
"TupleAggregateStep::doThreadedAggregate: No DISTINCT columns nested into aggregation function or "
"GROUP BY columns found. Should not reach here.");
}
} // try
}
catch (...)
{
handleException(std::current_exception(), logging::tupleAggregateStepErr,
Expand Down Expand Up @@ -5872,6 +5887,25 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp
return rowCount;
}

bool TupleAggregateStep::cleanUpAndOutputRowGroup(ByteStream& bs, RowGroupDL* dlp)
{
if (fRowGroupOut.getColumnCount() != fRowGroupDelivered.getColumnCount())
pruneAuxColumns();

if (dlp)
{
RGData rgData = fRowGroupDelivered.duplicate();
dlp->insert(rgData);
}
else
{
bs.restart();
fRowGroupDelivered.serializeRGData(bs);
return false;
}
return true;
}

void TupleAggregateStep::pruneAuxColumns()
{
uint64_t rowCount = fRowGroupOut.getRowCount();
Expand Down