Skip to content

Commit

Permalink
[#13653] CDCSDK: Deleting stream IDs lead to stale entries in the cdc…
Browse files Browse the repository at this point in the history
…_state table causing tserver crash

Summary:
During the analysis we found that in case a stream_id is deleted, the metadata related to it is not getting cleared from the cdc_state table - and now even if a new stream is created, the previous (deleted) stream is causing interference to the functioning which is ultimately leading to a tserver crash.

To fix this we will ignore those deleted stream metadata entries as part //setCDCCheckpoint//, and will remove those entries when //UpdatePeersAndMetrics// thread is enabled again.

Test Plan: Running all the c and java testcases

Reviewers: srangavajjula, vkushwaha, abharadwaj, aagrawal, skumar

Reviewed By: aagrawal, skumar

Subscribers: ycdcxcluster

Differential Revision: https://phabricator.dev.yugabyte.com/D18986
  • Loading branch information
sureshdash2022-yb committed Aug 20, 2022
1 parent 0a9521a commit 21e562a
Show file tree
Hide file tree
Showing 8 changed files with 532 additions and 3 deletions.
47 changes: 44 additions & 3 deletions ent/src/yb/cdc/cdc_service.cc
Expand Up @@ -550,14 +550,31 @@ class CDCServiceImpl::Impl {
CoarseMonoClock::Now() >
it->cdc_state_checkpoint.last_active_time +
MonoDelta::FromMilliseconds(GetAtomicFlag(&FLAGS_cdc_intent_retention_ms))) {
LOG(ERROR) << "Stream ID: " << producer_tablet.stream_id
<< " expired for Tablet ID: " << producer_tablet.tablet_id
<< " with active time :"
<< it->cdc_state_checkpoint.last_active_time.time_since_epoch();
return STATUS_FORMAT(
InternalError, "stream ID $0 is expired for Tablet ID $1", producer_tablet.stream_id,
producer_tablet.tablet_id);
}
VLOG(1) << "Tablet :" << producer_tablet.ToString()
<< " found in CDCSerive Cache with active time: "
<< ": " << it->cdc_state_checkpoint.last_active_time.time_since_epoch();
}
return Status::OK();
}

Result<TabletCheckpoint> TEST_GetTabletInfoFromCache(const ProducerTabletInfo& producer_tablet) {
SharedLock<rw_spinlock> l(mutex_);
auto it = tablet_checkpoints_.find(producer_tablet);
if (it != tablet_checkpoints_.end()) {
return it->cdc_state_checkpoint;
}
return STATUS_FORMAT(
InternalError, "Tablet info: $0 not found in cache.", producer_tablet.ToString());
}

void UpdateActiveTime(const ProducerTabletInfo& producer_tablet) {
SharedLock<rw_spinlock> l(mutex_);
auto it = tablet_checkpoints_.find(producer_tablet);
Expand Down Expand Up @@ -1165,6 +1182,11 @@ Result<google::protobuf::RepeatedPtrField<master::TabletLocationsPB>> CDCService
return all_tablets;
}

Result<TabletCheckpoint> CDCServiceImpl::TEST_GetTabletInfoFromCache(
const ProducerTabletInfo& producer_tablet) {
return impl_->TEST_GetTabletInfoFromCache(producer_tablet);
}

