Skip to content

Commit

Permalink
Refactor DDL Module to support a large number of tables with frequent…
Browse files Browse the repository at this point in the history
… ddl actions (#7437)

ref #1664, ref #3777, close #6532, ref #7630
  • Loading branch information
hongyunyan committed Jun 15, 2023
1 parent 04bf517 commit 865ba65
Show file tree
Hide file tree
Showing 157 changed files with 3,267 additions and 3,032 deletions.
2 changes: 1 addition & 1 deletion contrib/client-c
7 changes: 0 additions & 7 deletions dbms/src/Common/FailPoint.cpp
Expand Up @@ -33,11 +33,6 @@ namespace DB
M(exception_between_rename_table_data_and_metadata) \
M(exception_between_create_database_meta_and_directory) \
M(exception_before_rename_table_old_meta_removed) \
M(exception_after_step_1_in_exchange_partition) \
M(exception_before_step_2_rename_in_exchange_partition) \
M(exception_after_step_2_in_exchange_partition) \
M(exception_before_step_3_rename_in_exchange_partition) \
M(exception_after_step_3_in_exchange_partition) \
M(region_exception_after_read_from_storage_some_error) \
M(region_exception_after_read_from_storage_all_error) \
M(exception_before_dmfile_remove_encryption) \
Expand Down Expand Up @@ -67,7 +62,6 @@ namespace DB
M(exception_mpp_hash_probe) \
M(exception_before_drop_segment) \
M(exception_after_drop_segment) \
M(exception_between_schema_change_in_the_same_diff) \
M(force_ps_wal_compact) \
M(pause_before_full_gc_prepare) \
M(force_owner_mgr_state) \
Expand Down Expand Up @@ -125,7 +119,6 @@ namespace DB
M(pause_after_copr_streams_acquired) \
M(pause_query_init)


#define APPLY_FOR_RANDOM_FAILPOINTS(M) \
M(random_tunnel_wait_timeout_failpoint) \
M(random_tunnel_write_failpoint) \
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Common/TiFlashMetrics.h
Expand Up @@ -110,8 +110,7 @@ namespace DB
F(type_passthrough_zstd_compression, {"type", "passthrough_zstd_compression"})) \
M(tiflash_schema_version, "Current version of tiflash cached schema", Gauge) \
M(tiflash_schema_applying, "Whether the schema is applying or not (holding lock)", Gauge) \
M(tiflash_schema_apply_count, "Total number of each kinds of apply", Counter, F(type_diff, {"type", "diff"}), \
F(type_full, {"type", "full"}), F(type_failed, {"type", "failed"}), \
M(tiflash_schema_apply_count, "Total number of each kinds of apply", Counter, F(type_failed, {"type", "failed"}), \
F(type_drop_keyspace, {"type", "drop_keyspace"})) \
M(tiflash_schema_trigger_count, "Total number of each kinds of schema sync trigger", Counter, /**/ \
F(type_timer, {"type", "timer"}), F(type_raft_decode, {"type", "raft_decode"}), F(type_cop_read, {"type", "cop_read"})) \
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Debug/DBGInvoker.cpp
Expand Up @@ -90,6 +90,8 @@ DBGInvoker::DBGInvoker()
regSchemalessFunc("gc_schemas", dbgFuncGcSchemas);
regSchemalessFunc("reset_schemas", dbgFuncResetSchemas);
regSchemalessFunc("is_tombstone", dbgFuncIsTombstone);
regSchemalessFunc("refresh_table_schema", dbgFuncRefreshTableSchema);
regSchemalessFunc("refresh_mapped_table_schema", dbgFuncRefreshMappedTableSchema);

regSchemalessFunc("region_split", MockRaftCommand::dbgFuncRegionBatchSplit);
regSchemalessFunc("region_prepare_merge", MockRaftCommand::dbgFuncPrepareMerge);
Expand Down Expand Up @@ -118,6 +120,8 @@ DBGInvoker::DBGInvoker()

regSchemalessFunc("mapped_database", dbgFuncMappedDatabase);
regSchemalessFunc("mapped_table", dbgFuncMappedTable);
regSchemalessFunc("mapped_table_exists", dbgFuncTableExists);
regSchemalessFunc("mapped_database_exists", dbgFuncDatabaseExists);
regSchemafulFunc("query_mapped", dbgFuncQueryMapped);
regSchemalessFunc("get_tiflash_replica_count", dbgFuncGetTiflashReplicaCount);
regSchemalessFunc("get_partition_tables_tiflash_replica_count", dbgFuncGetPartitionTablesTiflashReplicaCount);
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Debug/MockRaftStoreProxy.cpp
Expand Up @@ -27,6 +27,7 @@
#include <Storages/Transaction/TMTContext.h>
#include <Storages/Transaction/tests/region_helper.h>
#include <TestUtils/TiFlashTestEnv.h>
#include <TiDB/Schema/TiDBSchemaManager.h>
#include <google/protobuf/text_format.h>

namespace DB
Expand Down Expand Up @@ -825,7 +826,7 @@ TableID MockRaftStoreProxy::bootstrapTable(
MockTiDB::instance().newTable("d", "prevt" + toString(random()), columns, tso, "", "dt");
UInt64 table_id = MockTiDB::instance().newTable("d", "t" + toString(random()), columns, tso, "", "dt");

auto schema_syncer = tmt.getSchemaSyncer();
auto schema_syncer = tmt.getSchemaSyncerManager();
schema_syncer->syncSchemas(ctx, NullspaceID);
this->table_id = table_id;
return table_id;
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Debug/MockSchemaGetter.h
Expand Up @@ -42,6 +42,11 @@ struct MockSchemaGetter
return MockTiDB::instance().getTableInfoByID(table_id);
}

static std::tuple<TiDB::DBInfoPtr, TiDB::TableInfoPtr> getDatabaseAndTableInfo(DatabaseID db_id, TableID table_id)
{
return std::make_tuple(getDatabase(db_id), getTableInfo(db_id, table_id));
}

static std::vector<TiDB::DBInfoPtr> listDBs()
{
std::vector<TiDB::DBInfoPtr> res;
Expand Down
9 changes: 5 additions & 4 deletions dbms/src/Debug/MockSchemaNameMapper.h
Expand Up @@ -21,16 +21,17 @@ namespace DB

struct MockSchemaNameMapper : public SchemaNameMapper
{
String mapDatabaseName(const TiDB::DBInfo & db_info) const override { return db_info.name; }
String mapTableName(const TiDB::TableInfo & table_info) const override { return table_info.name; }
String mapDatabaseName(const TiDB::DBInfo & db_info) const override { return "db_" + std::to_string(db_info.id); }
String mapDatabaseName(DatabaseID database_id, KeyspaceID /*keyspace_id*/) const override { return "db_" + std::to_string(database_id); }
String mapTableName(const TiDB::TableInfo & table_info) const override { return "t_" + std::to_string(table_info.id); }

String mapPartitionName(const TiDB::TableInfo & table_info) const override
{
return table_info.name + "_" + std::to_string(table_info.id);
}

String debugDatabaseName(const TiDB::DBInfo & db_info) const override { return db_info.name; }
String debugTableName(const TiDB::TableInfo & table_info) const override { return table_info.name; }
String debugDatabaseName(const TiDB::DBInfo & db_info) const override { return fmt::format("db_{}", db_info.id); }
String debugTableName(const TiDB::TableInfo & table_info) const override { return fmt::format("t_{}", table_info.id); }
};

} // namespace DB
19 changes: 16 additions & 3 deletions dbms/src/Debug/MockTiDB.cpp
Expand Up @@ -153,7 +153,14 @@ DatabaseID MockTiDB::newDataBase(const String & database_name)

if (databases.find(database_name) == databases.end())
{
schema_id = databases.size() + 1;
if (databases.empty())
{
schema_id = 1;
}
else
{
schema_id = databases.cbegin()->second + 1;
}
databases.emplace(database_name, schema_id);
}

Expand Down Expand Up @@ -224,6 +231,7 @@ TiDB::TableInfoPtr MockTiDB::parseColumns(
{
String & name = string_tokens[index];
index_info.idx_cols[index].name = name;
index_info.idx_cols[index].offset = pk_column_pos_map[name];
index_info.idx_cols[index].length = -1;
}
}
Expand Down Expand Up @@ -273,13 +281,15 @@ TableID MockTiDB::newTable(
return addTable(database_name, std::move(*table_info));
}

int MockTiDB::newTables(
std::vector<TableID> MockTiDB::newTables(
const String & database_name,
const std::vector<std::tuple<String, ColumnsDescription, String>> & tables,
Timestamp tso,
const String & engine_type)
{
std::lock_guard lock(tables_mutex);
std::vector<TableID> table_ids;
table_ids.reserve(tables.size());
if (databases.find(database_name) == databases.end())
{
throw Exception("MockTiDB not found db: " + database_name, ErrorCodes::LOGICAL_ERROR);
Expand Down Expand Up @@ -310,6 +320,8 @@ int MockTiDB::newTables(
opt.old_schema_id = table->database_id;
opt.old_table_id = table->id();
diff.affected_opts.push_back(std::move(opt));

table_ids.push_back(table->id());
}

if (diff.affected_opts.empty())
Expand All @@ -318,7 +330,8 @@ int MockTiDB::newTables(
diff.schema_id = diff.affected_opts[0].schema_id;
diff.version = version;
version_diff[version] = diff;
return 0;

return table_ids;
}

TableID MockTiDB::addTable(const String & database_name, TiDB::TableInfo && table_info)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/MockTiDB.h
Expand Up @@ -82,7 +82,7 @@ class MockTiDB : public ext::Singleton<MockTiDB>
const String & handle_pk_name,
const String & engine_type);

int newTables(
std::vector<TableID> newTables(
const String & database_name,
const std::vector<std::tuple<String, ColumnsDescription, String>> & tables,
Timestamp tso,
Expand Down
6 changes: 4 additions & 2 deletions dbms/src/Debug/dbgFuncCoprocessor.cpp
Expand Up @@ -16,6 +16,7 @@
#include <Debug/dbgFuncCoprocessorUtils.h>
#include <Debug/dbgNaturalDag.h>
#include <Debug/dbgQueryExecutor.h>
#include <Debug/dbgTools.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTLiteral.h>
#include <Storages/IManageableStorage.h>
Expand Down Expand Up @@ -49,7 +50,9 @@ BlockInputStreamPtr dbgFuncTiDBQuery(Context & context, const ASTs & args)
context,
query,
[&](const String & database_name, const String & table_name) {
auto storage = context.getTable(database_name, table_name);
auto mapped_database_name = mappedDatabase(context, database_name);
auto mapped_table_name = mappedTable(context, database_name, table_name);
auto storage = context.getTable(mapped_database_name, mapped_table_name.second);
auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
if (!managed_storage //
|| !(managed_storage->engineType() == ::TiDB::StorageEngine::DT
Expand All @@ -58,7 +61,6 @@ BlockInputStreamPtr dbgFuncTiDBQuery(Context & context, const ASTs & args)
return managed_storage->getTableInfo();
},
properties);

return executeQuery(context, region_id, properties, query_tasks, func_wrap_output_stream);
}

Expand Down
9 changes: 1 addition & 8 deletions dbms/src/Debug/dbgFuncMockRaftCommand.cpp
Expand Up @@ -60,16 +60,9 @@ void MockRaftCommand::dbgFuncRegionBatchSplit(Context & context, const ASTs & ar
std::vector<Field> end_keys1;
std::vector<Field> end_keys2;

std::unordered_map<String, size_t> column_name_columns_index_map;
for (size_t i = 0; i < table_info.columns.size(); i++)
{
column_name_columns_index_map.emplace(table_info.columns[i].name, i);
}

for (size_t i = 0; i < handle_column_size; i++)
{
auto idx = column_name_columns_index_map[table_info.getPrimaryIndexInfo().idx_cols[i].name];
auto & column_info = table_info.columns[idx];
auto & column_info = table_info.columns[table_info.getPrimaryIndexInfo().idx_cols[i].offset];

auto start_field1 = RegionBench::convertField(column_info, typeid_cast<const ASTLiteral &>(*args[3 + i]).value);
TiDB::DatumBumpy start_datum1 = TiDB::DatumBumpy(start_field1, column_info.tp);
Expand Down
26 changes: 7 additions & 19 deletions dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp
Expand Up @@ -43,6 +43,7 @@
#include <Storages/Transaction/TMTContext.h>
#include <Storages/Transaction/TiKVRange.h>
#include <Storages/Transaction/tests/region_helper.h>
#include <TiDB/Schema/TiDBSchemaManager.h>
#include <fmt/core.h>

namespace DB
Expand Down Expand Up @@ -75,12 +76,6 @@ RegionPtr GenDbgRegionSnapshotWithData(Context & context, const ASTs & args)
size_t handle_column_size = is_common_handle ? table_info.getPrimaryIndexInfo().idx_cols.size() : 1;
RegionPtr region;

std::unordered_map<String, size_t> column_name_columns_index_map;
for (size_t i = 0; i < table_info.columns.size(); i++)
{
column_name_columns_index_map.emplace(table_info.columns[i].name, i);
}

if (!is_common_handle)
{
auto start = static_cast<HandleID>(safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*args[3]).value));
Expand All @@ -94,8 +89,7 @@ RegionPtr GenDbgRegionSnapshotWithData(Context & context, const ASTs & args)
std::vector<Field> end_keys;
for (size_t i = 0; i < handle_column_size; i++)
{
auto idx = column_name_columns_index_map[table_info.getPrimaryIndexInfo().idx_cols[i].name];
auto & column_info = table_info.columns[idx];
auto & column_info = table_info.columns[table_info.getPrimaryIndexInfo().idx_cols[i].offset];
auto start_field = RegionBench::convertField(column_info, typeid_cast<const ASTLiteral &>(*args[3 + i]).value);
TiDB::DatumBumpy start_datum = TiDB::DatumBumpy(start_field, column_info.tp);
start_keys.emplace_back(start_datum.field());
Expand Down Expand Up @@ -136,9 +130,9 @@ RegionPtr GenDbgRegionSnapshotWithData(Context & context, const ASTs & args)
std::vector<Field> keys; // handle key
for (size_t i = 0; i < table_info.getPrimaryIndexInfo().idx_cols.size(); i++)
{
auto idx = column_name_columns_index_map[table_info.getPrimaryIndexInfo().idx_cols[i].name];
auto & column_info = table_info.columns[idx];
auto start_field = RegionBench::convertField(column_info, fields[idx]);
auto & idx_col = table_info.getPrimaryIndexInfo().idx_cols[i];
auto & column_info = table_info.columns[idx_col.offset];
auto start_field = RegionBench::convertField(column_info, fields[idx_col.offset]);
TiDB::DatumBumpy start_datum = TiDB::DatumBumpy(start_field, column_info.tp);
keys.emplace_back(start_datum.field());
}
Expand Down Expand Up @@ -213,15 +207,9 @@ void MockRaftCommand::dbgFuncRegionSnapshot(Context & context, const ASTs & args
std::vector<Field> start_keys;
std::vector<Field> end_keys;

std::unordered_map<String, size_t> column_name_columns_index_map;
for (size_t i = 0; i < table_info.columns.size(); i++)
{
column_name_columns_index_map.emplace(table_info.columns[i].name, i);
}
for (size_t i = 0; i < handle_column_size; i++)
{
auto idx = column_name_columns_index_map[table_info.getPrimaryIndexInfo().idx_cols[i].name];
const auto & column_info = table_info.columns[idx];
const auto & column_info = table_info.columns[table_info.getPrimaryIndexInfo().idx_cols[i].offset];
auto start_field = RegionBench::convertField(column_info, typeid_cast<const ASTLiteral &>(*args[1 + i]).value);
TiDB::DatumBumpy start_datum = TiDB::DatumBumpy(start_field, column_info.tp);
start_keys.emplace_back(start_datum.field());
Expand Down Expand Up @@ -610,7 +598,7 @@ RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr & regio

if (!atomic_decode(false))
{
tmt.getSchemaSyncer()->syncSchemas(context, keyspace_id);
tmt.getSchemaSyncerManager()->syncSchemas(context, keyspace_id);

if (!atomic_decode(true))
throw Exception("Pre-decode " + region->toString() + " cache to table " + std::to_string(table_id) + " block failed",
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Debug/dbgFuncMockTiDBTable.cpp
Expand Up @@ -14,6 +14,7 @@

#include <Debug/MockTiDB.h>
#include <Debug/dbgFuncMockTiDBTable.h>
#include <Debug/dbgTools.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Parsers/ASTExpressionList.h>
Expand Down Expand Up @@ -304,7 +305,8 @@ void MockTiDBTable::dbgFuncCreateTiDBTables(Context & context, const ASTs & args
if (args.size() < 2)
throw Exception("Args not matched, should be: db_name, table_name, [table_name], ..., [table_name]", ErrorCodes::BAD_ARGUMENTS);
const String & database_name = typeid_cast<const ASTIdentifier &>(*args[0]).name;
auto db = context.getDatabase(database_name);
auto mapped_database_name = mappedDatabase(context, database_name);
auto db = context.getDatabase(mapped_database_name);

std::vector<std::tuple<String, ColumnsDescription, String>> tables;

Expand Down
9 changes: 2 additions & 7 deletions dbms/src/Debug/dbgFuncRegion.cpp
Expand Up @@ -61,15 +61,10 @@ void dbgFuncPutRegion(Context & context, const ASTs & args, DBGInvoker::Printer
{
std::vector<Field> start_keys;
std::vector<Field> end_keys;
std::unordered_map<String, size_t> column_name_columns_index_map;
for (size_t i = 0; i < table_info.columns.size(); i++)
{
column_name_columns_index_map.emplace(table_info.columns[i].name, i);
}

for (size_t i = 0; i < handle_column_size; i++)
{
auto idx = column_name_columns_index_map[table_info.getPrimaryIndexInfo().idx_cols[i].name];
const auto & column_info = table_info.columns[idx];
const auto & column_info = table_info.columns[table_info.getPrimaryIndexInfo().idx_cols[i].offset];
auto start_field = RegionBench::convertField(column_info, typeid_cast<const ASTLiteral &>(*args[1 + i]).value);
TiDB::DatumBumpy start_datum = TiDB::DatumBumpy(start_field, column_info.tp);
start_keys.emplace_back(start_datum.field());
Expand Down

0 comments on commit 865ba65

Please sign in to comment.