Skip to content

Commit

Permalink
[#22343] xCluster: Unify xCluster Stream creation
Browse files Browse the repository at this point in the history
Summary:
Remove the option to create an xCluster stream via the cdc_service. This was just forwarding the call to master leader, and used only in TEST code.
Creation of xCluster streams requires custom options to be set on it. This code was scattered across multiple places and there used to be several client function calls. Unified all these into one `XClusterClient::CreateXClusterStream<Async>`.
xCluster streams use a static set of options which does not change. So now `XClusterSourceManager` sets these to the correct value. The Client code still sends the same options to make sure we work with older clusters. The client code can be cleaned up in the future

Fixes #22343
Jira: DB-11249

Test Plan: Jenkins

Reviewers: jhe, slingam, xCluster

Reviewed By: jhe

Subscribers: stiwary, skumar, ycdcxcluster, ybase

Differential Revision: https://phorge.dev.yugabyte.com/D34138
  • Loading branch information
hari90 committed May 10, 2024
1 parent 593563b commit 99fdbc1
Show file tree
Hide file tree
Showing 25 changed files with 268 additions and 401 deletions.
108 changes: 20 additions & 88 deletions src/yb/cdc/cdc_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -781,16 +781,6 @@ client::YBClient* CDCServiceImpl::client() { return impl_->async_client_init_->c

namespace {

bool YsqlTableHasPrimaryKey(const client::YBSchema& schema) {
for (const auto& col : schema.columns()) {
if (col.order() == static_cast<int32_t>(PgSystemAttrNum::kYBRowId)) {
// ybrowid column is added for tables that don't have user-specified primary key.
return false;
}
}
return true;
}

std::unordered_map<std::string, std::string> GetCreateCDCStreamOptions(
const CreateCDCStreamRequestPB* req) {
std::unordered_map<std::string, std::string> options;
Expand Down Expand Up @@ -873,29 +863,6 @@ bool GetFromOpId(const GetChangesRequestPB* req, OpId* op_id, CDCSDKCheckpointPB
return true;
}

// Check for compatibility whether CDC can be setup on the table
// This essentially checks that the table should not be a REDIS table since we do not support it
// and if it's a YSQL or YCQL one, it should have a primary key
Status CheckCdcCompatibility(const std::shared_ptr<client::YBTable>& table) {
// return if it is a CQL table because they always have a user specified primary key
if (table->table_type() == client::YBTableType::YQL_TABLE_TYPE) {
LOG(INFO) << "Returning while checking CDC compatibility, table is a YCQL table";
return Status::OK();
}

if (table->table_type() == client::YBTableType::REDIS_TABLE_TYPE) {
return STATUS(InvalidArgument, "Cannot setup CDC on YEDIS_TABLE");
}

// Check if YSQL table has a primary key. CQL tables always have a
// user specified primary key.
if (!YsqlTableHasPrimaryKey(table->schema())) {
return STATUS(InvalidArgument, "Cannot setup CDC on table without primary key");
}

return Status::OK();
}

CoarseTimePoint GetDeadline(const RpcContext& context, client::YBClient* client) {
CoarseTimePoint deadline = context.GetClientDeadline();
if (deadline == CoarseTimePoint::max()) { // Not specified by user.
Expand Down Expand Up @@ -1076,59 +1043,30 @@ void CDCServiceImpl::CreateCDCStream(
return;
}

RPC_CHECK_AND_RETURN_ERROR(
req->has_table_id() || req->has_namespace_name(),
STATUS(InvalidArgument, "Table ID or Database name is required to create CDC stream"),
resp->mutable_error(),
CDCErrorPB::INVALID_REQUEST,
context);

bool is_xcluster = req->source_type() == XCLUSTER;
if (is_xcluster || req->has_table_id()) {
std::shared_ptr<client::YBTable> table;
Status s = client()->OpenTable(req->table_id(), &table);
RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::TABLE_NOT_FOUND, context);

// We don't allow CDC on YEDIS and tables without a primary key.
if (req->record_format() != CDCRecordFormat::WAL) {
s = CheckCdcCompatibility(table);
RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INVALID_REQUEST, context);
}

std::unordered_map<std::string, std::string> options = GetCreateCDCStreamOptions(req);
if (req->has_table_id() || req->source_type() == XCLUSTER) {
RPC_STATUS_RETURN_ERROR(
STATUS(InvalidArgument, "xCluster stream should be created on master"),
resp->mutable_error(), CDCErrorPB::INVALID_REQUEST, context);
}

auto stream_id = RPC_VERIFY_RESULT(
client()->CreateCDCStream(req->table_id(), options), resp->mutable_error(),
CDCErrorPB::INTERNAL_ERROR, context);
RPC_CHECK_AND_RETURN_ERROR(
req->has_namespace_name(),
STATUS(InvalidArgument, "Table ID or Database name is required to create CDCSDK stream"),
resp->mutable_error(), CDCErrorPB::INVALID_REQUEST, context);

resp->set_stream_id(stream_id.ToString());

// Add stream to cache.
AddStreamMetadataToCache(
stream_id,
std::make_shared<StreamMetadata>(
"",
std::vector<TableId>{req->table_id()},
req->record_type(),
req->record_format(),
req->source_type(),
req->checkpoint_type(),
StreamModeTransactional(req->transactional())));
} else if (req->has_namespace_name()) {
// Return error if we see that no checkpoint type has been populated.
RPC_CHECK_AND_RETURN_ERROR(
req->has_checkpoint_type(),
STATUS(InvalidArgument, "Checkpoint type is required to create a CDCSDK stream"),
resp->mutable_error(), CDCErrorPB::INVALID_REQUEST, context);
// Return error if we see that no checkpoint type has been populated.
RPC_CHECK_AND_RETURN_ERROR(
req->has_checkpoint_type(),
STATUS(InvalidArgument, "Checkpoint type is required to create a CDCSDK stream"),
resp->mutable_error(), CDCErrorPB::INVALID_REQUEST, context);

auto deadline = GetDeadline(context, client());
Status status = CreateCDCStreamForNamespace(req, resp, deadline);
CDCError error(status);
auto deadline = GetDeadline(context, client());
Status status = CreateCDCStreamForNamespace(req, resp, deadline);
CDCError error(status);

if (!status.ok()) {
SetupErrorAndRespond(resp->mutable_error(), status, error.value(), &context);
return;
}
if (!status.ok()) {
SetupErrorAndRespond(resp->mutable_error(), status, error.value(), &context);
return;
}

context.RespondSuccess();
Expand Down Expand Up @@ -4151,12 +4089,6 @@ void CDCServiceImpl::RemoveStreamFromCache(const xrepl::StreamId& stream_id) {
stream_metadata_.erase(stream_id);
}

void CDCServiceImpl::AddStreamMetadataToCache(
const xrepl::StreamId& stream_id, const std::shared_ptr<StreamMetadata>& stream_metadata) {
std::lock_guard l(mutex_);
InsertOrUpdate(&stream_metadata_, stream_id, stream_metadata);
}

Status CDCServiceImpl::CheckTabletValidForStream(const TabletStreamInfo& info) {
auto result = VERIFY_RESULT(impl_->PreCheckTabletValidForStream(info));
if (result) {
Expand Down
11 changes: 0 additions & 11 deletions src/yb/cdc/cdc_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,6 @@ typedef std::unordered_map<HostPort, std::shared_ptr<CDCServiceProxy>, HostPortH

YB_STRONGLY_TYPED_BOOL(CreateMetricsEntityIfNotFound);

static const char* const kRecordType = "record_type";
static const char* const kRecordFormat = "record_format";
static const char* const kRetentionSec = "retention_sec";
static const char* const kSourceType = "source_type";
static const char* const kCheckpointType = "checkpoint_type";
static const char* const kStreamState = "state";
static const char* const kNamespaceId = "NAMESPACEID";
static const char* const kCDCSDKSnapshotDoneKey = "snapshot_done_key";

struct TabletCheckpoint {
Expand Down Expand Up @@ -334,10 +327,6 @@ class CDCServiceImpl : public CDCServiceIf {

void RemoveStreamFromCache(const xrepl::StreamId& stream_id);

void AddStreamMetadataToCache(
const xrepl::StreamId& stream_id, const std::shared_ptr<StreamMetadata>& stream_metadata)
EXCLUDES(mutex_);

Status CheckTabletValidForStream(const TabletStreamInfo& producer_info);

void TabletLeaderGetChanges(
Expand Down
8 changes: 8 additions & 0 deletions src/yb/cdc/cdc_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,13 @@ using EnumLabelCache = std::unordered_map<NamespaceName, EnumOidLabelMap>;
using CompositeAttsMap = std::unordered_map<uint32_t, std::vector<master::PgAttributePB>>;
using CompositeTypeCache = std::unordered_map<NamespaceName, CompositeAttsMap>;

static const char* const kRecordType = "record_type";
static const char* const kRecordFormat = "record_format";
static const char* const kSourceType = "source_type";
static const char* const kCheckpointType = "checkpoint_type";
static const char* const kStreamState = "state";
static const char* const kNamespaceId = "NAMESPACEID";
// NOTE: Do not add new options here. Create them as explicit PB fields.

} // namespace cdc
} // namespace yb
23 changes: 3 additions & 20 deletions src/yb/cdc/xcluster_producer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@
#include "yb/client/schema.h"

#include "yb/client/table_handle.h"
#include "yb/client/xcluster_client.h"
#include "yb/client/yb_op.h"
#include "yb/client/session.h"
#include "yb/integration-tests/cdc_test_util.h"
#include "yb/integration-tests/mini_cluster.h"
#include "yb/integration-tests/yb_mini_cluster_test_base.h"
#include "yb/master/catalog_manager.h"
#include "yb/master/master_auto_flags_manager.h"
#include "yb/master/master_cluster.pb.h"
#include "yb/master/mini_master.h"
Expand Down Expand Up @@ -78,7 +77,8 @@ class XClusterProducerTest : public MiniClusterTestWithClient<MiniCluster> {
&client_->proxy_cache(), HostPort::FromBoundEndpoint(tablet_server_->bound_rpc_addr()));

ASSERT_OK(CreateTable());
stream_id_ = ASSERT_RESULT(CreateCDCStream());
stream_id_ = ASSERT_RESULT(client::XClusterClient(*client_).CreateXClusterStream(
table_->id(), /*active=*/true, cdc::StreamModeTransactional::kFalse));
}

Status CreateTable() {
Expand All @@ -99,23 +99,6 @@ class XClusterProducerTest : public MiniClusterTestWithClient<MiniCluster> {
return Status::OK();
}

Result<xrepl::StreamId> CreateCDCStream() {
CreateCDCStreamRequestPB req;
CreateCDCStreamResponsePB resp;
req.set_table_id(table_->id());
req.set_source_type(XCLUSTER);
req.set_checkpoint_type(IMPLICIT);
req.set_record_format(WAL);

rpc::RpcController rpc;
RETURN_NOT_OK(cdc_proxy_->CreateCDCStream(req, &resp, &rpc));
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}

return xrepl::StreamId::FromString(resp.stream_id());
}

Status InsertRows(uint32_t start, uint32_t end) {
return WriteRows(start, end, /* delete_op */ false);
}
Expand Down
14 changes: 7 additions & 7 deletions src/yb/cdc/xcluster_producer_bootstrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "yb/cdc/cdc_service_context.h"
#include "yb/cdc/cdc_state_table.h"
#include "yb/client/meta_cache.h"
#include "yb/client/xcluster_client.h"
#include "yb/consensus/consensus.h"
#include "yb/consensus/log.h"
#include "yb/consensus/log_cache.h"
Expand Down Expand Up @@ -140,17 +141,16 @@ Status XClusterProducerBootstrap::CreateAllBootstrapStreams() {
return Status::OK();
}

std::unordered_map<std::string, std::string> options;
options.reserve(2);
options.emplace(cdc::kRecordType, CDCRecordType_Name(cdc::CDCRecordType::CHANGE));
options.emplace(cdc::kRecordFormat, CDCRecordFormat_Name(cdc::CDCRecordFormat::WAL));

// Generate bootstrap ids & setup the CDC streams, for use with the XCluster Consumer.
for (const auto& table_id : req_.table_ids()) {
// Mark this stream as being bootstrapped, to help in finding dangling streams.
// Mark this stream as being bootstrapped, to help in finding dangling streams. Set state to
// inactive since it will take some time for the streams to be used. When the target cluster is
// setup it will switch the streams to active state. Transactional flag will also get set at
// that point.
// TODO: Turn this into a batch RPC.
const auto& bootstrap_id = VERIFY_RESULT(
cdc_service_->client()->CreateCDCStream(table_id, options, /* active */ false));
client::XClusterClient(*cdc_service_->client())
.CreateXClusterStream(table_id, /* active */ false, StreamModeTransactional::kFalse));
creation_state_->created_cdc_streams.push_back(bootstrap_id);

bootstrap_ids_and_tables_.emplace_back(bootstrap_id, table_id);
Expand Down
2 changes: 2 additions & 0 deletions src/yb/cdc/xrepl_stream_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ namespace {
// This function is to handle the upgrade scenario where the DB is upgraded from a version
// without CDCSDK changes to the one with it. So in case some required options are missing,
// the default values will be added for the same.
// (DEPRECATE_EOL 2024.1) This can be removed since XClusterSourceManager populates these defaults
// on new streams and CDCStreamLoader backfills them for older streams.
void AddDefaultOptionsIfMissing(std::unordered_map<std::string, std::string>* options) {
InsertIfNotPresent(options, kSourceType, CDCRequestSource_Name(CDCRequestSource::XCLUSTER));
InsertIfNotPresent(options, kCheckpointType, CDCCheckpointType_Name(CDCCheckpointType::IMPLICIT));
Expand Down
1 change: 1 addition & 0 deletions src/yb/client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ set(CLIENT_SRCS

set(CLIENT_LIBS
yb_common
cdc_service_proto
master_proto
master_rpc
master_util
Expand Down
67 changes: 28 additions & 39 deletions src/yb/client/client-internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1677,20 +1677,18 @@ void GetColocatedTabletSchemaRpc::ProcessResponse(const Status& status) {
user_cb_.Run(new_status);
}

class CreateCDCStreamRpc
class CreateXClusterStreamRpc
: public ClientMasterRpc<CreateCDCStreamRequestPB, CreateCDCStreamResponsePB> {
public:
CreateCDCStreamRpc(
YBClient* client,
CreateCDCStreamCallback user_cb,
const TableId& table_id,
const std::unordered_map<std::string, std::string>& options,
cdc::StreamModeTransactional transactional,
CreateXClusterStreamRpc(
YBClient* client, CreateCDCStreamCallback user_cb, const TableId& table_id,
const google::protobuf::RepeatedPtrField<yb::master::CDCStreamOptionsPB>& options,
master::SysCDCStreamEntryPB::State state, cdc::StreamModeTransactional transactional,
CoarseTimePoint deadline);

string ToString() const override;

virtual ~CreateCDCStreamRpc();
virtual ~CreateXClusterStreamRpc();

private:
void CallRemoteMethod() override;
Expand All @@ -1700,40 +1698,33 @@ class CreateCDCStreamRpc
const std::string table_id_;
};

CreateCDCStreamRpc::CreateCDCStreamRpc(
YBClient* client,
CreateCDCStreamCallback user_cb,
const TableId& table_id,
const std::unordered_map<std::string, std::string>& options,
cdc::StreamModeTransactional transactional,
CreateXClusterStreamRpc::CreateXClusterStreamRpc(
YBClient* client, CreateCDCStreamCallback user_cb, const TableId& table_id,
const google::protobuf::RepeatedPtrField<yb::master::CDCStreamOptionsPB>& options,
master::SysCDCStreamEntryPB::State state, cdc::StreamModeTransactional transactional,
CoarseTimePoint deadline)
: ClientMasterRpc(client, deadline),
user_cb_(std::move(user_cb)),
table_id_(table_id) {
: ClientMasterRpc(client, deadline), user_cb_(std::move(user_cb)), table_id_(table_id) {
req_.set_table_id(table_id_);
req_.mutable_options()->Reserve(narrow_cast<int>(options.size()));
// (DEPRECATE_EOL 2024.1) Master now sets the options explicitly, so they do not need to be sent.
*req_.mutable_options() = options;
req_.set_transactional(transactional);
for (const auto& option : options) {
auto* op = req_.add_options();
op->set_key(option.first);
op->set_value(option.second);
}
req_.set_initial_state(state);
}

CreateCDCStreamRpc::~CreateCDCStreamRpc() {
}
CreateXClusterStreamRpc::~CreateXClusterStreamRpc() {}

void CreateCDCStreamRpc::CallRemoteMethod() {
void CreateXClusterStreamRpc::CallRemoteMethod() {
master_replication_proxy()->CreateCDCStreamAsync(
req_, &resp_, mutable_retrier()->mutable_controller(),
std::bind(&CreateCDCStreamRpc::Finished, this, Status::OK()));
std::bind(&CreateXClusterStreamRpc::Finished, this, Status::OK()));
}

string CreateCDCStreamRpc::ToString() const {
return Substitute("CreateCDCStream(table_id: $0, num_attempts: $1)", table_id_, num_attempts());
string CreateXClusterStreamRpc::ToString() const {
return Substitute(
"CreateXClusterStream(table_id: $0, num_attempts: $1)", table_id_, num_attempts());
}

void CreateCDCStreamRpc::ProcessResponse(const Status& status) {
void CreateXClusterStreamRpc::ProcessResponse(const Status& status) {
if (status.ok()) {
user_cb_(xrepl::StreamId::FromString(resp_.stream_id()));
} else {
Expand Down Expand Up @@ -2513,15 +2504,13 @@ Result<IndexPermissions> YBClient::Data::WaitUntilIndexPermissionsAtLeast(
return actual_index_permissions;
}

void YBClient::Data::CreateCDCStream(
YBClient* client,
const TableId& table_id,
const std::unordered_map<std::string, std::string>& options,
cdc::StreamModeTransactional transactional,
CoarseTimePoint deadline,
CreateCDCStreamCallback callback) {
auto rpc = StartRpc<internal::CreateCDCStreamRpc>(
client, callback, table_id, options, transactional, deadline);
void YBClient::Data::CreateXClusterStream(
YBClient* client, const TableId& table_id,
const google::protobuf::RepeatedPtrField<yb::master::CDCStreamOptionsPB>& options,
master::SysCDCStreamEntryPB::State state, cdc::StreamModeTransactional transactional,
CoarseTimePoint deadline, CreateCDCStreamCallback callback) {
auto rpc = StartRpc<internal::CreateXClusterStreamRpc>(
client, callback, table_id, options, state, transactional, deadline);
}

void YBClient::Data::DeleteCDCStream(
Expand Down
11 changes: 5 additions & 6 deletions src/yb/client/client-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -309,12 +309,11 @@ class YBClient::Data {
const CoarseTimePoint deadline,
const CoarseDuration max_wait = std::chrono::seconds(2));

void CreateCDCStream(YBClient* client,
const TableId& table_id,
const std::unordered_map<std::string, std::string>& options,
cdc::StreamModeTransactional transactional,
CoarseTimePoint deadline,
CreateCDCStreamCallback callback);
void CreateXClusterStream(
YBClient* client, const TableId& table_id,
const google::protobuf::RepeatedPtrField<yb::master::CDCStreamOptionsPB>& options,
master::SysCDCStreamEntryPB::State state, cdc::StreamModeTransactional transactional,
CoarseTimePoint deadline, CreateCDCStreamCallback callback);

void DeleteCDCStream(
YBClient* client,
Expand Down
Loading

0 comments on commit 99fdbc1

Please sign in to comment.