Skip to content

Commit 3e924b7

Browse files
committed
[#28033] CDC: Propagate the GetChanges() errors back to client
Summary: Call to GetConsistentChanges() suppresses all the error status it gets from virtual_wal's GetConsistentChangesInternal() apart from few ones (such as 'stream being inactive' and 'intent records already GCed'). This behaviour of not responding most of the errors back to walsender causes problems such as walsender being unaware of internal issue keep trying and thus being stuck. This diff corrects this issue by sending back most of the errors back to walsender which GetConsistentChanges() gets from virtual_wal's GetConsistentChangesInternal(). The error cases which are skipped from propagating back to client are: when tablet peer is not started yet, and when the tablet is not in kAvailable state. Jira: DB-17654, DB-16451 Test Plan: ./yb_build.sh --cxx-test cdcsdk_consumption_consistent_changes-test --gtest_filter=CDCSDKConsumptionConsistentChangesTest.TestBeforeImageNotExistErrorPropagation ./yb_build.sh release --cxx-test cdcsdk_consumption_consistent_changes-test --gtest_filter=CDCSDKConsumptionConsistentChangesTest.TestBeforeImageNotExistErrorPropagation Reviewers: sumukh.phalgaonkar, skumar, stiwary, asrinivasan, #db-approvers Reviewed By: sumukh.phalgaonkar, #db-approvers Subscribers: ycdcxcluster Tags: #jenkins-ready Differential Revision: https://phorge.dev.yugabyte.com/D45633
1 parent 5d91b11 commit 3e924b7

File tree

6 files changed

+96
-41
lines changed

6 files changed

+96
-41
lines changed

src/yb/cdc/cdc_service.cc

Lines changed: 12 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,10 @@ DEFINE_RUNTIME_bool(cdc_enable_implicit_checkpointing, false,
206206
DEFINE_RUNTIME_uint32(cdc_max_virtual_wal_per_tserver, 5,
207207
"Maximum VirtualWAL instances that can be present on a tserver at any time.");
208208

209+
DEFINE_test_flag(bool, mimic_tablet_not_in_available_state, false,
210+
"If true, this is used to mimic the behavior of the tablet as if it is not in an available "
211+
"state.");
212+
209213
DECLARE_int32(log_min_seconds_to_retain);
210214

211215
static bool ValidateMaxRefreshInterval(const char* flag_name, uint32 value) {
@@ -1601,6 +1605,13 @@ void CDCServiceImpl::GetChanges(
16011605
status.IsTabletSplit() ? CDCErrorPB::TABLET_SPLIT : CDCErrorPB::INVALID_REQUEST, context);
16021606
}
16031607

1608+
// Mocking that the tablet peer is not in TabletObjectState::kAvailable state.
1609+
if (PREDICT_FALSE(FLAGS_TEST_mimic_tablet_not_in_available_state)) {
1610+
RPC_STATUS_RETURN_ERROR(
1611+
STATUS(IllegalState, "Tablet not running: tablet object has invalid state"),
1612+
resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context);
1613+
}
1614+
16041615
auto tablet_peer = context_->LookupTablet(req->tablet_id());
16051616

16061617
auto original_leader_term = tablet_peer ? tablet_peer->LeaderTerm() : OpId::kUnknownTerm;
@@ -4989,25 +5000,6 @@ Result<tablet::TabletPeerPtr> CDCServiceImpl::GetServingTablet(const TabletId& t
49895000
return context_->GetServingTablet(tablet_id);
49905001
}
49915002

4992-
bool IsStreamInactiveError(Status status) {
4993-
if (!status.ok() && status.IsInternalError() &&
4994-
status.message().ToBuffer().find("expired for Tablet") != std::string::npos) {
4995-
return true;
4996-
}
4997-
4998-
return false;
4999-
}
5000-
5001-
bool IsIntentGCError(Status status) {
5002-
if (!status.ok() && status.IsInternalError() &&
5003-
status.message().ToBuffer().find("CDCSDK Trying to fetch already GCed intents") !=
5004-
std::string::npos) {
5005-
return true;
5006-
}
5007-
5008-
return false;
5009-
}
5010-
50115003
Status CDCServiceImpl::PersistActivePidInSlotEntry(
50125004
const xrepl::StreamId& stream_id, uint64_t active_pid) {
50135005
cdc::CDCStateTableEntry entry(kCDCSDKSlotEntryTabletId, stream_id);
@@ -5239,11 +5231,7 @@ void CDCServiceImpl::GetConsistentChanges(
52395231
Format("GetConsistentChanges failed for stream_id: $0 with error: $1", stream_id, s);
52405232
if (!s.IsTryAgain()) {
52415233
LOG(WARNING) << msg;
5242-
// Propogate the error to the client only when the stream has expired or the intents have been
5243-
// garbage collected.
5244-
if (IsStreamInactiveError(s) || IsIntentGCError(s)) {
5245-
RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context);
5246-
}
5234+
RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context);
52475235
} else {
52485236
YB_LOG_EVERY_N_SECS(WARNING, 300) << msg;
52495237
}

src/yb/cdc/cdcsdk_producer.cc

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -588,16 +588,17 @@ Status PopulateBeforeImage(
588588
tablet_peer, commit_time.Decremented(), row_message, cdc_sdk_safe_time,
589589
std::forward<Args>(args)...);
590590
if (!status.ok()) {
591-
LOG(DFATAL) << "Failed to get the BeforeImage for tablet: " << tablet_peer->tablet_id()
592-
<< " with read time: " << commit_time
593-
<< " for change record type: " << row_message->op()
594-
<< " row_message: " << row_message->DebugString()
595-
<< " with error status: " << status;
591+
LOG(WARNING) << "Failed to get the BeforeImage for tablet: " << tablet_peer->tablet_id()
592+
<< " with read time: " << commit_time
593+
<< " for change record type: " << row_message->op()
594+
<< " row_message: " << row_message->DebugString()
595+
<< " with error status: " << status;
596+
} else {
597+
VLOG(2) << "Successfully got the BeforeImage for tablet: " << tablet_peer->tablet_id()
598+
<< " with read time: " << commit_time
599+
<< " for change record type: " << row_message->op()
600+
<< " row_message: " << row_message->DebugString();
596601
}
597-
VLOG(2) << "Successfully got the BeforeImage for tablet: " << tablet_peer->tablet_id()
598-
<< " with read time: " << commit_time
599-
<< " for change record type: " << row_message->op()
600-
<< " row_message: " << row_message->DebugString();
601602
return status;
602603
}
603604

src/yb/cdc/cdcsdk_virtual_wal.cc

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -840,6 +840,30 @@ Status CDCSDKVirtualWAL::GetConsistentChangesInternal(
840840
return Status::OK();
841841
}
842842

843+
bool IsRetryableError(const Status& status) {
844+
// The following error cases are considered retryable:
845+
// - when the tablet peer is not started yet, or
846+
// - the tablet is not in available state.
847+
// If any additional errors need to be made retryable in VWAL, add them to the list above.
848+
849+
DCHECK(!status.ok()) << "Status is not expected to be OK when calling IsRetryableError, "
850+
<< "status: " << status.ToString();
851+
852+
// Tablet peer is not started yet
853+
if (status.IsIllegalState() &&
854+
status.message().ToBuffer().find("is not started yet") != std::string::npos) {
855+
return true;
856+
}
857+
858+
// Tablet is not in kAvailable state
859+
if (status.IsIllegalState() &&
860+
status.message().ToBuffer().find("Tablet not running") != std::string::npos) {
861+
return true;
862+
}
863+
864+
return false;
865+
}
866+
843867
Status CDCSDKVirtualWAL::GetChangesInternal(
844868
const std::unordered_set<TabletId> tablet_to_poll_list, HostPort hostport,
845869
CoarseTimePoint deadline) {
@@ -885,6 +909,11 @@ Status CDCSDKVirtualWAL::GetChangesInternal(
885909
}
886910
continue;
887911
} else {
912+
// Replace the status code with 'kTryAgain' for the errors which are retryable so they
913+
// don't get propagated to walsender.
914+
if (IsRetryableError(s)) {
915+
s = s.CloneAndReplaceCode(Status::Code::kTryAgain);
916+
}
888917
LOG_WITH_PREFIX(WARNING) << "GetChanges failed for tablet_id: " << tablet_id
889918
<< " with error: " << s.CloneAndPrepend(error_msg).ToString();
890919
RETURN_NOT_OK(s);

src/yb/integration-tests/cdcsdk_consumption_consistent_changes-test.cc

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2682,6 +2682,49 @@ TEST_F(
26822682
ASSERT_EQ(tablet_peer_2->get_cdc_sdk_safe_time(), slot_row->record_id_commit_time);
26832683
}
26842684

2685+
TEST_F(CDCSDKConsumptionConsistentChangesTest, TestBeforeImageNotExistErrorPropagation) {
2686+
ASSERT_OK(SetUpWithParams(1, 1, false));
2687+
uint32_t num_cols = 3;
2688+
auto table = ASSERT_RESULT(CreateTable(
2689+
&test_cluster_, kNamespaceName, kTableName, 1, true, false, 0, false, "", "public", num_cols));
2690+
2691+
auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName));
2692+
// Set the replica identity to FULL. This is needed to get before image in update operation.
2693+
ASSERT_OK(conn.ExecuteFormat("ALTER TABLE $0 REPLICA IDENTITY FULL", kTableName));
2694+
2695+
auto stream_id = ASSERT_RESULT(CreateConsistentSnapshotStreamWithReplicationSlot());
2696+
ASSERT_OK(InitVirtualWAL(stream_id, {table.table_id()}));
2697+
2698+
// Setting the flag to mimic tablet not in available state. The expectation is that
2699+
// CDCServiceImpl::GetConsistentChanges() should not return an error since such error is expected
2700+
// to be retryable.
2701+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_mimic_tablet_not_in_available_state) = true;
2702+
ASSERT_OK(GetConsistentChangesFromCDC(stream_id));
2703+
2704+
// Resetting the test flag.
2705+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_mimic_tablet_not_in_available_state) = false;
2706+
2707+
map<std::string, uint32_t> col_val_map1, col_val_map2;
2708+
col_val_map1.insert({"col2", 9});
2709+
col_val_map1.insert({"col3", 10});
2710+
col_val_map2.insert({"col2", 10});
2711+
col_val_map2.insert({"col3", 11});
2712+
ASSERT_OK(UpdateRowsHelper(1, 2, &test_cluster_, true, 1, col_val_map1, col_val_map2, num_cols));
2713+
2714+
// Getting the consistent changes from stream and expecting a non-ok status due to "Failed to get
2715+
// the beforeimage" error. This error occurs when an UPDATE follows an INSERT for the same row in
2716+
// the same transaction, and the before image is unavailable.
2717+
// However, when packed rows are enabled, all the UPDATE operation/s of the same row gets absorbed
2718+
// into the INSERT operation and hence the before image is not required. In that case, we expect
2719+
// GetConsistentChanges to return the records without any error.
2720+
if (FLAGS_ysql_enable_packed_row) {
2721+
ASSERT_OK(GetConsistentChangesFromCDC(stream_id));
2722+
} else {
2723+
ASSERT_NOK_STR_CONTAINS(
2724+
GetConsistentChangesFromCDC(stream_id), "Failed to get the beforeimage");
2725+
}
2726+
}
2727+
26852728
// This test verifies the behaviour of VWAL when the publication refresh interval is changed when
26862729
// consumption is in progress.
26872730
TEST_F(CDCSDKConsumptionConsistentChangesTest, TestChangingPublicationRefreshInterval) {

src/yb/integration-tests/cdcsdk_ysql_test_base.cc

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1560,14 +1560,7 @@ Result<GetConsistentChangesResponsePB> CDCSDKYsqlTest::GetConsistentChangesFromC
15601560

15611561
if (status.ok() && change_resp.has_error()) {
15621562
status = StatusFromPB(change_resp.error().status());
1563-
if (status.IsNotFound() || status.IsInvalidArgument()) {
1564-
RETURN_NOT_OK(status);
1565-
} else if (status.IsInternalError()) {
1566-
auto err_msg = status.message().ToBuffer();
1567-
if ((err_msg.find("expired for Tablet") ||
1568-
err_msg.find("CDCSDK Trying to fetch already GCed intents")))
1569-
RETURN_NOT_OK(status);
1570-
}
1563+
RETURN_NOT_OK(status);
15711564
}
15721565

15731566
return false;

src/yb/integration-tests/cdcsdk_ysql_test_base.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ DECLARE_int32(retryable_request_timeout_secs);
139139
DECLARE_bool(save_index_into_wal_segments);
140140
DECLARE_bool(TEST_skip_process_apply);
141141
DECLARE_bool(TEST_ysql_yb_enable_implicit_dynamic_tables_logical_replication);
142+
DECLARE_bool(TEST_mimic_tablet_not_in_available_state);
142143

143144
namespace yb {
144145

0 commit comments

Comments
 (0)