Skip to content

Commit

Permalink
[#21204] CDCSDK: Support for deletion of slot row from cdc_state table
Browse files Browse the repository at this point in the history
Summary:
This diff adds support to delete the cdc_state table entry for the slot, when the stream corresponding to the slot is deleted.

The master background task sets the checkpoint to max for all the state table entries corresponding to the stream that is being deleted. The slot row entry is no exception to this. We use this max checkpoint information to find out the slot entries that need to be deleted.
The slot entry is used for setting of `cdc_sdk_safe_time` for the other entries for that stream, hence while deleting we need to ensure that the slot entry is deleted only after all the entries with valid tablet_id for that stream are deleted. We accomplish this by deleting the entries with valid tablet_id first and then deleting slot entry in the next pass of `UpdatePeersAndMetrics`.
Jira: DB-10134

Test Plan:
Jenkins: .*CDCSDK.*
./yb_build.sh --cxx-test integration-tests_cdcsdk_consumption_consistent_changes-test --gtest_filter CDCSDKConsumptionConsistentChangesTest.TestSlotRowDeletionWithSingleStream

./yb_build.sh --cxx-test integration-tests_cdcsdk_consumption_consistent_changes-test --gtest_filter CDCSDKConsumptionConsistentChangesTest.TestSlotRowDeletionWithMultipleStreams

Reviewers: asrinivasan, skumar, stiwary, siddharth.shah

Reviewed By: asrinivasan

Subscribers: ycdcxcluster

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D34828
  • Loading branch information
Sumukh-Phalgaonkar committed May 10, 2024
1 parent ead65ba commit bc46258
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 43 deletions.
66 changes: 52 additions & 14 deletions src/yb/cdc/cdc_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2417,7 +2417,8 @@ Result<bool> CDCServiceImpl::CheckBeforeImageActive(

Result<std::unordered_map<NamespaceId, uint64_t>>
CDCServiceImpl::GetNamespaceMinRecordIdCommitTimeMap(
const CDCStateTableRange& table_range, Status* iteration_status) {
const CDCStateTableRange& table_range, Status* iteration_status,
StreamIdSet* slot_entries_to_be_deleted) {
std::unordered_map<NamespaceId, uint64_t> namespace_to_min_record_id_commit_time;

// Iterate over all the slot entries and find the minimum record_id_commit_time for each
Expand All @@ -2441,6 +2442,12 @@ CDCServiceImpl::GetNamespaceMinRecordIdCommitTimeMap(
Format("The slot entry for the stream $0 did not have a value for record_id_commit_time"),
stream_id);

if (slot_entries_to_be_deleted && entry.checkpoint == OpId::Max()) {
LOG(INFO) << "Stream : " << stream_id << " is being deleted";
slot_entries_to_be_deleted->insert(stream_id);
continue;
}

auto stream_metadata = VERIFY_RESULT(GetStream(stream_id));
auto namespace_id = stream_metadata->GetNamespaceId();

Expand All @@ -2458,7 +2465,8 @@ CDCServiceImpl::GetNamespaceMinRecordIdCommitTimeMap(
}

Result<TabletIdCDCCheckpointMap> CDCServiceImpl::PopulateTabletCheckPointInfo(
const TabletId& input_tablet_id, TabletIdStreamIdSet* tablet_stream_to_be_deleted) {
const TabletId& input_tablet_id, TabletIdStreamIdSet* tablet_stream_to_be_deleted,
StreamIdSet* slot_entries_to_be_deleted) {
TabletIdCDCCheckpointMap tablet_min_checkpoint_map;
std::unordered_set<xrepl::StreamId> refreshed_metadata_set;

Expand All @@ -2470,9 +2478,10 @@ Result<TabletIdCDCCheckpointMap> CDCServiceImpl::PopulateTabletCheckPointInfo(

// Get the minimum record_id_commit_time for each namespace by looking at all the slot entries.
std::unordered_map<NamespaceId, uint64_t> namespace_to_min_record_id_commit_time;
StreamIdSet streams_with_tablet_entries_to_be_deleted;
if (FLAGS_ysql_TEST_enable_replication_slot_consumption) {
namespace_to_min_record_id_commit_time =
VERIFY_RESULT(GetNamespaceMinRecordIdCommitTimeMap(table_range, &iteration_status));
namespace_to_min_record_id_commit_time = VERIFY_RESULT(GetNamespaceMinRecordIdCommitTimeMap(
table_range, &iteration_status, slot_entries_to_be_deleted));
}

for (auto entry_result : table_range) {
Expand Down Expand Up @@ -2542,6 +2551,7 @@ Result<TabletIdCDCCheckpointMap> CDCServiceImpl::PopulateTabletCheckPointInfo(
VLOG(2) << "We will remove the entry for the stream: " << stream_id
<< ", from cdc_state table.";
tablet_stream_to_be_deleted->insert({tablet_id, stream_id});
streams_with_tablet_entries_to_be_deleted.insert(stream_id);
RemoveStreamFromCache(stream_id);
}
continue;
Expand Down Expand Up @@ -2569,14 +2579,16 @@ Result<TabletIdCDCCheckpointMap> CDCServiceImpl::PopulateTabletCheckPointInfo(
// For replication slot consumption we can set the cdc_sdk_safe_time to the minimum
// acknowledged commit time among all the slots on the namespace.
if (FLAGS_ysql_TEST_enable_replication_slot_consumption) {
// This is possible when Update Peers and Metrics thread comes into action before the slot
// entry is added to the cdc_state table.
if (!namespace_to_min_record_id_commit_time.contains(namespace_id)) {
LOG(WARNING) << "Did not find any value for record_id_commit_time for the namespace: "
<< namespace_id;
continue;
if (slot_entries_to_be_deleted && !slot_entries_to_be_deleted->contains(stream_id)) {
// This is possible when Update Peers and Metrics thread comes into action before the slot
// entry is added to the cdc_state table.
if (!namespace_to_min_record_id_commit_time.contains(namespace_id)) {
LOG(WARNING) << "Did not find any value for record_id_commit_time for the namespace: "
<< namespace_id;
continue;
}
cdc_sdk_safe_time = HybridTime(namespace_to_min_record_id_commit_time[namespace_id]);
}
cdc_sdk_safe_time = HybridTime(namespace_to_min_record_id_commit_time[namespace_id]);
} else if (entry.cdc_sdk_safe_time) {
cdc_sdk_safe_time = HybridTime(*entry.cdc_sdk_safe_time);
}
Expand All @@ -2598,6 +2610,7 @@ Result<TabletIdCDCCheckpointMap> CDCServiceImpl::PopulateTabletCheckPointInfo(
<< ", tablet_id: " << tablet_id
<< ", from cdc_state table since it has OpId::Max().";
tablet_stream_to_be_deleted->insert({tablet_id, stream_id});
streams_with_tablet_entries_to_be_deleted.insert(stream_id);
}

// If a tablet_id, stream_id pair is in "uninitialized state", we don't need to send the
Expand Down Expand Up @@ -2665,6 +2678,15 @@ Result<TabletIdCDCCheckpointMap> CDCServiceImpl::PopulateTabletCheckPointInfo(

RETURN_NOT_OK(iteration_status);

// Delete the slot entry in the state table in the next pass of Update Peers and Metrics, if
// entries with valid tablet_id are being deleted in this pass. This will ensure that the slot
// entry is the last entry to be deleted from the state table for a particular stream.
for (const auto& stream : streams_with_tablet_entries_to_be_deleted) {
if (slot_entries_to_be_deleted && slot_entries_to_be_deleted->contains(stream)) {
slot_entries_to_be_deleted->erase(stream);
}
}

YB_LOG_EVERY_N_SECS(INFO, 300) << "Read " << count << " records from "
<< kCdcStateTableName;
return tablet_min_checkpoint_map;
Expand Down Expand Up @@ -2960,7 +2982,9 @@ void CDCServiceImpl::UpdatePeersAndMetrics() {
// Don't exit from this thread even if below method throw error, because
// if we fail to read cdc_state table, lets wait for the next retry after 60 secs.
TabletIdStreamIdSet cdc_state_entries_to_delete;
auto result = PopulateTabletCheckPointInfo("", &cdc_state_entries_to_delete);
StreamIdSet slot_entries_to_be_deleted;
auto result =
PopulateTabletCheckPointInfo("", &cdc_state_entries_to_delete, &slot_entries_to_be_deleted);
if (!result.ok()) {
LOG(WARNING) << "Failed to populate tablets checkpoint info: " << result.status();
continue;
Expand All @@ -2987,7 +3011,8 @@ void CDCServiceImpl::UpdatePeersAndMetrics() {
TEST_SYNC_POINT("UpdateTabletPeersWithMaxCheckpoint::Done");

WARN_NOT_OK(
DeleteCDCStateTableMetadata(cdc_state_entries_to_delete, failed_tablet_ids),
DeleteCDCStateTableMetadata(
cdc_state_entries_to_delete, failed_tablet_ids, slot_entries_to_be_deleted),
"Unable to cleanup CDC State table metadata");

rate_limiter_->SetBytesPerSecond(
Expand All @@ -2998,7 +3023,8 @@ void CDCServiceImpl::UpdatePeersAndMetrics() {

Status CDCServiceImpl::DeleteCDCStateTableMetadata(
const TabletIdStreamIdSet& cdc_state_entries_to_delete,
const std::unordered_set<TabletId>& failed_tablet_ids) {
const std::unordered_set<TabletId>& failed_tablet_ids,
const StreamIdSet& slot_entries_to_be_deleted) {
// Iterating over set and deleting entries from the cdc_state table.
for (const auto& [tablet_id, stream_id] : cdc_state_entries_to_delete) {
if (failed_tablet_ids.contains(tablet_id)) {
Expand All @@ -3023,6 +3049,18 @@ Status CDCServiceImpl::DeleteCDCStateTableMetadata(
<< " is deleted";
}
}

std::vector<CDCStateTableKey> slot_entry_keys_to_be_deleted;
for (const auto& stream_id : slot_entries_to_be_deleted) {
slot_entry_keys_to_be_deleted.push_back({kCDCSDKSlotEntryTabletId, stream_id});
}
if (!slot_entry_keys_to_be_deleted.empty()) {
Status s = cdc_state_table_->DeleteEntries(slot_entry_keys_to_be_deleted);
if (!s.ok()) {
LOG(WARNING) << "Unable to flush operations to delete slot entries from state table: " << s;
return s.CloneAndPrepend("Error deleting slot rows from cdc_state table");
}
}
return Status::OK();
}

Expand Down
10 changes: 7 additions & 3 deletions src/yb/cdc/cdc_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ struct TabletCDCCheckpointInfo {

using TabletIdCDCCheckpointMap = std::unordered_map<TabletId, TabletCDCCheckpointInfo>;
using TabletIdStreamIdSet = std::set<std::pair<TabletId, xrepl::StreamId>>;
using StreamIdSet = std::set<xrepl::StreamId>;
using RollBackTabletIdCheckpointMap =
std::unordered_map<const std::string*, std::pair<int64_t, OpId>>;
class CDCServiceImpl : public CDCServiceIf {
Expand Down Expand Up @@ -407,7 +408,8 @@ class CDCServiceImpl : public CDCServiceIf {
// This method deletes entries from the cdc_state table that are contained in the set.
Status DeleteCDCStateTableMetadata(
const TabletIdStreamIdSet& cdc_state_entries_to_delete,
const std::unordered_set<TabletId>& failed_tablet_ids);
const std::unordered_set<TabletId>& failed_tablet_ids,
const StreamIdSet& slot_entries_to_be_deleted);

MicrosTime GetLastReplicatedTime(const std::shared_ptr<tablet::TabletPeer>& tablet_peer);

Expand Down Expand Up @@ -435,11 +437,13 @@ class CDCServiceImpl : public CDCServiceIf {
const tablet::TabletPeerPtr& tablet_peer);

Result<std::unordered_map<NamespaceId, uint64_t>> GetNamespaceMinRecordIdCommitTimeMap(
const CDCStateTableRange& table_range, Status* iteration_status);
const CDCStateTableRange& table_range, Status* iteration_status,
StreamIdSet* slot_entries_to_be_deleted);

Result<TabletIdCDCCheckpointMap> PopulateTabletCheckPointInfo(
const TabletId& input_tablet_id = "",
TabletIdStreamIdSet* tablet_stream_to_be_deleted = nullptr);
TabletIdStreamIdSet* tablet_stream_to_be_deleted = nullptr,
StreamIdSet* slot_entries_to_be_deleted = nullptr);

Status SetInitialCheckPoint(
const OpId& checkpoint, const std::string& tablet_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class CDCSDKConsumptionConsistentChangesTest : public CDCSDKYsqlTest {
void TestVWALRestartOnLongTxns(FeedbackType feedback_type);
void TestConcurrentConsumptionFromMultipleVWAL(CDCSDKSnapshotOption snapshot_option);
void TestCommitTimeTieWithPublicationRefreshRecord(bool special_record_in_separate_response);
void TestSlotRowDeletion(bool multiple_streams);
};

TEST_F(CDCSDKConsumptionConsistentChangesTest, TestVirtualWAL) {
Expand Down Expand Up @@ -2054,7 +2055,7 @@ void CDCSDKConsumptionConsistentChangesTest::TestCommitTimeTieWithPublicationRef

// Get the Consistent Snapshot Time for stream 2.
auto slot_row = ASSERT_RESULT(ReadSlotEntryFromStateTable(stream_2));
auto cdcsdk_consistent_snapshot_time = slot_row.last_pub_refresh_time;
auto cdcsdk_consistent_snapshot_time = slot_row->last_pub_refresh_time;
HybridTime commit_time;

ASSERT_OK(conn.Execute("BEGIN;"));
Expand Down Expand Up @@ -2268,8 +2269,8 @@ TEST_F(

ASSERT_OK(UpdateAndPersistLSN(stream_id, last_record_lsn, restart_lsn));
auto slot_row = ASSERT_RESULT(ReadSlotEntryFromStateTable(stream_id));
ASSERT_EQ(slot_row.restart_lsn, restart_lsn - 1);
ASSERT_EQ(slot_row.confirmed_flush_lsn, last_record_lsn);
ASSERT_EQ(slot_row->restart_lsn, restart_lsn - 1);
ASSERT_EQ(slot_row->confirmed_flush_lsn, last_record_lsn);

// Sleep to ensure that we will receive publication refresh record.
SleepFor(MonoDelta::FromMicroseconds(2 * publication_refresh_interval));
Expand Down Expand Up @@ -2546,8 +2547,8 @@ TEST_F(

// The safetime for both the table's tablets should be equal to slot's commit time.
auto slot_row = ASSERT_RESULT(ReadSlotEntryFromStateTable(stream_id));
ASSERT_EQ(tablet_peer_1->get_cdc_sdk_safe_time(), slot_row.record_id_commit_time);
ASSERT_EQ(tablet_peer_2->get_cdc_sdk_safe_time(), slot_row.record_id_commit_time);
ASSERT_EQ(tablet_peer_1->get_cdc_sdk_safe_time(), slot_row->record_id_commit_time);
ASSERT_EQ(tablet_peer_2->get_cdc_sdk_safe_time(), slot_row->record_id_commit_time);
}

// This test verifies the behaviour of VWAL when the publication refresh interval is changed when
Expand Down Expand Up @@ -2610,7 +2611,7 @@ TEST_F(CDCSDKConsumptionConsistentChangesTest, TestChangingPublicationRefreshInt

// Verify the value of pub_refresh_times in the state table.
auto slot_row = ASSERT_RESULT(ReadSlotEntryFromStateTable(stream_id));
ASSERT_EQ(slot_row.pub_refresh_times, GetPubRefreshTimesString(pub_refresh_times));
ASSERT_EQ(slot_row->pub_refresh_times, GetPubRefreshTimesString(pub_refresh_times));

// Update the publication's table list and change the refresh interval.
ASSERT_OK(UpdatePublicationTableList(stream_id, {table_1.table_id(), table_2.table_id()}));
Expand Down Expand Up @@ -2648,7 +2649,7 @@ TEST_F(CDCSDKConsumptionConsistentChangesTest, TestChangingPublicationRefreshInt

// Verify the value of pub_refresh_times in the state table.
slot_row = ASSERT_RESULT(ReadSlotEntryFromStateTable(stream_id));
ASSERT_EQ(slot_row.pub_refresh_times, GetPubRefreshTimesString(pub_refresh_times));
ASSERT_EQ(slot_row->pub_refresh_times, GetPubRefreshTimesString(pub_refresh_times));

// Update the publication's table list and change the refresh interval.
ASSERT_OK(UpdatePublicationTableList(stream_id, {table_1.table_id()}));
Expand Down Expand Up @@ -2680,7 +2681,7 @@ TEST_F(CDCSDKConsumptionConsistentChangesTest, TestChangingPublicationRefreshInt

// Verify the value of pub_refresh_times in the state table.
slot_row = ASSERT_RESULT(ReadSlotEntryFromStateTable(stream_id));
ASSERT_EQ(slot_row.pub_refresh_times, GetPubRefreshTimesString(pub_refresh_times));
ASSERT_EQ(slot_row->pub_refresh_times, GetPubRefreshTimesString(pub_refresh_times));

// Send acknowledgemetn for txn 1. This should not trim the pub_refresh_times list. Upon restart
// the very first record to be popped should be a publication refresh record.
Expand Down Expand Up @@ -2722,7 +2723,7 @@ TEST_F(CDCSDKConsumptionConsistentChangesTest, TestChangingPublicationRefreshInt

// Verify that the pub_refresh_times has been trimmed in state table slot entry.
slot_row = ASSERT_RESULT(ReadSlotEntryFromStateTable(stream_id));
ASSERT_EQ(slot_row.pub_refresh_times, GetPubRefreshTimesString(pub_refresh_times));
ASSERT_EQ(slot_row->pub_refresh_times, GetPubRefreshTimesString(pub_refresh_times));

ASSERT_OK(UpdatePublicationTableList(stream_id, {table_1.table_id()}));

Expand Down Expand Up @@ -2752,7 +2753,7 @@ TEST_F(CDCSDKConsumptionConsistentChangesTest, TestChangingPublicationRefreshInt

// Verify the value of pub_refresh_times in the state table.
slot_row = ASSERT_RESULT(ReadSlotEntryFromStateTable(stream_id));
ASSERT_EQ(slot_row.pub_refresh_times, GetPubRefreshTimesString(pub_refresh_times));
ASSERT_EQ(slot_row->pub_refresh_times, GetPubRefreshTimesString(pub_refresh_times));
}

TEST_F(CDCSDKConsumptionConsistentChangesTest, TestLSNDeterminismWithChangingPubRefreshInterval) {
Expand Down Expand Up @@ -2923,5 +2924,67 @@ TEST_F(CDCSDKConsumptionConsistentChangesTest, TestConsumptionAfterDroppingTable
CheckRecordCount(get_consistent_changes_resp, dml_records_per_table);
}

TEST_F(CDCSDKConsumptionConsistentChangesTest, TestSlotRowDeletionWithSingleStream) {
TestSlotRowDeletion(false);
}

TEST_F(CDCSDKConsumptionConsistentChangesTest, TestSlotRowDeletionWithMultipleStreams) {
TestSlotRowDeletion(true);
}

void CDCSDKConsumptionConsistentChangesTest::TestSlotRowDeletion(bool multiple_streams) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_update_min_cdc_indices_interval_secs) = 1;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdcsdk_retention_barrier_no_revision_interval_secs) = 1;

ASSERT_OK(SetUpWithParams(1, 1, false, true));
auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName));
auto table = ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, kTableName));

google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, nullptr));
ASSERT_EQ(tablets.size(), 1);

auto tablet_peer =
ASSERT_RESULT(GetLeaderPeerForTablet(test_cluster(), tablets.begin()->tablet_id()));

// Set the replica identity to full, so that cdc_sdk_safe_time will be set.
ASSERT_OK(conn.Execute("ALTER TABLE test_table REPLICA IDENTITY FULL"));
auto stream_id = ASSERT_RESULT(CreateConsistentSnapshotStream());

if (multiple_streams) {
ASSERT_RESULT(CreateConsistentSnapshotStream());
}

ASSERT_OK(WriteRowsWithConn(1, 2, &test_cluster_, &conn));

ASSERT_OK(InitVirtualWAL(stream_id, {table.table_id()}));
auto change_resp = ASSERT_RESULT(GetConsistentChangesFromCDC(stream_id));
ASSERT_GE(change_resp.cdc_sdk_proto_records_size(), 3);

auto slot_row = ASSERT_RESULT(ReadSlotEntryFromStateTable(stream_id));
ASSERT_TRUE(slot_row.has_value());

ASSERT_TRUE(DeleteCDCStream(stream_id));

// The slot row will be deleted from the state table.
ASSERT_OK(WaitFor(
[&]() -> Result<bool> {
auto slot_row = VERIFY_RESULT(ReadSlotEntryFromStateTable(stream_id));
return !slot_row.has_value();
},
MonoDelta::FromSeconds(10), "Timed out waiting for slot entry deletion from state table"));

if (multiple_streams) {
// Since one stream still exists, the retention barriers will not be lifted.
ASSERT_NE(tablet_peer->get_cdc_sdk_safe_time(), HybridTime::kInvalid);
ASSERT_NE(tablet_peer->get_cdc_min_replicated_index(), OpId::Max().index);
} else {
// Since the only stream that existed is now deleted, the retention barriers will be unset.
VerifyTransactionParticipant(tablet_peer->tablet_id(), OpId::Max());
ASSERT_EQ(tablet_peer->get_cdc_sdk_safe_time(), HybridTime::kInvalid);
ASSERT_EQ(tablet_peer->get_cdc_min_replicated_index(), OpId::Max().index);
}
}

} // namespace cdc
} // namespace yb
30 changes: 15 additions & 15 deletions src/yb/integration-tests/cdcsdk_ysql_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3724,10 +3724,9 @@ Result<string> CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) {
RETURN_NOT_OK(s);
return expected_row;
}

Result<CDCSDKYsqlTest::CdcStateTableSlotRow> CDCSDKYsqlTest::ReadSlotEntryFromStateTable(
const xrepl::StreamId& stream_id) {
CdcStateTableSlotRow slot_row;
Result<std::optional<CDCSDKYsqlTest::CdcStateTableSlotRow>>
CDCSDKYsqlTest::ReadSlotEntryFromStateTable(const xrepl::StreamId& stream_id) {
std::optional<CdcStateTableSlotRow> slot_row = std::nullopt;
CDCStateTable cdc_state_table(test_client());
Status s;
auto table_range =
Expand All @@ -3737,18 +3736,19 @@ Result<string> CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) {
RETURN_NOT_OK(row_result);
auto& row = *row_result;
if (row.key.tablet_id == kCDCSDKSlotEntryTabletId && row.key.stream_id == stream_id) {
slot_row.confirmed_flush_lsn = *(row.confirmed_flush_lsn);
slot_row.restart_lsn = *(row.restart_lsn);
slot_row.xmin = *(row.xmin);
slot_row.record_id_commit_time = HybridTime(*(row.record_id_commit_time));
slot_row.last_pub_refresh_time = HybridTime(*(row.last_pub_refresh_time));
slot_row.pub_refresh_times = *(row.pub_refresh_times);
slot_row = CdcStateTableSlotRow();
slot_row->confirmed_flush_lsn = *(row.confirmed_flush_lsn);
slot_row->restart_lsn = *(row.restart_lsn);
slot_row->xmin = *(row.xmin);
slot_row->record_id_commit_time = HybridTime(*(row.record_id_commit_time));
slot_row->last_pub_refresh_time = HybridTime(*(row.last_pub_refresh_time));
slot_row->pub_refresh_times = *(row.pub_refresh_times);
LOG(INFO) << "Read cdc_state table slot entry for slot with stream id: " << stream_id
<< " confirmed_flush_lsn: " << slot_row.confirmed_flush_lsn
<< " restart_lsn: " << slot_row.restart_lsn << " xmin: " << slot_row.xmin
<< " record_id_commit_time: " << slot_row.record_id_commit_time.ToUint64()
<< " last_pub_refresh_time: " << slot_row.last_pub_refresh_time.ToUint64()
<< " pub_refresh_times: " << slot_row.pub_refresh_times;
<< " confirmed_flush_lsn: " << slot_row->confirmed_flush_lsn
<< " restart_lsn: " << slot_row->restart_lsn << " xmin: " << slot_row->xmin
<< " record_id_commit_time: " << slot_row->record_id_commit_time.ToUint64()
<< " last_pub_refresh_time: " << slot_row->last_pub_refresh_time.ToUint64()
<< " pub_refresh_times: " << slot_row->pub_refresh_times;
}
}
RETURN_NOT_OK(s);
Expand Down
Loading

0 comments on commit bc46258

Please sign in to comment.