Skip to content

Commit

Permalink
[#17118] CDCSDK: Remove tables with no active streams from namespace_…
Browse files Browse the repository at this point in the history
…to_cdcsdk_unprocessed_table_map_

Summary:
As part of the CreateTable RPC in CatalogManager, all successfully newly created tables are added to: namespace_to_cdcsdk_unprocessed_table_map_ , and later we see if this table needs to be added to a CDCSDK stream or not.
But if there were no active streams in the namespace, we have to remove the table from "namespace_to_cdcsdk_unprocessed_table_map_", which was missed earlier (https://phabricator.dev.yugabyte.com/D24801).
Now changes are made in the 'AddNewTableToCDCDKStreamsMetadata' method, to remove such tables which need not be added to any stream.

Additionally, added some VLOGs for ease of debugging.
Jira Issue(s): DB-6403

Test Plan: ./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest. TestAdd100TableToNamespaceWithActiveStream

Reviewers: skumar

Reviewed By: skumar

Subscribers: bogdan, xCluster

Differential Revision: https://phabricator.dev.yugabyte.com/D25018
  • Loading branch information
Adithya Bharadwaj committed May 4, 2023
1 parent da3e29e commit 3d3480b
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 2 deletions.
4 changes: 2 additions & 2 deletions src/yb/integration-tests/cdcsdk_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ Status CDCSDKTestBase::CreateDatabase(
const std::string& namespace_name,
bool colocated) {
auto conn = VERIFY_RESULT(cluster->Connect());
RETURN_NOT_OK(conn.ExecuteFormat(
"CREATE DATABASE $0$1", namespace_name, colocated ? " colocated = true" : ""));
RETURN_NOT_OK(conn.ExecuteFormat(
"CREATE DATABASE $0$1", namespace_name, colocated ? " with colocation = true" : ""));
return Status::OK();
}

Expand Down
30 changes: 30 additions & 0 deletions src/yb/integration-tests/cdcsdk_ysql-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4616,6 +4616,36 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestAddTableToNamespaceWithActive
ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets_2));
}

TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestAdd100TableToNamespaceWithActiveStream)) {
ASSERT_OK(SetUpWithParams(1, 1, true));

const uint32_t num_tablets = 1;
auto table = ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, kTableName, num_tablets));
google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, /* partition_list_version =*/nullptr));
ASSERT_EQ(tablets.size(), num_tablets);

TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName));
CDCStreamId stream_id = ASSERT_RESULT(CreateDBStream(IMPLICIT));

const int num_new_tables = 100;
auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName));
for (int i = 1; i <= num_new_tables; i++) {
std::string table_name = "test_table_" + std::to_string(i);
ASSERT_OK(conn.ExecuteFormat("CREATE TABLE $0(key int PRIMARY KEY, value_1 int);", table_name));
}

ASSERT_OK(WaitFor(
[&]() -> Result<bool> {
auto result = GetCDCStreamTableIds(stream_id);
if (!result.ok()) return false;

return (result.get().size() == num_new_tables + 1);
},
MonoDelta::FromSeconds(180),
"Could not find all the added table's in the stream's metadata"));
}

TEST_F(
CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestAddTableToNamespaceWithActiveStreamMasterRestart)) {
FLAGS_catalog_manager_bg_task_wait_ms = 60 * 1000;
Expand Down
19 changes: 19 additions & 0 deletions src/yb/master/xrepl_catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,8 @@ Status CatalogManager::DeleteCDCStreamsMetadataForTables(const vector<TableId>&
Status CatalogManager::AddNewTableToCDCDKStreamsMetadata(
const TableId& table_id, const NamespaceId& ns_id) {
LockGuard lock(cdcsdk_unprocessed_table_mutex_);
VLOG(1) << "Added table: " << table_id << ", under namesapce: " << ns_id
<< ", to namespace_to_cdcsdk_unprocessed_table_map_ to be processed by CDC streams";
namespace_to_cdcsdk_unprocessed_table_map_[ns_id].insert(table_id);

return Status::OK();
Expand Down Expand Up @@ -936,6 +938,9 @@ Status CatalogManager::FindCDCSDKStreamsForAddedTables(
break;
}
}
if (found_unprocessed_tables == FLAGS_cdcsdk_table_processing_limit_per_run) {
break;
}
}
}

Expand Down Expand Up @@ -979,11 +984,23 @@ Status CatalogManager::FindCDCSDKStreamsForAddedTables(
if (std::find(ltm->table_id().begin(), ltm->table_id().end(), unprocessed_table_id) ==
ltm->table_id().end()) {
(*table_to_unprocessed_streams_map)[unprocessed_table_id].push_back(stream_info);
VLOG(1) << "Will try and add table: " << unprocessed_table_id
<< ", to stream: " << stream_info->id();
}
}
}
}

for (const auto& [ns_id, unprocessed_table_ids] : namespace_to_unprocessed_table_map) {
for (const auto& unprocessed_table_id : unprocessed_table_ids) {
if (!table_to_unprocessed_streams_map->contains(unprocessed_table_id)) {
// This means we found no active CDCSDK stream where this table was missing, hence we can
// remove this table from 'RemoveTableFromCDCSDKUnprocessedMap'.
RemoveTableFromCDCSDKUnprocessedMap(unprocessed_table_id, ns_id);
}
}
}

return Status::OK();
}

Expand Down Expand Up @@ -1073,6 +1090,8 @@ Status CatalogManager::AddTabletEntriesToCDCSDKStreamsForNewTables(
if (s.IsNotFound()) {
// The table has been deleted. We will remove the table's entry from the stream's metadata.
RemoveTableFromCDCSDKUnprocessedMap(table_id, streams.begin()->get()->namespace_id());
VLOG(1) << "Removed table: " << table_id
<< ", from namespace_to_cdcsdk_unprocessed_table_map_ , beacuse table not found";
} else {
LOG(WARNING) << "Encountered error calling: 'GetTableLocations' for table: " << table_id
<< "while trying to add tablet details to cdc_state table. Error: " << s;
Expand Down

0 comments on commit 3d3480b

Please sign in to comment.