Skip to content

Commit

Permalink
[#13929] DocDB: Fix CDC Service GetChanges when tablet was deleted
Browse files Browse the repository at this point in the history
Summary:
CDC Service has pretty weird logic for tablet peer state.
It ignores all failures except IsNotFound.
Since before D19068/d4df77709fc39c3cf2fca4d907c6a094898ff2df tablet peer was set, it was not crash.

But this logic is naturally wrong, we cannot rely that tablet peer would be non null in case of other failures.
So after D19068 it started to crash.

Fixed to check for general failure.

Test Plan: ybd debug --cxx-test twodc-test --gtest_filter TwoDCTestParams/TwoDCTest.DeleteTableChecksCQL/2 -n 28

Reviewers: slingam

Reviewed By: slingam

Subscribers: rahuldesirazu, nicolas, bogdan, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D19622
  • Loading branch information
spolitov committed Sep 19, 2022
1 parent baa31bf commit 0ffebf7
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 30 deletions.
51 changes: 27 additions & 24 deletions ent/src/yb/cdc/cdc_service.cc
Expand Up @@ -656,7 +656,7 @@ bool YsqlTableHasPrimaryKey(const client::YBSchema& schema) {
return true;
}

bool IsTabletPeerLeader(const std::shared_ptr<tablet::TabletPeer>& peer) {
bool IsTabletPeerLeader(const tablet::TabletPeerPtr& peer) {
return peer->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY;
}

Expand Down Expand Up @@ -1044,12 +1044,11 @@ Result<SetCDCCheckpointResponsePB> CDCServiceImpl::SetCDCCheckpoint(
LOG(WARNING) << "Setting the checkpoint explicitly even though the checkpoint type is implicit";
}

auto tablet_peer_result = context_->GetServingTablet(req.tablet_id());
auto status = ResultToStatus(tablet_peer_result);
auto tablet_peer = context_->LookupTablet(req.tablet_id());

// Case-1 The connected tserver does not contain the requested tablet_id.
// Case-2 The connected tserver does not contain the tablet LEADER.
if (status.IsNotFound() || !IsTabletPeerLeader(*tablet_peer_result)) {
if (!tablet_peer || !IsTabletPeerLeader(tablet_peer)) {
// Get tablet LEADER.
auto result = GetLeaderTServer(req.tablet_id());
RETURN_NOT_OK_SET_CODE(result, CDCError(CDCErrorPB::NOT_LEADER));
Expand All @@ -1062,11 +1061,10 @@ Result<SetCDCCheckpointResponsePB> CDCServiceImpl::SetCDCCheckpoint(
RETURN_NOT_OK_SET_CODE(
cdc_proxy->SetCDCCheckpoint(req, &resp, &rpc), CDCError(CDCErrorPB::INTERNAL_ERROR));
return SetCDCCheckpointResponsePB();
} else if (!status.ok()) {
RETURN_NOT_OK_SET_CODE(status, CDCError(CDCErrorPB::LEADER_NOT_READY));
}

auto tablet_peer = std::move(*tablet_peer_result);
RETURN_NOT_OK_SET_CODE(CheckCanServeTabletData(*tablet_peer->tablet_metadata()),
CDCError(CDCErrorPB::LEADER_NOT_READY));

ProducerTabletInfo producer_tablet{"" /* UUID */, req.stream_id(), req.tablet_id()};
OpId checkpoint;
Expand Down Expand Up @@ -1264,30 +1262,36 @@ void CDCServiceImpl::GetChanges(const GetChangesRequestPB* req,
CheckTabletValidForStream(producer_tablet),
resp->mutable_error(), CDCErrorPB::INVALID_REQUEST, context);

auto tablet_peer_result = context_->GetServingTablet(req->tablet_id());
auto status = ResultToStatus(tablet_peer_result);
auto tablet_peer = ResultToValue(std::move(tablet_peer_result), {});
auto tablet_peer = context_->LookupTablet(req->tablet_id());

auto original_leader_term = tablet_peer ? tablet_peer->LeaderTerm() : OpId::kUnknownTerm;

// If we can't serve this tablet...
if (status.IsNotFound() || !IsTabletPeerLeader(tablet_peer)) {
if (!tablet_peer || !IsTabletPeerLeader(tablet_peer)) {
if (req->serve_as_proxy()) {
// Forward GetChanges() to tablet leader. This commonly happens in Kubernetes setups.
auto context_ptr = std::make_shared<RpcContext>(std::move(context));
TabletLeaderGetChanges(req, resp, context_ptr, tablet_peer);
// Otherwise, figure out the proper return code.
} else if (status.IsNotFound()) {
SetupErrorAndRespond(resp->mutable_error(), status, CDCErrorPB::TABLET_NOT_FOUND, &context);
} else if (tablet_peer->LeaderStatus() == consensus::LeaderStatus::NOT_LEADER) {
} else if (!tablet_peer) {
SetupErrorAndRespond(
resp->mutable_error(),
STATUS_FORMAT(NotFound, "Tablet $0 not found", req->tablet_id()),
CDCErrorPB::TABLET_NOT_FOUND,
&context);
} else if (tablet_peer && tablet_peer->LeaderStatus() == consensus::LeaderStatus::NOT_LEADER) {
// TODO: we may be able to get some changes, even if we're not the leader.
SetupErrorAndRespond(resp->mutable_error(),
STATUS(NotFound, Format("Not leader for $0", req->tablet_id())),
CDCErrorPB::TABLET_NOT_FOUND, &context);
SetupErrorAndRespond(
resp->mutable_error(),
STATUS_FORMAT(NotFound, "Not leader for $0", req->tablet_id()),
CDCErrorPB::TABLET_NOT_FOUND,
&context);
} else {
SetupErrorAndRespond(resp->mutable_error(),
SetupErrorAndRespond(
resp->mutable_error(),
STATUS(LeaderNotReadyToServe, "Not ready to serve"),
CDCErrorPB::LEADER_NOT_READY, &context);
CDCErrorPB::LEADER_NOT_READY,
&context);
}
return;
}
Expand Down Expand Up @@ -1364,6 +1368,7 @@ void CDCServiceImpl::GetChanges(const GetChangesRequestPB* req,
}

// Read the latest changes from the Log.
Status status;
if (record.source_type == XCLUSTER) {
status = GetChangesForXCluster(
stream_id, req->tablet_id(), op_id, record, tablet_peer, session,
Expand Down Expand Up @@ -1417,15 +1422,13 @@ void CDCServiceImpl::GetChanges(const GetChangesRequestPB* req,
resp->mutable_error(),
status.IsNotFound() ? CDCErrorPB::CHECKPOINT_TOO_OLD : CDCErrorPB::UNKNOWN_ERROR,
context);
tablet_peer_result = context_->GetServingTablet(req->tablet_id());
status = ResultToStatus(tablet_peer_result);
tablet_peer = ResultToValue(std::move(tablet_peer_result), {});
tablet_peer = context_->LookupTablet(req->tablet_id());

// Verify leadership was maintained for the duration of the GetChanges() read.
if (status.IsNotFound() || !IsTabletPeerLeader(tablet_peer) ||
if (!tablet_peer || !IsTabletPeerLeader(tablet_peer) ||
tablet_peer->LeaderTerm() != original_leader_term) {
SetupErrorAndRespond(resp->mutable_error(),
STATUS(NotFound, Format("Not leader for $0", req->tablet_id())),
STATUS_FORMAT(NotFound, "Not leader for $0", req->tablet_id()),
CDCErrorPB::TABLET_NOT_FOUND, &context);
return;
}
Expand Down
10 changes: 10 additions & 0 deletions src/yb/tablet/tablet_metadata.cc
Expand Up @@ -1484,5 +1484,15 @@ std::vector<TableId> RaftGroupMetadata::GetAllColocatedTables() {
return table_ids;
}

Status CheckCanServeTabletData(const RaftGroupMetadata& metadata) {
auto data_state = metadata.tablet_data_state();
if (!CanServeTabletData(data_state)) {
return STATUS_FORMAT(
IllegalState, "Tablet $0 data state not ready: $1", metadata.raft_group_id(),
TabletDataState_Name(data_state));
}
return Status::OK();
}

} // namespace tablet
} // namespace yb
2 changes: 2 additions & 0 deletions src/yb/tablet/tablet_metadata.h
Expand Up @@ -621,6 +621,8 @@ inline bool CanServeTabletData(TabletDataState state) {
state == TabletDataState::TABLET_DATA_SPLIT_COMPLETED;
}

Status CheckCanServeTabletData(const RaftGroupMetadata& metadata);

} // namespace tablet
} // namespace yb

Expand Down
7 changes: 1 addition & 6 deletions src/yb/tserver/ts_tablet_manager.cc
Expand Up @@ -1797,12 +1797,7 @@ TabletPeerPtr TSTabletManager::LookupTabletUnlocked(const Key& tablet_id) const
template <class Key>
Result<TabletPeerPtr> TSTabletManager::DoGetServingTablet(const Key& tablet_id) const {
auto tablet = VERIFY_RESULT(GetTablet(tablet_id));
TabletDataState data_state = tablet->tablet_metadata()->tablet_data_state();
if (!CanServeTabletData(data_state)) {
return STATUS_FORMAT(
IllegalState, "Tablet $0 data state not ready: $1", tablet_id,
TabletDataState_Name(data_state));
}
RETURN_NOT_OK(CheckCanServeTabletData(*tablet->tablet_metadata()));
return tablet;
}

Expand Down

0 comments on commit 0ffebf7

Please sign in to comment.