Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Apply the patch from Gagan.
Signed-off-by: Jigao Luo <luojigao@outlook.com>
  • Loading branch information
JigaoLuo committed Jul 9, 2022
1 parent b3d415a commit 7598702
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 9 deletions.
23 changes: 15 additions & 8 deletions dbcon/joblist/tupleunion.cpp
Expand Up @@ -1124,7 +1124,8 @@ void TupleUnion::readInput(uint32_t which)
const_cast<RowPosition&>(*(inserted.first)) =
RowPosition(rowMemory.size() - 1, l_outputRG.getRowCount());
memDiff += outRow.getRealSize();
addToOutput(&outRow, &l_outputRG, true, outRGData);
uint32_t tmpRowCount = 0;
addToOutput(&outRow, &l_outputRG, true, outRGData, tmpRowCount);
}
}

Expand All @@ -1149,11 +1150,17 @@ void TupleUnion::readInput(uint32_t which)
else
{
std::vector<std::function<void(const Row& in, Row* out, uint32_t col)>> normalizeFunctions = inferNormalizeFunctions(inRow, &outRow);
for (uint32_t i = 0; i < l_inputRG.getRowCount(); i++, inRow.nextRow())
const uint32_t inputRGRowCount = l_inputRG.getRowCount();
uint32_t tmpRowCount = l_outputRG.getRowCount();

for (uint32_t i = 0; i < inputRGRowCount; i++, inRow.nextRow())
{
normalize(inRow, &outRow, normalizeFunctions);
addToOutput(&outRow, &l_outputRG, false, outRGData);
addToOutput(&outRow, &l_outputRG, false, outRGData, tmpRowCount);
}

fRowsReturned += inputRGRowCount;
l_outputRG.setRowCount(tmpRowCount);
}

more = dl->next(it, &inRGData);
Expand Down Expand Up @@ -1264,14 +1271,14 @@ void TupleUnion::getOutput(RowGroup* rg, Row* row, RGData* data)
rg->getRow(rg->getRowCount(), row);
}

void TupleUnion::addToOutput(Row* r, RowGroup* rg, bool keepit, RGData& data)
void TupleUnion::addToOutput(Row* r, RowGroup* rg, bool keepit, RGData& data, uint32_t& tmpRowCount)
{
r->nextRow();
rg->incRowCount();
fRowsReturned++;
tmpRowCount++;

if (rg->getRowCount() == 8192)
if (UNLIKELY(tmpRowCount == 8192))
{
rg->setRowCount(8192);
{
boost::mutex::scoped_lock lock(sMutex);
output->insert(data);
Expand All @@ -1280,13 +1287,13 @@ void TupleUnion::addToOutput(Row* r, RowGroup* rg, bool keepit, RGData& data)
rg->setData(&data);
rg->resetRowGroup(0);
rg->getRow(0, r);
tmpRowCount = 0;

if (keepit)
rowMemory.push_back(data);
}
}


void TupleUnion::normalize(const Row& in, Row* out, std::vector<std::function<void(const Row& in, Row* out, uint32_t col)>>& normalizeFunctions)
{
uint32_t i;
Expand Down
2 changes: 1 addition & 1 deletion dbcon/joblist/tupleunion.h
Expand Up @@ -122,7 +122,7 @@ class TupleUnion : public JobStep, public TupleDeliveryStep
};

void getOutput(rowgroup::RowGroup* rg, rowgroup::Row* row, rowgroup::RGData* data);
void addToOutput(rowgroup::Row* r, rowgroup::RowGroup* rg, bool keepit, rowgroup::RGData& data);
void addToOutput(rowgroup::Row* r, rowgroup::RowGroup* rg, bool keepit, rowgroup::RGData& data, uint32_t& tmpRowCount);
void normalize(const rowgroup::Row& in, rowgroup::Row* out, std::vector<std::function<void(const rowgroup::Row& in, rowgroup::Row* out, uint32_t col)>>& infer_normalize_functions);
void writeNull(rowgroup::Row* out, uint32_t col);
void readInput(uint32_t);
Expand Down

0 comments on commit 7598702

Please sign in to comment.