Skip to content

Commit

Permalink
[yugabyte#14846] CDCSDK: Add support for tablet split for newly added…
Browse files Browse the repository at this point in the history
… tables

Summary:
Adding support for streaming changes through a CDCSDK stream for tablet split belonging to tablets of newly added tables.

In the method: AddTabletEntriesToCDCSDKStreamsForNewTables , when we add new table details to the stream's metadata and cdc_state table, we now also add the table to 'cdcsdk_tables_to_stream_map_'.
This ensures tablets blenging to the new table will not be deleted directly after a successful tablet split, and will rather be hidden , as needed.

Test Plan:
Added ctests:
TestTabletSplitOnAddedTableForCDC
TestTabletSplitOnAddedTableForCDCWithMasterRestart

Reviewers: skumar, sdash

Reviewed By: sdash

Subscribers: bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D20841
  • Loading branch information
Adithya Bharadwaj authored and jayant07-yb committed Dec 7, 2022
1 parent 0fee9a6 commit b803175
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 0 deletions.
127 changes: 127 additions & 0 deletions ent/src/yb/integration-tests/cdcsdk_ysql-test.cc
Expand Up @@ -7999,6 +7999,133 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestBeginCommitRecordValidationWi
ASSERT_EQ(actual_commit_records, expected_commit_records);
}

TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestTabletSplitOnAddedTableForCDC)) {
ASSERT_OK(SetUpWithParams(1, 1, false));

const uint32_t num_tablets = 1;
auto table = ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, kTableName, num_tablets));
google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, /* partition_list_version =*/nullptr));
ASSERT_EQ(tablets.size(), num_tablets);

std::vector<TableId> expected_table_ids;
expected_table_ids.reserve(2);
TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName));
expected_table_ids.push_back(table_id);
CDCStreamId stream_id = ASSERT_RESULT(CreateDBStream(IMPLICIT));

std::unordered_set<TabletId> expected_tablet_ids;
for (const auto& tablet : tablets) {
expected_tablet_ids.insert(tablet.tablet_id());
}

auto table_2 =
ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, "test_table_1", num_tablets));
TableId table_2_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, "test_table_1"));
expected_table_ids.push_back(table_2_id);
google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets_2;
ASSERT_OK(
test_client()->GetTablets(table_2, 0, &tablets_2, /* partition_list_version =*/nullptr));
for (const auto& tablet : tablets_2) {
expected_tablet_ids.insert(tablet.tablet_id());
}
ASSERT_EQ(expected_tablet_ids.size(), num_tablets * 2);

// Verify that table_2's tablets have been added to the cdc_state table.
CheckTabletsInCDCStateTable(expected_tablet_ids, test_client());
SleepFor(MonoDelta::FromSeconds(1));

auto resp = ASSERT_RESULT(SetCDCCheckpoint(stream_id, tablets_2));
ASSERT_FALSE(resp.has_error());

GetChangesResponsePB change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets_2));
change_resp =
ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets_2, &change_resp.cdc_sdk_checkpoint()));

ASSERT_OK(WriteRowsHelper(1, 200, &test_cluster_, true, 2, "test_table_1"));
ASSERT_OK(test_client()->FlushTables(
{table_2_id}, /* add_indexes = */ false, /* timeout_secs = */ 30,
/* is_compaction = */ true));
ASSERT_OK(test_cluster_.mini_cluster_->CompactTablets());
WaitUntilSplitIsSuccesful(tablets_2.Get(0).tablet_id(), table_2);

// Verify GetChanges returns records even after tablet split, i.e tablets of the newly added table
// are hidden instead of being deleted.
change_resp =
ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets_2, &change_resp.cdc_sdk_checkpoint()));
ASSERT_GE(change_resp.cdc_sdk_proto_records_size(), 200);

// Now that all the required records have been streamed, verify that the tablet split error is
// reported.
ASSERT_NOK(GetChangesFromCDC(stream_id, tablets_2, &change_resp.cdc_sdk_checkpoint()));
}

