diff --git a/src/yb/master/catalog_entity_info.h b/src/yb/master/catalog_entity_info.h index 8f83b633dd10..b61d6cd9be5a 100644 --- a/src/yb/master/catalog_entity_info.h +++ b/src/yb/master/catalog_entity_info.h @@ -243,6 +243,8 @@ class TabletInfo : public RefCountedThreadSafe, // Accessors for the latest known tablet replica locations. // These locations include only the members of the latest-reported Raft // configuration whose tablet servers have ever heartbeated to this Master. + // TODO: Make Set/Update private so users are forced to use the catalog manager wrappers which + // update the tablet locations version. void SetReplicaLocations(std::shared_ptr replica_locations); std::shared_ptr GetReplicaLocations() const; Result GetLeader() const; diff --git a/src/yb/master/catalog_manager.cc b/src/yb/master/catalog_manager.cc index d552bea4ae56..fa19ba297f6d 100644 --- a/src/yb/master/catalog_manager.cc +++ b/src/yb/master/catalog_manager.cc @@ -241,14 +241,6 @@ DEFINE_test_flag(bool, get_ysql_catalog_version_from_sys_catalog, false, "from the sys_catalog."); -// TODO: should this be a test flag? -DEFINE_RUNTIME_bool(catalog_manager_wait_for_new_tablets_to_elect_leader, true, - "Whether the catalog manager should wait for a newly created tablet to " - "elect a leader before considering it successfully created. " - "This is disabled in some tests where we explicitly manage leader " - "election."); -TAG_FLAG(catalog_manager_wait_for_new_tablets_to_elect_leader, hidden); - // TODO: should this be a test flag? DEFINE_RUNTIME_int32(catalog_manager_inject_latency_in_delete_table_ms, 0, "Number of milliseconds that the master will sleep in DeleteTable."); @@ -265,10 +257,6 @@ DEFINE_RUNTIME_int32(max_create_tablets_per_ts, 50, "The number of tablets per TS that can be requested for a new table."); TAG_FLAG(max_create_tablets_per_ts, advanced); -DEFINE_RUNTIME_int32(catalog_manager_report_batch_size, 1, - "The max number of tablets evaluated in the heartbeat as a single SysCatalog update."); -TAG_FLAG(catalog_manager_report_batch_size, advanced); - // TODO: Is this code even useful? DEFINE_RUNTIME_int32(master_failover_catchup_timeout_ms, 30 * 1000 * yb::kTimeMultiplier, // 30 sec "Amount of time to give a newly-elected leader master to load" @@ -277,19 +265,8 @@ DEFINE_RUNTIME_int32(master_failover_catchup_timeout_ms, 30 * 1000 * yb::kTimeMu TAG_FLAG(master_failover_catchup_timeout_ms, advanced); TAG_FLAG(master_failover_catchup_timeout_ms, experimental); -DEFINE_RUNTIME_bool(master_tombstone_evicted_tablet_replicas, true, - "Whether the Master should tombstone (delete) tablet replicas that " - "are no longer part of the latest reported raft config."); -TAG_FLAG(master_tombstone_evicted_tablet_replicas, hidden); - DECLARE_bool(master_ignore_deleted_on_load); -// Temporary. Can be removed after long-run testing. -// TODO: how temporary is this? -DEFINE_RUNTIME_bool(master_ignore_stale_cstate, true, - "Whether Master processes the raft config when the version is lower."); -TAG_FLAG(master_ignore_stale_cstate, hidden); - // TODO: should this be a test flag? DEFINE_RUNTIME_bool(catalog_manager_check_ts_count_for_create_table, true, "Whether the master should ensure that there are enough live tablet " @@ -304,9 +281,6 @@ DEFINE_test_flag(bool, catalog_manager_check_yql_partitions_exist_for_is_create_ DEFINE_test_flag(uint64, inject_latency_during_remote_bootstrap_secs, 0, "Number of seconds to sleep during a remote bootstrap."); -DEFINE_test_flag(uint64, inject_latency_during_tablet_report_ms, 0, - "Number of milliseconds to sleep during the processing of a tablet batch."); - DEFINE_test_flag(bool, catalog_manager_simulate_system_table_create_failure, false, "This is only used in tests to simulate a failure where the table information is " "persisted in syscatalog, but the tablet information is not yet persisted and " @@ -400,19 +374,10 @@ DEFINE_RUNTIME_bool(enable_register_ts_from_raft, true, DECLARE_int32(tserver_unresponsive_timeout_ms); -DEFINE_RUNTIME_bool(use_create_table_leader_hint, true, - "Whether the Master should hint which replica for each tablet should " - "be leader initially on tablet creation."); - DEFINE_test_flag(bool, create_table_leader_hint_min_lexicographic, false, "Whether the Master should hint replica with smallest lexicographic rank for each " "tablet as leader initially on tablet creation."); -DEFINE_RUNTIME_double(heartbeat_safe_deadline_ratio, .20, - "When the heartbeat deadline has this percentage of time remaining, " - "the master should halt tablet report processing so it can respond in time."); -DECLARE_int32(heartbeat_rpc_timeout_ms); - DEFINE_test_flag(int32, num_missing_tablets, 0, "Simulates missing tablets in a table"); DEFINE_RUNTIME_int32(partitions_vtable_cache_refresh_secs, 30, @@ -8086,703 +8051,73 @@ void CatalogManager::NotifyTabletDeleteFinished( } } -bool CatalogManager::ReplicaMapDiffersFromConsensusState(const scoped_refptr& tablet, - const ConsensusStatePB& cstate) { - auto locs = tablet->GetReplicaLocations(); - if (locs->size() != implicit_cast(cstate.config().peers_size())) { - return true; - } - for (auto iter = cstate.config().peers().begin(); iter != cstate.config().peers().end(); iter++) { - if (locs->find(iter->permanent_uuid()) == locs->end()) { - return true; - } - } - return false; -} - -namespace { - -int64_t GetCommittedConsensusStateOpIdIndex(const ReportedTabletPB& report) { - if (!report.has_committed_consensus_state() || - !report.committed_consensus_state().config().has_opid_index()) { - return consensus::kInvalidOpIdIndex; - } - - return report.committed_consensus_state().config().opid_index(); -} - -} // namespace - -bool CatalogManager::ProcessCommittedConsensusState( - TSDescriptor* ts_desc, - bool is_incremental, - const ReportedTabletPB& report, - const LeaderEpoch& epoch, - std::map* table_write_locks, - const TabletInfoPtr& tablet, - const TabletInfo::WriteLock& tablet_lock, - std::map>* tables, - std::vector* rpcs) { - const ConsensusStatePB& prev_cstate = tablet_lock->pb.committed_consensus_state(); - ConsensusStatePB cstate = report.committed_consensus_state(); - bool tablet_was_mutated = false; - - // 6a. The master only processes reports for replicas with committed - // consensus configurations since it needs the committed index to only - // cache the most up-to-date config. Since it's possible for TOMBSTONED - // replicas with no ConsensusMetadata on disk to be reported as having no - // committed config opid_index, we skip over those replicas. - if (!cstate.config().has_opid_index()) { - LOG(WARNING) << "Missing opid_index in reported config: " << report.ShortDebugString(); - return false; - } - if (PREDICT_TRUE(FLAGS_master_ignore_stale_cstate) && - (cstate.current_term() < prev_cstate.current_term() || - GetCommittedConsensusStateOpIdIndex(report) < prev_cstate.config().opid_index())) { - LOG(WARNING) << "Stale heartbeat for Tablet " << tablet->ToString() - << " on TS " << ts_desc->permanent_uuid() - << " cstate=" << cstate.ShortDebugString() - << ", prev_cstate=" << prev_cstate.ShortDebugString(); - return false; - } - - // We do not expect reports from a TS for a config that it is not part of. This can happen if a - // TS is removed from the config while it is remote bootstrapping. In this case, we must ignore - // the heartbeats to avoid incorrectly adding this TS to the config in - // UpdateTabletReplicaInLocalMemory. - bool found_ts_in_config = false; - for (const auto& peer : cstate.config().peers()) { - if (peer.permanent_uuid() == ts_desc->permanent_uuid()) { - found_ts_in_config = true; - break; - } - } - if (!found_ts_in_config) { - LOG(WARNING) << Format("Ignoring heartbeat from tablet server that is not part of reported " - "consensus config. ts_desc: $0, cstate: $1.", *ts_desc, cstate); - return false; - } - - // 6b. Disregard the leader state if the reported leader is not a member - // of the committed config. - if (cstate.leader_uuid().empty() || - !IsRaftConfigMember(cstate.leader_uuid(), cstate.config())) { - cstate.clear_leader_uuid(); - tablet_was_mutated = true; - } - - // 6c. Mark the tablet as RUNNING if it makes sense to do so. - // - // We need to wait for a leader before marking a tablet as RUNNING, or - // else we could incorrectly consider a tablet created when only a - // minority of its replicas were successful. In that case, the tablet - // would be stuck in this bad state forever. - // - FLAG added to avoid waiting during mock tests. - if (!tablet_lock->is_running() && - report.state() == tablet::RUNNING && - (cstate.has_leader_uuid() || - !FLAGS_catalog_manager_wait_for_new_tablets_to_elect_leader)) { - DCHECK_EQ(SysTabletsEntryPB::CREATING, tablet_lock->pb.state()) - << "Tablet in unexpected state: " << tablet->ToString() - << ": " << tablet_lock->pb.ShortDebugString(); - VLOG(1) << "Tablet " << tablet->ToString() << " is now online"; - tablet_lock.mutable_data()->set_state(SysTabletsEntryPB::RUNNING, - "Tablet reported with an active leader"); - tablet_was_mutated = true; - } - - // 6d. Update the consensus state if: - // - A config change operation was committed (reflected by a change to - // the committed config's opid_index). - // - The new cstate has a leader, and either the old cstate didn't, or - // there was a term change. - if (cstate.config().opid_index() > prev_cstate.config().opid_index() || - (cstate.has_leader_uuid() && - (!prev_cstate.has_leader_uuid() || - cstate.current_term() > prev_cstate.current_term()))) { - - // 6d(i). Retain knowledge of the leader even if it wasn't reported in - // the latest config. - // - // When a config change is reported to the master, it may not include the - // leader because the follower doing the reporting may not know who the - // leader is yet (it may have just started up). It is safe to reuse - // the previous leader if the reported cstate has the same term as the - // previous cstate, and the leader was known for that term. - if (cstate.current_term() == prev_cstate.current_term()) { - if (!cstate.has_leader_uuid() && prev_cstate.has_leader_uuid()) { - cstate.set_leader_uuid(prev_cstate.leader_uuid()); - // Sanity check to detect consensus divergence bugs. - } else if (cstate.has_leader_uuid() && prev_cstate.has_leader_uuid() && - cstate.leader_uuid() != prev_cstate.leader_uuid()) { - string msg = Substitute("Previously reported cstate for tablet $0 gave " - "a different leader for term $1 than the current cstate. " - "Previous cstate: $2. Current cstate: $3.", - tablet->ToString(), cstate.current_term(), - prev_cstate.ShortDebugString(), cstate.ShortDebugString()); - LOG(DFATAL) << msg; - return false; - } - } - - // 6d(ii). Delete any replicas from the previous config that are not in the new one. - if (FLAGS_master_tombstone_evicted_tablet_replicas) { - std::unordered_set current_member_uuids; - for (const consensus::RaftPeerPB &peer : cstate.config().peers()) { - InsertOrDie(¤t_member_uuids, peer.permanent_uuid()); - } - for (const consensus::RaftPeerPB &prev_peer : prev_cstate.config().peers()) { - const string& peer_uuid = prev_peer.permanent_uuid(); - if (!ContainsKey(current_member_uuids, peer_uuid)) { - // Don't delete a tablet server that hasn't reported in yet (Bootstrapping). - shared_ptr dummy_ts_desc; - if (!master_->ts_manager()->LookupTSByUUID(peer_uuid, &dummy_ts_desc)) { - continue; - } - // Otherwise, the TabletServer needs to remove this peer. - rpcs->push_back(std::make_shared( - master_, AsyncTaskPool(), peer_uuid, tablet->table(), tablet->tablet_id(), - TABLET_DATA_TOMBSTONED, prev_cstate.config().opid_index(), - epoch, GetDeleteReplicaTaskThrottler(peer_uuid), - Substitute("TS $0 not found in new config with opid_index $1", - peer_uuid, cstate.config().opid_index()))); - } - } - } - // 6d(iii). Update the in-memory ReplicaLocations for this tablet using the new config. - VLOG(2) << "Updating replicas for tablet " << tablet->tablet_id() - << " using config reported by " << ts_desc->permanent_uuid() - << " to that committed in log index " << cstate.config().opid_index() - << " with leader state from term " << cstate.current_term(); - UpdateTabletReplicasAfterConfigChange(tablet, ts_desc->permanent_uuid(), cstate, report); - - // 6d(iv). Update the consensus state. Don't use 'prev_cstate' after this. - LOG(INFO) << "Tablet: " << tablet->tablet_id() << " reported consensus state change." - << " New consensus state: " << cstate.ShortDebugString() - << " from " << ts_desc->permanent_uuid(); - *tablet_lock.mutable_data()->pb.mutable_committed_consensus_state() = cstate; - tablet_was_mutated = true; - } else { - // Report opid_index is equal to the previous opid_index. If some - // replica is reporting the same consensus configuration we already know about, but we - // haven't yet heard from all the tservers in the config, update the in-memory - // ReplicaLocations. - LOG(INFO) << "Tablet server " << ts_desc->permanent_uuid() << " sent " - << (is_incremental ? "incremental" : "full tablet") - << " report for " << tablet->tablet_id() - << ", prev state op id: " << prev_cstate.config().opid_index() - << ", prev state term: " << prev_cstate.current_term() - << ", prev state has_leader_uuid: " << prev_cstate.has_leader_uuid() - << ". Consensus state: " << cstate.ShortDebugString(); - if (GetAtomicFlag(&FLAGS_enable_register_ts_from_raft) && - ReplicaMapDiffersFromConsensusState(tablet, cstate)) { - LOG(INFO) << Format("Tablet replica map differs from reported consensus state. Replica map: " - "$0. Reported consensus state: $1.", *tablet->GetReplicaLocations(), - cstate.ShortDebugString()); - UpdateTabletReplicasAfterConfigChange(tablet, ts_desc->permanent_uuid(), cstate, report); - } else { - UpdateTabletReplicaInLocalMemory(ts_desc, &cstate, report, tablet); - } - } - - if (FLAGS_use_create_table_leader_hint && !cstate.has_leader_uuid()) { - StartElectionIfReady(cstate, epoch, tablet.get()); - } - - // 7. Send an AlterSchema RPC if the tablet has an old schema version. - if (table_write_locks->count(tablet->table()->id())) { - const TableInfo::WriteLock& table_lock = (*table_write_locks)[tablet->table()->id()]; - if (report.has_schema_version() && - report.schema_version() != table_lock->pb.version()) { - if (report.schema_version() > table_lock->pb.version()) { - LOG(ERROR) << "TS " << ts_desc->permanent_uuid() - << " has reported a schema version greater than the current one " - << " for tablet " << tablet->ToString() - << ". Expected version " << table_lock->pb.version() - << " got " << report.schema_version() - << " (corruption)"; - } else { - // TODO: For Alter (rolling apply to tablets), this is an expected transitory state. - LOG(INFO) << "TS " << ts_desc->permanent_uuid() - << " does not have the latest schema for tablet " << tablet->ToString() - << ". Expected version " << table_lock->pb.version() - << " got " << report.schema_version(); - } - // All metadata related changes for the tablet is passed as part of RESTORE_ON_TABLET rpcs - // and we should not trigger anything else during restore so as to not race schema versions. - // TODO(Sanket): What if restore is stuck then this block is muted forever. - auto restore_result = IsTableUndergoingPitrRestore(*tablet->table()); - LOG_IF(DFATAL, !restore_result.ok()) - << "Failed to determine if table has PITR restore in progress"; - if (!restore_result.ok() || !*restore_result) { - // It's possible that the tablet being reported is a laggy replica, and in fact - // the leader has already received an AlterTable RPC. That's OK, though -- - // it'll safely ignore it if we send another. - TransactionId txn_id = TransactionId::Nil(); - if (table_lock->pb.has_transaction() && - table_lock->pb.transaction().has_transaction_id()) { - LOG(INFO) << "Parsing transaction ID for tablet ID " << tablet->tablet_id(); - auto txn_id_res = FullyDecodeTransactionId( - table_lock->pb.transaction().transaction_id()); - if (!txn_id_res.ok()) { - LOG(WARNING) << "Parsing transaction ID failed for tablet ID " << tablet->tablet_id(); - return false; - } - txn_id = txn_id_res.get(); - } - LOG(INFO) << "Triggering AlterTable with transaction ID " << txn_id - << " due to heartbeat delay for tablet ID " << tablet->tablet_id(); - rpcs->push_back(std::make_shared( - master_, AsyncTaskPool(), tablet, tablet->table(), txn_id, epoch)); - } - } - } +void CatalogManager::GetReportedAndOrphanedTabletsFromReport( + int num_tablets, + const TabletReportPB& full_report, + TabletReportUpdatesPB* full_report_update, + ReportedTablets* reported_tablets, + set* orphaned_tablets) { + // Lock the catalog to iterate over tablet_ids_map_ & table_ids_map_. + SharedLock lock(mutex_); - // Send AlterSchema RPC for colocated tables of this tablet if they are outdated. - for (const auto& id_to_version : report.table_to_version()) { - // Skip Primary table. - if (tablet->table()->id() == id_to_version.first) { - continue; - } - if (tables->count(id_to_version.first)) { - const auto& table_lock = (*table_write_locks)[id_to_version.first]; - // Ignore if same version. - if (table_lock->pb.version() == id_to_version.second) { + // Fill the above variables before processing + full_report_update->mutable_tablets()->Reserve(num_tablets); + for (const ReportedTabletPB& report : full_report.updated_tablets()) { + const string& tablet_id = report.tablet_id(); + + // 1a. Find the tablet, deleting/skipping it if it can't be found. + scoped_refptr tablet = FindPtrOrNull(*tablet_map_, tablet_id); + if (!tablet || FLAGS_TEST_simulate_sys_catalog_data_loss) { + // Every tablet in the report that is processed gets a heartbeat response entry. + ReportedTabletUpdatesPB* update = full_report_update->add_tablets(); + update->set_tablet_id(tablet_id); + + if (GetAtomicFlag(&FLAGS_master_enable_deletion_check_for_orphaned_tablets) && + !deleted_tablets_loaded_from_sys_catalog_.contains(tablet_id)) { + // See the comment in deleted_tablets_loaded_from_sys_catalog_ declaration for an + // explanation of this logic. + LOG_WITH_PREFIX(ERROR) << Format( + "Skipping deletion of orphaned tablet $0, since master has never registered this " + "tablet.", tablet_id); continue; } - if (id_to_version.second > table_lock->pb.version()) { - LOG(ERROR) << "TS " << ts_desc->permanent_uuid() - << " has reported a schema version greater than the current one " - << " for table " << id_to_version.first - << ". Expected version " << table_lock->pb.version() - << " got " << id_to_version.second - << " (corruption)"; - } else { - LOG(INFO) << "TS " << ts_desc->permanent_uuid() - << " does not have the latest schema for table " << id_to_version.first - << ". Expected version " << table_lock->pb.version() - << " got " << id_to_version.second; - } - // All metadata related changes for the tablet is passed as part of RESTORE_ON_TABLET rpcs - // and we should not trigger anything else during restore so as to not race schema versions. - // TODO(Sanket): What if restore is stuck then this block is muted forever. - auto restore_result = IsTableUndergoingPitrRestore(*(*tables)[id_to_version.first]); - LOG_IF(DFATAL, !restore_result.ok()) - << "Failed to determine if table has PITR restore in progress"; - if (!restore_result.ok() || !*restore_result) { - LOG(INFO) << "Triggering AlterTable for table id " << id_to_version.first - << " due to heartbeat delay for tablet ID " << tablet->tablet_id(); - rpcs->push_back(std::make_shared( - master_, AsyncTaskPool(), tablet, (*tables)[id_to_version.first], - TransactionId::Nil(), epoch)); - } - } - } - - return tablet_was_mutated; -} -Status CatalogManager::ProcessTabletReportBatch( - TSDescriptor* ts_desc, - bool is_incremental, - ReportedTablets::iterator begin, - ReportedTablets::iterator end, - const LeaderEpoch& epoch, - TabletReportUpdatesPB* full_report_update, - std::vector* rpcs) { - // First Pass. Iterate in TabletId Order to discover all Table locks we'll need. - - // Maps a table ID to its corresponding TableInfo. - std::map table_info_map; - - std::map table_write_locks; - for (auto reported_tablet = begin; reported_tablet != end; ++reported_tablet) { - auto table = reported_tablet->info->table(); - table_info_map[table->id()] = table; - // Acquire locks for all colocated tables reported - // in sorted order of table ids (We use a map). - for (const auto& [table_id, table_info] : reported_tablet->tables) { - table_info_map[table_info->id()] = table_info; - } - } - - // Need to acquire locks in Id order to prevent deadlock. - for (auto& [table_id, table] : table_info_map) { - table_write_locks[table_id] = table->LockForWrite(); - } - - map tablet_write_locks; - // Second Pass. - // Process each tablet. The list is sorted by ID. This may not be in the order that the tablets - // appear in 'full_report', but that has no bearing on correctness. - vector mutated_tablets; // refcount protected by ReportedTablet::info - std::unordered_map> new_running_tablets; - for (auto it = begin; it != end; ++it) { - const auto& tablet_id = it->tablet_id; - const TabletInfoPtr& tablet = it->info; - const ReportedTabletPB& report = *it->report; - const TableInfoPtr& table = tablet->table(); - - // Prepare an heartbeat response entry for this tablet, now that we're going to process it. - // Every tablet in the report that is processed gets one, even if there are no changes to it. - ReportedTabletUpdatesPB* update = full_report_update->add_tablets(); - update->set_tablet_id(tablet_id); - - // Get tablet lock on demand. This works in the batch case because the loop is ordered. - tablet_write_locks[tablet_id] = tablet->LockForWrite(); - auto& table_lock = table_write_locks[table->id()]; - auto& tablet_lock = tablet_write_locks[tablet_id]; - - TRACE_EVENT1("master", "HandleReportedTablet", "tablet_id", report.tablet_id()); - RETURN_NOT_OK_PREPEND(CheckIsLeaderAndReady(), - Substitute("This master is no longer the leader, unable to handle report for tablet $0", - tablet_id)); - - VLOG(3) << "tablet report: " << report.ShortDebugString() - << " peer: " << ts_desc->permanent_uuid(); - - // Delete the tablet if it (or its table) have been deleted. - if (tablet_lock->is_deleted() || - table_lock->started_deleting()) { - const string msg = tablet_lock->pb.state_msg(); - update->set_state_msg(msg); - LOG(INFO) << "Got report from deleted tablet " << tablet->ToString() - << " (" << msg << "): Sending delete request for this tablet"; - // TODO(unknown): Cancel tablet creation, instead of deleting, in cases - // where that might be possible (tablet creation timeout & replacement). - rpcs->push_back(std::make_shared( - master_, AsyncTaskPool(), ts_desc->permanent_uuid(), table, tablet_id, - TABLET_DATA_DELETED, boost::none, epoch, - GetDeleteReplicaTaskThrottler(ts_desc->permanent_uuid()), msg)); + // If a TS reported an unknown tablet, send a delete tablet rpc to the TS. + LOG(INFO) << "Null tablet reported, possibly the TS was not around when the" + " table was being deleted. Sending Delete tablet RPC to this TS."; + orphaned_tablets->insert(tablet_id); continue; } - - if (!table_lock->is_running()) { - const string msg = tablet_lock->pb.state_msg(); - LOG(INFO) << "Got report from tablet " << tablet->tablet_id() << " for non-running table " - << table->ToString() << ": " << msg; - update->set_state_msg(msg); + if (!tablet->table() || tables_->FindTableOrNull(tablet->table()->id()) == nullptr) { + auto table_id = tablet->table() == nullptr ? "(null)" : tablet->table()->id(); + LOG(INFO) << "Got report from an orphaned tablet " << tablet_id << " on table " << table_id; + orphaned_tablets->insert(tablet_id); + // Every tablet in the report that is processed gets a heartbeat response entry. + ReportedTabletUpdatesPB* update = full_report_update->add_tablets(); + update->set_tablet_id(tablet_id); continue; } - // Tombstone a replica that is no longer part of the Raft config (and - // not already tombstoned or deleted outright). - // - // If the report includes a committed raft config, we only tombstone if the opid_index of the - // committed raft config is strictly less than the latest reported committed config. This - // prevents us from spuriously deleting replicas that have just been added to the committed - // config and are in the process of copying. - const ConsensusStatePB& prev_cstate = tablet_lock->pb.committed_consensus_state(); - const int64_t prev_opid_index = prev_cstate.config().opid_index(); - const int64_t report_opid_index = GetCommittedConsensusStateOpIdIndex(report); - if (FLAGS_master_tombstone_evicted_tablet_replicas && - report.tablet_data_state() != TABLET_DATA_TOMBSTONED && - report.tablet_data_state() != TABLET_DATA_DELETED && - report_opid_index < prev_opid_index && - !IsRaftConfigMember(ts_desc->permanent_uuid(), prev_cstate.config())) { - const string delete_msg = (report_opid_index == consensus::kInvalidOpIdIndex) ? - "Replica has no consensus available" : - Substitute("Replica with old config index $0", report_opid_index); - rpcs->push_back(std::make_shared( - master_, AsyncTaskPool(), ts_desc->permanent_uuid(), table, tablet_id, - TABLET_DATA_TOMBSTONED, prev_opid_index, epoch, - GetDeleteReplicaTaskThrottler(ts_desc->permanent_uuid()), - Substitute("$0 (current committed config index is $1)", - delete_msg, prev_opid_index))); - continue; - } - - // Skip a non-deleted tablet which reports an error. - if (report.has_error()) { - Status s = StatusFromPB(report.error()); - DCHECK(!s.ok()); - DCHECK_EQ(report.state(), tablet::FAILED); - LOG(WARNING) << "Tablet " << tablet->ToString() << " has failed on TS " - << ts_desc->permanent_uuid() << ": " << s.ToString(); - continue; - } - - // Hide the tablet if it (or its table) has been hidden and the tablet hasn't. - if ((tablet_lock->is_hidden() || - table_lock->started_hiding()) && - report.has_is_hidden() && - !report.is_hidden()) { - const string msg = tablet_lock->pb.state_msg(); - LOG(INFO) << "Got report from hidden tablet " << tablet->ToString() - << " (" << msg << "): Sending hide request for this tablet"; - auto task = std::make_shared( - master_, AsyncTaskPool(), ts_desc->permanent_uuid(), table, tablet_id, - TABLET_DATA_DELETED, boost::none, epoch, - GetDeleteReplicaTaskThrottler(ts_desc->permanent_uuid()), msg); - task->set_hide_only(true); - rpcs->push_back(task); - } - - // Process the report's consensus state. - // The report will not have a committed_consensus_state if it is in the - // middle of starting up, such as during tablet bootstrap. - // If we received an incremental report, and the tablet is starting up, we will update the - // replica so that the balancer knows how many tablets are in the middle of remote bootstrap. - if (report.has_committed_consensus_state()) { - const bool tablet_was_running = tablet_lock->is_running(); - if (ProcessCommittedConsensusState( - ts_desc, is_incremental, report, epoch, &table_write_locks, tablet, tablet_lock, - &it->tables, rpcs)) { - // If the tablet was mutated, add it to the tablets to be re-persisted. - // - // Done here and not on a per-mutation basis to avoid duplicate entries. - mutated_tablets.push_back(tablet.get()); - if (!tablet_was_running && tablet_lock->is_running()) { - new_running_tablets[table->id()].insert(tablet->id()); - } - } - } else if (is_incremental && - (report.state() == tablet::NOT_STARTED || report.state() == tablet::BOOTSTRAPPING)) { - // When a tablet server is restarted, it sends a full tablet report with all of its tablets - // in the NOT_STARTED state, so this would make the load balancer think that all the - // tablets are being remote bootstrapped at once, so only process incremental reports here. - UpdateTabletReplicaInLocalMemory(ts_desc, nullptr /* consensus */, report, tablet); - } - } // Finished one round of batch processing. - - // Write all tablet mutations to the catalog table. - // - // SysCatalogTable::Write will short-circuit the case where the data has not - // in fact changed since the previous version and avoid any unnecessary mutations. - if (!mutated_tablets.empty()) { - Status s = sys_catalog_->Upsert(epoch, mutated_tablets); - if (!s.ok()) { - LOG(WARNING) << "Error updating tablets: " << s; - return s; - } - } - - // Update the table state if all its tablets are now running. - for (auto& [table_id, tablets] : new_running_tablets) { - SchedulePostTabletCreationTasks(table_info_map[table_id], epoch, tablets); - } - - // Filter the mutated tablets to find which tablets were modified. Need to actually commit the - // state of the tablets before updating the system.partitions table, so get this first. - vector yql_partitions_mutated_tablets = - GetYqlPartitionsVtable().FilterRelevantTablets(mutated_tablets); - - // Publish the in-memory tablet mutations and release the locks. - for (auto& l : tablet_write_locks) { - l.second.Commit(); - } - tablet_write_locks.clear(); - - // Unlock the tables; we no longer need to access their state. - for (auto& l : table_write_locks) { - l.second.Commit(); - } - table_write_locks.clear(); - - // Update the relevant tablet entries in system.partitions. - if (!yql_partitions_mutated_tablets.empty()) { - Status s = GetYqlPartitionsVtable() - .ProcessMutatedTablets(yql_partitions_mutated_tablets, tablet_write_locks); - } - - // Third Pass. Process all tablet schema version changes. - // (This is separate from tablet state mutations because only table on-disk state is changed.) - for (auto it = begin; it != end; ++it) { - const ReportedTabletPB& report = *it->report; - if (!report.has_schema_version()) { - continue; - } - const TabletInfoPtr& tablet = it->info; - auto leader = tablet->GetLeader(); - if (leader.ok() && leader.get()->permanent_uuid() == ts_desc->permanent_uuid()) { - RETURN_NOT_OK(HandleTabletSchemaVersionReport(tablet.get(), report.schema_version(), epoch)); - } - } - - return Status::OK(); -} - -Status CatalogManager::ProcessTabletReport(TSDescriptor* ts_desc, - const TabletReportPB& full_report, - const LeaderEpoch& epoch, - TabletReportUpdatesPB* full_report_update, - RpcContext* rpc) { - int num_tablets = full_report.updated_tablets_size(); - TRACE_EVENT2("master", "ProcessTabletReport", - "requestor", rpc->requestor_string(), - "num_tablets", num_tablets); - - VLOG_WITH_PREFIX(2) << "Received tablet report from " << RequestorString(rpc) << "(" - << ts_desc->permanent_uuid() << "): " << full_report.DebugString(); - - if (!ts_desc->has_tablet_report() && full_report.is_incremental()) { - LOG_WITH_PREFIX(WARNING) - << "Invalid tablet report from " << ts_desc->permanent_uuid() - << ": Received an incremental tablet report when a full one was needed"; - // We should respond with success in order to send reply that we need full report. - return Status::OK(); - } - - // TODO: on a full tablet report, we may want to iterate over the tablets we think - // the server should have, compare vs the ones being reported, and somehow mark - // any that have been "lost" (eg somehow the tablet metadata got corrupted or something). - - ReportedTablets reported_tablets; - - // Tablet Deletes to process after the catalog lock below. - set orphaned_tablets; - - { - // Lock the catalog to iterate over tablet_ids_map_ & table_ids_map_. - SharedLock lock(mutex_); - - // Fill the above variables before processing - full_report_update->mutable_tablets()->Reserve(num_tablets); - for (const ReportedTabletPB& report : full_report.updated_tablets()) { - const string& tablet_id = report.tablet_id(); - - // 1a. Find the tablet, deleting/skipping it if it can't be found. - scoped_refptr tablet = FindPtrOrNull(*tablet_map_, tablet_id); - if (!tablet || FLAGS_TEST_simulate_sys_catalog_data_loss) { - // Every tablet in the report that is processed gets a heartbeat response entry. - ReportedTabletUpdatesPB* update = full_report_update->add_tablets(); - update->set_tablet_id(tablet_id); - - if (GetAtomicFlag(&FLAGS_master_enable_deletion_check_for_orphaned_tablets) && - !deleted_tablets_loaded_from_sys_catalog_.contains(tablet_id)) { - // See the comment in deleted_tablets_loaded_from_sys_catalog_ declaration for an - // explanation of this logic. - LOG_WITH_PREFIX(ERROR) << Format( - "Skipping deletion of orphaned tablet $0, since master has never registered this " - "tablet.", tablet_id); - continue; - } - - // If a TS reported an unknown tablet, send a delete tablet rpc to the TS. - LOG(INFO) << "Null tablet reported, possibly the TS was not around when the" - " table was being deleted. Sending Delete tablet RPC to this TS."; - orphaned_tablets.insert(tablet_id); - continue; - } - if (!tablet->table() || tables_->FindTableOrNull(tablet->table()->id()) == nullptr) { - auto table_id = tablet->table() == nullptr ? "(null)" : tablet->table()->id(); - LOG(INFO) << "Got report from an orphaned tablet " << tablet_id << " on table " << table_id; - orphaned_tablets.insert(tablet_id); - // Every tablet in the report that is processed gets a heartbeat response entry. - ReportedTabletUpdatesPB* update = full_report_update->add_tablets(); - update->set_tablet_id(tablet_id); + // 1b. Found the tablet, update local state. + reported_tablets->push_back(ReportedTablet { + .tablet_id = tablet_id, + .info = tablet, + .report = &report, + .tables = {} + }); + // For colocated tablet, update all the tables that need processing. + for (const auto& id_to_version : report.table_to_version()) { + auto table_info = tables_->FindTableOrNull(id_to_version.first); + if(!table_info) { + // TODO(Sanket): Do we need to suitably handle these orphaned tables? continue; } - - // 1b. Found the tablet, update local state. - reported_tablets.push_back(ReportedTablet { - .tablet_id = tablet_id, - .info = tablet, - .report = &report, - .tables = {} - }); - // For colocated tablet, update all the tables that need processing. - for (const auto& id_to_version : report.table_to_version()) { - auto table_info = tables_->FindTableOrNull(id_to_version.first); - if(!table_info) { - // TODO(Sanket): Do we need to suitably handle these orphaned tables? - continue; - } - VLOG_WITH_PREFIX(1) << "Tablet " << report.tablet_id() << " reported table " - << id_to_version.first << " in its colocated list"; - reported_tablets[reported_tablets.size() - 1].tables.emplace( - id_to_version.first, table_info); - } - } - } - - std::sort(reported_tablets.begin(), reported_tablets.end(), [](const auto& lhs, const auto& rhs) { - return lhs.tablet_id < rhs.tablet_id; - }); - - // Process any delete requests from orphaned tablets, identified above. - for (const auto& tablet_id : orphaned_tablets) { - SendDeleteTabletRequest( - tablet_id, TABLET_DATA_DELETED /* delete_type */, - boost::none /* cas_config_opid_index_less_or_equal */, - nullptr /* table */, - ts_desc, - "Report from an orphaned tablet" /* reason */, - epoch); - } - - // Calculate the deadline for this expensive loop coming up. - const auto safe_deadline = rpc->GetClientDeadline() - - (FLAGS_heartbeat_rpc_timeout_ms * 1ms * FLAGS_heartbeat_safe_deadline_ratio); - - // Process tablets by batches. - for (auto tablet_iter = reported_tablets.begin(); tablet_iter != reported_tablets.end();) { - auto batch_begin = tablet_iter; - tablet_iter += std::min( - reported_tablets.end() - tablet_iter, FLAGS_catalog_manager_report_batch_size); - - // Keeps track of all RPCs that should be sent when we're done with a single batch. - std::vector rpcs; - auto status = ProcessTabletReportBatch( - ts_desc, full_report.is_incremental(), batch_begin, tablet_iter, epoch, full_report_update, - &rpcs); - if (!status.ok()) { - for (auto& rpc : rpcs) { - rpc->AbortAndReturnPrevState(status); - } - return status; - } - - // 13. Send all queued RPCs. - for (auto& rpc : rpcs) { - DCHECK(rpc->table()); - rpc->table()->AddTask(rpc); - WARN_NOT_OK(ScheduleTask(rpc), Substitute("Failed to send $0", rpc->description())); - } - rpcs.clear(); - - // 14. Check deadline. Need to exit before processing all batches if we're close to timing out. - if (tablet_iter != reported_tablets.end()) { - - // [TESTING] Inject latency before processing a batch to test deadline. - if (PREDICT_FALSE(FLAGS_TEST_inject_latency_during_tablet_report_ms > 0)) { - LOG(INFO) << "Sleeping in CatalogManager::ProcessTabletReport for " - << FLAGS_TEST_inject_latency_during_tablet_report_ms << " ms"; - SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_inject_latency_during_tablet_report_ms)); - } - - // Return from here at configured safe heartbeat deadline to give the response packet time. - if (safe_deadline < CoarseMonoClock::Now()) { - LOG(INFO) << "Reached Heartbeat deadline. Returning early after processing " - << full_report_update->tablets_size() << " tablets"; - full_report_update->set_processing_truncated(true); - return Status::OK(); - } - } - } // Loop to process the next batch until fully iterated. - - if (!full_report.is_incremental()) { - // A full report may take multiple heartbeats. - // The TS communicates how much is left to process for the full report beyond this specific HB. - bool completed_full_report = !full_report.has_remaining_tablet_count() - || full_report.remaining_tablet_count() == 0; - if (full_report.updated_tablets_size() == 0) { - LOG(INFO) << ts_desc->permanent_uuid() << " sent full tablet report with 0 tablets."; - } else if (!ts_desc->has_tablet_report()) { - LOG(INFO) << ts_desc->permanent_uuid() - << (completed_full_report ? " finished" : " receiving") << " first full report: " - << full_report.updated_tablets_size() << " tablets."; + VLOG_WITH_PREFIX(1) << "Tablet " << report.tablet_id() << " reported table " + << id_to_version.first << " in its colocated list"; + (*reported_tablets)[reported_tablets->size() - 1].tables.emplace( + id_to_version.first, table_info); } - // We have a tablet report only once we're done processing all the chunks of the initial report. - ts_desc->set_has_tablet_report(completed_full_report); } - - // 14. Queue background processing if we had updates. - if (full_report.updated_tablets_size() > 0) { - background_tasks_->WakeIfHasPendingUpdates(); - } - - return Status::OK(); } Status CatalogManager::CreateTablegroup( @@ -10571,124 +9906,6 @@ Status CatalogManager::RegisterTsFromRaftConfig(const consensus::RaftPeerPB& pee RegisteredThroughHeartbeat::kFalse); } -void CatalogManager::UpdateTabletReplicasAfterConfigChange( - const scoped_refptr& tablet, - const std::string& sender_uuid, - const ConsensusStatePB& consensus_state, - const ReportedTabletPB& report) { - auto replica_locations = std::make_shared(); - auto prev_rl = tablet->GetReplicaLocations(); - - for (const consensus::RaftPeerPB& peer : consensus_state.config().peers()) { - shared_ptr ts_desc; - if (!peer.has_permanent_uuid()) { - LOG_WITH_PREFIX(WARNING) << "Missing UUID for peer" << peer.ShortDebugString(); - continue; - } - if (!master_->ts_manager()->LookupTSByUUID(peer.permanent_uuid(), &ts_desc)) { - if (!GetAtomicFlag(&FLAGS_enable_register_ts_from_raft)) { - LOG_WITH_PREFIX(WARNING) << "Tablet server has never reported in. " - << "Not including in replica locations map yet. Peer: " << peer.ShortDebugString() - << "; Tablet: " << tablet->ToString(); - continue; - } - - LOG_WITH_PREFIX(INFO) << "Tablet server has never reported in. Registering the ts using " - << "the raft config. Peer: " << peer.ShortDebugString() - << "; Tablet: " << tablet->ToString(); - Status s = RegisterTsFromRaftConfig(peer); - if (!s.ok()) { - LOG_WITH_PREFIX(WARNING) << "Could not register ts from raft config: " << s - << " Skip updating the replica map."; - continue; - } - - // Guaranteed to find the ts since we just registered. - master_->ts_manager()->LookupTSByUUID(peer.permanent_uuid(), &ts_desc); - if (!ts_desc.get()) { - LOG_WITH_PREFIX(WARNING) << "Could not find ts with uuid " << peer.permanent_uuid() - << " after registering from raft config. Skip updating the replica" - << " map."; - continue; - } - } - - // Do not update replicas in the NOT_STARTED or BOOTSTRAPPING state (unless they are stale). - bool use_existing = false; - const TabletReplica* existing_replica = nullptr; - auto it = prev_rl->find(ts_desc->permanent_uuid()); - if (it != prev_rl->end()) { - existing_replica = &it->second; - } - if (existing_replica && peer.permanent_uuid() != sender_uuid) { - // IsStarting returns true if state == NOT_STARTED or state == BOOTSTRAPPING. - use_existing = existing_replica->IsStarting() && !existing_replica->IsStale(); - } - if (use_existing) { - InsertOrDie(replica_locations.get(), existing_replica->ts_desc->permanent_uuid(), - *existing_replica); - } else { - // The RaftGroupStatePB in the report is only applicable to the replica that is owned by the - // sender. Initialize the other replicas with an unknown state. - const RaftGroupStatePB replica_state = - (sender_uuid == ts_desc->permanent_uuid()) ? report.state() : RaftGroupStatePB::UNKNOWN; - - TabletReplica replica; - CreateNewReplicaForLocalMemory( - ts_desc.get(), &consensus_state, report, replica_state, &replica); - auto result = replica_locations.get()->insert({replica.ts_desc->permanent_uuid(), replica}); - LOG_IF(FATAL, !result.second) << "duplicate uuid: " << replica.ts_desc->permanent_uuid(); - if (existing_replica) { - result.first->second.UpdateDriveInfo(existing_replica->drive_info); - result.first->second.UpdateLeaderLeaseInfo(existing_replica->leader_lease_info); - } - } - } - - // Update the local tablet replica set. This deviates from persistent state during bootstrapping. - tablet->SetReplicaLocations(replica_locations); - tablet_locations_version_.fetch_add(1, std::memory_order_acq_rel); -} - -void CatalogManager::UpdateTabletReplicaInLocalMemory(TSDescriptor* ts_desc, - const ConsensusStatePB* consensus_state, - const ReportedTabletPB& report, - const scoped_refptr& tablet) { - TabletReplica replica; - CreateNewReplicaForLocalMemory(ts_desc, consensus_state, report, report.state(), &replica); - tablet->UpdateReplicaLocations(replica); - tablet_locations_version_.fetch_add(1, std::memory_order_acq_rel); -} - -void CatalogManager::CreateNewReplicaForLocalMemory(TSDescriptor* ts_desc, - const ConsensusStatePB* consensus_state, - const ReportedTabletPB& report, - const RaftGroupStatePB& state, - TabletReplica* new_replica) { - // Tablets in state NOT_STARTED or BOOTSTRAPPING don't have a consensus. - if (consensus_state == nullptr) { - new_replica->role = PeerRole::NON_PARTICIPANT; - new_replica->member_type = PeerMemberType::UNKNOWN_MEMBER_TYPE; - } else { - CHECK(consensus_state != nullptr) << "No cstate: " << ts_desc->permanent_uuid() - << " - " << state; - new_replica->role = GetConsensusRole(ts_desc->permanent_uuid(), *consensus_state); - new_replica->member_type = GetConsensusMemberType(ts_desc->permanent_uuid(), *consensus_state); - } - if (report.has_should_disable_lb_move()) { - new_replica->should_disable_lb_move = report.should_disable_lb_move(); - } - if (report.has_fs_data_dir()) { - new_replica->fs_data_dir = report.fs_data_dir(); - } - new_replica->state = state; - new_replica->ts_desc = ts_desc; - if (!ts_desc->registered_through_heartbeat()) { - auto last_heartbeat = ts_desc->LastHeartbeatTime(); - new_replica->time_updated = last_heartbeat ? last_heartbeat : MonoTime::kMin; - } -} - Result CatalogManager::GetServingTablet(const TabletId& tablet_id) const { return GetServingTablet(Slice(tablet_id)); } @@ -10779,6 +9996,10 @@ Status CatalogManager::EnableBgTasks() { return Status::OK(); } +void CatalogManager::WakeBgTaskIfPendingUpdates() { + background_tasks_->WakeIfHasPendingUpdates(); +} + Status CatalogManager::StartRemoteBootstrap(const StartRemoteBootstrapRequestPB& req) { const TabletId& tablet_id = req.tablet_id(); std::unique_lock l(remote_bootstrap_mtx_, std::try_to_lock); @@ -11192,6 +10413,29 @@ void CatalogManager::SendDeleteTabletRequest( WARN_NOT_OK(status, Substitute("Failed to send delete request for tablet $0", tablet_id)); } +std::shared_ptr CatalogManager::MakeDeleteReplicaTask( + const TabletServerId& peer_uuid, const TableInfoPtr& table, const TabletId& tablet_id, + tablet::TabletDataState delete_type, + boost::optional cas_config_opid_index_less_or_equal, LeaderEpoch epoch, + const std::string& reason) { + return std::make_shared( + master_, AsyncTaskPool(), peer_uuid, table, tablet_id, delete_type, + cas_config_opid_index_less_or_equal, epoch, GetDeleteReplicaTaskThrottler(peer_uuid), + reason); +} + +void CatalogManager::SetTabletReplicaLocations( + const TabletInfoPtr& tablet, const std::shared_ptr& replica_locations) { + tablet->SetReplicaLocations(replica_locations); + tablet_locations_version_.fetch_add(1, std::memory_order_acq_rel); +} + +void CatalogManager::UpdateTabletReplicaLocations( + const TabletInfoPtr& tablet, const TabletReplica& replica) { + tablet->UpdateReplicaLocations(replica); + tablet_locations_version_.fetch_add(1, std::memory_order_acq_rel); +} + void CatalogManager::SendLeaderStepDownRequest( const scoped_refptr& tablet, const ConsensusStatePB& cstate, const string& change_config_ts_uuid, bool should_remove, const LeaderEpoch& epoch, diff --git a/src/yb/master/catalog_manager.h b/src/yb/master/catalog_manager.h index 455ee2c91ebc..14dc6d6dc50c 100644 --- a/src/yb/master/catalog_manager.h +++ b/src/yb/master/catalog_manager.h @@ -541,15 +541,74 @@ class CatalogManager : public tserver::TabletPeerLookupIf, // Returns the system tablet in catalog manager by the id. Result> GetSystemTablet(const TabletId& id) override; - // Handle a tablet report from the given tablet server. - // - // The RPC context is provided for logging/tracing purposes, - // but this function does not itself respond to the RPC. - Status ProcessTabletReport(TSDescriptor* ts_desc, - const TabletReportPB& report, - const LeaderEpoch& epoch, - TabletReportUpdatesPB *report_update, - rpc::RpcContext* rpc); + // TODO(asrivastava): Get rid of this struct and move the logic for this function into the + // master heartbeat service code. + struct ReportedTablet { + TabletId tablet_id; + TabletInfoPtr info; + const ReportedTabletPB* report; + std::map> tables; + }; + using ReportedTablets = std::vector; + void GetReportedAndOrphanedTabletsFromReport( + int num_tablets, + const TabletReportPB& full_report, + TabletReportUpdatesPB* full_report_update, + ReportedTablets* reported_tablets, + std::set* orphaned_tablets); + + // Send the "delete tablet request" to the specified TS/tablet. + // The specified 'reason' will be logged on the TS. + void SendDeleteTabletRequest(const TabletId& tablet_id, + tablet::TabletDataState delete_type, + const boost::optional& cas_config_opid_index_less_or_equal, + const scoped_refptr& table, + TSDescriptor* ts_desc, + const std::string& reason, + const LeaderEpoch& epoch, + HideOnly hide_only = HideOnly::kFalse, + KeepData keep_data = KeepData::kFalse); + + std::shared_ptr MakeDeleteReplicaTask( + const TabletServerId& peer_uuid, const TableInfoPtr& table, const TabletId& tablet_id, + tablet::TabletDataState delete_type, + boost::optional cas_config_opid_index_less_or_equal, LeaderEpoch epoch, + const std::string& reason); + + void SetTabletReplicaLocations( + const TabletInfoPtr& tablet, const std::shared_ptr& replica_locations); + void UpdateTabletReplicaLocations(const TabletInfoPtr& tablet, const TabletReplica& replica); + + void WakeBgTaskIfPendingUpdates(); + + // Get the ycql system.partitions vtable. Note that this has EXCLUDES(mutex_), in order to + // maintain lock ordering. + const YQLPartitionsVTable& GetYqlPartitionsVtable() const EXCLUDES(mutex_); + + Status HandleTabletSchemaVersionReport( + TabletInfo *tablet, uint32_t version, + const LeaderEpoch& epoch, + const scoped_refptr& table = nullptr) override; + + // For a table that is currently in PREPARING state, if all its tablets have transitioned to + // RUNNING state, then collect and start the required post tablet creation async tasks. Table is + // advanced to the RUNNING state after all of these tasks complete successfully. + // new_running_tablets is the new set of tablets that are being transitioned to RUNNING state + // (dirty copy is modified) and yet to be persisted. These should be persisted before the table + // lock is released. Note: + // WriteLock on the table is required. + void SchedulePostTabletCreationTasks( + const TableInfoPtr& table_info, const LeaderEpoch& epoch, + const std::set& new_running_tablets = {}); + + void StartElectionIfReady( + const consensus::ConsensusStatePB& cstate, const LeaderEpoch& epoch, TabletInfo* tablet); + + Result IsTableUndergoingPitrRestore(const TableInfo& table_info); + + // Register the tablet server with the ts manager using the Raft config. This is called for + // servers that are part of the Raft config but haven't registered as yet. + Status RegisterTsFromRaftConfig(const consensus::RaftPeerPB& peer); // Create a new Namespace with the specified attributes. // @@ -1820,34 +1879,6 @@ class CatalogManager : public tserver::TabletPeerLookupIf, IncludeInactive include_inactive = IncludeInactive::kFalse, PartitionsOnly partitions_only = PartitionsOnly::kFalse); - // Check whether the tservers in the current replica map differs from those in the cstate when - // processing a tablet report. Ignore the roles reported by the cstate, just compare the - // tservers. - bool ReplicaMapDiffersFromConsensusState(const scoped_refptr& tablet, - const consensus::ConsensusStatePB& consensus_state); - - void UpdateTabletReplicasAfterConfigChange( - const scoped_refptr& tablet, - const std::string& sender_uuid, - const consensus::ConsensusStatePB& consensus_state, - const ReportedTabletPB& report); - - // Register a tablet server whenever it heartbeats with a consensus configuration. This is - // needed because we have logic in the Master that states that if a tablet - // server that is part of a consensus configuration has not heartbeated to the Master yet, we - // leave it out of the consensus configuration reported to clients. - // TODO: See if we can remove this logic, as it seems confusing. - void UpdateTabletReplicaInLocalMemory(TSDescriptor* ts_desc, - const consensus::ConsensusStatePB* consensus_state, - const ReportedTabletPB& report, - const scoped_refptr& tablet_to_update); - - static void CreateNewReplicaForLocalMemory(TSDescriptor* ts_desc, - const consensus::ConsensusStatePB* consensus_state, - const ReportedTabletPB& report, - const tablet::RaftGroupStatePB& state, - TabletReplica* new_replica); - // Extract the set of tablets that can be deleted and the set of tablets // that must be processed because not running yet. // Returns a map of table_id -> {tablet_info1, tablet_info2, etc.}. @@ -1910,11 +1941,6 @@ class CatalogManager : public tserver::TabletPeerLookupIf, DeferredAssignmentActions* deferred, TabletInfos* new_tablets); - Status HandleTabletSchemaVersionReport( - TabletInfo *tablet, uint32_t version, - const LeaderEpoch& epoch, - const scoped_refptr& table = nullptr) override; - // Send the create tablet requests to the selected peers of the consensus configurations. // The creation is async, and at the moment there is no error checking on the // caller side. We rely on the assignment timeout. If we don't see the tablet @@ -2012,18 +2038,6 @@ class CatalogManager : public tserver::TabletPeerLookupIf, const scoped_refptr& tablet, const std::string& leader_uuid, const std::string& reason, HideOnly hide_only, const LeaderEpoch& epoch); - // Send the "delete tablet request" to the specified TS/tablet. - // The specified 'reason' will be logged on the TS. - void SendDeleteTabletRequest(const TabletId& tablet_id, - tablet::TabletDataState delete_type, - const boost::optional& cas_config_opid_index_less_or_equal, - const scoped_refptr& table, - TSDescriptor* ts_desc, - const std::string& reason, - const LeaderEpoch& epoch, - HideOnly hide_only = HideOnly::kFalse, - KeepData keep_data = KeepData::kFalse); - // Start a task to request the specified tablet leader to step down and optionally to remove // the server that is over-replicated. A new tablet server can be specified to start an election // immediately to become the new leader. If new_leader_ts_uuid is empty, the election will be run @@ -2132,10 +2146,6 @@ class CatalogManager : public tserver::TabletPeerLookupIf, // Creates a new TableInfo object. scoped_refptr NewTableInfo(TableId id, bool colocated) override; - // Register the tablet server with the ts manager using the Raft config. This is called for - // servers that are part of the Raft config but haven't registered as yet. - Status RegisterTsFromRaftConfig(const consensus::RaftPeerPB& peer); - template Status Load(const std::string& title, SysCatalogLoadingState* state); @@ -2153,8 +2163,6 @@ class CatalogManager : public tserver::TabletPeerLookupIf, Result IsTablePartOfSomeSnapshotSchedule(const TableInfo& table_info) override; - Result IsTableUndergoingPitrRestore(const TableInfo& table_info); - // Is this table part of xCluster or CDCSDK? bool IsTablePartOfXRepl(const TableId& table_id) const REQUIRES_SHARED(mutex_); @@ -2475,9 +2483,6 @@ class CatalogManager : public tserver::TabletPeerLookupIf, std::unique_ptr xcluster_manager_; - void StartElectionIfReady( - const consensus::ConsensusStatePB& cstate, const LeaderEpoch& epoch, TabletInfo* tablet); - Status CanAddPartitionsToTable( size_t desired_partitions, const PlacementInfoPB& placement_info) override; @@ -2582,45 +2587,10 @@ class CatalogManager : public tserver::TabletPeerLookupIf, // Helper function to refresh the tablespace info. Status DoRefreshTablespaceInfo(const LeaderEpoch& epoch); - // Processes committed consensus state for specified tablet from ts_desc. - // Returns true if tablet was mutated. - bool ProcessCommittedConsensusState( - TSDescriptor* ts_desc, - bool is_incremental, - const ReportedTabletPB& report, - const LeaderEpoch& epoch, - std::map* table_write_locks, - const TabletInfoPtr& tablet, - const TabletInfo::WriteLock& tablet_lock, - std::map>* tables, - std::vector* rpcs); - - struct ReportedTablet { - TabletId tablet_id; - TabletInfoPtr info; - const ReportedTabletPB* report; - std::map> tables; - }; - using ReportedTablets = std::vector; - - // Process tablets batch while processing tablet report. - Status ProcessTabletReportBatch( - TSDescriptor* ts_desc, - bool is_incremental, - ReportedTablets::iterator begin, - ReportedTablets::iterator end, - const LeaderEpoch& epoch, - TabletReportUpdatesPB* full_report_update, - std::vector* rpcs); - size_t GetNumLiveTServersForPlacement(const PlacementId& placement_id); TSDescriptorVector GetAllLiveNotBlacklistedTServers() const override; - // Get the ycql system.partitions vtable. Note that this has EXCLUDES(mutex_), in order to - // maintain lock ordering. - const YQLPartitionsVTable& GetYqlPartitionsVtable() const EXCLUDES(mutex_); - void InitializeTableLoadState( const TableId& table_id, TSDescriptorVector ts_descs, CMPerTableLoadState* state); @@ -3122,17 +3092,6 @@ class CatalogManager : public tserver::TabletPeerLookupIf, Status DropXClusterStreamsOfTables(const std::unordered_set& table_ids) EXCLUDES(mutex_); - // For a table that is currently in PREPARING state, if all its tablets have transitioned to - // RUNNING state, then collect and start the required post tablet creation async tasks. Table is - // advanced to the RUNNING state after all of these tasks complete successfully. - // new_running_tablets is the new set of tablets that are being transitioned to RUNNING state - // (dirty copy is modified) and yet to be persisted. These should be persisted before the table - // lock is released. Note: - // WriteLock on the table is required. - void SchedulePostTabletCreationTasks( - const TableInfoPtr& table_info, const LeaderEpoch& epoch, - const std::set& new_running_tablets = {}); - void SchedulePostTabletCreationTasksForPendingTables(const LeaderEpoch& epoch) EXCLUDES(mutex_); // Checks if the table is a consumer in an xCluster replication universe. diff --git a/src/yb/master/master_heartbeat_service.cc b/src/yb/master/master_heartbeat_service.cc index 0ffcaafabcf3..98f40b253a9f 100644 --- a/src/yb/master/master_heartbeat_service.cc +++ b/src/yb/master/master_heartbeat_service.cc @@ -15,13 +15,19 @@ #include "yb/common/common_util.h" #include "yb/common/pg_catversions.h" +#include "yb/consensus/metadata.pb.h" +#include "yb/consensus/opid_util.h" +#include "yb/consensus/quorum_util.h" + #include "yb/master/catalog_entity_info.pb.h" #include "yb/master/catalog_manager.h" #include "yb/master/master_heartbeat.service.h" #include "yb/master/master_service_base.h" #include "yb/master/master_service_base-internal.h" #include "yb/master/ts_manager.h" +#include "yb/master/yql_partitions_vtable.h" +#include "yb/util/debug/trace_event.h" #include "yb/util/flags.h" DEFINE_UNKNOWN_int32(tablet_report_limit, 1000, @@ -45,6 +51,40 @@ TAG_FLAG(master_enable_universe_uuid_heartbeat_check, advanced); DEFINE_test_flag(bool, skip_processing_tablet_metadata, false, "Whether to skip processing tablet metadata for TSHeartbeat."); +DEFINE_RUNTIME_int32(catalog_manager_report_batch_size, 1, + "The max number of tablets evaluated in the heartbeat as a single SysCatalog update."); +TAG_FLAG(catalog_manager_report_batch_size, advanced); + +DEFINE_RUNTIME_bool(catalog_manager_wait_for_new_tablets_to_elect_leader, true, + "Whether the catalog manager should wait for a newly created tablet to " + "elect a leader before considering it successfully created. " + "This is disabled in some tests where we explicitly manage leader " + "election."); +TAG_FLAG(catalog_manager_wait_for_new_tablets_to_elect_leader, hidden); + +DEFINE_RUNTIME_double(heartbeat_safe_deadline_ratio, .20, + "When the heartbeat deadline has this percentage of time remaining, " + "the master should halt tablet report processing so it can respond in time."); + +// Temporary. Can be removed after long-run testing. +// TODO: how temporary is this? +DEFINE_RUNTIME_bool(master_ignore_stale_cstate, true, + "Whether Master processes the raft config when the version is lower."); +TAG_FLAG(master_ignore_stale_cstate, hidden); + +DEFINE_RUNTIME_bool(master_tombstone_evicted_tablet_replicas, true, + "Whether the Master should tombstone (delete) tablet replicas that " + "are no longer part of the latest reported raft config."); +TAG_FLAG(master_tombstone_evicted_tablet_replicas, hidden); + +DEFINE_RUNTIME_bool(use_create_table_leader_hint, true, + "Whether the Master should hint which replica for each tablet should " + "be leader initially on tablet creation."); + +DEFINE_test_flag(uint64, inject_latency_during_tablet_report_ms, 0, + "Number of milliseconds to sleep during the processing of a tablet batch."); + +DECLARE_bool(enable_register_ts_from_raft); DECLARE_bool(enable_heartbeat_pg_catalog_versions_cache); DECLARE_int32(heartbeat_rpc_timeout_ms); @@ -55,307 +95,1133 @@ namespace master { namespace { +using std::string; + +using namespace std::literals; +using namespace std::placeholders; + +using consensus::ConsensusStatePB; +using consensus::PeerMemberType; +using tablet::RaftGroupStatePB; +using tablet::TABLET_DATA_DELETED; +using tablet::TABLET_DATA_TOMBSTONED; + class MasterHeartbeatServiceImpl : public MasterServiceBase, public MasterHeartbeatIf { public: explicit MasterHeartbeatServiceImpl(Master* master) - : MasterServiceBase(master), MasterHeartbeatIf(master->metric_entity()) {} + : MasterServiceBase(master), MasterHeartbeatIf(master->metric_entity()), + master_(master), catalog_manager_(master->catalog_manager_impl()) {} static Status CheckUniverseUuidMatchFromTserver( const UniverseUuid& tserver_universe_uuid, - const UniverseUuid& master_universe_uuid) { - if (!GetAtomicFlag(&FLAGS_master_enable_universe_uuid_heartbeat_check)) { - return Status::OK(); + const UniverseUuid& master_universe_uuid); + + void TSHeartbeat(const TSHeartbeatRequestPB* req, + TSHeartbeatResponsePB* resp, + rpc::RpcContext rpc) override; + + private: + Master* master_; + CatalogManager* catalog_manager_; + + Status ProcessTabletReport( + TSDescriptor* ts_desc, + const TabletReportPB& full_report, + const LeaderEpoch& epoch, + TabletReportUpdatesPB* full_report_update, + rpc::RpcContext* rpc); + + Status ProcessTabletReportBatch( + TSDescriptor* ts_desc, + bool is_incremental, + CatalogManager::ReportedTablets::iterator begin, + CatalogManager::ReportedTablets::iterator end, + const LeaderEpoch& epoch, + TabletReportUpdatesPB* full_report_update, + std::vector* rpcs); + + bool ProcessCommittedConsensusState( + TSDescriptor* ts_desc, + bool is_incremental, + const ReportedTabletPB& report, + const LeaderEpoch& epoch, + std::map* table_write_locks, + const TabletInfoPtr& tablet, + const TabletInfo::WriteLock& tablet_lock, + std::map>* tables, + std::vector* rpcs); + + void UpdateTabletReplicasAfterConfigChange( + const scoped_refptr& tablet, + const std::string& sender_uuid, + const ConsensusStatePB& consensus_state, + const ReportedTabletPB& report); + + void UpdateTabletReplicaInLocalMemory( + TSDescriptor* ts_desc, + const ConsensusStatePB* consensus_state, + const ReportedTabletPB& report, + const scoped_refptr& tablet); + + void CreateNewReplicaForLocalMemory( + TSDescriptor* ts_desc, + const ConsensusStatePB* consensus_state, + const ReportedTabletPB& report, + const RaftGroupStatePB& state, + TabletReplica* new_replica); + + bool ReplicaMapDiffersFromConsensusState( + const scoped_refptr& tablet, const ConsensusStatePB& cstate); +}; + +Status MasterHeartbeatServiceImpl::CheckUniverseUuidMatchFromTserver( + const UniverseUuid& tserver_universe_uuid, + const UniverseUuid& master_universe_uuid) { + if (!GetAtomicFlag(&FLAGS_master_enable_universe_uuid_heartbeat_check)) { + return Status::OK(); + } + + // If the universe uuid on the master is empty, return an error. Eventually this field will be + // set via a background thread. + if (master_universe_uuid.IsNil()) { + return STATUS(TryAgain, "universe_uuid is not yet set in the cluster config"); + } + + if (tserver_universe_uuid.IsNil()) { + // The tserver must send universe_uuid with the request, ask it to retry. + return STATUS(TryAgain, "universe_uuid needs to be set in the request"); + } + + if (tserver_universe_uuid != master_universe_uuid) { + return STATUS(InvalidArgument, + Format("Received wrong universe_uuid $0, expected $1", + tserver_universe_uuid.ToString(), master_universe_uuid.ToString())); + } + return Status::OK(); +} + +void MasterHeartbeatServiceImpl::TSHeartbeat( + const TSHeartbeatRequestPB* req, + TSHeartbeatResponsePB* resp, + rpc::RpcContext rpc) { + LongOperationTracker long_operation_tracker("TSHeartbeat", 1s); + + // If CatalogManager is not initialized don't even know whether or not we will + // be a leader (so we can't tell whether or not we can accept tablet reports). + SCOPED_LEADER_SHARED_LOCK(l, server_->catalog_manager_impl()); + + if (req->common().ts_instance().permanent_uuid().empty()) { + // In FSManager, we have already added empty UUID protection so that TServer will + // crash before even sending heartbeat to Master. Here is only for the case that + // new updated Master might received empty UUID from old version of TServer that + // doesn't have the crash code in FSManager. + rpc.RespondFailure(STATUS(InvalidArgument, "Recevied Empty UUID from instance: ", + req->common().ts_instance().ShortDebugString())); + return; + } + + consensus::ConsensusStatePB cpb; + Status s = server_->catalog_manager_impl()->GetCurrentConfig(&cpb); + if (!s.ok()) { + // For now, we skip setting the config on errors (hopefully next heartbeat will work). + // We could enhance to fail rpc, if there are too many error, on a case by case error basis. + LOG(WARNING) << "Could not set master raft config : " << s.ToString(); + } else if (cpb.has_config()) { + if (cpb.config().opid_index() > req->config_index()) { + *resp->mutable_master_config() = std::move(cpb.config()); + LOG(INFO) << "Set config at index " << resp->master_config().opid_index() << " for ts uuid " + << req->common().ts_instance().permanent_uuid(); } + } // Do nothing if config not ready. + + if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, &rpc)) { + resp->set_leader_master(false); + return; + } + + resp->mutable_master_instance()->CopyFrom(server_->instance_pb()); + resp->set_leader_master(true); + + // At the time of this check, we need to know that we're the master leader to access the + // cluster config. + SysClusterConfigEntryPB cluster_config; + s = server_->catalog_manager_impl()->GetClusterConfig(&cluster_config); + if (!s.ok()) { + LOG(WARNING) << "Unable to get cluster configuration: " << s.ToString(); + rpc.RespondFailure(s); + return; + } + + auto tserver_universe_uuid_res = UniverseUuid::FromString(req->universe_uuid()); + if (!tserver_universe_uuid_res) { + LOG(WARNING) << "Could not decode request universe_uuid: " << + tserver_universe_uuid_res.status().ToString(); + rpc.RespondFailure(tserver_universe_uuid_res.status()); + } + auto tserver_universe_uuid = *tserver_universe_uuid_res; - // If the universe uuid on the master is empty, return an error. Eventually this field will be - // set via a background thread. + auto master_universe_uuid_res = UniverseUuid::FromString( + FLAGS_TEST_master_universe_uuid.empty() ? + cluster_config.universe_uuid() : FLAGS_TEST_master_universe_uuid); + if (!master_universe_uuid_res) { + LOG(WARNING) << "Could not decode cluster config universe_uuid: " << + master_universe_uuid_res.status().ToString(); + rpc.RespondFailure(master_universe_uuid_res.status()); + } + auto master_universe_uuid = *master_universe_uuid_res; + + s = CheckUniverseUuidMatchFromTserver(tserver_universe_uuid, master_universe_uuid); + + if (!s.ok()) { + LOG(WARNING) << "Failed CheckUniverseUuidMatchFromTserver check: " << s.ToString(); if (master_universe_uuid.IsNil()) { - return STATUS(TryAgain, "universe_uuid is not yet set in the cluster config"); + auto* error = resp->mutable_error(); + error->set_code(MasterErrorPB::INVALID_CLUSTER_CONFIG); + StatusToPB(s, error->mutable_status()); + rpc.RespondSuccess(); + return; } if (tserver_universe_uuid.IsNil()) { - // The tserver must send universe_uuid with the request, ask it to retry. - return STATUS(TryAgain, "universe_uuid needs to be set in the request"); + resp->set_universe_uuid((*master_universe_uuid_res).ToString()); + auto* error = resp->mutable_error(); + error->set_code(MasterErrorPB::INVALID_REQUEST); + StatusToPB(s, error->mutable_status()); + rpc.RespondSuccess(); + return; } - if (tserver_universe_uuid != master_universe_uuid) { - return STATUS(InvalidArgument, - Format("Received wrong universe_uuid $0, expected $1", - tserver_universe_uuid.ToString(), master_universe_uuid.ToString())); - } - return Status::OK(); + rpc.RespondFailure(s); + return; } - void TSHeartbeat(const TSHeartbeatRequestPB* req, - TSHeartbeatResponsePB* resp, - rpc::RpcContext rpc) override { - LongOperationTracker long_operation_tracker("TSHeartbeat", 1s); - - // If CatalogManager is not initialized don't even know whether or not we will - // be a leader (so we can't tell whether or not we can accept tablet reports). - SCOPED_LEADER_SHARED_LOCK(l, server_->catalog_manager_impl()); - - if (req->common().ts_instance().permanent_uuid().empty()) { - // In FSManager, we have already added empty UUID protection so that TServer will - // crash before even sending heartbeat to Master. Here is only for the case that - // new updated Master might received empty UUID from old version of TServer that - // doesn't have the crash code in FSManager. - rpc.RespondFailure(STATUS(InvalidArgument, "Recevied Empty UUID from instance: ", - req->common().ts_instance().ShortDebugString())); + // If the TS is registering, register in the TS manager. + if (req->has_registration()) { + Status s = server_->ts_manager()->RegisterTS(req->common().ts_instance(), + req->registration(), + server_->MakeCloudInfoPB(), + &server_->proxy_cache()); + if (!s.ok()) { + LOG(WARNING) << "Unable to register tablet server (" << rpc.requestor_string() << "): " + << s.ToString(); + // TODO: add service-specific errors. + rpc.RespondFailure(s); return; } + resp->set_cluster_uuid(cluster_config.cluster_uuid()); + } - consensus::ConsensusStatePB cpb; - Status s = server_->catalog_manager_impl()->GetCurrentConfig(&cpb); + s = server_->catalog_manager_impl()->FillHeartbeatResponse(req, resp); + if (!s.ok()) { + LOG(WARNING) << "Unable to fill heartbeat response: " << s.ToString(); + rpc.RespondFailure(s); + } + + // Look up the TS -- if it just registered above, it will be found here. + // This allows the TS to register and tablet-report in the same RPC. + TSDescriptorPtr ts_desc; + s = server_->ts_manager()->LookupTS(req->common().ts_instance(), &ts_desc); + if (s.IsNotFound()) { + LOG(INFO) << "Got heartbeat from unknown tablet server { " + << req->common().ts_instance().ShortDebugString() << " } as " + << rpc.requestor_string() + << "; Asking this server to re-register. Status from ts lookup: " << s; + resp->set_needs_reregister(true); + resp->set_needs_full_tablet_report(true); + rpc.RespondSuccess(); + return; + } else if (!s.ok()) { + LOG(WARNING) << "Unable to look up tablet server for heartbeat request " + << req->DebugString() << " from " << rpc.requestor_string() + << "\nStatus: " << s.ToString(); + rpc.RespondFailure(s.CloneAndPrepend("Unable to lookup TS")); + return; + } + + ts_desc->UpdateHeartbeat(req); + resp->set_tablet_report_limit(FLAGS_tablet_report_limit); + + // Set the TServer metrics in TS Descriptor. + if (req->has_metrics()) { + ts_desc->UpdateMetrics(req->metrics()); + } + + if (req->has_tablet_report()) { + s = ProcessTabletReport( + ts_desc.get(), req->tablet_report(), l.epoch(), resp->mutable_tablet_report(), &rpc); if (!s.ok()) { - // For now, we skip setting the config on errors (hopefully next heartbeat will work). - // We could enhance to fail rpc, if there are too many error, on a case by case error basis. - LOG(WARNING) << "Could not set master raft config : " << s.ToString(); - } else if (cpb.has_config()) { - if (cpb.config().opid_index() > req->config_index()) { - *resp->mutable_master_config() = std::move(cpb.config()); - LOG(INFO) << "Set config at index " << resp->master_config().opid_index() << " for ts uuid " - << req->common().ts_instance().permanent_uuid(); + rpc.RespondFailure(s.CloneAndPrepend("Failed to process tablet report")); + return; + } + } + + if (!req->has_tablet_report() || req->tablet_report().is_incremental()) { + // Only process metadata if we have plenty of time to process the work (> 50% of + // timeout). + auto safe_time_left = CoarseMonoClock::Now() + (FLAGS_heartbeat_rpc_timeout_ms * 1ms / 2); + if (rpc.GetClientDeadline() > safe_time_left && + PREDICT_TRUE(!ANNOTATE_UNPROTECTED_READ(FLAGS_TEST_skip_processing_tablet_metadata))) { + std::unordered_map id_to_leader_metrics; + for (auto& info : req->leader_info()) { + id_to_leader_metrics[info.tablet_id()] = info; } - } // Do nothing if config not ready. + for (const auto& metadata : req->storage_metadata()) { + std::optional leader_metrics; + auto iter = id_to_leader_metrics.find(metadata.tablet_id()); + if (iter != id_to_leader_metrics.end()) { + leader_metrics = iter->second; + } + server_->catalog_manager_impl()->ProcessTabletMetadata(ts_desc.get()->permanent_uuid(), + metadata, leader_metrics); + } + } - if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, &rpc)) { - resp->set_leader_master(false); - return; + for (const auto& consumer_replication_state : req->xcluster_consumer_replication_status()) { + server_->catalog_manager_impl()->StoreXClusterConsumerReplicationStatus( + consumer_replication_state); } - resp->mutable_master_instance()->CopyFrom(server_->instance_pb()); - resp->set_leader_master(true); + // Only process the full compaction statuses if we have plenty of time to process the work (> + // 50% of timeout). + safe_time_left = CoarseMonoClock::Now() + (FLAGS_heartbeat_rpc_timeout_ms * 1ms / 2); + if (rpc.GetClientDeadline() > safe_time_left) { + for (const auto& full_compaction_status : req->full_compaction_statuses()) { + server_->catalog_manager_impl()->ProcessTabletReplicaFullCompactionStatus( + ts_desc->permanent_uuid(), full_compaction_status); + } + } - // At the time of this check, we need to know that we're the master leader to access the - // cluster config. - SysClusterConfigEntryPB cluster_config; - s = server_->catalog_manager_impl()->GetClusterConfig(&cluster_config); - if (!s.ok()) { - LOG(WARNING) << "Unable to get cluster configuration: " << s.ToString(); - rpc.RespondFailure(s); - return; + // Only set once. It may take multiple heartbeats to receive a full tablet report. + if (!ts_desc->has_tablet_report()) { + resp->set_needs_full_tablet_report(true); + } + } + + // Retrieve all the nodes known by the master. + std::vector> descs; + server_->ts_manager()->GetAllLiveDescriptors(&descs); + for (const auto& desc : descs) { + *resp->add_tservers() = *desc->GetTSInformationPB(); + } + + // Retrieve the ysql catalog schema version. + if (FLAGS_ysql_enable_db_catalog_version_mode) { + DbOidToCatalogVersionMap versions; + uint64_t fingerprint; // can only be used when versions is not empty. + s = server_->catalog_manager_impl()->GetYsqlAllDBCatalogVersions( + FLAGS_enable_heartbeat_pg_catalog_versions_cache /* use_cache */, + &versions, &fingerprint); + if (s.ok() && !versions.empty()) { + // Return versions back via heartbeat response if the tserver does not provide + // a fingerprint or the tserver's fingerprint does not match the master's + // fingerprint. The tserver does not provide a fingerprint when it has + // not received any catalog versions yet after it starts. + if (!req->has_ysql_db_catalog_versions_fingerprint() || + req->ysql_db_catalog_versions_fingerprint() != fingerprint) { + auto* const mutable_version_data = resp->mutable_db_catalog_version_data(); + for (const auto& it : versions) { + auto* const catalog_version = mutable_version_data->add_db_catalog_versions(); + catalog_version->set_db_oid(it.first); + catalog_version->set_current_version(it.second.current_version); + catalog_version->set_last_breaking_version(it.second.last_breaking_version); + } + if (FLAGS_log_ysql_catalog_versions) { + VLOG_WITH_FUNC(2) << "responding (to ts " + << req->common().ts_instance().permanent_uuid() + << ") db catalog versions: " + << resp->db_catalog_version_data().ShortDebugString(); + } + } else if (FLAGS_log_ysql_catalog_versions) { + VLOG_WITH_FUNC(2) << "responding (to ts " + << req->common().ts_instance().permanent_uuid() + << ") without db catalog versions: fingerprints matched"; + } + } else { + LOG(WARNING) << "Could not get YSQL db catalog versions for heartbeat response: " + << s.ToUserMessage(); + } + } else { + uint64_t last_breaking_version = 0; + uint64_t catalog_version = 0; + s = server_->catalog_manager_impl()->GetYsqlCatalogVersion( + &catalog_version, &last_breaking_version); + if (s.ok()) { + resp->set_ysql_catalog_version(catalog_version); + resp->set_ysql_last_breaking_catalog_version(last_breaking_version); + if (FLAGS_log_ysql_catalog_versions) { + VLOG_WITH_FUNC(1) << "responding (to ts " << req->common().ts_instance().permanent_uuid() + << ") catalog version: " << catalog_version + << ", breaking version: " << last_breaking_version; + } + } else { + LOG(WARNING) << "Could not get YSQL catalog version for heartbeat response: " + << s.ToUserMessage(); } + } + + uint64_t transaction_tables_version = server_->catalog_manager()->GetTransactionTablesVersion(); + resp->set_transaction_tables_version(transaction_tables_version); + + if (req->has_auto_flags_config_version() && + req->auto_flags_config_version() < server_->GetAutoFlagConfigVersion()) { + *resp->mutable_auto_flags_config() = server_->GetAutoFlagsConfig(); + } + + rpc.RespondSuccess(); +} + +Status MasterHeartbeatServiceImpl::ProcessTabletReport( + TSDescriptor* ts_desc, + const TabletReportPB& full_report, + const LeaderEpoch& epoch, + TabletReportUpdatesPB* full_report_update, + rpc::RpcContext* rpc) { + int num_tablets = full_report.updated_tablets_size(); + TRACE_EVENT2("master", "ProcessTabletReport", + "requestor", rpc->requestor_string(), + "num_tablets", num_tablets); + + VLOG(2) << "Received tablet report from " << rpc::RequestorString(rpc) << "(" + << ts_desc->permanent_uuid() << "): " << full_report.DebugString(); + + if (!ts_desc->has_tablet_report() && full_report.is_incremental()) { + LOG(WARNING) + << "Invalid tablet report from " << ts_desc->permanent_uuid() + << ": Received an incremental tablet report when a full one was needed"; + // We should respond with success in order to send reply that we need full report. + return Status::OK(); + } + + // TODO: on a full tablet report, we may want to iterate over the tablets we think + // the server should have, compare vs the ones being reported, and somehow mark + // any that have been "lost" (eg somehow the tablet metadata got corrupted or something). + + CatalogManager::ReportedTablets reported_tablets; - auto tserver_universe_uuid_res = UniverseUuid::FromString(req->universe_uuid()); - if (!tserver_universe_uuid_res) { - LOG(WARNING) << "Could not decode request universe_uuid: " << - tserver_universe_uuid_res.status().ToString(); - rpc.RespondFailure(tserver_universe_uuid_res.status()); + // Tablet Deletes to process after the catalog lock below. + std::set orphaned_tablets; + + catalog_manager_->GetReportedAndOrphanedTabletsFromReport( + num_tablets, full_report, full_report_update, &reported_tablets, &orphaned_tablets); + + std::sort(reported_tablets.begin(), reported_tablets.end(), [](const auto& lhs, const auto& rhs) { + return lhs.tablet_id < rhs.tablet_id; + }); + + // Process any delete requests from orphaned tablets, identified above. + for (const auto& tablet_id : orphaned_tablets) { + catalog_manager_->SendDeleteTabletRequest( + tablet_id, + tablet::TABLET_DATA_DELETED /* delete_type */, + boost::none /* cas_config_opid_index_less_or_equal */, + nullptr /* table */, + ts_desc, + "Report from an orphaned tablet" /* reason */, + epoch); + } + + // Calculate the deadline for this expensive loop coming up. + const auto safe_deadline = rpc->GetClientDeadline() - + (FLAGS_heartbeat_rpc_timeout_ms * 1ms * FLAGS_heartbeat_safe_deadline_ratio); + + // Process tablets by batches. + for (auto tablet_iter = reported_tablets.begin(); tablet_iter != reported_tablets.end();) { + auto batch_begin = tablet_iter; + tablet_iter += std::min( + reported_tablets.end() - tablet_iter, FLAGS_catalog_manager_report_batch_size); + + // Keeps track of all RPCs that should be sent when we're done with a single batch. + std::vector rpcs; + auto status = ProcessTabletReportBatch( + ts_desc, full_report.is_incremental(), batch_begin, tablet_iter, epoch, full_report_update, + &rpcs); + if (!status.ok()) { + for (auto& rpc : rpcs) { + rpc->AbortAndReturnPrevState(status); + } + return status; } - auto tserver_universe_uuid = *tserver_universe_uuid_res; - auto master_universe_uuid_res = UniverseUuid::FromString( - FLAGS_TEST_master_universe_uuid.empty() ? - cluster_config.universe_uuid() : FLAGS_TEST_master_universe_uuid); - if (!master_universe_uuid_res) { - LOG(WARNING) << "Could not decode cluster config universe_uuid: " << - master_universe_uuid_res.status().ToString(); - rpc.RespondFailure(master_universe_uuid_res.status()); + // 13. Send all queued RPCs. + for (auto& rpc : rpcs) { + DCHECK(rpc->table()); + rpc->table()->AddTask(rpc); + WARN_NOT_OK(catalog_manager_->ScheduleTask(rpc), + Format("Failed to send $0", rpc->description())); } - auto master_universe_uuid = *master_universe_uuid_res; + rpcs.clear(); - s = CheckUniverseUuidMatchFromTserver(tserver_universe_uuid, master_universe_uuid); + // 14. Check deadline. Need to exit before processing all batches if we're close to timing out. + if (tablet_iter != reported_tablets.end()) { - if (!s.ok()) { - LOG(WARNING) << "Failed CheckUniverseUuidMatchFromTserver check: " << s.ToString(); - if (master_universe_uuid.IsNil()) { - auto* error = resp->mutable_error(); - error->set_code(MasterErrorPB::INVALID_CLUSTER_CONFIG); - StatusToPB(s, error->mutable_status()); - rpc.RespondSuccess(); - return; + // [TESTING] Inject latency before processing a batch to test deadline. + if (PREDICT_FALSE(FLAGS_TEST_inject_latency_during_tablet_report_ms > 0)) { + LOG(INFO) << "Sleeping in CatalogManager::ProcessTabletReport for " + << FLAGS_TEST_inject_latency_during_tablet_report_ms << " ms"; + SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_inject_latency_during_tablet_report_ms)); } - if (tserver_universe_uuid.IsNil()) { - resp->set_universe_uuid((*master_universe_uuid_res).ToString()); - auto* error = resp->mutable_error(); - error->set_code(MasterErrorPB::INVALID_REQUEST); - StatusToPB(s, error->mutable_status()); - rpc.RespondSuccess(); - return; + // Return from here at configured safe heartbeat deadline to give the response packet time. + if (safe_deadline < CoarseMonoClock::Now()) { + LOG(INFO) << "Reached Heartbeat deadline. Returning early after processing " + << full_report_update->tablets_size() << " tablets"; + full_report_update->set_processing_truncated(true); + return Status::OK(); } + } + } // Loop to process the next batch until fully iterated. - rpc.RespondFailure(s); - return; + if (!full_report.is_incremental()) { + // A full report may take multiple heartbeats. + // The TS communicates how much is left to process for the full report beyond this specific HB. + bool completed_full_report = !full_report.has_remaining_tablet_count() + || full_report.remaining_tablet_count() == 0; + if (full_report.updated_tablets_size() == 0) { + LOG(INFO) << ts_desc->permanent_uuid() << " sent full tablet report with 0 tablets."; + } else if (!ts_desc->has_tablet_report()) { + LOG(INFO) << ts_desc->permanent_uuid() + << (completed_full_report ? " finished" : " receiving") << " first full report: " + << full_report.updated_tablets_size() << " tablets."; } + // We have a tablet report only once we're done processing all the chunks of the initial report. + ts_desc->set_has_tablet_report(completed_full_report); + } - // If the TS is registering, register in the TS manager. - if (req->has_registration()) { - Status s = server_->ts_manager()->RegisterTS(req->common().ts_instance(), - req->registration(), - server_->MakeCloudInfoPB(), - &server_->proxy_cache()); - if (!s.ok()) { - LOG(WARNING) << "Unable to register tablet server (" << rpc.requestor_string() << "): " - << s.ToString(); - // TODO: add service-specific errors. - rpc.RespondFailure(s); - return; + // 14. Queue background processing if we had updates. + if (full_report.updated_tablets_size() > 0) { + catalog_manager_->WakeBgTaskIfPendingUpdates(); + } + + return Status::OK(); +} + +int64_t GetCommittedConsensusStateOpIdIndex(const ReportedTabletPB& report) { + if (!report.has_committed_consensus_state() || + !report.committed_consensus_state().config().has_opid_index()) { + return consensus::kInvalidOpIdIndex; + } + + return report.committed_consensus_state().config().opid_index(); +} + +Status MasterHeartbeatServiceImpl::ProcessTabletReportBatch( + TSDescriptor* ts_desc, + bool is_incremental, + CatalogManager::ReportedTablets::iterator begin, + CatalogManager::ReportedTablets::iterator end, + const LeaderEpoch& epoch, + TabletReportUpdatesPB* full_report_update, + std::vector* rpcs) { + // First Pass. Iterate in TabletId Order to discover all Table locks we'll need. + + // Maps a table ID to its corresponding TableInfo. + std::map table_info_map; + + std::map table_write_locks; + for (auto reported_tablet = begin; reported_tablet != end; ++reported_tablet) { + auto table = reported_tablet->info->table(); + table_info_map[table->id()] = table; + // Acquire locks for all colocated tables reported + // in sorted order of table ids (We use a map). + for (const auto& [table_id, table_info] : reported_tablet->tables) { + table_info_map[table_info->id()] = table_info; + } + } + + // Need to acquire locks in Id order to prevent deadlock. + for (auto& [table_id, table] : table_info_map) { + table_write_locks[table_id] = table->LockForWrite(); + } + + std::map tablet_write_locks; + // Second Pass. + // Process each tablet. The list is sorted by ID. This may not be in the order that the tablets + // appear in 'full_report', but that has no bearing on correctness. + std::vector mutated_tablets; // refcount protected by ReportedTablet::info + std::unordered_map> new_running_tablets; + for (auto it = begin; it != end; ++it) { + const auto& tablet_id = it->tablet_id; + const TabletInfoPtr& tablet = it->info; + const ReportedTabletPB& report = *it->report; + const TableInfoPtr& table = tablet->table(); + + // Prepare an heartbeat response entry for this tablet, now that we're going to process it. + // Every tablet in the report that is processed gets one, even if there are no changes to it. + ReportedTabletUpdatesPB* update = full_report_update->add_tablets(); + update->set_tablet_id(tablet_id); + + // Get tablet lock on demand. This works in the batch case because the loop is ordered. + tablet_write_locks[tablet_id] = tablet->LockForWrite(); + auto& table_lock = table_write_locks[table->id()]; + auto& tablet_lock = tablet_write_locks[tablet_id]; + + TRACE_EVENT1("master", "HandleReportedTablet", "tablet_id", report.tablet_id()); + RETURN_NOT_OK_PREPEND(catalog_manager_->CheckIsLeaderAndReady(), + Format("This master is no longer the leader, unable to handle report for tablet $0", + tablet_id)); + + VLOG(3) << "tablet report: " << report.ShortDebugString() + << " peer: " << ts_desc->permanent_uuid(); + + // Delete the tablet if it (or its table) have been deleted. + if (tablet_lock->is_deleted() || + table_lock->started_deleting()) { + const string msg = tablet_lock->pb.state_msg(); + update->set_state_msg(msg); + LOG(INFO) << "Got report from deleted tablet " << tablet->ToString() + << " (" << msg << "): Sending delete request for this tablet"; + // TODO(unknown): Cancel tablet creation, instead of deleting, in cases + // where that might be possible (tablet creation timeout & replacement). + rpcs->push_back(catalog_manager_->MakeDeleteReplicaTask( + ts_desc->permanent_uuid(), table, tablet_id, TABLET_DATA_DELETED, + boost::none /* cas_config_opid_index_less_or_equal */, epoch, msg)); + continue; + } + + if (!table_lock->is_running()) { + const string msg = tablet_lock->pb.state_msg(); + LOG(INFO) << "Got report from tablet " << tablet->tablet_id() << " for non-running table " + << table->ToString() << ": " << msg; + update->set_state_msg(msg); + continue; + } + + // Tombstone a replica that is no longer part of the Raft config (and + // not already tombstoned or deleted outright). + // + // If the report includes a committed raft config, we only tombstone if the opid_index of the + // committed raft config is strictly less than the latest reported committed config. This + // prevents us from spuriously deleting replicas that have just been added to the committed + // config and are in the process of copying. + const ConsensusStatePB& prev_cstate = tablet_lock->pb.committed_consensus_state(); + const int64_t prev_opid_index = prev_cstate.config().opid_index(); + const int64_t report_opid_index = GetCommittedConsensusStateOpIdIndex(report); + if (FLAGS_master_tombstone_evicted_tablet_replicas && + report.tablet_data_state() != TABLET_DATA_TOMBSTONED && + report.tablet_data_state() != TABLET_DATA_DELETED && + report_opid_index < prev_opid_index && + !IsRaftConfigMember(ts_desc->permanent_uuid(), prev_cstate.config())) { + const string delete_msg = (report_opid_index == consensus::kInvalidOpIdIndex) ? + "Replica has no consensus available" : + Format("Replica with old config index $0", report_opid_index); + rpcs->push_back(catalog_manager_->MakeDeleteReplicaTask( + ts_desc->permanent_uuid(), table, tablet_id, TABLET_DATA_TOMBSTONED, + prev_opid_index, epoch, + Format("$0 (current committed config index is $1)", delete_msg, prev_opid_index))); + continue; + } + + // Skip a non-deleted tablet which reports an error. + if (report.has_error()) { + Status s = StatusFromPB(report.error()); + DCHECK(!s.ok()); + DCHECK_EQ(report.state(), tablet::FAILED); + LOG(WARNING) << "Tablet " << tablet->ToString() << " has failed on TS " + << ts_desc->permanent_uuid() << ": " << s.ToString(); + continue; + } + + // Hide the tablet if it (or its table) has been hidden and the tablet hasn't. + if ((tablet_lock->is_hidden() || + table_lock->started_hiding()) && + report.has_is_hidden() && + !report.is_hidden()) { + const string msg = tablet_lock->pb.state_msg(); + LOG(INFO) << "Got report from hidden tablet " << tablet->ToString() + << " (" << msg << "): Sending hide request for this tablet"; + auto task = catalog_manager_->MakeDeleteReplicaTask( + ts_desc->permanent_uuid(), table, tablet_id, TABLET_DATA_DELETED, + boost::none /* cas_config_opid_index_less_or_equal */, epoch, msg); + task->set_hide_only(true); + rpcs->push_back(task); + } + + // Process the report's consensus state. + // The report will not have a committed_consensus_state if it is in the + // middle of starting up, such as during tablet bootstrap. + // If we received an incremental report, and the tablet is starting up, we will update the + // replica so that the balancer knows how many tablets are in the middle of remote bootstrap. + if (report.has_committed_consensus_state()) { + const bool tablet_was_running = tablet_lock->is_running(); + if (ProcessCommittedConsensusState( + ts_desc, is_incremental, report, epoch, &table_write_locks, tablet, tablet_lock, + &it->tables, rpcs)) { + // If the tablet was mutated, add it to the tablets to be re-persisted. + // + // Done here and not on a per-mutation basis to avoid duplicate entries. + mutated_tablets.push_back(tablet.get()); + if (!tablet_was_running && tablet_lock->is_running()) { + new_running_tablets[table->id()].insert(tablet->id()); + } } - resp->set_cluster_uuid(cluster_config.cluster_uuid()); + } else if (is_incremental && + (report.state() == tablet::NOT_STARTED || report.state() == tablet::BOOTSTRAPPING)) { + // When a tablet server is restarted, it sends a full tablet report with all of its tablets + // in the NOT_STARTED state, so this would make the load balancer think that all the + // tablets are being remote bootstrapped at once, so only process incremental reports here. + UpdateTabletReplicaInLocalMemory(ts_desc, nullptr /* consensus */, report, tablet); } + } // Finished one round of batch processing. - s = server_->catalog_manager_impl()->FillHeartbeatResponse(req, resp); + // Write all tablet mutations to the catalog table. + // + // SysCatalogTable::Write will short-circuit the case where the data has not + // in fact changed since the previous version and avoid any unnecessary mutations. + if (!mutated_tablets.empty()) { + Status s = catalog_manager_->sys_catalog()->Upsert(epoch, mutated_tablets); if (!s.ok()) { - LOG(WARNING) << "Unable to fill heartbeat response: " << s.ToString(); - rpc.RespondFailure(s); + LOG(WARNING) << "Error updating tablets: " << s; + return s; } + } - // Look up the TS -- if it just registered above, it will be found here. - // This allows the TS to register and tablet-report in the same RPC. - TSDescriptorPtr ts_desc; - s = server_->ts_manager()->LookupTS(req->common().ts_instance(), &ts_desc); - if (s.IsNotFound()) { - LOG(INFO) << "Got heartbeat from unknown tablet server { " - << req->common().ts_instance().ShortDebugString() << " } as " - << rpc.requestor_string() - << "; Asking this server to re-register. Status from ts lookup: " << s; - resp->set_needs_reregister(true); - resp->set_needs_full_tablet_report(true); - rpc.RespondSuccess(); - return; - } else if (!s.ok()) { - LOG(WARNING) << "Unable to look up tablet server for heartbeat request " - << req->DebugString() << " from " << rpc.requestor_string() - << "\nStatus: " << s.ToString(); - rpc.RespondFailure(s.CloneAndPrepend("Unable to lookup TS")); - return; + // Update the table state if all its tablets are now running. + for (auto& [table_id, tablets] : new_running_tablets) { + catalog_manager_->SchedulePostTabletCreationTasks(table_info_map[table_id], epoch, tablets); + } + + // Filter the mutated tablets to find which tablets were modified. Need to actually commit the + // state of the tablets before updating the system.partitions table, so get this first. + std::vector yql_partitions_mutated_tablets = + catalog_manager_->GetYqlPartitionsVtable().FilterRelevantTablets(mutated_tablets); + + // Publish the in-memory tablet mutations and release the locks. + for (auto& l : tablet_write_locks) { + l.second.Commit(); + } + tablet_write_locks.clear(); + + // Unlock the tables; we no longer need to access their state. + for (auto& l : table_write_locks) { + l.second.Commit(); + } + table_write_locks.clear(); + + // Update the relevant tablet entries in system.partitions. + if (!yql_partitions_mutated_tablets.empty()) { + Status s = catalog_manager_->GetYqlPartitionsVtable() + .ProcessMutatedTablets(yql_partitions_mutated_tablets, tablet_write_locks); + } + + // Third Pass. Process all tablet schema version changes. + // (This is separate from tablet state mutations because only table on-disk state is changed.) + for (auto it = begin; it != end; ++it) { + const ReportedTabletPB& report = *it->report; + if (!report.has_schema_version()) { + continue; } + const TabletInfoPtr& tablet = it->info; + auto leader = tablet->GetLeader(); + if (leader.ok() && leader.get()->permanent_uuid() == ts_desc->permanent_uuid()) { + RETURN_NOT_OK(catalog_manager_->HandleTabletSchemaVersionReport( + tablet.get(), report.schema_version(), epoch)); + } + } + + return Status::OK(); +} - ts_desc->UpdateHeartbeat(req); - resp->set_tablet_report_limit(FLAGS_tablet_report_limit); +bool MasterHeartbeatServiceImpl::ProcessCommittedConsensusState( + TSDescriptor* ts_desc, + bool is_incremental, + const ReportedTabletPB& report, + const LeaderEpoch& epoch, + std::map* table_write_locks, + const TabletInfoPtr& tablet, + const TabletInfo::WriteLock& tablet_lock, + std::map>* tables, + std::vector* rpcs) { + const ConsensusStatePB& prev_cstate = tablet_lock->pb.committed_consensus_state(); + ConsensusStatePB cstate = report.committed_consensus_state(); + bool tablet_was_mutated = false; - // Set the TServer metrics in TS Descriptor. - if (req->has_metrics()) { - ts_desc->UpdateMetrics(req->metrics()); + // 6a. The master only processes reports for replicas with committed + // consensus configurations since it needs the committed index to only + // cache the most up-to-date config. Since it's possible for TOMBSTONED + // replicas with no ConsensusMetadata on disk to be reported as having no + // committed config opid_index, we skip over those replicas. + if (!cstate.config().has_opid_index()) { + LOG(WARNING) << "Missing opid_index in reported config: " << report.ShortDebugString(); + return false; + } + if (PREDICT_TRUE(FLAGS_master_ignore_stale_cstate) && + (cstate.current_term() < prev_cstate.current_term() || + GetCommittedConsensusStateOpIdIndex(report) < prev_cstate.config().opid_index())) { + LOG(WARNING) << "Stale heartbeat for Tablet " << tablet->ToString() + << " on TS " << ts_desc->permanent_uuid() + << " cstate=" << cstate.ShortDebugString() + << ", prev_cstate=" << prev_cstate.ShortDebugString(); + return false; + } + + // We do not expect reports from a TS for a config that it is not part of. This can happen if a + // TS is removed from the config while it is remote bootstrapping. In this case, we must ignore + // the heartbeats to avoid incorrectly adding this TS to the config in + // UpdateTabletReplicaInLocalMemory. + bool found_ts_in_config = false; + for (const auto& peer : cstate.config().peers()) { + if (peer.permanent_uuid() == ts_desc->permanent_uuid()) { + found_ts_in_config = true; + break; } + } + if (!found_ts_in_config) { + LOG(WARNING) << Format("Ignoring heartbeat from tablet server that is not part of reported " + "consensus config. ts_desc: $0, cstate: $1.", *ts_desc, cstate); + return false; + } - if (req->has_tablet_report()) { - s = server_->catalog_manager_impl()->ProcessTabletReport( - ts_desc.get(), req->tablet_report(), l.epoch(), resp->mutable_tablet_report(), &rpc); - if (!s.ok()) { - rpc.RespondFailure(s.CloneAndPrepend("Failed to process tablet report")); - return; + // 6b. Disregard the leader state if the reported leader is not a member + // of the committed config. + if (cstate.leader_uuid().empty() || + !IsRaftConfigMember(cstate.leader_uuid(), cstate.config())) { + cstate.clear_leader_uuid(); + tablet_was_mutated = true; + } + + // 6c. Mark the tablet as RUNNING if it makes sense to do so. + // + // We need to wait for a leader before marking a tablet as RUNNING, or + // else we could incorrectly consider a tablet created when only a + // minority of its replicas were successful. In that case, the tablet + // would be stuck in this bad state forever. + // - FLAG added to avoid waiting during mock tests. + if (!tablet_lock->is_running() && + report.state() == tablet::RUNNING && + (cstate.has_leader_uuid() || + !FLAGS_catalog_manager_wait_for_new_tablets_to_elect_leader)) { + DCHECK_EQ(SysTabletsEntryPB::CREATING, tablet_lock->pb.state()) + << "Tablet in unexpected state: " << tablet->ToString() + << ": " << tablet_lock->pb.ShortDebugString(); + VLOG(1) << "Tablet " << tablet->ToString() << " is now online"; + tablet_lock.mutable_data()->set_state(SysTabletsEntryPB::RUNNING, + "Tablet reported with an active leader"); + tablet_was_mutated = true; + } + + // 6d. Update the consensus state if: + // - A config change operation was committed (reflected by a change to + // the committed config's opid_index). + // - The new cstate has a leader, and either the old cstate didn't, or + // there was a term change. + if (cstate.config().opid_index() > prev_cstate.config().opid_index() || + (cstate.has_leader_uuid() && + (!prev_cstate.has_leader_uuid() || + cstate.current_term() > prev_cstate.current_term()))) { + + // 6d(i). Retain knowledge of the leader even if it wasn't reported in + // the latest config. + // + // When a config change is reported to the master, it may not include the + // leader because the follower doing the reporting may not know who the + // leader is yet (it may have just started up). It is safe to reuse + // the previous leader if the reported cstate has the same term as the + // previous cstate, and the leader was known for that term. + if (cstate.current_term() == prev_cstate.current_term()) { + if (!cstate.has_leader_uuid() && prev_cstate.has_leader_uuid()) { + cstate.set_leader_uuid(prev_cstate.leader_uuid()); + // Sanity check to detect consensus divergence bugs. + } else if (cstate.has_leader_uuid() && prev_cstate.has_leader_uuid() && + cstate.leader_uuid() != prev_cstate.leader_uuid()) { + string msg = Format("Previously reported cstate for tablet $0 gave " + "a different leader for term $1 than the current cstate. " + "Previous cstate: $2. Current cstate: $3.", + tablet->ToString(), cstate.current_term(), + prev_cstate.ShortDebugString(), cstate.ShortDebugString()); + LOG(DFATAL) << msg; + return false; } } - if (!req->has_tablet_report() || req->tablet_report().is_incremental()) { - // Only process metadata if we have plenty of time to process the work (> 50% of - // timeout). - auto safe_time_left = CoarseMonoClock::Now() + (FLAGS_heartbeat_rpc_timeout_ms * 1ms / 2); - if (rpc.GetClientDeadline() > safe_time_left && - PREDICT_TRUE(!ANNOTATE_UNPROTECTED_READ(FLAGS_TEST_skip_processing_tablet_metadata))) { - std::unordered_map id_to_leader_metrics; - for (auto& info : req->leader_info()) { - id_to_leader_metrics[info.tablet_id()] = info; - } - for (const auto& metadata : req->storage_metadata()) { - std::optional leader_metrics; - auto iter = id_to_leader_metrics.find(metadata.tablet_id()); - if (iter != id_to_leader_metrics.end()) { - leader_metrics = iter->second; + // 6d(ii). Delete any replicas from the previous config that are not in the new one. + if (FLAGS_master_tombstone_evicted_tablet_replicas) { + std::unordered_set current_member_uuids; + for (const consensus::RaftPeerPB &peer : cstate.config().peers()) { + InsertOrDie(¤t_member_uuids, peer.permanent_uuid()); + } + for (const consensus::RaftPeerPB &prev_peer : prev_cstate.config().peers()) { + const string& peer_uuid = prev_peer.permanent_uuid(); + if (!ContainsKey(current_member_uuids, peer_uuid)) { + // Don't delete a tablet server that hasn't reported in yet (Bootstrapping). + std::shared_ptr dummy_ts_desc; + if (!master_->ts_manager()->LookupTSByUUID(peer_uuid, &dummy_ts_desc)) { + continue; } - server_->catalog_manager_impl()->ProcessTabletMetadata(ts_desc.get()->permanent_uuid(), - metadata, leader_metrics); + // Otherwise, the TabletServer needs to remove this peer. + rpcs->push_back(catalog_manager_->MakeDeleteReplicaTask( + peer_uuid, tablet->table(), tablet->tablet_id(), TABLET_DATA_TOMBSTONED, + prev_cstate.config().opid_index(), epoch, + Format("TS $0 not found in new config with opid_index $1", + peer_uuid, cstate.config().opid_index()))); } } + } + // 6d(iii). Update the in-memory ReplicaLocations for this tablet using the new config. + VLOG(2) << "Updating replicas for tablet " << tablet->tablet_id() + << " using config reported by " << ts_desc->permanent_uuid() + << " to that committed in log index " << cstate.config().opid_index() + << " with leader state from term " << cstate.current_term(); + UpdateTabletReplicasAfterConfigChange(tablet, ts_desc->permanent_uuid(), cstate, report); - for (const auto& consumer_replication_state : req->xcluster_consumer_replication_status()) { - server_->catalog_manager_impl()->StoreXClusterConsumerReplicationStatus( - consumer_replication_state); - } + // 6d(iv). Update the consensus state. Don't use 'prev_cstate' after this. + LOG(INFO) << "Tablet: " << tablet->tablet_id() << " reported consensus state change." + << " New consensus state: " << cstate.ShortDebugString() + << " from " << ts_desc->permanent_uuid(); + *tablet_lock.mutable_data()->pb.mutable_committed_consensus_state() = cstate; + tablet_was_mutated = true; + } else { + // Report opid_index is equal to the previous opid_index. If some + // replica is reporting the same consensus configuration we already know about, but we + // haven't yet heard from all the tservers in the config, update the in-memory + // ReplicaLocations. + LOG(INFO) << "Tablet server " << ts_desc->permanent_uuid() << " sent " + << (is_incremental ? "incremental" : "full tablet") + << " report for " << tablet->tablet_id() + << ", prev state op id: " << prev_cstate.config().opid_index() + << ", prev state term: " << prev_cstate.current_term() + << ", prev state has_leader_uuid: " << prev_cstate.has_leader_uuid() + << ". Consensus state: " << cstate.ShortDebugString(); + if (GetAtomicFlag(&FLAGS_enable_register_ts_from_raft) && + ReplicaMapDiffersFromConsensusState(tablet, cstate)) { + LOG(INFO) << Format("Tablet replica map differs from reported consensus state. Replica map: " + "$0. Reported consensus state: $1.", *tablet->GetReplicaLocations(), + cstate.ShortDebugString()); + UpdateTabletReplicasAfterConfigChange(tablet, ts_desc->permanent_uuid(), cstate, report); + } else { + UpdateTabletReplicaInLocalMemory(ts_desc, &cstate, report, tablet); + } + } - // Only process the full compaction statuses if we have plenty of time to process the work (> - // 50% of timeout). - safe_time_left = CoarseMonoClock::Now() + (FLAGS_heartbeat_rpc_timeout_ms * 1ms / 2); - if (rpc.GetClientDeadline() > safe_time_left) { - for (const auto& full_compaction_status : req->full_compaction_statuses()) { - server_->catalog_manager_impl()->ProcessTabletReplicaFullCompactionStatus( - ts_desc->permanent_uuid(), full_compaction_status); + if (FLAGS_use_create_table_leader_hint && !cstate.has_leader_uuid()) { + catalog_manager_->StartElectionIfReady(cstate, epoch, tablet.get()); + } + + // 7. Send an AlterSchema RPC if the tablet has an old schema version. + if (table_write_locks->count(tablet->table()->id())) { + const TableInfo::WriteLock& table_lock = (*table_write_locks)[tablet->table()->id()]; + if (report.has_schema_version() && + report.schema_version() != table_lock->pb.version()) { + if (report.schema_version() > table_lock->pb.version()) { + LOG(ERROR) << "TS " << ts_desc->permanent_uuid() + << " has reported a schema version greater than the current one " + << " for tablet " << tablet->ToString() + << ". Expected version " << table_lock->pb.version() + << " got " << report.schema_version() + << " (corruption)"; + } else { + // TODO: For Alter (rolling apply to tablets), this is an expected transitory state. + LOG(INFO) << "TS " << ts_desc->permanent_uuid() + << " does not have the latest schema for tablet " << tablet->ToString() + << ". Expected version " << table_lock->pb.version() + << " got " << report.schema_version(); + } + // All metadata related changes for the tablet is passed as part of RESTORE_ON_TABLET rpcs + // and we should not trigger anything else during restore so as to not race schema versions. + // TODO(Sanket): What if restore is stuck then this block is muted forever. + auto restore_result = catalog_manager_->IsTableUndergoingPitrRestore(*tablet->table()); + LOG_IF(DFATAL, !restore_result.ok()) + << "Failed to determine if table has PITR restore in progress"; + if (!restore_result.ok() || !*restore_result) { + // It's possible that the tablet being reported is a laggy replica, and in fact + // the leader has already received an AlterTable RPC. That's OK, though -- + // it'll safely ignore it if we send another. + TransactionId txn_id = TransactionId::Nil(); + if (table_lock->pb.has_transaction() && + table_lock->pb.transaction().has_transaction_id()) { + LOG(INFO) << "Parsing transaction ID for tablet ID " << tablet->tablet_id(); + auto txn_id_res = FullyDecodeTransactionId( + table_lock->pb.transaction().transaction_id()); + if (!txn_id_res.ok()) { + LOG(WARNING) << "Parsing transaction ID failed for tablet ID " << tablet->tablet_id(); + return false; + } + txn_id = txn_id_res.get(); } + LOG(INFO) << "Triggering AlterTable with transaction ID " << txn_id + << " due to heartbeat delay for tablet ID " << tablet->tablet_id(); + rpcs->push_back(std::make_shared( + master_, catalog_manager_->AsyncTaskPool(), tablet, tablet->table(), txn_id, epoch)); } + } + } - // Only set once. It may take multiple heartbeats to receive a full tablet report. - if (!ts_desc->has_tablet_report()) { - resp->set_needs_full_tablet_report(true); + // Send AlterSchema RPC for colocated tables of this tablet if they are outdated. + for (const auto& id_to_version : report.table_to_version()) { + // Skip Primary table. + if (tablet->table()->id() == id_to_version.first) { + continue; + } + if (tables->count(id_to_version.first)) { + const auto& table_lock = (*table_write_locks)[id_to_version.first]; + // Ignore if same version. + if (table_lock->pb.version() == id_to_version.second) { + continue; + } + if (id_to_version.second > table_lock->pb.version()) { + LOG(ERROR) << "TS " << ts_desc->permanent_uuid() + << " has reported a schema version greater than the current one " + << " for table " << id_to_version.first + << ". Expected version " << table_lock->pb.version() + << " got " << id_to_version.second + << " (corruption)"; + } else { + LOG(INFO) << "TS " << ts_desc->permanent_uuid() + << " does not have the latest schema for table " << id_to_version.first + << ". Expected version " << table_lock->pb.version() + << " got " << id_to_version.second; + } + // All metadata related changes for the tablet is passed as part of RESTORE_ON_TABLET rpcs + // and we should not trigger anything else during restore so as to not race schema versions. + // TODO(Sanket): What if restore is stuck then this block is muted forever. + auto restore_result = + catalog_manager_->IsTableUndergoingPitrRestore(*(*tables)[id_to_version.first]); + LOG_IF(DFATAL, !restore_result.ok()) + << "Failed to determine if table has PITR restore in progress"; + if (!restore_result.ok() || !*restore_result) { + LOG(INFO) << "Triggering AlterTable for table id " << id_to_version.first + << " due to heartbeat delay for tablet ID " << tablet->tablet_id(); + rpcs->push_back(std::make_shared( + master_, catalog_manager_->AsyncTaskPool(), tablet, (*tables)[id_to_version.first], + TransactionId::Nil(), epoch)); } } + } - // Retrieve all the nodes known by the master. - std::vector> descs; - server_->ts_manager()->GetAllLiveDescriptors(&descs); - for (const auto& desc : descs) { - *resp->add_tservers() = *desc->GetTSInformationPB(); - } - - // Retrieve the ysql catalog schema version. - if (FLAGS_ysql_enable_db_catalog_version_mode) { - DbOidToCatalogVersionMap versions; - uint64_t fingerprint; // can only be used when versions is not empty. - s = server_->catalog_manager_impl()->GetYsqlAllDBCatalogVersions( - FLAGS_enable_heartbeat_pg_catalog_versions_cache /* use_cache */, - &versions, &fingerprint); - if (s.ok() && !versions.empty()) { - // Return versions back via heartbeat response if the tserver does not provide - // a fingerprint or the tserver's fingerprint does not match the master's - // fingerprint. The tserver does not provide a fingerprint when it has - // not received any catalog versions yet after it starts. - if (!req->has_ysql_db_catalog_versions_fingerprint() || - req->ysql_db_catalog_versions_fingerprint() != fingerprint) { - auto* const mutable_version_data = resp->mutable_db_catalog_version_data(); - for (const auto& it : versions) { - auto* const catalog_version = mutable_version_data->add_db_catalog_versions(); - catalog_version->set_db_oid(it.first); - catalog_version->set_current_version(it.second.current_version); - catalog_version->set_last_breaking_version(it.second.last_breaking_version); - } - if (FLAGS_log_ysql_catalog_versions) { - VLOG_WITH_FUNC(2) << "responding (to ts " - << req->common().ts_instance().permanent_uuid() - << ") db catalog versions: " - << resp->db_catalog_version_data().ShortDebugString(); - } - } else if (FLAGS_log_ysql_catalog_versions) { - VLOG_WITH_FUNC(2) << "responding (to ts " - << req->common().ts_instance().permanent_uuid() - << ") without db catalog versions: fingerprints matched"; - } - } else { - LOG(WARNING) << "Could not get YSQL db catalog versions for heartbeat response: " - << s.ToUserMessage(); + return tablet_was_mutated; +} + +void MasterHeartbeatServiceImpl::UpdateTabletReplicasAfterConfigChange( + const scoped_refptr& tablet, + const std::string& sender_uuid, + const ConsensusStatePB& consensus_state, + const ReportedTabletPB& report) { + auto replica_locations = std::make_shared(); + auto prev_rl = tablet->GetReplicaLocations(); + + for (const consensus::RaftPeerPB& peer : consensus_state.config().peers()) { + std::shared_ptr ts_desc; + if (!peer.has_permanent_uuid()) { + LOG(WARNING) << "Missing UUID for peer" << peer.ShortDebugString(); + continue; + } + if (!master_->ts_manager()->LookupTSByUUID(peer.permanent_uuid(), &ts_desc)) { + if (!GetAtomicFlag(&FLAGS_enable_register_ts_from_raft)) { + LOG(WARNING) << "Tablet server has never reported in. " + << "Not including in replica locations map yet. Peer: " + << peer.ShortDebugString() + << "; Tablet: " << tablet->ToString(); + continue; } - } else { - uint64_t last_breaking_version = 0; - uint64_t catalog_version = 0; - s = server_->catalog_manager_impl()->GetYsqlCatalogVersion( - &catalog_version, &last_breaking_version); - if (s.ok()) { - resp->set_ysql_catalog_version(catalog_version); - resp->set_ysql_last_breaking_catalog_version(last_breaking_version); - if (FLAGS_log_ysql_catalog_versions) { - VLOG_WITH_FUNC(1) << "responding (to ts " << req->common().ts_instance().permanent_uuid() - << ") catalog version: " << catalog_version - << ", breaking version: " << last_breaking_version; - } - } else { - LOG(WARNING) << "Could not get YSQL catalog version for heartbeat response: " - << s.ToUserMessage(); + + LOG(INFO) << "Tablet server has never reported in. Registering the ts using " + << "the raft config. Peer: " << peer.ShortDebugString() + << "; Tablet: " << tablet->ToString(); + Status s = catalog_manager_->RegisterTsFromRaftConfig(peer); + if (!s.ok()) { + LOG(WARNING) << "Could not register ts from raft config: " << s + << " Skip updating the replica map."; + continue; + } + + // Guaranteed to find the ts since we just registered. + master_->ts_manager()->LookupTSByUUID(peer.permanent_uuid(), &ts_desc); + if (!ts_desc.get()) { + LOG(WARNING) << "Could not find ts with uuid " << peer.permanent_uuid() + << " after registering from raft config. Skip updating the replica" + << " map."; + continue; } } - uint64_t transaction_tables_version = server_->catalog_manager()->GetTransactionTablesVersion(); - resp->set_transaction_tables_version(transaction_tables_version); + // Do not update replicas in the NOT_STARTED or BOOTSTRAPPING state (unless they are stale). + bool use_existing = false; + const TabletReplica* existing_replica = nullptr; + auto it = prev_rl->find(ts_desc->permanent_uuid()); + if (it != prev_rl->end()) { + existing_replica = &it->second; + } + if (existing_replica && peer.permanent_uuid() != sender_uuid) { + // IsStarting returns true if state == NOT_STARTED or state == BOOTSTRAPPING. + use_existing = existing_replica->IsStarting() && !existing_replica->IsStale(); + } + if (use_existing) { + InsertOrDie(replica_locations.get(), existing_replica->ts_desc->permanent_uuid(), + *existing_replica); + } else { + // The RaftGroupStatePB in the report is only applicable to the replica that is owned by the + // sender. Initialize the other replicas with an unknown state. + const RaftGroupStatePB replica_state = + (sender_uuid == ts_desc->permanent_uuid()) ? report.state() : RaftGroupStatePB::UNKNOWN; - if (req->has_auto_flags_config_version() && - req->auto_flags_config_version() < server_->GetAutoFlagConfigVersion()) { - *resp->mutable_auto_flags_config() = server_->GetAutoFlagsConfig(); + TabletReplica replica; + CreateNewReplicaForLocalMemory( + ts_desc.get(), &consensus_state, report, replica_state, &replica); + auto result = replica_locations.get()->insert({replica.ts_desc->permanent_uuid(), replica}); + LOG_IF(FATAL, !result.second) << "duplicate uuid: " << replica.ts_desc->permanent_uuid(); + if (existing_replica) { + result.first->second.UpdateDriveInfo(existing_replica->drive_info); + result.first->second.UpdateLeaderLeaseInfo(existing_replica->leader_lease_info); + } } + } - rpc.RespondSuccess(); + // Update the local tablet replica set. This deviates from persistent state during bootstrapping. + catalog_manager_->SetTabletReplicaLocations(tablet, replica_locations); +} + +void MasterHeartbeatServiceImpl::UpdateTabletReplicaInLocalMemory( + TSDescriptor* ts_desc, + const ConsensusStatePB* consensus_state, + const ReportedTabletPB& report, + const scoped_refptr& tablet) { + TabletReplica replica; + CreateNewReplicaForLocalMemory(ts_desc, consensus_state, report, report.state(), &replica); + catalog_manager_->UpdateTabletReplicaLocations(tablet, replica); +} + +void MasterHeartbeatServiceImpl::CreateNewReplicaForLocalMemory( + TSDescriptor* ts_desc, + const ConsensusStatePB* consensus_state, + const ReportedTabletPB& report, + const RaftGroupStatePB& state, + TabletReplica* new_replica) { + // Tablets in state NOT_STARTED or BOOTSTRAPPING don't have a consensus. + if (consensus_state == nullptr) { + new_replica->role = PeerRole::NON_PARTICIPANT; + new_replica->member_type = PeerMemberType::UNKNOWN_MEMBER_TYPE; + } else { + CHECK(consensus_state != nullptr) << "No cstate: " << ts_desc->permanent_uuid() + << " - " << state; + new_replica->role = GetConsensusRole(ts_desc->permanent_uuid(), *consensus_state); + new_replica->member_type = GetConsensusMemberType(ts_desc->permanent_uuid(), *consensus_state); + } + if (report.has_should_disable_lb_move()) { + new_replica->should_disable_lb_move = report.should_disable_lb_move(); + } + if (report.has_fs_data_dir()) { + new_replica->fs_data_dir = report.fs_data_dir(); } + new_replica->state = state; + new_replica->ts_desc = ts_desc; + if (!ts_desc->registered_through_heartbeat()) { + auto last_heartbeat = ts_desc->LastHeartbeatTime(); + new_replica->time_updated = last_heartbeat ? last_heartbeat : MonoTime::kMin; + } +} -}; +bool MasterHeartbeatServiceImpl::ReplicaMapDiffersFromConsensusState( + const scoped_refptr& tablet, const ConsensusStatePB& cstate) { + auto locs = tablet->GetReplicaLocations(); + if (locs->size() != implicit_cast(cstate.config().peers_size())) { + return true; + } + for (auto iter = cstate.config().peers().begin(); iter != cstate.config().peers().end(); iter++) { + if (locs->find(iter->permanent_uuid()) == locs->end()) { + return true; + } + } + return false; +} } // namespace