Skip to content

Commit

Permalink
[#17432] xCluster: Coalesce terms like producer_id, universe_id into …
Browse files Browse the repository at this point in the history
…replication_group_id

Summary:
producer_id, universe_id, producer_universe_uuid, producer_uuid and universe_uuid all mean the same thing. These terms describe a collection of tables replicated from one producer universe to one consumer universe. There could be multiple such groups between the same of different universe pairs.

Renaming these to replication_group_id. Also making this a strongly typed string type ReplicationGroupId. Strongly typed strings help prevent assigning other strings like stream_id, table_id to this type implicitly. It also helps avoid mistakes when function Params are passed in incorrect order.
Jira: DB-6605

Test Plan: All xCluster and CDC tests

Reviewers: jhe, xCluster

Reviewed By: jhe

Subscribers: slingam, bogdan, ybase

Differential Revision: https://phorge.dev.yugabyte.com/D25526
  • Loading branch information
hari90 committed May 20, 2023
1 parent f5186b2 commit d5d857e
Show file tree
Hide file tree
Showing 32 changed files with 848 additions and 721 deletions.
32 changes: 16 additions & 16 deletions src/yb/cdc/cdc_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ class CDCServiceImpl::Impl {
const TabletId& tablet_id,
std::vector<ProducerTabletInfo>* producer_entries_modified) {
ProducerTabletInfo producer_tablet{
.universe_uuid = "", .stream_id = stream_id, .tablet_id = tablet_id};
.replication_group_id = {}, .stream_id = stream_id, .tablet_id = tablet_id};
CoarseTimePoint time;
int64_t active_time;

Expand Down Expand Up @@ -528,7 +528,8 @@ class CDCServiceImpl::Impl {
const OpId& split_op_id) {
std::lock_guard<rw_spinlock> l(mutex_);
for (const auto& tablet : tablets) {
ProducerTabletInfo producer_info{info.universe_uuid, info.stream_id, tablet->tablet_id()};
ProducerTabletInfo producer_info{
info.replication_group_id, info.stream_id, tablet->tablet_id()};
tablet_checkpoints_.emplace(TabletCheckpointInfo{
.producer_tablet_info = producer_info,
.cdc_state_checkpoint =
Expand Down Expand Up @@ -559,7 +560,8 @@ class CDCServiceImpl::Impl {
std::lock_guard<rw_spinlock> l(mutex_);
for (const auto& tablet : tablets) {
// Add every tablet in the stream.
ProducerTabletInfo producer_info{info.universe_uuid, info.stream_id, tablet.tablet_id()};
ProducerTabletInfo producer_info{
info.replication_group_id, info.stream_id, tablet.tablet_id()};
tablet_checkpoints_.emplace(TabletCheckpointInfo{
.producer_tablet_info = producer_info,
.cdc_state_checkpoint =
Expand Down Expand Up @@ -1185,7 +1187,7 @@ Result<SetCDCCheckpointResponsePB> CDCServiceImpl::SetCDCCheckpoint(
CheckCanServeTabletData(*tablet_peer->tablet_metadata()),
CDCError(CDCErrorPB::LEADER_NOT_READY));

ProducerTabletInfo producer_tablet{"" /* UUID */, req.stream_id(), req.tablet_id()};
ProducerTabletInfo producer_tablet{{}, req.stream_id(), req.tablet_id()};
RETURN_NOT_OK_SET_CODE(
CheckTabletValidForStream(producer_tablet), CDCError(CDCErrorPB::INVALID_REQUEST));

Expand Down Expand Up @@ -1344,17 +1346,15 @@ void CDCServiceImpl::GetTabletListToPollForCDC(
CDCSDKCheckpointPB parent_checkpoint_pb;
{
auto session = client()->NewSession();
ProducerTabletInfo parent_tablet = {
"" /* UUID */, req->table_info().stream_id(), req->tablet_id()};
ProducerTabletInfo parent_tablet = {{}, req->table_info().stream_id(), req->tablet_id()};
auto result = GetLastCheckpoint(parent_tablet, session, CDCRequestSource::CDCSDK);
RPC_RESULT_RETURN_ERROR(result, resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context);
(*result).ToPB(&parent_checkpoint_pb);
}

for (const auto& child_tablet_id : child_tablet_ids) {
auto session = client()->NewSession();
ProducerTabletInfo cur_child_tablet = {
"" /* UUID */, req->table_info().stream_id(), child_tablet_id};
ProducerTabletInfo cur_child_tablet = {{}, req->table_info().stream_id(), child_tablet_id};

auto tablet_checkpoint_pair_pb = resp->add_tablet_checkpoint_pairs();
tablet_checkpoint_pair_pb->mutable_tablet_locations()->CopyFrom(
Expand Down Expand Up @@ -1519,7 +1519,7 @@ void CDCServiceImpl::GetChanges(
session->SetDeadline(deadline);

// Check that requested tablet_id is part of the CDC stream.
producer_tablet = {"" /* UUID */, stream_id, req->tablet_id()};
producer_tablet = {{}, stream_id, req->tablet_id()};

auto status = CheckTabletValidForStream(producer_tablet);
if (!status.ok()) {
Expand Down Expand Up @@ -2002,7 +2002,7 @@ void CDCServiceImpl::UpdateCDCMetrics() {
StreamMetadata& record = **get_stream_metadata;

bool is_leader = (tablet_peer->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY);
ProducerTabletInfo tablet_info = {"" /* universe_uuid */, stream_id, tablet_id};
ProducerTabletInfo tablet_info = {{}, stream_id, tablet_id};
tablets_in_cdc_state_table.insert(tablet_info);

if (record.GetSourceType() == CDCSDK) {
Expand Down Expand Up @@ -2410,7 +2410,7 @@ Result<TabletIdCDCCheckpointMap> CDCServiceImpl::PopulateTabletCheckPointInfo(
}

// Check that requested tablet_id is part of the CDC stream.
ProducerTabletInfo producer_tablet = {"" /* UUID */, stream_id, tablet_id};
ProducerTabletInfo producer_tablet = {{}, stream_id, tablet_id};

// Check stream associated with the tablet is active or not.
// Don't consider those inactive stream for the min_checkpoint calculation.
Expand Down Expand Up @@ -2992,7 +2992,7 @@ void CDCServiceImpl::GetCheckpoint(
CDCErrorPB::LEADER_NOT_READY, context);

// Check that requested tablet_id is part of the CDC stream.
ProducerTabletInfo producer_tablet = {"" /* UUID */, req->stream_id(), req->tablet_id()};
ProducerTabletInfo producer_tablet = {{}, req->stream_id(), req->tablet_id()};
auto s = CheckTabletValidForStream(producer_tablet);
RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INVALID_REQUEST, context);

Expand Down Expand Up @@ -3552,7 +3552,7 @@ Status CDCServiceImpl::BootstrapProducerHelperParallelized(
for (int i = 0; i < get_op_id_resp->op_ids_size(); i++) {
const std::string bootstrap_id = leader_tablets.at(i).first;
const std::string tablet_id = leader_tablets.at(i).second;
ProducerTabletInfo producer_tablet{"" /* Universe UUID */, bootstrap_id, tablet_id};
ProducerTabletInfo producer_tablet{{} /* Universe UUID */, bootstrap_id, tablet_id};
auto op_id = OpId::FromPB(get_op_id_resp->op_ids(i));

// Add op_id for tablet.
Expand Down Expand Up @@ -4226,7 +4226,7 @@ Status CDCServiceImpl::UpdateSnapshotDone(

// Also update the active_time in the streaming row.
if (!colocated_table_id.empty()) {
ProducerTabletInfo producer_tablet = {"" /* UUID */, stream_id, tablet_id};
ProducerTabletInfo producer_tablet = {{}, stream_id, tablet_id};
auto streaming_safe_time = VERIFY_RESULT(GetSafeTime(producer_tablet, session));
RETURN_NOT_OK(UpdateActiveTime(producer_tablet, session, current_time, streaming_safe_time));
}
Expand Down Expand Up @@ -4415,7 +4415,7 @@ void CDCServiceImpl::IsBootstrapRequired(

if (req->has_stream_id() && !req->stream_id().empty()) {
// Check that requested tablet_id is part of the CDC stream.
ProducerTabletInfo producer_tablet = {"" /* UUID */, req->stream_id(), tablet_id};
ProducerTabletInfo producer_tablet = {{}, req->stream_id(), tablet_id};
auto s = CheckTabletValidForStream(producer_tablet);
RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INVALID_REQUEST, context);

Expand Down Expand Up @@ -4576,7 +4576,7 @@ void CDCServiceImpl::CheckReplicationDrain(
continue;
}

ProducerTabletInfo producer_tablet = {"" /* UUID */, stream_id, tablet_id};
ProducerTabletInfo producer_tablet = {{}, stream_id, tablet_id};
auto s = CheckTabletValidForStream(producer_tablet);
if (!s.ok()) {
LOG_WITH_FUNC(WARNING) << "Tablet not valid for stream: " << s << ". Skipping.";
Expand Down
46 changes: 31 additions & 15 deletions src/yb/cdc/cdc_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
namespace yb {
namespace cdc {

YB_STRONGLY_TYPED_STRING(ReplicationGroupId);

// Maps a tablet id -> stream id -> replication error -> error detail.
typedef std::unordered_map<ReplicationErrorPb, std::string> ReplicationErrorMap;
typedef std::unordered_map<CDCStreamId, ReplicationErrorMap> StreamReplicationErrorMap;
Expand All @@ -61,32 +63,34 @@ struct ConsumerTabletInfo {
};

struct ProducerTabletInfo {
std::string universe_uuid; /* needed on Consumer side for uniqueness. Empty on Producer */
CDCStreamId stream_id; /* unique ID on Producer, but not on Consumer. */
// Needed on Consumer side for uniqueness. Empty on Producer.
ReplicationGroupId replication_group_id;
// Unique ID on Producer, but not on Consumer.
CDCStreamId stream_id;
std::string tablet_id;

bool operator==(const ProducerTabletInfo& other) const {
return universe_uuid == other.universe_uuid &&
stream_id == other.stream_id &&
return replication_group_id == other.replication_group_id && stream_id == other.stream_id &&
tablet_id == other.tablet_id;
}

std::string ToString() const {
return Format("{ universe_uuid: $0 stream_id: $1 tablet_id: $2 }",
universe_uuid, stream_id, tablet_id);
return Format(
"{ replication_group_id: $0 stream_id: $1 tablet_id: $2 }", replication_group_id, stream_id,
tablet_id);
}

// String used as a descriptor id for metrics.
std::string MetricsString() const {
std::stringstream ss;
ss << universe_uuid << ":" << stream_id << ":" << tablet_id;
ss << replication_group_id << ":" << stream_id << ":" << tablet_id;
return ss.str();
}

struct Hash {
std::size_t operator()(const ProducerTabletInfo& p) const noexcept {
std::size_t hash = 0;
boost::hash_combine(hash, p.universe_uuid);
boost::hash_combine(hash, p.replication_group_id);
boost::hash_combine(hash, p.stream_id);
boost::hash_combine(hash, p.tablet_id);

Expand Down Expand Up @@ -120,17 +124,29 @@ inline size_t hash_value(const ProducerTabletInfo& p) noexcept {
return ProducerTabletInfo::Hash()(p);
}

inline bool IsAlterReplicationUniverseId(const std::string& universe_uuid) {
return GStringPiece(universe_uuid).ends_with(".ALTER");
constexpr char kAlterReplicationGroupSuffix[] = ".ALTER";

inline ReplicationGroupId GetAlterReplicationGroupId(const std::string& replication_group_id) {
return ReplicationGroupId(replication_group_id + kAlterReplicationGroupSuffix);
}

inline ReplicationGroupId GetAlterReplicationGroupId(
const ReplicationGroupId& replication_group_id) {
return GetAlterReplicationGroupId(replication_group_id.ToString());
}

inline bool IsAlterReplicationGroupId(const ReplicationGroupId& replication_group_id) {
return GStringPiece(replication_group_id.ToString()).ends_with(kAlterReplicationGroupSuffix);
}

inline std::string GetOriginalReplicationUniverseId(const std::string& universe_uuid) {
inline ReplicationGroupId GetOriginalReplicationGroupId(
const ReplicationGroupId& replication_group_id) {
// Remove the .ALTER suffix from universe_uuid if applicable.
GStringPiece clean_universe_id(universe_uuid);
if (clean_universe_id.ends_with(".ALTER")) {
clean_universe_id.remove_suffix(sizeof(".ALTER")-1 /* exclude \0 ending */);
GStringPiece clean_id(replication_group_id.ToString());
if (clean_id.ends_with(kAlterReplicationGroupSuffix)) {
clean_id.remove_suffix(sizeof(kAlterReplicationGroupSuffix) - 1 /* exclude \0 ending */);
}
return clean_universe_id.ToString();
return ReplicationGroupId(clean_id.ToString());
}

Result<std::optional<qlexpr::QLRow>> FetchOptionalCdcStreamInfo(
Expand Down
30 changes: 10 additions & 20 deletions src/yb/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1610,18 +1610,14 @@ Status YBClient::BootstrapProducer(
}

Status YBClient::UpdateConsumerOnProducerSplit(
const string& producer_id,
const cdc::ReplicationGroupId& replication_group_id,
const CDCStreamId& stream_id,
const master::ProducerSplitTabletInfoPB& split_info) {
if (producer_id.empty()) {
return STATUS(InvalidArgument, "Producer id is required.");
}
if (stream_id.empty()) {
return STATUS(InvalidArgument, "Stream id is required.");
}
SCHECK(!replication_group_id.empty(), InvalidArgument, "Producer id is required.");
SCHECK(!stream_id.empty(), InvalidArgument, "Stream id is required.");

UpdateConsumerOnProducerSplitRequestPB req;
req.set_producer_id(producer_id);
req.set_producer_id(replication_group_id.ToString());
req.set_stream_id(stream_id);
req.mutable_producer_split_tablet_info()->CopyFrom(split_info);

Expand All @@ -1631,25 +1627,19 @@ Status YBClient::UpdateConsumerOnProducerSplit(
}

Status YBClient::UpdateConsumerOnProducerMetadata(
const string& producer_id,
const cdc::ReplicationGroupId& replication_group_id,
const CDCStreamId& stream_id,
const tablet::ChangeMetadataRequestPB& meta_info,
uint32_t colocation_id,
uint32_t producer_schema_version,
uint32_t consumer_schema_version,
master::UpdateConsumerOnProducerMetadataResponsePB *resp) {
if (producer_id.empty()) {
return STATUS(InvalidArgument, "Producer id is required.");
}
if (stream_id.empty()) {
return STATUS(InvalidArgument, "Stream id is required.");
}
if (resp == nullptr) {
return STATUS(InvalidArgument, "Response pointer is required.");
}
master::UpdateConsumerOnProducerMetadataResponsePB* resp) {
SCHECK(!replication_group_id.empty(), InvalidArgument, "ReplicationGroup id is required.");
SCHECK(!stream_id.empty(), InvalidArgument, "Stream id is required.");
SCHECK(resp != nullptr, InvalidArgument, "Response pointer is required.");

master::UpdateConsumerOnProducerMetadataRequestPB req;
req.set_producer_id(producer_id);
req.set_producer_id(replication_group_id.ToString());
req.set_stream_id(stream_id);
req.set_colocation_id(colocation_id);
req.set_producer_schema_version(producer_schema_version);
Expand Down
11 changes: 6 additions & 5 deletions src/yb/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -594,19 +594,20 @@ class YBClient {
BootstrapProducerCallback callback);

// Update consumer pollers after a producer side tablet split.
Status UpdateConsumerOnProducerSplit(const std::string& producer_id,
const TableId& table_id,
const master::ProducerSplitTabletInfoPB& split_info);
Status UpdateConsumerOnProducerSplit(
const cdc::ReplicationGroupId& replication_group_id,
const TableId& table_id,
const master::ProducerSplitTabletInfoPB& split_info);

// Update after a producer DDL change. Returns if caller should wait for a similar Consumer DDL.
Status UpdateConsumerOnProducerMetadata(
const std::string& producer_id,
const cdc::ReplicationGroupId& replication_group_id,
const CDCStreamId& stream_id,
const tablet::ChangeMetadataRequestPB& meta_info,
uint32_t colocation_id,
uint32_t producer_schema_version,
uint32_t consumer_schema_version,
master::UpdateConsumerOnProducerMetadataResponsePB *resp);
master::UpdateConsumerOnProducerMetadataResponsePB* resp);

void GetTableLocations(
const TableId& table_id, int32_t max_tablets, RequireTabletsRunning require_tablets_running,
Expand Down
Loading

0 comments on commit d5d857e

Please sign in to comment.