Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: Fix RegionBlockReaderTest helper functions (#5899) #5917

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 38 additions & 23 deletions dbms/src/Storages/Transaction/RowCodec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,18 @@ struct RowEncoderV2
/// Cache encoded individual columns.
for (size_t i_col = 0, i_val = 0; i_col < table_info.columns.size(); i_col++)
{
if (i_val == fields.size())
break;

const auto & column_info = table_info.columns[i_col];
const auto & field = fields[i_val];
if ((table_info.pk_is_handle || table_info.is_common_handle) && column_info.hasPriKeyFlag())
{
// for common handle/pk is handle table,
// the field with primary key flag is usually encoded to key instead of value
continue;
}

if (column_info.id > std::numeric_limits<typename RowV2::Types<false>::ColumnIDType>::max())
is_big = true;
if (!field.isNull())
Expand All @@ -213,9 +221,6 @@ struct RowEncoderV2
null_column_ids.emplace(column_info.id);
}
i_val++;

if (i_val == fields.size())
break;
}
is_big = is_big || value_length > std::numeric_limits<RowV2::Types<false>::ValueOffsetType>::max();

Expand Down Expand Up @@ -314,7 +319,7 @@ bool appendRowV2ToBlock(
ColumnID pk_handle_id,
bool force_decode)
{
UInt8 row_flag = readLittleEndian<UInt8>(&raw_value[1]);
auto row_flag = readLittleEndian<UInt8>(&raw_value[1]);
bool is_big = row_flag & RowV2::BigRowMask;
return is_big ? appendRowV2ToBlockImpl<true>(raw_value, column_ids_iter, column_ids_iter_end, block, block_column_pos, column_infos, pk_handle_id, force_decode)
: appendRowV2ToBlockImpl<false>(raw_value, column_ids_iter, column_ids_iter_end, block, block_column_pos, column_infos, pk_handle_id, force_decode);
Expand Down Expand Up @@ -360,9 +365,10 @@ bool appendRowV2ToBlockImpl(
decodeUInts<ColumnID, typename RowV2::Types<is_big>::ColumnIDType>(cursor, raw_value, num_null_columns, null_column_ids);
decodeUInts<size_t, typename RowV2::Types<is_big>::ValueOffsetType>(cursor, raw_value, num_not_null_columns, value_offsets);
size_t values_start_pos = cursor;
size_t id_not_null = 0, id_null = 0;
size_t idx_not_null = 0;
size_t idx_null = 0;
// Merge ordered not null/null columns to keep order.
while (id_not_null < not_null_column_ids.size() || id_null < null_column_ids.size())
while (idx_not_null < not_null_column_ids.size() || idx_null < null_column_ids.size())
{
if (column_ids_iter == column_ids_iter_end)
{
Expand All @@ -371,24 +377,32 @@ bool appendRowV2ToBlockImpl(
}

bool is_null;
if (id_not_null < not_null_column_ids.size() && id_null < null_column_ids.size())
is_null = not_null_column_ids[id_not_null] > null_column_ids[id_null];
if (idx_not_null < not_null_column_ids.size() && idx_null < null_column_ids.size())
is_null = not_null_column_ids[idx_not_null] > null_column_ids[idx_null];
else
is_null = id_null < null_column_ids.size();
is_null = idx_null < null_column_ids.size();

auto next_datum_column_id = is_null ? null_column_ids[id_null] : not_null_column_ids[id_not_null];
if (column_ids_iter->first > next_datum_column_id)
auto next_datum_column_id = is_null ? null_column_ids[idx_null] : not_null_column_ids[idx_not_null];
const auto next_column_id = column_ids_iter->first;
if (next_column_id > next_datum_column_id)
{
// extra column
// The next column id to read is bigger than the column id of next datum in encoded row.
// It means this is the datum of extra column. May happen when reading after dropping
// a column.
if (!force_decode)
return false;
// Ignore the extra column and continue to parse other datum
if (is_null)
id_null++;
idx_null++;
else
id_not_null++;
idx_not_null++;
}
else if (column_ids_iter->first < next_datum_column_id)
else if (next_column_id < next_datum_column_id)
{
// The next column id to read is less than the column id of next datum in encoded row.
// It means this is the datum of missing column. May happen when reading after adding
// a column.
// Fill with default value and continue to read data for next column id.
const auto & column_info = column_infos[column_ids_iter->second];
if (!addDefaultValueToColumnIfPossible(column_info, block, block_column_pos, force_decode))
return false;
Expand All @@ -397,23 +411,24 @@ bool appendRowV2ToBlockImpl(
}
else
{
// if pk_handle_id is a valid column id, then it means the table's pk_is_handle is true
// If pk_handle_id is a valid column id, then it means the table's pk_is_handle is true
// we can just ignore the pk value encoded in value part
if (unlikely(column_ids_iter->first == pk_handle_id))
if (unlikely(next_column_id == pk_handle_id))
{
column_ids_iter++;
block_column_pos++;
if (is_null)
{
id_null++;
idx_null++;
}
else
{
id_not_null++;
idx_not_null++;
}
continue;
}

// Parse the datum.
auto * raw_column = const_cast<IColumn *>((block.getByPosition(block_column_pos)).column.get());
const auto & column_info = column_infos[column_ids_iter->second];
if (is_null)
Expand All @@ -432,15 +447,15 @@ bool appendRowV2ToBlockImpl(
}
// ColumnNullable::insertDefault just insert a null value
raw_column->insertDefault();
id_null++;
idx_null++;
}
else
{
size_t start = id_not_null ? value_offsets[id_not_null - 1] : 0;
size_t length = value_offsets[id_not_null] - start;
size_t start = idx_not_null ? value_offsets[idx_not_null - 1] : 0;
size_t length = value_offsets[idx_not_null] - start;
if (!raw_column->decodeTiDBRowV2Datum(values_start_pos + start, raw_value, length, force_decode))
return false;
id_not_null++;
idx_not_null++;
}
column_ids_iter++;
block_column_pos++;
Expand Down
45 changes: 28 additions & 17 deletions dbms/src/Storages/Transaction/tests/RowCodecTestUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
#pragma once
#include <Storages/Transaction/DecodingStorageSchemaSnapshot.h>
#include <Storages/Transaction/RowCodec.h>
#include <Storages/Transaction/TiDB.h>
#include <Storages/Transaction/TypeMapping.h>
#include <Storages/Transaction/Types.h>

namespace DB::tests
{
Expand Down Expand Up @@ -146,7 +148,7 @@ struct ColumnIDValue<T, true>
{
static constexpr bool value_is_null = true;
using ValueType = std::decay_t<T>;
ColumnIDValue(ColumnID id_)
explicit ColumnIDValue(ColumnID id_)
: id(id_)
{}
ColumnID id;
Expand Down Expand Up @@ -211,46 +213,55 @@ void getTableInfoFieldsInternal(OrderedColumnInfoFields & column_info_fields, Ty
}

template <typename... Types>
std::pair<TableInfo, std::vector<Field>> getTableInfoAndFields(ColumnIDs handle_ids, bool is_common_handle, Types &&... column_value_ids)
std::pair<TableInfo, std::vector<Field>> getTableInfoAndFields(ColumnIDs pk_col_ids, bool is_common_handle, Types &&... column_value_ids)
{
OrderedColumnInfoFields column_info_fields;
getTableInfoFieldsInternal(column_info_fields, std::forward<Types>(column_value_ids)...);
TableInfo table_info;
std::vector<Field> fields;
bool pk_is_handle = pk_col_ids.size() == 1 && pk_col_ids[0] != ::DB::TiDBPkColumnID;

for (auto & column_info_field : column_info_fields)
{
auto & column = std::get<0>(column_info_field.second);
auto & field = std::get<1>(column_info_field.second);
if (std::find(handle_ids.begin(), handle_ids.end(), column.id) != handle_ids.end())
if (std::find(pk_col_ids.begin(), pk_col_ids.end(), column.id) != pk_col_ids.end())
{
column.setPriKeyFlag();
if (column.tp != TiDB::TypeLong && column.tp != TiDB::TypeTiny && column.tp != TiDB::TypeLongLong && column.tp != TiDB::TypeShort && column.tp != TiDB::TypeInt24)
{
pk_is_handle = false;
}
}
table_info.columns.emplace_back(std::move(column));
fields.emplace_back(std::move(field));
}
if (!is_common_handle)
{
if (handle_ids[0] != EXTRA_HANDLE_COLUMN_ID)
table_info.pk_is_handle = true;
}
else

table_info.pk_is_handle = pk_is_handle;
table_info.is_common_handle = is_common_handle;
if (is_common_handle)
{
table_info.is_common_handle = true;
TiDB::IndexInfo index_info;
for (auto handle_id : handle_ids)
// TiFlash maintains the column name of primary key
// for common handle table
TiDB::IndexInfo pk_index_info;
pk_index_info.is_primary = true;
pk_index_info.idx_name = "PRIMARY";
pk_index_info.is_unique = true;
for (auto pk_col_id : pk_col_ids)
{
TiDB::IndexColumnInfo index_column_info;
for (auto & column : table_info.columns)
{
if (column.id == handle_id)
if (column.id == pk_col_id)
{
index_column_info.name = column.name;
break;
}
}
index_info.idx_cols.emplace_back(index_column_info);
pk_index_info.idx_cols.emplace_back(index_column_info);
}
table_info.index_infos.emplace_back(index_info);
table_info.index_infos.emplace_back(pk_index_info);
}

return std::make_pair(std::move(table_info), std::move(fields));
Expand All @@ -272,7 +283,7 @@ inline DecodingStorageSchemaSnapshotConstPtr getDecodingStorageSchemaSnapshot(co
store_columns.emplace_back(VERSION_COLUMN_ID, VERSION_COLUMN_NAME, VERSION_COLUMN_TYPE);
store_columns.emplace_back(TAG_COLUMN_ID, TAG_COLUMN_NAME, TAG_COLUMN_TYPE);
ColumnID handle_id = EXTRA_HANDLE_COLUMN_ID;
for (auto & column_info : table_info.columns)
for (const auto & column_info : table_info.columns)
{
if (table_info.pk_is_handle)
{
Expand Down Expand Up @@ -301,7 +312,7 @@ size_t valueStartPos(const TableInfo & table_info)

inline Block decodeRowToBlock(const String & row_value, DecodingStorageSchemaSnapshotConstPtr decoding_schema)
{
auto & sorted_column_id_with_pos = decoding_schema->sorted_column_id_with_pos;
const auto & sorted_column_id_with_pos = decoding_schema->sorted_column_id_with_pos;
auto iter = sorted_column_id_with_pos.begin();
const size_t value_column_num = 3;
// skip first three column which is EXTRA_HANDLE_COLUMN, VERSION_COLUMN, TAG_COLUMN
Expand Down Expand Up @@ -347,4 +358,4 @@ T getValueByRowV1(const T & v)
return static_cast<T>(std::move((*block.getByPosition(0).column)[0].template safeGet<NearestType>()));
}

} // namespace DB::tests
} // namespace DB::tests
Loading