Skip to content

Commit

Permalink
Implement materialized index scans by materializing lazy scans (ad-fr…
Browse files Browse the repository at this point in the history
…eiburg#1323)

This makes the code much simpler, and makes no difference for almost all queries. The expensive part (reading from disk and decompressing) is still done in parallel, only the writing to the `IdTable` is now serialized + there is an additional copy compared to before. An example query that is slower now because of this change is: materialize a large index scan (for example, for the predicate `rdf:type`) and group by subject (there is a shortcut for grouping by object when there are few objects). But such queries will become lazy soon anyway (see ad-freiburg#1350) and then this will be irrelevant.
  • Loading branch information
joka921 authored and realHannes committed Jun 15, 2024
1 parent a60aad6 commit 46afb77
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 124 deletions.
12 changes: 12 additions & 0 deletions src/engine/idTable/IdTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,18 @@ class IdTable {
}
}

// Add all entries from the `table` at the end of this IdTable.
void insertAtEnd(const IdTable& table) {
AD_CORRECTNESS_CHECK(table.numColumns() == numColumns());
auto oldSize = size();
resize(numRows() + table.numRows_);
std::ranges::for_each(ad_utility::integerRange(numColumns()),
[this, &table, oldSize](size_t i) {
std::ranges::copy(table.getColumn(i),
getColumn(i).begin() + oldSize);
});
}

// Check whether two `IdTables` have the same content. Mostly used for unit
// testing.
bool operator==(const IdTable& other) const requires(!isView) {
Expand Down
127 changes: 11 additions & 116 deletions src/index/CompressedRelation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,110 +314,19 @@ IdTable CompressedRelationReader::scan(
auto columnIndices = prepareColumnIndices(scanSpec, additionalColumns);
IdTable result(columnIndices.size(), allocator_);

// Get all the blocks that possibly might contain our pair of col0Id and
// col1Id
// Compute an upper bound for the size and reserve enough space in the result.
auto relevantBlocks = getRelevantBlocks(scanSpec, blocks);
auto beginBlock = relevantBlocks.begin();
auto endBlock = relevantBlocks.end();

// The first and the last block might be incomplete (that is, only
// a part of these blocks is actually part of the result,
// set up a lambda which allows us to read these blocks, and returns
// the result as a vector.
auto readIncompleteBlock = [&](const auto& block) {
return readPossiblyIncompleteBlock(scanSpec, block, std::nullopt,
columnIndices);
};

// The first and the last block might be incomplete, compute
// and store the partial results from them.
std::optional<DecompressedBlock> firstBlockResult;
std::optional<DecompressedBlock> lastBlockResult;
size_t totalResultSize = 0;
if (beginBlock < endBlock) {
firstBlockResult = readIncompleteBlock(*beginBlock);
totalResultSize += firstBlockResult.value().size();
++beginBlock;
cancellationHandle->throwIfCancelled();
}
if (beginBlock < endBlock) {
lastBlockResult = readIncompleteBlock(*(endBlock - 1));
totalResultSize += lastBlockResult.value().size();
endBlock--;
cancellationHandle->throwIfCancelled();
auto sizes = relevantBlocks |
std::views::transform(&CompressedBlockMetadata::numRows_);
auto upperBoundSize = std::accumulate(sizes.begin(), sizes.end(), 0ULL);
result.reserve(upperBoundSize);

for (const auto& block :
lazyScan(scanSpec, {relevantBlocks.begin(), relevantBlocks.end()},
{additionalColumns.begin(), additionalColumns.end()},
cancellationHandle)) {
result.insertAtEnd(block);
}

// Determine the total size of the result.
// First accumulate the complete blocks in the "middle"
totalResultSize += std::accumulate(beginBlock, endBlock, 0UL,
[](const auto& count, const auto& block) {
return count + block.numRows_;
});
result.resize(totalResultSize);
cancellationHandle->throwIfCancelled();

size_t rowIndexOfNextBlockStart = 0;
// Lambda that appends a possibly incomplete block (the first or last block)
// to the `result`.
auto addIncompleteBlockIfExists =
[&rowIndexOfNextBlockStart, &result](
const std::optional<DecompressedBlock>& incompleteBlock) mutable {
if (!incompleteBlock.has_value()) {
return;
}
AD_CORRECTNESS_CHECK(incompleteBlock->numColumns() ==
result.numColumns());
for (auto i : ad_utility::integerRange(result.numColumns())) {
std::ranges::copy(
incompleteBlock->getColumn(i),
result.getColumn(i).data() + rowIndexOfNextBlockStart);
}
rowIndexOfNextBlockStart += incompleteBlock->numRows();
};

addIncompleteBlockIfExists(firstBlockResult);
cancellationHandle->throwIfCancelled();

// Insert the complete blocks from the middle in parallel
if (beginBlock < endBlock) {
#pragma omp parallel
#pragma omp single
for (; beginBlock < endBlock; ++beginBlock) {
const auto& block = *beginBlock;

// Read the block serially, only read the second column.
AD_CORRECTNESS_CHECK(block.offsetsAndCompressedSize_.size() >= 2);
CompressedBlock compressedBuffer =
readCompressedBlockFromFile(block, columnIndices);

// A lambda that owns the compressed block decompresses it to the
// correct position in the result. It may safely be run in parallel
auto decompressLambda = [rowIndexOfNextBlockStart, &block, &result,
compressedBuffer =
std::move(compressedBuffer)]() mutable {
ad_utility::TimeBlockAndLog tbl{"Decompression a block"};

decompressBlockToExistingIdTable(compressedBuffer, block.numRows_,
result, rowIndexOfNextBlockStart);
};

// Register an OpenMP task that performs the decompression of this
// block in parallel
#pragma omp task
{
if (!cancellationHandle->isCancelled()) {
decompressLambda();
}
}

// update the pointers
rowIndexOfNextBlockStart += block.numRows_;
} // end of parallel region
}
cancellationHandle->throwIfCancelled();
// Add the last block.
addIncompleteBlockIfExists(lastBlockResult);
AD_CORRECTNESS_CHECK(rowIndexOfNextBlockStart == result.size());
cancellationHandle->throwIfCancelled();
return result;
}
Expand Down Expand Up @@ -674,20 +583,6 @@ DecompressedBlock CompressedRelationReader::decompressBlock(
return decompressedBlock;
}

// ____________________________________________________________________________
void CompressedRelationReader::decompressBlockToExistingIdTable(
const CompressedBlock& compressedBlock, size_t numRowsToRead,
IdTable& table, size_t offsetInTable) {
AD_CORRECTNESS_CHECK(table.numRows() >= offsetInTable + numRowsToRead);
// TODO<joka921, C++23> use zip_view.
AD_CORRECTNESS_CHECK(compressedBlock.size() == table.numColumns());
for (size_t i = 0; i < compressedBlock.size(); ++i) {
auto col = table.getColumn(i);
decompressColumn(compressedBlock[i], numRowsToRead,
col.data() + offsetInTable);
}
}

// ____________________________________________________________________________
template <typename Iterator>
void CompressedRelationReader::decompressColumn(
Expand Down
8 changes: 0 additions & 8 deletions src/index/CompressedRelation.h
Original file line number Diff line number Diff line change
Expand Up @@ -544,14 +544,6 @@ class CompressedRelationReader {
DecompressedBlock decompressBlock(const CompressedBlock& compressedBlock,
size_t numRowsToRead) const;

// Similar to `decompressBlock`, but the block is directly decompressed into
// the `table`, starting at the `offsetInTable`-th row. The `table` and the
// `compressedBlock` must have the same number of columns, and the `table`
// must have at least `numRowsToRead + offsetInTable` rows.
static void decompressBlockToExistingIdTable(
const CompressedBlock& compressedBlock, size_t numRowsToRead,
IdTable& table, size_t offsetInTable);

// Helper function used by `decompressBlock` and
// `decompressBlockToExistingIdTable`. Decompress the `compressedColumn` and
// store the result at the `iterator`. For the `numRowsToRead` argument, see
Expand Down

0 comments on commit 46afb77

Please sign in to comment.