diff --git a/ent/src/yb/cdc/cdc_service.cc b/ent/src/yb/cdc/cdc_service.cc index 23bdf9a134e6..1e2e61ed8b79 100644 --- a/ent/src/yb/cdc/cdc_service.cc +++ b/ent/src/yb/cdc/cdc_service.cc @@ -656,7 +656,7 @@ bool YsqlTableHasPrimaryKey(const client::YBSchema& schema) { return true; } -bool IsTabletPeerLeader(const std::shared_ptr& peer) { +bool IsTabletPeerLeader(const tablet::TabletPeerPtr& peer) { return peer->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY; } @@ -1044,12 +1044,11 @@ Result CDCServiceImpl::SetCDCCheckpoint( LOG(WARNING) << "Setting the checkpoint explicitly even though the checkpoint type is implicit"; } - auto tablet_peer_result = context_->GetServingTablet(req.tablet_id()); - auto status = ResultToStatus(tablet_peer_result); + auto tablet_peer = context_->LookupTablet(req.tablet_id()); // Case-1 The connected tserver does not contain the requested tablet_id. // Case-2 The connected tserver does not contain the tablet LEADER. - if (status.IsNotFound() || !IsTabletPeerLeader(*tablet_peer_result)) { + if (!tablet_peer || !IsTabletPeerLeader(tablet_peer)) { // Get tablet LEADER. auto result = GetLeaderTServer(req.tablet_id()); RETURN_NOT_OK_SET_CODE(result, CDCError(CDCErrorPB::NOT_LEADER)); @@ -1062,11 +1061,10 @@ Result CDCServiceImpl::SetCDCCheckpoint( RETURN_NOT_OK_SET_CODE( cdc_proxy->SetCDCCheckpoint(req, &resp, &rpc), CDCError(CDCErrorPB::INTERNAL_ERROR)); return SetCDCCheckpointResponsePB(); - } else if (!status.ok()) { - RETURN_NOT_OK_SET_CODE(status, CDCError(CDCErrorPB::LEADER_NOT_READY)); } - auto tablet_peer = std::move(*tablet_peer_result); + RETURN_NOT_OK_SET_CODE(CheckCanServeTabletData(*tablet_peer->tablet_metadata()), + CDCError(CDCErrorPB::LEADER_NOT_READY)); ProducerTabletInfo producer_tablet{"" /* UUID */, req.stream_id(), req.tablet_id()}; OpId checkpoint; @@ -1264,30 +1262,36 @@ void CDCServiceImpl::GetChanges(const GetChangesRequestPB* req, CheckTabletValidForStream(producer_tablet), resp->mutable_error(), CDCErrorPB::INVALID_REQUEST, context); - auto tablet_peer_result = context_->GetServingTablet(req->tablet_id()); - auto status = ResultToStatus(tablet_peer_result); - auto tablet_peer = ResultToValue(std::move(tablet_peer_result), {}); + auto tablet_peer = context_->LookupTablet(req->tablet_id()); auto original_leader_term = tablet_peer ? tablet_peer->LeaderTerm() : OpId::kUnknownTerm; // If we can't serve this tablet... - if (status.IsNotFound() || !IsTabletPeerLeader(tablet_peer)) { + if (!tablet_peer || !IsTabletPeerLeader(tablet_peer)) { if (req->serve_as_proxy()) { // Forward GetChanges() to tablet leader. This commonly happens in Kubernetes setups. auto context_ptr = std::make_shared(std::move(context)); TabletLeaderGetChanges(req, resp, context_ptr, tablet_peer); // Otherwise, figure out the proper return code. - } else if (status.IsNotFound()) { - SetupErrorAndRespond(resp->mutable_error(), status, CDCErrorPB::TABLET_NOT_FOUND, &context); - } else if (tablet_peer->LeaderStatus() == consensus::LeaderStatus::NOT_LEADER) { + } else if (!tablet_peer) { + SetupErrorAndRespond( + resp->mutable_error(), + STATUS_FORMAT(NotFound, "Tablet $0 not found", req->tablet_id()), + CDCErrorPB::TABLET_NOT_FOUND, + &context); + } else if (tablet_peer && tablet_peer->LeaderStatus() == consensus::LeaderStatus::NOT_LEADER) { // TODO: we may be able to get some changes, even if we're not the leader. - SetupErrorAndRespond(resp->mutable_error(), - STATUS(NotFound, Format("Not leader for $0", req->tablet_id())), - CDCErrorPB::TABLET_NOT_FOUND, &context); + SetupErrorAndRespond( + resp->mutable_error(), + STATUS_FORMAT(NotFound, "Not leader for $0", req->tablet_id()), + CDCErrorPB::TABLET_NOT_FOUND, + &context); } else { - SetupErrorAndRespond(resp->mutable_error(), + SetupErrorAndRespond( + resp->mutable_error(), STATUS(LeaderNotReadyToServe, "Not ready to serve"), - CDCErrorPB::LEADER_NOT_READY, &context); + CDCErrorPB::LEADER_NOT_READY, + &context); } return; } @@ -1364,6 +1368,7 @@ void CDCServiceImpl::GetChanges(const GetChangesRequestPB* req, } // Read the latest changes from the Log. + Status status; if (record.source_type == XCLUSTER) { status = GetChangesForXCluster( stream_id, req->tablet_id(), op_id, record, tablet_peer, session, @@ -1417,15 +1422,13 @@ void CDCServiceImpl::GetChanges(const GetChangesRequestPB* req, resp->mutable_error(), status.IsNotFound() ? CDCErrorPB::CHECKPOINT_TOO_OLD : CDCErrorPB::UNKNOWN_ERROR, context); - tablet_peer_result = context_->GetServingTablet(req->tablet_id()); - status = ResultToStatus(tablet_peer_result); - tablet_peer = ResultToValue(std::move(tablet_peer_result), {}); + tablet_peer = context_->LookupTablet(req->tablet_id()); // Verify leadership was maintained for the duration of the GetChanges() read. - if (status.IsNotFound() || !IsTabletPeerLeader(tablet_peer) || + if (!tablet_peer || !IsTabletPeerLeader(tablet_peer) || tablet_peer->LeaderTerm() != original_leader_term) { SetupErrorAndRespond(resp->mutable_error(), - STATUS(NotFound, Format("Not leader for $0", req->tablet_id())), + STATUS_FORMAT(NotFound, "Not leader for $0", req->tablet_id()), CDCErrorPB::TABLET_NOT_FOUND, &context); return; } diff --git a/src/yb/tablet/tablet_metadata.cc b/src/yb/tablet/tablet_metadata.cc index 8ce8a5952636..d879244dff8f 100644 --- a/src/yb/tablet/tablet_metadata.cc +++ b/src/yb/tablet/tablet_metadata.cc @@ -1484,5 +1484,15 @@ std::vector RaftGroupMetadata::GetAllColocatedTables() { return table_ids; } +Status CheckCanServeTabletData(const RaftGroupMetadata& metadata) { + auto data_state = metadata.tablet_data_state(); + if (!CanServeTabletData(data_state)) { + return STATUS_FORMAT( + IllegalState, "Tablet $0 data state not ready: $1", metadata.raft_group_id(), + TabletDataState_Name(data_state)); + } + return Status::OK(); +} + } // namespace tablet } // namespace yb diff --git a/src/yb/tablet/tablet_metadata.h b/src/yb/tablet/tablet_metadata.h index 480879ffa3c2..ed82accc1308 100644 --- a/src/yb/tablet/tablet_metadata.h +++ b/src/yb/tablet/tablet_metadata.h @@ -621,6 +621,8 @@ inline bool CanServeTabletData(TabletDataState state) { state == TabletDataState::TABLET_DATA_SPLIT_COMPLETED; } +Status CheckCanServeTabletData(const RaftGroupMetadata& metadata); + } // namespace tablet } // namespace yb diff --git a/src/yb/tserver/ts_tablet_manager.cc b/src/yb/tserver/ts_tablet_manager.cc index 35a85ca48615..a974dc40a27e 100644 --- a/src/yb/tserver/ts_tablet_manager.cc +++ b/src/yb/tserver/ts_tablet_manager.cc @@ -1797,12 +1797,7 @@ TabletPeerPtr TSTabletManager::LookupTabletUnlocked(const Key& tablet_id) const template Result TSTabletManager::DoGetServingTablet(const Key& tablet_id) const { auto tablet = VERIFY_RESULT(GetTablet(tablet_id)); - TabletDataState data_state = tablet->tablet_metadata()->tablet_data_state(); - if (!CanServeTabletData(data_state)) { - return STATUS_FORMAT( - IllegalState, "Tablet $0 data state not ready: $1", tablet_id, - TabletDataState_Name(data_state)); - } + RETURN_NOT_OK(CheckCanServeTabletData(*tablet->tablet_metadata())); return tablet; }