Skip to content

Commit

Permalink
Create storage instance with tombstone timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
JaySon-Huang committed Nov 26, 2023
1 parent 8d72656 commit e189878
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 40 deletions.
5 changes: 5 additions & 0 deletions dbms/src/Debug/MockSchemaGetter.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ struct MockSchemaGetter
return MockTiDB::instance().getTableInfoByID(table_id);
}

static std::pair<TiDB::TableInfoPtr, bool> getTableInfoAndCheckMvcc(DatabaseID db_id, TableID table_id)
{
return {getTableInfo(db_id, table_id), false};
}

static std::tuple<TiDB::DBInfoPtr, TiDB::TableInfoPtr> getDatabaseAndTableInfo(DatabaseID db_id, TableID table_id)
{
return std::make_tuple(getDatabase(db_id), getTableInfo(db_id, table_id));
Expand Down
83 changes: 68 additions & 15 deletions dbms/src/TiDB/Schema/SchemaBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,16 @@ bool isReservedDatabase(Context & context, const String & database_name)
template <typename Getter, typename NameMapper>
void SchemaBuilder<Getter, NameMapper>::applyCreateTable(DatabaseID database_id, TableID table_id)
{
auto table_info = getter.getTableInfo(database_id, table_id);
if (table_info == nullptr) // the database maybe dropped
TableInfoPtr table_info;
bool get_by_mvcc = false;
std::tie(table_info, get_by_mvcc) = getter.getTableInfoAndCheckMvcc(database_id, table_id);
if (table_info == nullptr)
{
LOG_INFO(
log,
"table is not exist in TiKV, may have been dropped, applyCreateTable is ignored, table_id={}",
"table is not exist in TiKV, may have been dropped, applyCreateTable is ignored, database_id={} "
"table_id={}",
database_id,
table_id);
return;
}
Expand All @@ -87,16 +91,28 @@ void SchemaBuilder<Getter, NameMapper>::applyCreateTable(DatabaseID database_id,
}

// If table is partition table, we will create the logical table here.
// Because we get the table_info, so we can ensure new_db_info will not be nullptr.
auto new_db_info = getter.getDatabase(database_id);
applyCreateStorageInstance(new_db_info, table_info);
if (new_db_info == nullptr)
{
// the database has been dropped
LOG_INFO(
log,
"database is not exist in TiKV, may have been dropped, applyCreateTable is ignored, database_id={} "
"table_id={}",
database_id,
table_id);
return;
}
applyCreateStorageInstance(new_db_info, table_info, get_by_mvcc);

// Register the partition_id -> logical_table_id mapping
for (const auto & part_def : table_info->partition.definitions)
{
LOG_DEBUG(
log,
"register table to table_id_map for partition table, logical_table_id={} physical_table_id={}",
"register table to table_id_map for partition table, database_id={} logical_table_id={} "
"physical_table_id={}",
database_id,
table_id,
part_def.id);
table_id_map.emplacePartitionTableID(part_def.id, table_id);
Expand Down Expand Up @@ -722,6 +738,16 @@ void SchemaBuilder<Getter, NameMapper>::applyRecoverTable(DatabaseID database_id
return;
}

applyRecoverLogicalTable(db_info, table_info);
}

template <typename Getter, typename NameMapper>
void SchemaBuilder<Getter, NameMapper>::applyRecoverLogicalTable(
const TiDB::DBInfoPtr & db_info,
const TiDB::TableInfoPtr & table_info)
{
assert(db_info != nullptr);
assert(table_info != nullptr);
if (table_info->isLogicalPartitionTable())
{
for (const auto & part_def : table_info->partition.definitions)
Expand Down Expand Up @@ -952,6 +978,7 @@ String createTableStmt(
const DBInfo & db_info,
const TableInfo & table_info,
const SchemaNameMapper & name_mapper,
const UInt64 tombstone,
const LoggerPtr & log)
{
LOG_DEBUG(log, "Analyzing table info : {}", table_info.serialize());
Expand Down Expand Up @@ -985,13 +1012,14 @@ String createTableStmt(
}
writeString("), '", stmt_buf);
writeEscapedString(table_info.serialize(), stmt_buf);
writeString("')", stmt_buf);
writeString(fmt::format("', {})", tombstone), stmt_buf);
}
else
{
throw TiFlashException(
fmt::format("Unknown engine type : {}", static_cast<int32_t>(table_info.engine_type)),
Errors::DDL::Internal);
Errors::DDL::Internal,
"Unknown engine type : {}",
fmt::underlying(table_info.engine_type));
}

return stmt;
Expand All @@ -1000,8 +1028,12 @@ String createTableStmt(
template <typename Getter, typename NameMapper>
void SchemaBuilder<Getter, NameMapper>::applyCreateStorageInstance(
const TiDB::DBInfoPtr & db_info,
const TableInfoPtr & table_info)
const TableInfoPtr & table_info,
bool is_tombstone)
{
assert(db_info != nullptr);
assert(table_info != nullptr);

GET_METRIC(tiflash_schema_internal_ddl_count, type_create_table).Increment();
LOG_INFO(
log,
Expand All @@ -1023,7 +1055,13 @@ void SchemaBuilder<Getter, NameMapper>::applyCreateStorageInstance(
table_info->engine_type = tmt_context.getEngineType();
}

String stmt = createTableStmt(*db_info, *table_info, name_mapper, log);
UInt64 tombstone_ts = 0;
if (is_tombstone)
{
tombstone_ts = context.getTMTContext().getPDClient()->getTS();
}

String stmt = createTableStmt(*db_info, *table_info, name_mapper, tombstone_ts, log);

LOG_INFO(
log,
Expand Down Expand Up @@ -1201,7 +1239,11 @@ void SchemaBuilder<Getter, NameMapper>::syncAllSchema()
table_id_map.emplaceTableID(table->id, db->id);
LOG_DEBUG(log, "register table to table_id_map, database_id={} table_id={}", db->id, table->id);

applyCreateStorageInstance(db, table);
// `SchemaGetter::listTables` only return non-tombstone tables.
// So `syncAllSchema` will not create tombstone tables. But if there are new rows/new snapshot
// sent to TiFlash, TiFlash can create the instance by `applyTable` with force==true in the
// related process.
applyCreateStorageInstance(db, table, false);
if (table->isLogicalPartitionTable())
{
for (const auto & part_def : table->partition.definitions)
Expand Down Expand Up @@ -1292,12 +1334,22 @@ bool SchemaBuilder<Getter, NameMapper>::applyTable(
// dropped but not physically removed by TiDB/TiKV gc_safepoint.
// It is need for TiFlash correctly decoding the data and get ready for `RECOVER TABLE`
// and `RECOVER DATABASE`.
auto table_info = getter.getTableInfo(database_id, logical_table_id, /*try_mvcc*/ force);
TableInfoPtr table_info;
bool get_by_mvcc = false;
if (!force)
{
table_info = getter.getTableInfo(database_id, logical_table_id, /*try_mvcc*/ false);
}
else
{
std::tie(table_info, get_by_mvcc) = getter.getTableInfoAndCheckMvcc(database_id, logical_table_id);
}
if (table_info == nullptr)
{
LOG_WARNING(
log,
"table is not exist in TiKV, applyTable need retry, database_id={} logical_table_id={}",
"table is not exist in TiKV, applyTable need retry, get_by_mvcc={} database_id={} logical_table_id={}",
get_by_mvcc,
database_id,
logical_table_id);
return false;
Expand Down Expand Up @@ -1351,7 +1403,8 @@ bool SchemaBuilder<Getter, NameMapper>::applyTable(
}

// Create the instance with the latest table info
applyCreateStorageInstance(db_info, table_info);
// If the table info is get by mvcc, it means the table is actually in "dropped" status
applyCreateStorageInstance(db_info, table_info, get_by_mvcc);
return true;
}

Expand Down
5 changes: 4 additions & 1 deletion dbms/src/TiDB/Schema/SchemaBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ struct SchemaBuilder

void applyCreateSchema(const TiDB::DBInfoPtr & db_info);

void applyCreateStorageInstance(const TiDB::DBInfoPtr & db_info, const TiDB::TableInfoPtr & table_info);
void applyCreateStorageInstance(
const TiDB::DBInfoPtr & db_info,
const TiDB::TableInfoPtr & table_info,
bool is_tombstone);

void applyDropTable(DatabaseID database_id, TableID table_id);

Expand Down
21 changes: 10 additions & 11 deletions dbms/src/TiDB/Schema/SchemaGetter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,40 +283,39 @@ TiDB::DBInfoPtr SchemaGetter::getDatabase(DatabaseID db_id)
}

template <bool mvcc_get>
TiDB::TableInfoPtr SchemaGetter::getTableInfoImpl(DatabaseID db_id, TableID table_id)
std::pair<TiDB::TableInfoPtr, bool> SchemaGetter::getTableInfoImpl(DatabaseID db_id, TableID table_id)
{
String db_key = getDBKey(db_id);
if (!checkDBExists(db_key))
{
LOG_ERROR(log, "The database does not exist, database_id={}", db_id);
return nullptr;
}
// Note: Do not check the existence of `db_key` here, otherwise we can not
// get the table info after database is dropped.
String table_key = getTableKey(table_id);
String table_info_json = TxnStructure::hGet(snap, db_key, table_key);
bool get_by_mvcc = false;
if (table_info_json.empty())
{
if constexpr (!mvcc_get)
{
return nullptr;
return {nullptr, false};
}

LOG_WARNING(log, "The table is dropped in TiKV, try to get the latest table_info, table_id={}", table_id);
table_info_json = TxnStructure::mvccGet(snap, db_key, table_key);
get_by_mvcc = true;
if (table_info_json.empty())
{
LOG_ERROR(
log,
"The table is dropped in TiKV, and the latest table_info is still empty, it should be GCed, "
"table_id={}",
table_id);
return nullptr;
return {nullptr, get_by_mvcc};
}
}
LOG_DEBUG(log, "Get Table Info from TiKV, table_id={} {}", table_id, table_info_json);
return std::make_shared<TiDB::TableInfo>(table_info_json, keyspace_id);
return {std::make_shared<TiDB::TableInfo>(table_info_json, keyspace_id), get_by_mvcc};
}
template TiDB::TableInfoPtr SchemaGetter::getTableInfoImpl<false>(DatabaseID db_id, TableID table_id);
template TiDB::TableInfoPtr SchemaGetter::getTableInfoImpl<true>(DatabaseID db_id, TableID table_id);
template std::pair<TiDB::TableInfoPtr, bool> SchemaGetter::getTableInfoImpl<false>(DatabaseID db_id, TableID table_id);
template std::pair<TiDB::TableInfoPtr, bool> SchemaGetter::getTableInfoImpl<true>(DatabaseID db_id, TableID table_id);

std::tuple<TiDB::DBInfoPtr, TiDB::TableInfoPtr> SchemaGetter::getDatabaseAndTableInfo(
DatabaseID db_id,
Expand Down
11 changes: 8 additions & 3 deletions dbms/src/TiDB/Schema/SchemaGetter.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,13 @@ struct SchemaGetter
TiDB::TableInfoPtr getTableInfo(DatabaseID db_id, TableID table_id, bool try_mvcc = true)
{
if (try_mvcc)
return getTableInfoImpl</*mvcc_get*/ true>(db_id, table_id);
return getTableInfoImpl</*mvcc_get*/ false>(db_id, table_id);
return getTableInfoImpl</*mvcc_get*/ true>(db_id, table_id).first;
return getTableInfoImpl</*mvcc_get*/ false>(db_id, table_id).first;
}

std::pair<TiDB::TableInfoPtr, bool> getTableInfoAndCheckMvcc(DatabaseID db_id, TableID table_id)
{
return getTableInfoImpl</*mvcc_get*/ true>(db_id, table_id);
}

std::tuple<TiDB::DBInfoPtr, TiDB::TableInfoPtr> getDatabaseAndTableInfo(DatabaseID db_id, TableID table_id);
Expand All @@ -182,7 +187,7 @@ struct SchemaGetter

private:
template <bool mvcc_get>
TiDB::TableInfoPtr getTableInfoImpl(DatabaseID db_id, TableID table_id);
std::pair<TiDB::TableInfoPtr, bool> getTableInfoImpl(DatabaseID db_id, TableID table_id);
};

} // namespace DB
3 changes: 3 additions & 0 deletions dbms/src/TiDB/Schema/TiDBSchemaSyncer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ bool TiDBSchemaSyncer<mock_getter, mock_mapper>::syncTableSchema(Context & conte
GET_METRIC(tiflash_schema_trigger_count, type_sync_table_schema).Increment();
// Notice: must use the same getter
syncSchemasByGetter(context, getter);
// Try to sync the table schema with `force==true`. Even the table is tombstone (but not physically
// dropped in TiKV), it will sync the table schema to handle snapshot or raft commands that come after
// table is dropped.
std::tie(need_update_id_mapping, message)
= trySyncTableSchema(context, physical_table_id, getter, true, "sync table schema fail");
if (likely(!need_update_id_mapping))
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/TiDB/Schema/TiDBSchemaSyncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ class TiDBSchemaSyncer : public SchemaSyncer

LoggerPtr log;

TableIDMap table_id_map;
DatabaseInfoCache databases;
TableIDMap table_id_map;

Getter createSchemaGetter(KeyspaceID keyspace_id)
{
Expand Down

0 comments on commit e189878

Please sign in to comment.