Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/TiDBTableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ void TiDBTableScan::constructTableScanForRemoteRead(tipb::TableScan * tipb_table
tipb_table_scan->set_table_id(table_id);
for (const auto & column : partition_table_scan.columns())
*tipb_table_scan->add_columns() = column;
for (const auto & filter : partition_table_scan.pushed_down_filter_conditions())
*tipb_table_scan->add_pushed_down_filter_conditions() = filter;
tipb_table_scan->set_desc(partition_table_scan.desc());
for (auto id : partition_table_scan.primary_column_ids())
tipb_table_scan->add_primary_column_ids(id);
Expand Down
7 changes: 6 additions & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ size_t DMFileReader::skipNextBlock()

scan_context->total_dmfile_skipped_rows += read_rows;
next_row_offset += read_rows;
last_read_skipped = true;
return read_rows;
}

Expand Down Expand Up @@ -396,6 +397,8 @@ Block DMFileReader::readWithFilter(const IColumn::Filter & filter)
// merge blocks
Block res = vstackBlocks(std::move(blocks));
res.setStartOffset(start_row_offset);

last_read_skipped = false;
return res;
}

Expand Down Expand Up @@ -639,6 +642,8 @@ Block DMFileReader::read()
e.rethrow();
}
}

last_read_skipped = false;
return res;
}

Expand All @@ -654,7 +659,7 @@ void DMFileReader::readFromDisk(
if (auto iter = column_streams.find(stream_name); iter != column_streams.end())
{
auto & top_stream = iter->second;
bool should_seek = force_seek || shouldSeek(start_pack_id) || skip_packs > 0;
bool should_seek = force_seek || shouldSeek(start_pack_id) || skip_packs > 0 || last_read_skipped;

auto data_type = dmfile->getColumnStat(column_define.id).type;
data_type->deserializeBinaryBulkWithMultipleStreams( //
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/DeltaMerge/File/DMFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ class DMFileReader

std::unique_ptr<ColumnSharingCacheMap> col_data_cache{};
std::unordered_map<ColId, bool> last_read_from_cache{};

/// call skipNextBlock() before read()
bool last_read_skipped{false};
};

} // namespace DM
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,26 @@ Block LateMaterializationBlockInputStream::readImpl()
// If filter is nullptr, it means that these push down filters are always true.
if (!filter)
{
Block rest_column_block = rest_column_stream->read();
IColumn::Filter col_filter;
col_filter.resize(filter_column_block.rows());
Block rest_column_block;
if (bitmap_filter->get(col_filter, filter_column_block.startOffset(), filter_column_block.rows()))
{
rest_column_block = rest_column_stream->read();
}
else
{
rest_column_block = rest_column_stream->read();
size_t passed_count = countBytesInFilter(col_filter);
for (auto & col : rest_column_block)
{
col.column = col.column->filter(col_filter, passed_count);
}
for (auto & col : filter_column_block)
{
col.column = col.column->filter(col_filter, passed_count);
}
}
return hstackBlocks({std::move(filter_column_block), std::move(rest_column_block)}, header);
}

Expand All @@ -66,7 +85,7 @@ Block LateMaterializationBlockInputStream::readImpl()
if (size_t passed_count = countBytesInFilter(*filter); passed_count == 0)
{
// if all rows are filtered, skip the next block of rest_column_stream
if (rest_column_stream->skipNextBlock() == 0)
if (size_t skipped_rows = rest_column_stream->skipNextBlock(); skipped_rows == 0)
{
// if we fail to skip, we need to call read() of rest_column_stream, but ignore the result
// NOTE: skipNextBlock() return 0 only if failed to skip or meets the end of stream,
Expand All @@ -77,6 +96,7 @@ Block LateMaterializationBlockInputStream::readImpl()
}
else
{
RUNTIME_CHECK(skipped_rows == rows);
LOG_DEBUG(log, "Late materialization skip read block at start_offset: {}, rows: {}", filter_column_block.startOffset(), filter_column_block.rows());
}
}
Expand Down