TEST_F(
CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestTabletSplitOnAddedTableForCDCWithMasterRestart)) {
ASSERT_OK(SetUpWithParams(1, 1, false));

const uint32_t num_tablets = 1;
auto table = ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, kTableName, num_tablets));
google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, /* partition_list_version =*/nullptr));
ASSERT_EQ(tablets.size(), num_tablets);

std::vector<TableId> expected_table_ids;
expected_table_ids.reserve(2);
TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName));
expected_table_ids.push_back(table_id);
CDCStreamId stream_id = ASSERT_RESULT(CreateDBStream(IMPLICIT));

std::unordered_set<TabletId> expected_tablet_ids;
for (const auto& tablet : tablets) {
expected_tablet_ids.insert(tablet.tablet_id());
}

auto table_2 =
ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, "test_table_1", num_tablets));
TableId table_2_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, "test_table_1"));
expected_table_ids.push_back(table_2_id);
google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets_2;
ASSERT_OK(
test_client()->GetTablets(table_2, 0, &tablets_2, /* partition_list_version =*/nullptr));
for (const auto& tablet : tablets_2) {
expected_tablet_ids.insert(tablet.tablet_id());
}
ASSERT_EQ(expected_tablet_ids.size(), num_tablets * 2);

// Verify that table_2's tablets have been added to the cdc_state table.
CheckTabletsInCDCStateTable(expected_tablet_ids, test_client());

test_cluster_.mini_cluster_->mini_master()->Shutdown();
ASSERT_OK(test_cluster_.mini_cluster_->StartMasters());
LOG(INFO) << "Restarted Master";
SleepFor(MonoDelta::FromSeconds(30));

auto resp = ASSERT_RESULT(SetCDCCheckpoint(stream_id, tablets_2));
ASSERT_FALSE(resp.has_error());

GetChangesResponsePB change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets_2));
change_resp =
ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets_2, &change_resp.cdc_sdk_checkpoint()));

ASSERT_OK(WriteRowsHelper(1, 200, &test_cluster_, true, 2, "test_table_1"));
ASSERT_OK(test_client()->FlushTables(
{table_2_id}, /* add_indexes = */ false, /* timeout_secs = */ 30,
/* is_compaction = */ true));
ASSERT_OK(test_cluster_.mini_cluster_->CompactTablets());
WaitUntilSplitIsSuccesful(tablets_2.Get(0).tablet_id(), table_2);

// Verify GetChanges returns records even after tablet split, i.e tablets of the newly added table
// are hidden instead of being deleted.
change_resp =
ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets_2, &change_resp.cdc_sdk_checkpoint()));
ASSERT_GE(change_resp.cdc_sdk_proto_records_size(), 200);

// Now that all the required records have been streamed, verify that the tablet split error is
// reported.
ASSERT_NOK(GetChangesFromCDC(stream_id, tablets_2, &change_resp.cdc_sdk_checkpoint()));
}

} // namespace enterprise
} // namespace cdc
} // namespace yb
8 changes: 8 additions & 0 deletions ent/src/yb/master/catalog_manager_ent.cc
Expand Up @@ -3995,6 +3995,14 @@ Status CatalogManager::AddTabletEntriesToCDCSDKStreamsForNewTables(
continue;
}

// Add the table/ stream pair details to 'cdcsdk_tables_to_stream_map_', so that parent
// tablets on which tablet split is successful will be hidden rather than deleted straight
// away, as needed.
{
LockGuard lock(mutex_);
cdcsdk_tables_to_stream_map_[table_id].insert(stream->id());
}

stream_lock.Commit();
stream->cdcsdk_unprocessed_tables.erase(table_id);
LOG(INFO) << "Added tablets of table: " << table_id
Expand Down

0 comments on commit b803175

Please sign in to comment.