From 99fdbc1c15526ab457c9441264f1e9b298256079 Mon Sep 17 00:00:00 2001 From: Hari Krishna Sunder Date: Thu, 9 May 2024 12:54:54 -0700 Subject: [PATCH] [#22343] xCluster: Unify xCluster Stream creation Summary: Remove the option to create an xCluster stream via the cdc_service. This was just forwarding the call to master leader, and used only in TEST code. Creation of xCluster streams requires custom options to be set on it. This code was scattered across multiple places and there used to be several client function calls. Unified all these into one `XClusterClient::CreateXClusterStream`. xCluster streams use a static set of options which does not change. So now `XClusterSourceManager` sets these to the correct value. The Client code still sends the same options to make sure we work with older clusters. The client code can be cleaned up in the future Fixes #22343 Jira: DB-11249 Test Plan: Jenkins Reviewers: jhe, slingam, xCluster Reviewed By: jhe Subscribers: stiwary, skumar, ycdcxcluster, ybase Differential Revision: https://phorge.dev.yugabyte.com/D34138 --- src/yb/cdc/cdc_service.cc | 108 ++++-------------- src/yb/cdc/cdc_service.h | 11 -- src/yb/cdc/cdc_types.h | 8 ++ src/yb/cdc/xcluster_producer-test.cc | 23 +--- src/yb/cdc/xcluster_producer_bootstrap.cc | 14 +-- src/yb/cdc/xrepl_stream_metadata.cc | 2 + src/yb/client/CMakeLists.txt | 1 + src/yb/client/client-internal.cc | 67 +++++------ src/yb/client/client-internal.h | 11 +- src/yb/client/client-test.cc | 44 +++---- src/yb/client/client.cc | 34 ------ src/yb/client/client.h | 13 --- src/yb/client/xcluster_client.cc | 53 ++++++++- src/yb/client/xcluster_client.h | 13 +++ .../integration-tests/cdc_service-int-test.cc | 73 +++++++----- .../integration-tests/cdc_service-txn-test.cc | 9 +- src/yb/integration-tests/cdc_test_util.cc | 23 +--- src/yb/integration-tests/cdc_test_util.h | 5 +- .../integration-tests/cdcsdk_stream-test.cc | 12 +- src/yb/integration-tests/cdcsdk_ysql-test.cc | 2 +- .../xcluster/xcluster-tablet-split-itest.cc | 9 +- src/yb/master/master_xrepl-test.cc | 66 ++++------- .../xcluster/xcluster_source_manager.cc | 25 ++-- .../master/xcluster/xcluster_source_manager.h | 9 +- src/yb/master/xrepl_catalog_manager.cc | 34 +++--- 25 files changed, 268 insertions(+), 401 deletions(-) diff --git a/src/yb/cdc/cdc_service.cc b/src/yb/cdc/cdc_service.cc index 7095225e1255..869f72b5aff0 100644 --- a/src/yb/cdc/cdc_service.cc +++ b/src/yb/cdc/cdc_service.cc @@ -781,16 +781,6 @@ client::YBClient* CDCServiceImpl::client() { return impl_->async_client_init_->c namespace { -bool YsqlTableHasPrimaryKey(const client::YBSchema& schema) { - for (const auto& col : schema.columns()) { - if (col.order() == static_cast(PgSystemAttrNum::kYBRowId)) { - // ybrowid column is added for tables that don't have user-specified primary key. - return false; - } - } - return true; -} - std::unordered_map GetCreateCDCStreamOptions( const CreateCDCStreamRequestPB* req) { std::unordered_map options; @@ -873,29 +863,6 @@ bool GetFromOpId(const GetChangesRequestPB* req, OpId* op_id, CDCSDKCheckpointPB return true; } -// Check for compatibility whether CDC can be setup on the table -// This essentially checks that the table should not be a REDIS table since we do not support it -// and if it's a YSQL or YCQL one, it should have a primary key -Status CheckCdcCompatibility(const std::shared_ptr& table) { - // return if it is a CQL table because they always have a user specified primary key - if (table->table_type() == client::YBTableType::YQL_TABLE_TYPE) { - LOG(INFO) << "Returning while checking CDC compatibility, table is a YCQL table"; - return Status::OK(); - } - - if (table->table_type() == client::YBTableType::REDIS_TABLE_TYPE) { - return STATUS(InvalidArgument, "Cannot setup CDC on YEDIS_TABLE"); - } - - // Check if YSQL table has a primary key. CQL tables always have a - // user specified primary key. - if (!YsqlTableHasPrimaryKey(table->schema())) { - return STATUS(InvalidArgument, "Cannot setup CDC on table without primary key"); - } - - return Status::OK(); -} - CoarseTimePoint GetDeadline(const RpcContext& context, client::YBClient* client) { CoarseTimePoint deadline = context.GetClientDeadline(); if (deadline == CoarseTimePoint::max()) { // Not specified by user. @@ -1076,59 +1043,30 @@ void CDCServiceImpl::CreateCDCStream( return; } - RPC_CHECK_AND_RETURN_ERROR( - req->has_table_id() || req->has_namespace_name(), - STATUS(InvalidArgument, "Table ID or Database name is required to create CDC stream"), - resp->mutable_error(), - CDCErrorPB::INVALID_REQUEST, - context); - - bool is_xcluster = req->source_type() == XCLUSTER; - if (is_xcluster || req->has_table_id()) { - std::shared_ptr table; - Status s = client()->OpenTable(req->table_id(), &table); - RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::TABLE_NOT_FOUND, context); - - // We don't allow CDC on YEDIS and tables without a primary key. - if (req->record_format() != CDCRecordFormat::WAL) { - s = CheckCdcCompatibility(table); - RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INVALID_REQUEST, context); - } - - std::unordered_map options = GetCreateCDCStreamOptions(req); + if (req->has_table_id() || req->source_type() == XCLUSTER) { + RPC_STATUS_RETURN_ERROR( + STATUS(InvalidArgument, "xCluster stream should be created on master"), + resp->mutable_error(), CDCErrorPB::INVALID_REQUEST, context); + } - auto stream_id = RPC_VERIFY_RESULT( - client()->CreateCDCStream(req->table_id(), options), resp->mutable_error(), - CDCErrorPB::INTERNAL_ERROR, context); + RPC_CHECK_AND_RETURN_ERROR( + req->has_namespace_name(), + STATUS(InvalidArgument, "Table ID or Database name is required to create CDCSDK stream"), + resp->mutable_error(), CDCErrorPB::INVALID_REQUEST, context); - resp->set_stream_id(stream_id.ToString()); - - // Add stream to cache. - AddStreamMetadataToCache( - stream_id, - std::make_shared( - "", - std::vector{req->table_id()}, - req->record_type(), - req->record_format(), - req->source_type(), - req->checkpoint_type(), - StreamModeTransactional(req->transactional()))); - } else if (req->has_namespace_name()) { - // Return error if we see that no checkpoint type has been populated. - RPC_CHECK_AND_RETURN_ERROR( - req->has_checkpoint_type(), - STATUS(InvalidArgument, "Checkpoint type is required to create a CDCSDK stream"), - resp->mutable_error(), CDCErrorPB::INVALID_REQUEST, context); + // Return error if we see that no checkpoint type has been populated. + RPC_CHECK_AND_RETURN_ERROR( + req->has_checkpoint_type(), + STATUS(InvalidArgument, "Checkpoint type is required to create a CDCSDK stream"), + resp->mutable_error(), CDCErrorPB::INVALID_REQUEST, context); - auto deadline = GetDeadline(context, client()); - Status status = CreateCDCStreamForNamespace(req, resp, deadline); - CDCError error(status); + auto deadline = GetDeadline(context, client()); + Status status = CreateCDCStreamForNamespace(req, resp, deadline); + CDCError error(status); - if (!status.ok()) { - SetupErrorAndRespond(resp->mutable_error(), status, error.value(), &context); - return; - } + if (!status.ok()) { + SetupErrorAndRespond(resp->mutable_error(), status, error.value(), &context); + return; } context.RespondSuccess(); @@ -4151,12 +4089,6 @@ void CDCServiceImpl::RemoveStreamFromCache(const xrepl::StreamId& stream_id) { stream_metadata_.erase(stream_id); } -void CDCServiceImpl::AddStreamMetadataToCache( - const xrepl::StreamId& stream_id, const std::shared_ptr& stream_metadata) { - std::lock_guard l(mutex_); - InsertOrUpdate(&stream_metadata_, stream_id, stream_metadata); -} - Status CDCServiceImpl::CheckTabletValidForStream(const TabletStreamInfo& info) { auto result = VERIFY_RESULT(impl_->PreCheckTabletValidForStream(info)); if (result) { diff --git a/src/yb/cdc/cdc_service.h b/src/yb/cdc/cdc_service.h index 3f4febd1dd3a..a55018b1eb40 100644 --- a/src/yb/cdc/cdc_service.h +++ b/src/yb/cdc/cdc_service.h @@ -73,13 +73,6 @@ typedef std::unordered_map, HostPortH YB_STRONGLY_TYPED_BOOL(CreateMetricsEntityIfNotFound); -static const char* const kRecordType = "record_type"; -static const char* const kRecordFormat = "record_format"; -static const char* const kRetentionSec = "retention_sec"; -static const char* const kSourceType = "source_type"; -static const char* const kCheckpointType = "checkpoint_type"; -static const char* const kStreamState = "state"; -static const char* const kNamespaceId = "NAMESPACEID"; static const char* const kCDCSDKSnapshotDoneKey = "snapshot_done_key"; struct TabletCheckpoint { @@ -334,10 +327,6 @@ class CDCServiceImpl : public CDCServiceIf { void RemoveStreamFromCache(const xrepl::StreamId& stream_id); - void AddStreamMetadataToCache( - const xrepl::StreamId& stream_id, const std::shared_ptr& stream_metadata) - EXCLUDES(mutex_); - Status CheckTabletValidForStream(const TabletStreamInfo& producer_info); void TabletLeaderGetChanges( diff --git a/src/yb/cdc/cdc_types.h b/src/yb/cdc/cdc_types.h index 9a75ee159b9d..96c3442652f0 100644 --- a/src/yb/cdc/cdc_types.h +++ b/src/yb/cdc/cdc_types.h @@ -61,5 +61,13 @@ using EnumLabelCache = std::unordered_map; using CompositeAttsMap = std::unordered_map>; using CompositeTypeCache = std::unordered_map; +static const char* const kRecordType = "record_type"; +static const char* const kRecordFormat = "record_format"; +static const char* const kSourceType = "source_type"; +static const char* const kCheckpointType = "checkpoint_type"; +static const char* const kStreamState = "state"; +static const char* const kNamespaceId = "NAMESPACEID"; +// NOTE: Do not add new options here. Create them as explicit PB fields. + } // namespace cdc } // namespace yb diff --git a/src/yb/cdc/xcluster_producer-test.cc b/src/yb/cdc/xcluster_producer-test.cc index 856d18182ac4..ad565e8e75b5 100644 --- a/src/yb/cdc/xcluster_producer-test.cc +++ b/src/yb/cdc/xcluster_producer-test.cc @@ -19,12 +19,11 @@ #include "yb/client/schema.h" #include "yb/client/table_handle.h" +#include "yb/client/xcluster_client.h" #include "yb/client/yb_op.h" #include "yb/client/session.h" -#include "yb/integration-tests/cdc_test_util.h" #include "yb/integration-tests/mini_cluster.h" #include "yb/integration-tests/yb_mini_cluster_test_base.h" -#include "yb/master/catalog_manager.h" #include "yb/master/master_auto_flags_manager.h" #include "yb/master/master_cluster.pb.h" #include "yb/master/mini_master.h" @@ -78,7 +77,8 @@ class XClusterProducerTest : public MiniClusterTestWithClient { &client_->proxy_cache(), HostPort::FromBoundEndpoint(tablet_server_->bound_rpc_addr())); ASSERT_OK(CreateTable()); - stream_id_ = ASSERT_RESULT(CreateCDCStream()); + stream_id_ = ASSERT_RESULT(client::XClusterClient(*client_).CreateXClusterStream( + table_->id(), /*active=*/true, cdc::StreamModeTransactional::kFalse)); } Status CreateTable() { @@ -99,23 +99,6 @@ class XClusterProducerTest : public MiniClusterTestWithClient { return Status::OK(); } - Result CreateCDCStream() { - CreateCDCStreamRequestPB req; - CreateCDCStreamResponsePB resp; - req.set_table_id(table_->id()); - req.set_source_type(XCLUSTER); - req.set_checkpoint_type(IMPLICIT); - req.set_record_format(WAL); - - rpc::RpcController rpc; - RETURN_NOT_OK(cdc_proxy_->CreateCDCStream(req, &resp, &rpc)); - if (resp.has_error()) { - return StatusFromPB(resp.error().status()); - } - - return xrepl::StreamId::FromString(resp.stream_id()); - } - Status InsertRows(uint32_t start, uint32_t end) { return WriteRows(start, end, /* delete_op */ false); } diff --git a/src/yb/cdc/xcluster_producer_bootstrap.cc b/src/yb/cdc/xcluster_producer_bootstrap.cc index 19bbb84e2659..1705366e8139 100644 --- a/src/yb/cdc/xcluster_producer_bootstrap.cc +++ b/src/yb/cdc/xcluster_producer_bootstrap.cc @@ -16,6 +16,7 @@ #include "yb/cdc/cdc_service_context.h" #include "yb/cdc/cdc_state_table.h" #include "yb/client/meta_cache.h" +#include "yb/client/xcluster_client.h" #include "yb/consensus/consensus.h" #include "yb/consensus/log.h" #include "yb/consensus/log_cache.h" @@ -140,17 +141,16 @@ Status XClusterProducerBootstrap::CreateAllBootstrapStreams() { return Status::OK(); } - std::unordered_map options; - options.reserve(2); - options.emplace(cdc::kRecordType, CDCRecordType_Name(cdc::CDCRecordType::CHANGE)); - options.emplace(cdc::kRecordFormat, CDCRecordFormat_Name(cdc::CDCRecordFormat::WAL)); - // Generate bootstrap ids & setup the CDC streams, for use with the XCluster Consumer. for (const auto& table_id : req_.table_ids()) { - // Mark this stream as being bootstrapped, to help in finding dangling streams. + // Mark this stream as being bootstrapped, to help in finding dangling streams. Set state to + // inactive since it will take some time for the streams to be used. When the target cluster is + // setup it will switch the streams to active state. Transactional flag will also get set at + // that point. // TODO: Turn this into a batch RPC. const auto& bootstrap_id = VERIFY_RESULT( - cdc_service_->client()->CreateCDCStream(table_id, options, /* active */ false)); + client::XClusterClient(*cdc_service_->client()) + .CreateXClusterStream(table_id, /* active */ false, StreamModeTransactional::kFalse)); creation_state_->created_cdc_streams.push_back(bootstrap_id); bootstrap_ids_and_tables_.emplace_back(bootstrap_id, table_id); diff --git a/src/yb/cdc/xrepl_stream_metadata.cc b/src/yb/cdc/xrepl_stream_metadata.cc index 3e40d791e5c6..99aa4a014c52 100644 --- a/src/yb/cdc/xrepl_stream_metadata.cc +++ b/src/yb/cdc/xrepl_stream_metadata.cc @@ -36,6 +36,8 @@ namespace { // This function is to handle the upgrade scenario where the DB is upgraded from a version // without CDCSDK changes to the one with it. So in case some required options are missing, // the default values will be added for the same. +// (DEPRECATE_EOL 2024.1) This can be removed since XClusterSourceManager populates these defaults +// on new streams and CDCStreamLoader backfills them for older streams. void AddDefaultOptionsIfMissing(std::unordered_map* options) { InsertIfNotPresent(options, kSourceType, CDCRequestSource_Name(CDCRequestSource::XCLUSTER)); InsertIfNotPresent(options, kCheckpointType, CDCCheckpointType_Name(CDCCheckpointType::IMPLICIT)); diff --git a/src/yb/client/CMakeLists.txt b/src/yb/client/CMakeLists.txt index 12c54cc577b0..4da1ff4186b7 100644 --- a/src/yb/client/CMakeLists.txt +++ b/src/yb/client/CMakeLists.txt @@ -70,6 +70,7 @@ set(CLIENT_SRCS set(CLIENT_LIBS yb_common + cdc_service_proto master_proto master_rpc master_util diff --git a/src/yb/client/client-internal.cc b/src/yb/client/client-internal.cc index 109fe0a8af7e..70318ba0d5fc 100644 --- a/src/yb/client/client-internal.cc +++ b/src/yb/client/client-internal.cc @@ -1677,20 +1677,18 @@ void GetColocatedTabletSchemaRpc::ProcessResponse(const Status& status) { user_cb_.Run(new_status); } -class CreateCDCStreamRpc +class CreateXClusterStreamRpc : public ClientMasterRpc { public: - CreateCDCStreamRpc( - YBClient* client, - CreateCDCStreamCallback user_cb, - const TableId& table_id, - const std::unordered_map& options, - cdc::StreamModeTransactional transactional, + CreateXClusterStreamRpc( + YBClient* client, CreateCDCStreamCallback user_cb, const TableId& table_id, + const google::protobuf::RepeatedPtrField& options, + master::SysCDCStreamEntryPB::State state, cdc::StreamModeTransactional transactional, CoarseTimePoint deadline); string ToString() const override; - virtual ~CreateCDCStreamRpc(); + virtual ~CreateXClusterStreamRpc(); private: void CallRemoteMethod() override; @@ -1700,40 +1698,33 @@ class CreateCDCStreamRpc const std::string table_id_; }; -CreateCDCStreamRpc::CreateCDCStreamRpc( - YBClient* client, - CreateCDCStreamCallback user_cb, - const TableId& table_id, - const std::unordered_map& options, - cdc::StreamModeTransactional transactional, +CreateXClusterStreamRpc::CreateXClusterStreamRpc( + YBClient* client, CreateCDCStreamCallback user_cb, const TableId& table_id, + const google::protobuf::RepeatedPtrField& options, + master::SysCDCStreamEntryPB::State state, cdc::StreamModeTransactional transactional, CoarseTimePoint deadline) - : ClientMasterRpc(client, deadline), - user_cb_(std::move(user_cb)), - table_id_(table_id) { + : ClientMasterRpc(client, deadline), user_cb_(std::move(user_cb)), table_id_(table_id) { req_.set_table_id(table_id_); - req_.mutable_options()->Reserve(narrow_cast(options.size())); + // (DEPRECATE_EOL 2024.1) Master now sets the options explicitly, so they do not need to be sent. + *req_.mutable_options() = options; req_.set_transactional(transactional); - for (const auto& option : options) { - auto* op = req_.add_options(); - op->set_key(option.first); - op->set_value(option.second); - } + req_.set_initial_state(state); } -CreateCDCStreamRpc::~CreateCDCStreamRpc() { -} +CreateXClusterStreamRpc::~CreateXClusterStreamRpc() {} -void CreateCDCStreamRpc::CallRemoteMethod() { +void CreateXClusterStreamRpc::CallRemoteMethod() { master_replication_proxy()->CreateCDCStreamAsync( req_, &resp_, mutable_retrier()->mutable_controller(), - std::bind(&CreateCDCStreamRpc::Finished, this, Status::OK())); + std::bind(&CreateXClusterStreamRpc::Finished, this, Status::OK())); } -string CreateCDCStreamRpc::ToString() const { - return Substitute("CreateCDCStream(table_id: $0, num_attempts: $1)", table_id_, num_attempts()); +string CreateXClusterStreamRpc::ToString() const { + return Substitute( + "CreateXClusterStream(table_id: $0, num_attempts: $1)", table_id_, num_attempts()); } -void CreateCDCStreamRpc::ProcessResponse(const Status& status) { +void CreateXClusterStreamRpc::ProcessResponse(const Status& status) { if (status.ok()) { user_cb_(xrepl::StreamId::FromString(resp_.stream_id())); } else { @@ -2513,15 +2504,13 @@ Result YBClient::Data::WaitUntilIndexPermissionsAtLeast( return actual_index_permissions; } -void YBClient::Data::CreateCDCStream( - YBClient* client, - const TableId& table_id, - const std::unordered_map& options, - cdc::StreamModeTransactional transactional, - CoarseTimePoint deadline, - CreateCDCStreamCallback callback) { - auto rpc = StartRpc( - client, callback, table_id, options, transactional, deadline); +void YBClient::Data::CreateXClusterStream( + YBClient* client, const TableId& table_id, + const google::protobuf::RepeatedPtrField& options, + master::SysCDCStreamEntryPB::State state, cdc::StreamModeTransactional transactional, + CoarseTimePoint deadline, CreateCDCStreamCallback callback) { + auto rpc = StartRpc( + client, callback, table_id, options, state, transactional, deadline); } void YBClient::Data::DeleteCDCStream( diff --git a/src/yb/client/client-internal.h b/src/yb/client/client-internal.h index 491d1b97d119..9180ac031abb 100644 --- a/src/yb/client/client-internal.h +++ b/src/yb/client/client-internal.h @@ -309,12 +309,11 @@ class YBClient::Data { const CoarseTimePoint deadline, const CoarseDuration max_wait = std::chrono::seconds(2)); - void CreateCDCStream(YBClient* client, - const TableId& table_id, - const std::unordered_map& options, - cdc::StreamModeTransactional transactional, - CoarseTimePoint deadline, - CreateCDCStreamCallback callback); + void CreateXClusterStream( + YBClient* client, const TableId& table_id, + const google::protobuf::RepeatedPtrField& options, + master::SysCDCStreamEntryPB::State state, cdc::StreamModeTransactional transactional, + CoarseTimePoint deadline, CreateCDCStreamCallback callback); void DeleteCDCStream( YBClient* client, diff --git a/src/yb/client/client-test.cc b/src/yb/client/client-test.cc index abf767ff6e66..67e22e7e2716 100644 --- a/src/yb/client/client-test.cc +++ b/src/yb/client/client-test.cc @@ -54,6 +54,7 @@ #include "yb/client/table_info.h" #include "yb/client/tablet_server.h" #include "yb/client/value.h" +#include "yb/client/xcluster_client.h" #include "yb/client/yb_op.h" #include "yb/dockv/partial_row.h" @@ -1682,38 +1683,27 @@ TEST_F(ClientTest, TestGetTableSchemaByIdMissingTable) { ASSERT_TRUE(TableNotFound(s)) << s; } -TEST_F(ClientTest, TestCreateCDCStreamAsync) { - std::promise> promise; - std::unordered_map options; - client_->CreateCDCStream( - client_table_.table()->id(), options, cdc::StreamModeTransactional::kFalse, - [&promise](const auto& stream) { promise.set_value(stream); }); - auto stream = promise.get_future().get(); - ASSERT_OK(stream); - ASSERT_FALSE(stream->IsNil()); +TEST_F(ClientTest, TestCreateXClusterStream) { + auto stream_id = ASSERT_RESULT(XClusterClient(*client_).CreateXClusterStream( + client_table_.table()->id(), /*active=*/true, cdc::StreamModeTransactional::kFalse)); + ASSERT_FALSE(stream_id.IsNil()); } -TEST_F(ClientTest, TestCreateCDCStreamMissingTable) { - std::promise> promise; - std::unordered_map options; - client_->CreateCDCStream( - "MissingTableId", options, cdc::StreamModeTransactional::kFalse, - [&promise](const auto& stream) { promise.set_value(stream); }); - auto stream = promise.get_future().get(); - ASSERT_NOK(stream); - ASSERT_TRUE(TableNotFound(stream.status())) << stream.status(); +TEST_F(ClientTest, TestCreateXClusterStreamMissingTable) { + auto result = XClusterClient(*client_).CreateXClusterStream( + "MissingTableId", /*active=*/true, cdc::StreamModeTransactional::kFalse); + ASSERT_NOK(result); + ASSERT_TRUE(TableNotFound(result.status())) << result.status(); } -TEST_F(ClientTest, TestDeleteCDCStreamAsync) { - std::unordered_map options; - auto result = client_->CreateCDCStream( - client_table_.table()->id(), options, cdc::StreamModeTransactional::kFalse); - ASSERT_TRUE(result.ok()); +TEST_F(ClientTest, TestDeleteXClusterStream) { + auto stream_id = ASSERT_RESULT(XClusterClient(*client_).CreateXClusterStream( + client_table_.table()->id(), /*active=*/true, cdc::StreamModeTransactional::kFalse)); - // Delete the created CDC stream. - Synchronizer sync; - client_->DeleteCDCStream(*result, sync.AsStatusCallback()); - ASSERT_OK(sync.Wait()); + ASSERT_NOK_STR_CONTAINS( + client_->DeleteCDCStream(stream_id, /*force_delete=*/false), + "Cannot delete an xCluster Stream in replication"); + ASSERT_OK(client_->DeleteCDCStream(stream_id, /*force_delete=*/true)); } TEST_F(ClientTest, TestDeleteCDCStreamMissingId) { diff --git a/src/yb/client/client.cc b/src/yb/client/client.cc index 18084c3cab8f..efff1546ac91 100644 --- a/src/yb/client/client.cc +++ b/src/yb/client/client.cc @@ -1477,40 +1477,6 @@ Status YBClient::DeleteUDType(const std::string& namespace_name, return Status::OK(); } -Result YBClient::CreateCDCStream( - const TableId& table_id, - const std::unordered_map& options, - bool active, - const xrepl::StreamId& db_stream_id) { - // Setting up request. - CreateCDCStreamRequestPB req; - req.set_table_id(table_id); - if (db_stream_id) { - req.set_db_stream_id(db_stream_id.ToString()); - } - req.mutable_options()->Reserve(narrow_cast(options.size())); - for (const auto& option : options) { - auto new_option = req.add_options(); - new_option->set_key(option.first); - new_option->set_value(option.second); - } - req.set_initial_state(active ? master::SysCDCStreamEntryPB::ACTIVE - : master::SysCDCStreamEntryPB::INITIATED); - - CreateCDCStreamResponsePB resp; - CALL_SYNC_LEADER_MASTER_RPC_EX(Replication, req, resp, CreateCDCStream); - return xrepl::StreamId::FromString(resp.stream_id()); -} - -void YBClient::CreateCDCStream( - const TableId& table_id, - const std::unordered_map& options, - cdc::StreamModeTransactional transactional, - CreateCDCStreamCallback callback) { - auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout(); - data_->CreateCDCStream(this, table_id, options, transactional, deadline, callback); -} - Result YBClient::CreateCDCSDKStreamForNamespace( const NamespaceId& namespace_id, const std::unordered_map& options, diff --git a/src/yb/client/client.h b/src/yb/client/client.h index 20c14ada2558..13101aca1a57 100644 --- a/src/yb/client/client.h +++ b/src/yb/client/client.h @@ -597,19 +597,6 @@ class YBClient { // CDC Stream related methods. - // Create a new CDC stream. - Result CreateCDCStream( - const TableId& table_id, - const std::unordered_map& options, - bool active = true, - const xrepl::StreamId& db_stream_id = xrepl::StreamId::Nil()); - - void CreateCDCStream( - const TableId& table_id, - const std::unordered_map& options, - cdc::StreamModeTransactional transactional, - CreateCDCStreamCallback callback); - Result CreateCDCSDKStreamForNamespace( const NamespaceId& namespace_id, const std::unordered_map& options, bool populate_namespace_id_as_table_id = false, diff --git a/src/yb/client/xcluster_client.cc b/src/yb/client/xcluster_client.cc index 31506a86c00f..89af4d6d73aa 100644 --- a/src/yb/client/xcluster_client.cc +++ b/src/yb/client/xcluster_client.cc @@ -13,6 +13,7 @@ #include "yb/client/xcluster_client.h" +#include "yb/cdc/cdc_service.pb.h" #include "yb/client/client.h" #include "yb/client/client-internal.h" #include "yb/master/master_defaults.h" @@ -34,13 +35,16 @@ DECLARE_bool(use_node_to_node_encryption); namespace yb::client { XClusterClient::XClusterClient(client::YBClient& yb_client) : yb_client_(yb_client) {} +CoarseTimePoint XClusterClient::GetDeadline() const { + return CoarseMonoClock::Now() + yb_client_.default_admin_operation_timeout(); +} + template Result XClusterClient::SyncLeaderMasterRpc( const RequestPB& req, const char* method_name, const Method& method) { ResponsePB resp; - RETURN_NOT_OK(yb_client_.data_->SyncLeaderMasterRpc( - CoarseMonoClock::Now() + yb_client_.default_admin_operation_timeout(), req, &resp, - method_name, method)); + RETURN_NOT_OK( + yb_client_.data_->SyncLeaderMasterRpc(GetDeadline(), req, &resp, method_name, method)); return resp; } @@ -334,6 +338,27 @@ Status XClusterClient::RepairOutboundXClusterReplicationGroupRemoveTable( return Status::OK(); } +Result XClusterClient::CreateXClusterStream( + const TableId& table_id, bool active, cdc::StreamModeTransactional transactional) { + std::promise> promise; + auto future = promise.get_future(); + + CreateXClusterStreamAsync( + table_id, active, transactional, + [&promise](Result result) { promise.set_value(std::move(result)); }); + + return future.get(); +} + +void XClusterClient::CreateXClusterStreamAsync( + const TableId& table_id, bool active, cdc::StreamModeTransactional transactional, + CreateCDCStreamCallback callback) { + yb_client_.data_->CreateXClusterStream( + &yb_client_, table_id, GetXClusterStreamOptions(), + (active ? master::SysCDCStreamEntryPB::ACTIVE : master::SysCDCStreamEntryPB::INITIATED), + transactional, GetDeadline(), std::move(callback)); +} + XClusterRemoteClient::XClusterRemoteClient(const std::string& certs_for_cdc_dir, MonoDelta timeout) : certs_for_cdc_dir_(certs_for_cdc_dir), timeout_(timeout) {} @@ -531,4 +556,26 @@ Status XClusterRemoteClient::AddNamespaceToDbScopedUniverseReplication( return Status::OK(); } +google::protobuf::RepeatedPtrField GetXClusterStreamOptions() { + google::protobuf::RepeatedPtrField<::yb::master::CDCStreamOptionsPB> options; + options.Reserve(4); + auto source_type = options.Add(); + source_type->set_key(cdc::kSourceType); + source_type->set_value(CDCRequestSource_Name(cdc::CDCRequestSource::XCLUSTER)); + + auto record_type = options.Add(); + record_type->set_key(cdc::kRecordType); + record_type->set_value(CDCRecordType_Name(cdc::CDCRecordType::CHANGE)); + + auto record_format = options.Add(); + record_format->set_key(cdc::kRecordFormat); + record_format->set_value(CDCRecordFormat_Name(cdc::CDCRecordFormat::WAL)); + + auto checkpoint_type = options.Add(); + checkpoint_type->set_key(cdc::kCheckpointType); + checkpoint_type->set_value(CDCCheckpointType_Name(cdc::CDCCheckpointType::IMPLICIT)); + + return options; +} + } // namespace yb::client diff --git a/src/yb/client/xcluster_client.h b/src/yb/client/xcluster_client.h index 629bbc2504fe..4f3ed01ca6ad 100644 --- a/src/yb/client/xcluster_client.h +++ b/src/yb/client/xcluster_client.h @@ -31,6 +31,7 @@ YB_STRONGLY_TYPED_UUID_DECL(UniverseUuid); class IsOperationDoneResult; namespace master { +class CDCStreamOptionsPB; class MasterReplicationProxy; class GetXClusterStreamsResponsePB; } // namespace master @@ -127,7 +128,16 @@ class XClusterClient { Status RepairOutboundXClusterReplicationGroupRemoveTable( const xcluster::ReplicationGroupId& replication_group_id, const TableId& table_id); + Result CreateXClusterStream( + const TableId& table_id, bool active, cdc::StreamModeTransactional transactional); + + void CreateXClusterStreamAsync( + const TableId& table_id, bool active, cdc::StreamModeTransactional transactional, + CreateCDCStreamCallback callback); + private: + CoarseTimePoint GetDeadline() const; + template Result SyncLeaderMasterRpc( const RequestPB& req, const char* method_name, const Method& method); @@ -186,5 +196,8 @@ class XClusterRemoteClient { std::unique_ptr xcluster_client_; }; +// TODO: Move xcluster_util to common and this into it. +google::protobuf::RepeatedPtrField GetXClusterStreamOptions(); + } // namespace client } // namespace yb diff --git a/src/yb/integration-tests/cdc_service-int-test.cc b/src/yb/integration-tests/cdc_service-int-test.cc index c224004967d4..764593862912 100644 --- a/src/yb/integration-tests/cdc_service-int-test.cc +++ b/src/yb/integration-tests/cdc_service-int-test.cc @@ -494,7 +494,7 @@ TEST_F(CDCServiceTest, TestCompoundKey) { ASSERT_OK(table.Create(kTableNameCompoundKey, tablet_count(), client_.get(), &builder)); // Create a stream on the table - stream_id_ = ASSERT_RESULT(CreateCDCStream(cdc_proxy_, table.table()->id())); + stream_id_ = ASSERT_RESULT(CreateXClusterStream(*client_, table.table()->id())); std::string tablet_id = GetTablet(table.name()); @@ -553,8 +553,21 @@ TEST_F(CDCServiceTest, TestCompoundKey) { } } -TEST_F(CDCServiceTest, TestCreateCDCStream) { - stream_id_ = ASSERT_RESULT(CreateCDCStream(cdc_proxy_, table_.table()->id())); +TEST_F(CDCServiceTest, TestCreateXClusterStream) { + const auto& table_id = table_.table()->id(); + + // Creating a xCluster stream on the cdc_service RPC should fail. + RpcController rpc; + CreateCDCStreamRequestPB create_req; + CreateCDCStreamResponsePB create_resp; + create_req.set_table_id(table_id); + ASSERT_OK(cdc_proxy_->CreateCDCStream(create_req, &create_resp, &rpc)); + ASSERT_TRUE(create_resp.has_error()); + ASSERT_STR_CONTAINS( + create_resp.error().status().message(), "xCluster stream should be created on master"); + + // Creating a xCluster stream via the client on the master should succeed. + stream_id_ = ASSERT_RESULT(CreateXClusterStream(*client_, table_id)); NamespaceId ns_id; std::vector table_ids; @@ -564,11 +577,11 @@ TEST_F(CDCServiceTest, TestCreateCDCStream) { ASSERT_EQ(table_ids.front(), table_.table()->id()); } -TEST_F(CDCServiceTest, TestCreateCDCStreamWithDefaultRententionTime) { +TEST_F(CDCServiceTest, TestCreateXClusterStreamWithDefaultRententionTime) { // Set default WAL retention time to 10 hours. ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_wal_retention_time_secs) = 36000; - stream_id_ = ASSERT_RESULT(CreateCDCStream(cdc_proxy_, table_.table()->id())); + stream_id_ = ASSERT_RESULT(CreateXClusterStream(*client_, table_.table()->id())); NamespaceId ns_id; std::vector table_ids; @@ -580,9 +593,9 @@ TEST_F(CDCServiceTest, TestCreateCDCStreamWithDefaultRententionTime) { VerifyWalRetentionTime(cluster_.get(), kCDCTestTableName, FLAGS_cdc_wal_retention_time_secs); } -TEST_F(CDCServiceTest, TestDeleteCDCStream) { +TEST_F(CDCServiceTest, TestDeleteXClusterStream) { ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_state_checkpoint_update_interval_ms) = 0; - stream_id_ = ASSERT_RESULT(CreateCDCStream(cdc_proxy_, table_.table()->id())); + stream_id_ = ASSERT_RESULT(CreateXClusterStream(*client_, table_.table()->id())); NamespaceId ns_id; std::vector table_ids; @@ -635,7 +648,7 @@ TEST_F(CDCServiceTest, TestDeleteCDCStream) { TEST_F(CDCServiceTest, TestSafeTime) { docdb::DisableYcqlPackedRow(); - stream_id_ = ASSERT_RESULT(CreateCDCStream(cdc_proxy_, table_.table()->id())); + stream_id_ = ASSERT_RESULT(CreateXClusterStream(*client_, table_.table()->id())); ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_collect_cdc_metrics) = true; std::string tablet_id = GetTablet(); @@ -686,7 +699,7 @@ TEST_F(CDCServiceTest, TestSafeTime) { } TEST_F(CDCServiceTest, TestMetricsOnDeletedReplication) { - stream_id_ = ASSERT_RESULT(CreateCDCStream(cdc_proxy_, table_.table()->id())); + stream_id_ = ASSERT_RESULT(CreateXClusterStream(*client_, table_.table()->id())); ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_collect_cdc_metrics) = true; std::string tablet_id = GetTablet(); @@ -748,7 +761,7 @@ TEST_F(CDCServiceTest, TestMetricsOnDeletedReplication) { } TEST_F(CDCServiceTest, TestWALPrematureGCErrorCode) { - stream_id_ = ASSERT_RESULT(CreateCDCStream(cdc_proxy_, table_.table()->id())); + stream_id_ = ASSERT_RESULT(CreateXClusterStream(*client_, table_.table()->id())); std::string tablet_id = GetTablet(); auto tablet_peer = ASSERT_RESULT( cluster_->mini_tablet_server(0)->server()->tablet_manager()->GetTablet(tablet_id)); @@ -772,7 +785,7 @@ TEST_F(CDCServiceTest, TestWALPrematureGCErrorCode) { TEST_F(CDCServiceTest, TestGetChanges) { docdb::DisableYcqlPackedRow(); - stream_id_ = ASSERT_RESULT(CreateCDCStream(cdc_proxy_, table_.table()->id())); + stream_id_ = ASSERT_RESULT(CreateXClusterStream(*client_, table_.table()->id())); ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_collect_cdc_metrics) = true; std::string tablet_id = GetTablet(); @@ -909,7 +922,7 @@ TEST_F(CDCServiceTest, TestGetChanges) { TEST_F(CDCServiceTest, YB_DISABLE_TEST_ON_MACOS(TestGetChangesWithDeadline)) { ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_populate_end_markers_transactions) = false; docdb::DisableYcqlPackedRow(); - stream_id_ = ASSERT_RESULT(CreateCDCStream(cdc_proxy_, table_.table()->id())); + stream_id_ = ASSERT_RESULT(CreateXClusterStream(*client_, table_.table()->id())); ANNOTATE_UNPROTECTED_WRITE(FLAGS_log_segment_size_bytes) = 100; ANNOTATE_UNPROTECTED_WRITE(FLAGS_get_changes_honor_deadline) = true; ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_read_safe_deadline_ratio) = 0.30; @@ -994,7 +1007,7 @@ TEST_F(CDCServiceTest, TestGetChangesInvalidStream) { } TEST_F(CDCServiceTest, TestGetCheckpoint) { - stream_id_ = ASSERT_RESULT(CreateCDCStream(cdc_proxy_, table_.table()->id())); + stream_id_ = ASSERT_RESULT(CreateXClusterStream(*client_, table_.table()->id())); std::string tablet_id = GetTablet(); @@ -1070,7 +1083,7 @@ TEST_F(CDCServiceTestMultipleServersOneTablet, TestGetChangesRpcTabletConsensusI endpoint = HostPort::FromBoundEndpoint( cluster_->mini_tablet_server(follower_idx[0])->bound_rpc_addr()); cdc_proxy_ = std::make_unique(&client_->proxy_cache(), endpoint); - stream_id_ = ASSERT_RESULT(CreateCDCStream(cdc_proxy_, table_.table()->id(), cdc::CDCSDK)); + stream_id_ = ASSERT_RESULT(CreateXClusterStream(*client_, table_.table()->id())); GetChangesResponsePB change_resp; ASSERT_NO_FATALS(GetAllChanges(GetTablet(), stream_id_, &change_resp)); @@ -1103,7 +1116,7 @@ TEST_F(CDCServiceTestMultipleServersOneTablet, TestMetricsAfterServerFailure) { // Test that the metric value is not time since epoch after a leadership change. docdb::DisableYcqlPackedRow(); SetAtomicFlag(0, &FLAGS_cdc_state_checkpoint_update_interval_ms); - stream_id_ = ASSERT_RESULT(CreateCDCStream(cdc_proxy_, table_.table()->id())); + stream_id_ = ASSERT_RESULT(CreateXClusterStream(*client_, table_.table()->id())); ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_collect_cdc_metrics) = false; std::string tablet_id = GetTablet(); @@ -1150,7 +1163,7 @@ TEST_F(CDCServiceTestMultipleServersOneTablet, TestUpdateLagMetrics) { // Enable BG thread to generate metrics. SetAtomicFlag(true, &FLAGS_enable_collect_cdc_metrics); - stream_id_ = ASSERT_RESULT(CreateCDCStream(cdc_proxy_, table_.table()->id())); + stream_id_ = ASSERT_RESULT(CreateXClusterStream(*client_, table_.table()->id())); std::string tablet_id = GetTablet(); TabletStreamInfo tablet_info = {stream_id_, tablet_id}; const auto& tservers = cluster_->mini_tablet_servers(); @@ -1277,7 +1290,7 @@ TEST_F(CDCServiceTestMultipleServersOneTablet, TestMetricsUponRegainingLeadershi SetAtomicFlag(1000, &FLAGS_min_leader_stepdown_retry_interval_ms); // Trigger metrics updates manually in the test, instead of relying on background thread. SetAtomicFlag(false, &FLAGS_enable_collect_cdc_metrics); - stream_id_ = ASSERT_RESULT(CreateCDCStream(cdc_proxy_, table_.table()->id())); + stream_id_ = ASSERT_RESULT(CreateXClusterStream(*client_, table_.table()->id())); std::string tablet_id = GetTablet(); const auto& tservers = cluster_->mini_tablet_servers(); @@ -1399,7 +1412,7 @@ class CDCServiceTestMultipleServers : public CDCServiceTest { }; TEST_F(CDCServiceTestMultipleServers, TestListTablets) { - stream_id_ = ASSERT_RESULT(CreateCDCStream(cdc_proxy_, table_.table()->id())); + stream_id_ = ASSERT_RESULT(CreateXClusterStream(*client_, table_.table()->id())); std::string tablet_id = GetTablet(); @@ -1444,7 +1457,7 @@ TEST_F(CDCServiceTestMultipleServers, TestListTablets) { TEST_F(CDCServiceTestMultipleServers, TestGetChangesProxyRouting) { docdb::DisableYcqlPackedRow(); - stream_id_ = ASSERT_RESULT(CreateCDCStream(cdc_proxy_, table_.table()->id())); + stream_id_ = ASSERT_RESULT(CreateXClusterStream(*client_, table_.table()->id())); // Figure out [1] all tablets and [2] which ones are local to the first server. std::vector local_tablets, all_tablets; @@ -1545,7 +1558,7 @@ TEST_F(CDCServiceTestMultipleServers, TestGetChangesProxyRouting) { TEST_F(CDCServiceTest, TestOnlyGetLocalChanges) { docdb::DisableYcqlPackedRow(); - stream_id_ = ASSERT_RESULT(CreateCDCStream(cdc_proxy_, table_.table()->id())); + stream_id_ = ASSERT_RESULT(CreateXClusterStream(*client_, table_.table()->id())); std::string tablet_id = GetTablet(); @@ -1659,7 +1672,7 @@ TEST_F(CDCServiceTest, TestOnlyGetLocalChanges) { } TEST_F(CDCServiceTest, TestCheckpointUpdatedForRemoteRows) { - stream_id_ = ASSERT_RESULT(CreateCDCStream(cdc_proxy_, table_.table()->id())); + stream_id_ = ASSERT_RESULT(CreateXClusterStream(*client_, table_.table()->id())); std::string tablet_id = GetTablet(); @@ -1719,7 +1732,7 @@ TEST_F(CDCServiceTest, TestCheckpointUpdate) { docdb::DisableYcqlPackedRow(); ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_state_checkpoint_update_interval_ms) = 0; - stream_id_ = ASSERT_RESULT(CreateCDCStream(cdc_proxy_, table_.table()->id())); + stream_id_ = ASSERT_RESULT(CreateXClusterStream(*client_, table_.table()->id())); std::string tablet_id = GetTablet(); @@ -1840,7 +1853,7 @@ class CDCServiceTestMaxRentionTime : public CDCServiceTest { TEST_F(CDCServiceTestMaxRentionTime, TestLogRetentionByOpId_MaxRentionTime) { docdb::DisableYcqlPackedRow(); - stream_id_ = ASSERT_RESULT(CreateCDCStream(cdc_proxy_, table_.table()->id())); + stream_id_ = ASSERT_RESULT(CreateXClusterStream(*client_, table_.table()->id())); std::string tablet_id = GetTablet(); @@ -1979,7 +1992,7 @@ TEST_F(CDCServiceTestDurableMinReplicatedIndex, TestBootstrapProducer) { } TEST_F(CDCServiceTestDurableMinReplicatedIndex, TestLogCDCMinReplicatedIndexIsDurable) { - stream_id_ = ASSERT_RESULT(CreateCDCStream(cdc_proxy_, table_.table()->id())); + stream_id_ = ASSERT_RESULT(CreateXClusterStream(*client_, table_.table()->id())); std::string tablet_id = GetTablet(); @@ -2038,7 +2051,7 @@ class CDCServiceTestMinSpace : public CDCServiceTest { TEST_F(CDCServiceTestMinSpace, TestLogRetentionByOpId_MinSpace) { docdb::DisableYcqlPackedRow(); - stream_id_ = ASSERT_RESULT(CreateCDCStream(cdc_proxy_, table_.table()->id())); + stream_id_ = ASSERT_RESULT(CreateXClusterStream(*client_, table_.table()->id())); std::string tablet_id = GetTablet(); @@ -2109,7 +2122,7 @@ TEST_F(CDCLogAndMetaIndex, TestLogAndMetaCdcIndex) { std::vector stream_ids; for (int i = 0; i < kNStreams; i++) { - stream_ids.push_back(ASSERT_RESULT(CreateCDCStream(cdc_proxy_, table_.table()->id()))); + stream_ids.push_back(ASSERT_RESULT(CreateXClusterStream(*client_, table_.table()->id()))); } std::string tablet_id = GetTablet(); @@ -2173,7 +2186,7 @@ TEST_F(CDCLogAndMetaIndexReset, TestLogAndMetaCdcIndexAreReset) { std::vector stream_ids; for (int i = 0; i < kNStreams; i++) { - stream_ids.push_back(ASSERT_RESULT(CreateCDCStream(cdc_proxy_, table_.table()->id()))); + stream_ids.push_back(ASSERT_RESULT(CreateXClusterStream(*client_, table_.table()->id()))); } std::string tablet_id = GetTablet(); @@ -2313,7 +2326,7 @@ TEST_F(CDCServiceTestThreeServers, TestNewLeaderUpdatesLogCDCAppliedIndex) { } LOG(INFO) << "Inserted " << kNRecords << " records"; - stream_id_ = ASSERT_RESULT(CreateCDCStream(cdc_proxy_, table_.table()->id())); + stream_id_ = ASSERT_RESULT(CreateXClusterStream(*client_, table_.table()->id())); LOG(INFO) << "Created cdc stream " << stream_id_; std::shared_ptr tablet_peer; @@ -2389,7 +2402,7 @@ class CDCServiceLowRpc: public CDCServiceTest { TEST_F(CDCServiceLowRpc, TestGetChangesRpcMax) { docdb::DisableYcqlPackedRow(); - stream_id_ = ASSERT_RESULT(CreateCDCStream(cdc_proxy_, table_.table()->id())); + stream_id_ = ASSERT_RESULT(CreateXClusterStream(*client_, table_.table()->id())); std::string tablet_id = GetTablet(); @@ -2446,7 +2459,7 @@ TEST_F(CDCServiceTestThreeServers, TestCheckpointIsMinOverMultipleStreams) { auto stream_id2 = xrepl::StreamId::GenerateRandom(); std::vector stream_ids{&stream_id1, &stream_id2, &stream_id_}; for (auto* stream_id : stream_ids) { - *stream_id = ASSERT_RESULT(CreateCDCStream(cdc_proxy_, table_.table()->id())); + *stream_id = ASSERT_RESULT(CreateXClusterStream(*client_, table_.table()->id())); } constexpr int kNRecords = 30; constexpr int kGettingLeaderTimeoutSecs = 20; diff --git a/src/yb/integration-tests/cdc_service-txn-test.cc b/src/yb/integration-tests/cdc_service-txn-test.cc index 26973c12588e..c2a71c982790 100644 --- a/src/yb/integration-tests/cdc_service-txn-test.cc +++ b/src/yb/integration-tests/cdc_service-txn-test.cc @@ -175,8 +175,7 @@ TEST_F(CDCServiceTxnTest, TestGetChanges) { auto schema = ASSERT_RESULT(tserver->tablet_manager()->GetTablet(tablet_id))->shared_tablet()->schema(); - // Create CDC stream on table. - auto stream_id = CreateCDCStream(cdc_proxy_, table_.table()->id()); + auto stream_id = CreateXClusterStream(*client_, table_.table()->id()); GetChangesRequestPB change_req; GetChangesResponsePB change_resp; @@ -252,8 +251,7 @@ TEST_F(CDCServiceTxnTest, TestGetChangesForPendingTransaction) { auto schema = ASSERT_RESULT(tserver->tablet_manager()->GetTablet(tablet_id))->shared_tablet()->schema(); - // Create CDC stream on table. - auto stream_id = ASSERT_RESULT(CreateCDCStream(cdc_proxy_, table_.table()->id())); + auto stream_id = ASSERT_RESULT(CreateXClusterStream(*client_, table_.table()->id())); GetChangesRequestPB change_req; GetChangesResponsePB change_resp; @@ -330,8 +328,7 @@ TEST_F(CDCServiceTxnTest, MetricsTest) { client_->GetTablets(table_->name(), 0, &tablets, /* partition_list_version =*/ nullptr)); ASSERT_EQ(tablets.size(), 1); - // Create CDC stream on table. - auto stream_id = ASSERT_RESULT(CreateCDCStream(cdc_proxy_, table_.table()->id())); + auto stream_id = ASSERT_RESULT(CreateXClusterStream(*client_, table_.table()->id())); auto tablet_id = tablets.Get(0).tablet_id(); diff --git a/src/yb/integration-tests/cdc_test_util.cc b/src/yb/integration-tests/cdc_test_util.cc index bf670796569f..fd95fad0b2ec 100644 --- a/src/yb/integration-tests/cdc_test_util.cc +++ b/src/yb/integration-tests/cdc_test_util.cc @@ -16,6 +16,7 @@ #include #include "yb/cdc/cdc_service.pb.h" +#include "yb/client/xcluster_client.h" #include "yb/consensus/log.h" #include "yb/rpc/rpc_controller.h" @@ -80,24 +81,10 @@ void AssertIntKey( ASSERT_EQ(int_val.int32_value(), value); } -Result CreateCDCStream( - const std::unique_ptr& cdc_proxy, - const TableId& table_id, - cdc::CDCRequestSource source_type) { - CreateCDCStreamRequestPB req; - CreateCDCStreamResponsePB resp; - req.set_table_id(table_id); - req.set_source_type(source_type); - req.set_checkpoint_type(IMPLICIT); - req.set_record_format(CDCRecordFormat::WAL); - - rpc::RpcController rpc; - RETURN_NOT_OK(cdc_proxy->CreateCDCStream(req, &resp, &rpc)); - if (resp.has_error()) { - return StatusFromPB(resp.error().status()); - } - - return xrepl::StreamId::FromString(resp.stream_id()); +Result CreateXClusterStream(client::YBClient& client, const TableId& table_id) { + // Test streams are used as soon as they are created so set state to active. + return client::XClusterClient(client).CreateXClusterStream( + table_id, /* active */ true, cdc::StreamModeTransactional::kFalse); } void WaitUntilWalRetentionSecs(std::function get_wal_retention_secs, diff --git a/src/yb/integration-tests/cdc_test_util.h b/src/yb/integration-tests/cdc_test_util.h index 1faf6eefd8aa..4a773283c30a 100644 --- a/src/yb/integration-tests/cdc_test_util.h +++ b/src/yb/integration-tests/cdc_test_util.h @@ -35,10 +35,7 @@ void AssertIntKey( const Schema& schema, const google::protobuf::RepeatedPtrField& key, int32_t value); -Result CreateCDCStream( - const std::unique_ptr& cdc_proxy, - const TableId& table_id, - cdc::CDCRequestSource source_type = XCLUSTER); +Result CreateXClusterStream(client::YBClient& client, const TableId& table_id); // For any tablet that belongs to a table whose name starts with 'table_name_start', this method // will verify that its WAL retention time matches the provided time. diff --git a/src/yb/integration-tests/cdcsdk_stream-test.cc b/src/yb/integration-tests/cdcsdk_stream-test.cc index 3bdb866ce36a..f6bd4e54e57c 100644 --- a/src/yb/integration-tests/cdcsdk_stream-test.cc +++ b/src/yb/integration-tests/cdcsdk_stream-test.cc @@ -368,18 +368,8 @@ TEST_F(CDCSDKStreamTest, CDCWithXclusterEnabled) { // Creating xCluster streams now. std::vector created_xcluster_streams; for (uint32_t i = 0; i < num_of_streams; ++i) { - RpcController rpc; - CreateCDCStreamRequestPB create_req; - CreateCDCStreamResponsePB create_resp; - - create_req.set_table_id(table.table_id()); - ASSERT_OK(cdc_proxy_->CreateCDCStream(create_req, &create_resp, &rpc)); - - // Assert that there is no DB stream ID in the response while creating xCluster stream. - ASSERT_FALSE(create_resp.has_db_stream_id()); - created_xcluster_streams.emplace_back( - ASSERT_RESULT(xrepl::StreamId::FromString(create_resp.stream_id()))); + ASSERT_RESULT(cdc::CreateXClusterStream(*test_client(), table.table_id()))); } std::sort(created_xcluster_streams.begin(), created_xcluster_streams.end()); diff --git a/src/yb/integration-tests/cdcsdk_ysql-test.cc b/src/yb/integration-tests/cdcsdk_ysql-test.cc index ca99f387aa21..a574614ad935 100644 --- a/src/yb/integration-tests/cdcsdk_ysql-test.cc +++ b/src/yb/integration-tests/cdcsdk_ysql-test.cc @@ -2850,7 +2850,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestXClusterLogGCedWithTabletBoot ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, /* partition_list_version=*/nullptr)); ASSERT_EQ(tablets.size(), num_tablets); - auto stream_id = ASSERT_RESULT(cdc::CreateCDCStream(cdc_proxy_, table_id)); + auto stream_id = ASSERT_RESULT(cdc::CreateXClusterStream(*test_client(), table_id)); // Insert some records. ASSERT_OK(WriteRows(0 /* start */, 100 /* end */, &test_cluster_)); diff --git a/src/yb/integration-tests/xcluster/xcluster-tablet-split-itest.cc b/src/yb/integration-tests/xcluster/xcluster-tablet-split-itest.cc index 7ed6bf8c04f8..e861b1a9f43a 100644 --- a/src/yb/integration-tests/xcluster/xcluster-tablet-split-itest.cc +++ b/src/yb/integration-tests/xcluster/xcluster-tablet-split-itest.cc @@ -22,6 +22,7 @@ #include "yb/client/client_fwd.h" #include "yb/client/session.h" #include "yb/client/table.h" +#include "yb/client/xcluster_client.h" #include "yb/client/yb_table_name.h" #include "yb/dockv/partition.h" #include "yb/common/ql_value.h" @@ -248,10 +249,11 @@ TEST_F(CdcTabletSplitITest, GetChangesOnSplitParentTablet) { docdb::DisableYcqlPackedRow(); ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_state_checkpoint_update_interval_ms) = 0; constexpr auto kNumRows = kDefaultNumRows; - // Create a cdc stream for this tablet. auto cdc_proxy = std::make_unique(&client_->proxy_cache(), HostPort::FromBoundEndpoint(cluster_->mini_tablet_servers().front()->bound_rpc_addr())); - auto stream_id = ASSERT_RESULT(cdc::CreateCDCStream(cdc_proxy, table_->id())); + // Create a xCluster stream for this tablet. + auto stream_id = ASSERT_RESULT(client::XClusterClient(*client_).CreateXClusterStream( + table_->id(), /*active=*/true, cdc::StreamModeTransactional::kFalse)); // Ensure that the cdc_state table is ready before inserting rows and splitting. ASSERT_OK(WaitForCdcStateTableToBeReady()); @@ -1201,7 +1203,8 @@ TEST_F(NotSupportedTabletSplitITest, SplittingWithCdcStream) { // Create a cdc stream for this tablet. auto cdc_proxy = std::make_unique(&client_->proxy_cache(), HostPort::FromBoundEndpoint(cluster_->mini_tablet_servers().front()->bound_rpc_addr())); - auto stream_id = ASSERT_RESULT(cdc::CreateCDCStream(cdc_proxy, table_->id())); + auto stream_id = ASSERT_RESULT(client::XClusterClient(*client_).CreateXClusterStream( + table_->id(), /*active=*/true, cdc::StreamModeTransactional::kFalse)); // Ensure that the cdc_state table is ready before inserting rows and splitting. ASSERT_OK(WaitForCdcStateTableToBeReady()); diff --git a/src/yb/master/master_xrepl-test.cc b/src/yb/master/master_xrepl-test.cc index 905fde9d73a6..fb8ad7a90255 100644 --- a/src/yb/master/master_xrepl-test.cc +++ b/src/yb/master/master_xrepl-test.cc @@ -13,7 +13,7 @@ #include #include -#include "yb/cdc/cdc_service.h" +#include "yb/cdc/cdc_service.pb.h" #include "yb/cdc/cdc_state_table.h" #include "yb/common/schema.h" @@ -79,10 +79,10 @@ class MasterTestXRepl : public MasterTestBase { cdc::CDCRecordType record_type = cdc::CDCRecordType::CHANGE); Result GetCDCStream(const xrepl::StreamId& stream_id); Result GetCDCStream(const std::string& cdcsdk_ysql_replication_slot_name); - Status DeleteCDCStream(const xrepl::StreamId& stream_id); + Status DeleteCDCStream(const xrepl::StreamId& stream_id, bool force = false); Result DeleteCDCStream( const std::vector& stream_ids, - const std::vector& cdcsdk_ysql_replication_slot_name); + const std::vector& cdcsdk_ysql_replication_slot_name, bool force = false); Result ListCDCStreams(); Result ListCDCSDKStreams(); Result IsObjectPartOfXRepl(const TableId& table_id); @@ -213,11 +213,15 @@ Result MasterTestXRepl::GetCDCStream( return resp; } -Status MasterTestXRepl::DeleteCDCStream(const xrepl::StreamId& stream_id) { +Status MasterTestXRepl::DeleteCDCStream(const xrepl::StreamId& stream_id, bool force) { DeleteCDCStreamRequestPB req; DeleteCDCStreamResponsePB resp; req.add_stream_id(stream_id.ToString()); + if (force) { + req.set_force_delete(true); + } + RETURN_NOT_OK(proxy_replication_->DeleteCDCStream(req, &resp, ResetAndGetController())); if (resp.has_error()) { RETURN_NOT_OK(StatusFromPB(resp.error().status())); @@ -227,7 +231,7 @@ Status MasterTestXRepl::DeleteCDCStream(const xrepl::StreamId& stream_id) { Result MasterTestXRepl::DeleteCDCStream( const std::vector& stream_ids, - const std::vector& cdcsdk_ysql_replication_slot_names) { + const std::vector& cdcsdk_ysql_replication_slot_names, bool force) { DeleteCDCStreamRequestPB req; DeleteCDCStreamResponsePB resp; for (const auto& stream_id : stream_ids) { @@ -237,6 +241,10 @@ Result MasterTestXRepl::DeleteCDCStream( req.add_cdcsdk_ysql_replication_slot_name(replication_slot_name); } + if (force) { + req.set_force_delete(true); + } + RETURN_NOT_OK(proxy_replication_->DeleteCDCStream(req, &resp, ResetAndGetController())); if (resp.has_error()) { RETURN_NOT_OK(StatusFromPB(resp.error().status())); @@ -665,7 +673,13 @@ TEST_F(MasterTestXRepl, TestDeleteCDCStream) { auto resp = ASSERT_RESULT(GetCDCStream(stream_id)); ASSERT_EQ(resp.stream().table_id().Get(0), table_id); - ASSERT_OK(DeleteCDCStream(stream_id)); + ASSERT_NOK_STR_CONTAINS( + DeleteCDCStream(stream_id), "Cannot delete an xCluster Stream in replication"); + + resp = ASSERT_RESULT(GetCDCStream(stream_id)); + ASSERT_EQ(resp.stream().table_id().Get(0), table_id); + + ASSERT_OK(DeleteCDCStream(stream_id, /*force=*/true)); resp = ASSERT_RESULT(GetCDCStream(stream_id)); ASSERT_TRUE(resp.has_error()); @@ -713,7 +727,9 @@ TEST_F(MasterTestXRepl, TestDeleteCDCStreamWithStreamIdAndReplicationSlotName) { // Delete streams: // 1. Using stream_id // 2. Using replication slot name - auto delete_resp = ASSERT_RESULT(DeleteCDCStream({stream_id_1}, {kPgReplicationSlotName})); + std::vector slot_names = {kPgReplicationSlotName}; + // xCluster stream has to be force deleted since we created it as ACTIVE. + auto delete_resp = ASSERT_RESULT(DeleteCDCStream({stream_id_1}, slot_names, /*force=*/true)); resp = ASSERT_RESULT(GetCDCStream(stream_id_1)); ASSERT_TRUE(resp.has_error()); @@ -758,42 +774,6 @@ TEST_F(MasterTestXRepl, TestDeleteTableWithCDCStream) { ASSERT_OK(GetCDCStream(stream_id)); } -// Just disabled on sanitizers because it doesn't need to run often. It's just a unit test. -TEST_F(MasterTestXRepl, YB_DISABLE_TEST_IN_SANITIZERS(TestDeleteCDCStreamNoForceDelete)) { - // #12255. Added 'force_delete' flag, but only run this check if the client code specifies it. - TableId table_id; - ASSERT_OK(CreateTableWithTableId(&table_id)); - - auto stream_id = xrepl::StreamId::Nil(); - // CreateCDCStream, simulating a fully-created XCluster configuration. - { - CreateCDCStreamRequestPB req; - CreateCDCStreamResponsePB resp; - - req.set_table_id(table_id); - req.set_initial_state(SysCDCStreamEntryPB::ACTIVE); - auto source_type_option = req.add_options(); - source_type_option->set_key(cdc::kRecordFormat); - source_type_option->set_value(CDCRecordFormat_Name(cdc::CDCRecordFormat::WAL)); - ASSERT_OK(proxy_replication_->CreateCDCStream(req, &resp, ResetAndGetController())); - if (resp.has_error()) { - ASSERT_OK(StatusFromPB(resp.error().status())); - } - stream_id = ASSERT_RESULT(CreateCDCStream(table_id)); - } - - auto resp = ASSERT_RESULT(GetCDCStream(stream_id)); - ASSERT_EQ(resp.stream().table_id().Get(0), table_id); - - // Should succeed because we don't use the 'force_delete' safety check in this API call. - ASSERT_OK(DeleteCDCStream(stream_id)); - - resp.Clear(); - resp = ASSERT_RESULT(GetCDCStream(stream_id)); - ASSERT_TRUE(resp.has_error()); - ASSERT_EQ(MasterErrorPB::OBJECT_NOT_FOUND, resp.error().code()); -} - TEST_F(MasterTestXRepl, TestCreateDropCDCStreamWithReplicationSlotName) { // Default of FLAGS_cdc_state_table_num_tablets is to fallback to num_tablet_servers which is 0. // So we need to explicitly set it here. diff --git a/src/yb/master/xcluster/xcluster_source_manager.cc b/src/yb/master/xcluster/xcluster_source_manager.cc index 6167bf973956..c2ccddbe3d96 100644 --- a/src/yb/master/xcluster/xcluster_source_manager.cc +++ b/src/yb/master/xcluster/xcluster_source_manager.cc @@ -17,6 +17,7 @@ #include "yb/cdc/cdc_service.proxy.h" #include "yb/cdc/cdc_state_table.h" #include "yb/cdc/xcluster_types.h" +#include "yb/client/xcluster_client.h" #include "yb/master/catalog_manager.h" #include "yb/master/master.h" #include "yb/master/xcluster/master_xcluster_util.h" @@ -421,22 +422,13 @@ class XClusterCreateStreamContextImpl : public XClusterCreateStreamsContext { Result> XClusterSourceManager::CreateStreamsForDbScoped( const std::vector& table_ids, const LeaderEpoch& epoch) { - google::protobuf::RepeatedPtrField<::yb::master::CDCStreamOptionsPB> options; - auto record_type_option = options.Add(); - record_type_option->set_key(cdc::kRecordType); - record_type_option->set_value(CDCRecordType_Name(cdc::CDCRecordType::CHANGE)); - auto record_format_option = options.Add(); - record_format_option->set_key(cdc::kRecordFormat); - record_format_option->set_value(CDCRecordFormat_Name(cdc::CDCRecordFormat::WAL)); - return CreateStreamsInternal( - table_ids, SysCDCStreamEntryPB::INITIATED, options, /*transactional=*/true, epoch); + table_ids, SysCDCStreamEntryPB::INITIATED, cdc::StreamModeTransactional::kTrue, epoch); } Result> XClusterSourceManager::CreateStreamsInternal( const std::vector& table_ids, SysCDCStreamEntryPB::State state, - const google::protobuf::RepeatedPtrField<::yb::master::CDCStreamOptionsPB>& options, - bool transactional, const LeaderEpoch& epoch) { + cdc::StreamModeTransactional transactional, const LeaderEpoch& epoch) { SCHECK( state == SysCDCStreamEntryPB::ACTIVE || state == SysCDCStreamEntryPB::INITIATED, InvalidArgument, "Stream state must be either ACTIVE or INITIATED"); @@ -457,7 +449,8 @@ Result> XClusterSourceManager::Cre metadata.add_table_id(table_id); metadata.set_transactional(transactional); - metadata.mutable_options()->CopyFrom(options); + // We use a static set of options for all xCluster streams. + *metadata.mutable_options() = client::GetXClusterStreamOptions(); metadata.set_state(state); RecordOutboundStream(stream, table_id); @@ -875,10 +868,8 @@ std::vector XClusterSourceManager::GetStreamsForTable( } Result XClusterSourceManager::CreateNewXClusterStreamForTable( - const TableId& table_id, bool transactional, - const std::optional& initial_state, - const google::protobuf::RepeatedPtrField<::yb::master::CDCStreamOptionsPB>& options, - const LeaderEpoch& epoch) { + const TableId& table_id, cdc::StreamModeTransactional transactional, + const std::optional& initial_state, const LeaderEpoch& epoch) { auto table_info = VERIFY_RESULT(catalog_manager_.FindTableById(table_id)); RETURN_NOT_OK(catalog_manager_.SetXReplWalRetentionForTable(table_info, epoch)); @@ -886,7 +877,7 @@ Result XClusterSourceManager::CreateNewXClusterStreamForTable( const auto state = initial_state ? *initial_state : SysCDCStreamEntryPB::ACTIVE; auto create_context = - VERIFY_RESULT(CreateStreamsInternal({table_id}, state, options, transactional, epoch)); + VERIFY_RESULT(CreateStreamsInternal({table_id}, state, transactional, epoch)); RSTATUS_DCHECK_EQ( create_context->streams_.size(), 1, IllegalState, "Unexpected Expected number of streams created"); diff --git a/src/yb/master/xcluster/xcluster_source_manager.h b/src/yb/master/xcluster/xcluster_source_manager.h index d463e01d442d..b96ef48b934b 100644 --- a/src/yb/master/xcluster/xcluster_source_manager.h +++ b/src/yb/master/xcluster/xcluster_source_manager.h @@ -73,10 +73,8 @@ class XClusterSourceManager { Status DoProcessHiddenTablets() EXCLUDES(retained_hidden_tablets_mutex_); Result CreateNewXClusterStreamForTable( - const TableId& table_id, bool transactional, - const std::optional& initial_state, - const google::protobuf::RepeatedPtrField<::yb::master::CDCStreamOptionsPB>& options, - const LeaderEpoch& epoch); + const TableId& table_id, cdc::StreamModeTransactional transactional, + const std::optional& initial_state, const LeaderEpoch& epoch); protected: XClusterSourceManager( @@ -188,8 +186,7 @@ class XClusterSourceManager { Result> CreateStreamsInternal( const std::vector& table_ids, SysCDCStreamEntryPB::State state, - const google::protobuf::RepeatedPtrField<::yb::master::CDCStreamOptionsPB>& options, - bool transactional, const LeaderEpoch& epoch); + cdc::StreamModeTransactional transactional, const LeaderEpoch& epoch); // Checkpoint the xCluster stream to the given location. Invokes callback with true if bootstrap // is required, and false is bootstrap is not required. diff --git a/src/yb/master/xrepl_catalog_manager.cc b/src/yb/master/xrepl_catalog_manager.cc index ff141f2f6b0b..1db506e564c7 100644 --- a/src/yb/master/xrepl_catalog_manager.cc +++ b/src/yb/master/xrepl_catalog_manager.cc @@ -20,6 +20,7 @@ #include "yb/client/table_handle.h" #include "yb/client/table_info.h" +#include "yb/client/xcluster_client.h" #include "yb/common/colocated_util.h" #include "yb/common/common_flags.h" #include "yb/common/pg_system_attr.h" @@ -207,6 +208,8 @@ class CDCStreamLoader : public Visitor { bool checkpoint_type_present = false; // Iterate over all the options to check if checkpoint_type and source_type are present. + // (DEPRECATE_EOL 2024.1) This can be removed since XClusterSourceManager creates all new + // streams with these options from 2024.1, and older streams were backfilled. for (auto option : metadata.options()) { if (option.key() == cdc::kSourceType) { source_type_present = true; @@ -810,12 +813,20 @@ Status CatalogManager::CreateCDCStream( if (source_type_option_value == CDCRequestSource_Name(cdc::CDCRequestSource::XCLUSTER) || (req->has_table_id() && id_type_option_value != cdc::kNamespaceId)) { // xCluster mode. + SCHECK_PB_FIELDS_NOT_EMPTY(*req, table_id); + SCHECK_NE( + id_type_option_value, cdc::kNamespaceId, InvalidArgument, + "NamespaceId option should not be set for xCluster streams"); + + // User specified req->options() are ignored. xCluster sets its own predefined static set of + // options. + std::optional initial_state = std::nullopt; if (req->has_initial_state()) { initial_state = req->initial_state(); } auto stream_id = VERIFY_RESULT(xcluster_manager_->CreateNewXClusterStreamForTable( - req->table_id(), req->transactional(), initial_state, req->options(), epoch)); + req->table_id(), cdc::StreamModeTransactional(req->transactional()), initial_state, epoch)); resp->set_stream_id(stream_id.ToString()); return Status::OK(); } @@ -3587,13 +3598,6 @@ Status CatalogManager::CreateCdcStreamsIfReplicationValidated( // Create CDC stream for each validated table, after persisting the replication state change. if (!validated_tables.empty()) { - std::unordered_map options; - options.reserve(4); - options.emplace(cdc::kRecordType, CDCRecordType_Name(cdc::CDCRecordType::CHANGE)); - options.emplace(cdc::kRecordFormat, CDCRecordFormat_Name(cdc::CDCRecordFormat::WAL)); - options.emplace(cdc::kSourceType, CDCRequestSource_Name(cdc::CDCRequestSource::XCLUSTER)); - options.emplace(cdc::kCheckpointType, CDCCheckpointType_Name(cdc::CDCCheckpointType::IMPLICIT)); - // Keep track of the bootstrap_id, table_id, and options of streams to update after // the last GetCDCStreamCallback finishes. Will be updated by multiple async // GetCDCStreamCallback. @@ -3613,12 +3617,14 @@ Status CatalogManager::CreateCdcStreamsIfReplicationValidated( stream_options, universe->ReplicationGroupId(), table, xcluster_rpc, std::placeholders::_1, stream_update_infos, update_infos_lock)); } else { - xcluster_rpc->client()->CreateCDCStream( - table, options, transactional, - std::bind( - &CatalogManager::AddCDCStreamToUniverseAndInitConsumer, this, - universe->ReplicationGroupId(), table, std::placeholders::_1, - nullptr /* on_success_cb */)); + // Streams are used as soon as they are created so set state to active. + client::XClusterClient(*xcluster_rpc->client()) + .CreateXClusterStreamAsync( + table, /*active=*/true, transactional, + std::bind( + &CatalogManager::AddCDCStreamToUniverseAndInitConsumer, this, + universe->ReplicationGroupId(), table, std::placeholders::_1, + nullptr /* on_success_cb */)); } } }