void CDCServiceImpl::GetChanges(const GetChangesRequestPB* req,
GetChangesResponsePB* resp,
RpcContext context) {
Expand Down Expand Up @@ -1385,6 +1407,7 @@ Status CDCServiceImpl::UpdatePeersCdcMinReplicatedIndex(
std::vector<client::internal::RemoteTabletServer *> servers;
RETURN_NOT_OK(GetTServers(tablet_id, &servers));

auto ts_leader = VERIFY_RESULT(GetLeaderTServer(tablet_id));
for (const auto &server : servers) {
if (server->IsLocal()) {
// We modify our log directly. Avoid calling itself through the proxy.
Expand All @@ -1400,6 +1423,12 @@ Status CDCServiceImpl::UpdatePeersCdcMinReplicatedIndex(
cdc_checkpoint_min.cdc_sdk_op_id.ToPB(update_index_req.add_cdc_sdk_consumed_ops());
update_index_req.add_cdc_sdk_ops_expiration_ms(
cdc_checkpoint_min.cdc_sdk_op_id_expiration.ToMilliseconds());
// Don't update active time for the TABLET LEADER. Only update in FOLLOWERS.
if (server->permanent_uuid() != ts_leader->permanent_uuid()) {
for (auto& stream_id : cdc_checkpoint_min.active_stream_list) {
update_index_req.add_stream_ids(stream_id);
}
}

rpc::RpcController rpc;
rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_cdc_write_rpc_timeout_ms));
Expand Down Expand Up @@ -1685,7 +1714,7 @@ Result<std::shared_ptr<client::TableHandle>> CDCServiceImpl::GetCdcStateTable()
<< ", last replicated time: " << last_replicated_time_str;

// Add the {tablet_id, stream_id} pair to the set if its checkpoint is OpId::Max().
if (checkpoint == OpId::Max().ToString()) {
if (tablet_stream_to_be_deleted && checkpoint == OpId::Max().ToString()) {
tablet_stream_to_be_deleted->insert({tablet_id, stream_id});
}

Expand Down Expand Up @@ -1725,7 +1754,11 @@ Result<std::shared_ptr<client::TableHandle>> CDCServiceImpl::GetCdcStateTable()
// Check stream associated with the tablet is active or not.
// Don't consider those inactive stream for the min_checkpoint calculation.
CoarseTimePoint latest_active_time = CoarseTimePoint ::min();
if (record.source_type == CDCSDK) {
// if current tsever is the tablet LEADER, send the FOLLOWER tablets to
// update their active_time in their CDCService Cache.
std::shared_ptr<tablet::TabletPeer> tablet_peer;
Status s = tablet_manager_->GetTabletPeer(tablet_id, &tablet_peer);
if (s.ok() && record.source_type == CDCSDK && IsTabletPeerLeader(tablet_peer)) {
auto status = impl_->CheckStreamActive(producer_tablet);
if (!status.ok()) {
// Inactive stream read from cdc_state table are not considered for the minimum
Expand All @@ -1740,7 +1773,7 @@ Result<std::shared_ptr<client::TableHandle>> CDCServiceImpl::GetCdcStateTable()
}
continue;
}

tablet_min_checkpoint_map[tablet_id].active_stream_list.insert(stream_id);
latest_active_time = impl_->GetLatestActiveTime(producer_tablet, *result);
}

Expand Down Expand Up @@ -2178,6 +2211,14 @@ Result<std::shared_ptr<client::TableHandle>> CDCServiceImpl::GetCdcStateTable()
cdc_sdk_op,
cdc_sdk_op_id_expiration);
RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INVALID_REQUEST, context);

if (req->stream_ids_size() > 0) {
for (int stream_idx = 0; stream_idx < req->stream_ids_size(); stream_idx++) {
ProducerTabletInfo producer_tablet = {
"" /* UUID */, req->stream_ids(stream_idx), req->tablet_ids(i)};
impl_->UpdateActiveTime(producer_tablet);
}
}
}
context.RespondSuccess();
}
Expand Down
3 changes: 3 additions & 0 deletions ent/src/yb/cdc/cdc_service.h
Expand Up @@ -86,6 +86,7 @@ struct TabletCDCCheckpointInfo {
OpId cdc_sdk_op_id = OpId::Invalid();
MonoDelta cdc_sdk_op_id_expiration = MonoDelta::kZero;
CoarseTimePoint cdc_sdk_most_active_time = CoarseTimePoint::min();
std::unordered_set<CDCStreamId> active_stream_list;
};

using TabletOpIdMap = std::unordered_map<TabletId, TabletCDCCheckpointInfo>;
Expand Down Expand Up @@ -118,6 +119,8 @@ class CDCServiceImpl : public CDCServiceIf {
GetCheckpointResponsePB* resp,
rpc::RpcContext rpc) override;

Result<TabletCheckpoint> TEST_GetTabletInfoFromCache(const ProducerTabletInfo& producer_tablet);

// Update peers in other tablet servers about the latest minimum applied cdc index for a specific
// tablet.
void UpdateCdcReplicatedIndex(const UpdateCdcReplicatedIndexRequestPB* req,
Expand Down

0 comments on commit 21e562a

Please sign in to comment.