Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Fix the case with distinct==True in function TupleUnion::readInput.
Signed-off-by: Jigao Luo <luojigao@outlook.com>
  • Loading branch information
JigaoLuo committed Jul 21, 2022
1 parent 7598702 commit 17f9ffe
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 11 deletions.
24 changes: 14 additions & 10 deletions dbcon/joblist/tupleunion.cpp
Expand Up @@ -1113,7 +1113,9 @@ void TupleUnion::readInput(uint32_t which)
getOutput(&l_outputRG, &outRow, &outRGData);
memUsageBefore = allocator.getMemUsage();

for (uint32_t i = 0; i < l_tmpRG.getRowCount(); i++, tmpRow.nextRow())
const uint32_t tmpRGRowCount = l_tmpRG.getRowCount();
uint32_t tmpOutputRowCount = l_outputRG.getRowCount();
for (uint32_t i = 0; i < tmpRGRowCount; i++, tmpRow.nextRow())
{
pair<Uniquer_t::iterator, bool> inserted;
inserted = uniquer->insert(RowPosition(which | RowPosition::normalizedFlag, i));
Expand All @@ -1124,11 +1126,13 @@ void TupleUnion::readInput(uint32_t which)
const_cast<RowPosition&>(*(inserted.first)) =
RowPosition(rowMemory.size() - 1, l_outputRG.getRowCount());
memDiff += outRow.getRealSize();
uint32_t tmpRowCount = 0;
addToOutput(&outRow, &l_outputRG, true, outRGData, tmpRowCount);
addToOutput(&outRow, &l_outputRG, true, outRGData, tmpOutputRowCount);
}
}

fRowsReturned += tmpRGRowCount;
l_outputRG.setRowCount(tmpOutputRowCount);

memUsageAfter = allocator.getMemUsage();
memDiff += (memUsageAfter - memUsageBefore);
memUsage += memDiff;
Expand All @@ -1151,16 +1155,16 @@ void TupleUnion::readInput(uint32_t which)
{
std::vector<std::function<void(const Row& in, Row* out, uint32_t col)>> normalizeFunctions = inferNormalizeFunctions(inRow, &outRow);
const uint32_t inputRGRowCount = l_inputRG.getRowCount();
uint32_t tmpRowCount = l_outputRG.getRowCount();
uint32_t tmpOutputRowCount = l_outputRG.getRowCount();

for (uint32_t i = 0; i < inputRGRowCount; i++, inRow.nextRow())
{
normalize(inRow, &outRow, normalizeFunctions);
addToOutput(&outRow, &l_outputRG, false, outRGData, tmpRowCount);
addToOutput(&outRow, &l_outputRG, false, outRGData, tmpOutputRowCount);
}

fRowsReturned += inputRGRowCount;
l_outputRG.setRowCount(tmpRowCount);
l_outputRG.setRowCount(tmpOutputRowCount);
}

more = dl->next(it, &inRGData);
Expand Down Expand Up @@ -1271,12 +1275,12 @@ void TupleUnion::getOutput(RowGroup* rg, Row* row, RGData* data)
rg->getRow(rg->getRowCount(), row);
}

void TupleUnion::addToOutput(Row* r, RowGroup* rg, bool keepit, RGData& data, uint32_t& tmpRowCount)
void TupleUnion::addToOutput(Row* r, RowGroup* rg, bool keepit, RGData& data, uint32_t& tmpOutputRowCount)
{
r->nextRow();
tmpRowCount++;
tmpOutputRowCount++;

if (UNLIKELY(tmpRowCount == 8192))
if (UNLIKELY(tmpOutputRowCount == 8192))
{
rg->setRowCount(8192);
{
Expand All @@ -1287,7 +1291,7 @@ void TupleUnion::addToOutput(Row* r, RowGroup* rg, bool keepit, RGData& data, ui
rg->setData(&data);
rg->resetRowGroup(0);
rg->getRow(0, r);
tmpRowCount = 0;
tmpOutputRowCount = 0;

if (keepit)
rowMemory.push_back(data);
Expand Down
2 changes: 1 addition & 1 deletion dbcon/joblist/tupleunion.h
Expand Up @@ -122,7 +122,7 @@ class TupleUnion : public JobStep, public TupleDeliveryStep
};

void getOutput(rowgroup::RowGroup* rg, rowgroup::Row* row, rowgroup::RGData* data);
void addToOutput(rowgroup::Row* r, rowgroup::RowGroup* rg, bool keepit, rowgroup::RGData& data, uint32_t& tmpRowCount);
void addToOutput(rowgroup::Row* r, rowgroup::RowGroup* rg, bool keepit, rowgroup::RGData& data, uint32_t& tmpOutputRowCount);
void normalize(const rowgroup::Row& in, rowgroup::Row* out, std::vector<std::function<void(const rowgroup::Row& in, rowgroup::Row* out, uint32_t col)>>& infer_normalize_functions);
void writeNull(rowgroup::Row* out, uint32_t col);
void readInput(uint32_t);
Expand Down

0 comments on commit 17f9ffe

Please sign in to comment.