Skip to content

Commit

Permalink
[#12068,#14047] xCluster: Fix shutdown path of CDCConsumer
Browse files Browse the repository at this point in the history
Summary:
Making a variety of changes to clean up the shutdown/destructor path for CDCConsumer -> CDCPoller -> TwoDCOutputClient.

- Adding a separate Shutdown() function to CDCPoller and TwoDCOutputClient
  - CDCConsumer calls Poller shutdown on its Shutdown
  - Removing the `should_continue_polling` and `should_continue_polling` callbacks in cdc_poller, and having cdc_consumer handle the removals in `TriggerDeletionOfOldPollers`
- Fix `CDCPoller::CheckOffline()` to actually check if the poller is offline
- Ensure that CDCPoller uses `shared_from_this()` for all callbacks to ensure that the dtor doesn't run early
- Change TwoDCOutputClient to use `shared_from_this()` as well to ensure we handle shutdowns during it's callbacks as well
- Introduce threadpool to TwoDCOutputClient so that we don't run callbacks on the reactor threads anymore
- Change shutdown_ to an atomic bool in TwoDCOutputClient
- Make sure to unregister poll handles before submitting the callback to the threadpool to avoid deadlocks in shutdown

Test Plan: Jenkins

Reviewers: rahuldesirazu, hsunder, nicolas

Reviewed By: hsunder, nicolas

Subscribers: ycdcxcluster, ybase, bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D19175
  • Loading branch information
hulien22 committed Oct 11, 2022
1 parent 50bea63 commit b659155
Show file tree
Hide file tree
Showing 10 changed files with 300 additions and 237 deletions.
70 changes: 33 additions & 37 deletions ent/src/yb/cdc/cdc_rpc.cc
Expand Up @@ -120,7 +120,7 @@ class CDCWriteRpc : public rpc::Rpc, public client::internal::TabletRpc {
void InvokeCallback(const Status &status) {
if (!called_) {
called_ = true;
callback_(status, resp_);
callback_(status, std::move(resp_));
} else {
LOG(WARNING) << "Multiple invocation of CDCWriteRpc: "
<< status.ToString() << " : " << resp_.DebugString();
Expand Down Expand Up @@ -160,48 +160,44 @@ rpc::RpcCommandPtr CreateCDCWriteRpc(

class CDCReadRpc : public rpc::Rpc, public client::internal::TabletRpc {
public:
CDCReadRpc(CoarseTimePoint deadline,
client::internal::RemoteTablet *tablet,
client::YBClient *client,
GetChangesRequestPB* req,
GetChangesCDCRpcCallback callback)
CDCReadRpc(
CoarseTimePoint deadline,
client::internal::RemoteTablet *tablet,
client::YBClient *client,
GetChangesRequestPB *req,
GetChangesCDCRpcCallback callback)
: rpc::Rpc(deadline, client->messenger(), &client->proxy_cache()),
trace_(new Trace),
invoker_(false /* local_tserver_only */,
false /* consistent_prefix */,
client,
this,
this,
tablet,
/* table =*/ nullptr,
mutable_retrier(),
trace_.get(),
master::IncludeInactive::kTrue),
invoker_(
false /* local_tserver_only */,
false /* consistent_prefix */,
client,
this,
this,
tablet,
/* table =*/nullptr,
mutable_retrier(),
trace_.get(),
master::IncludeInactive::kTrue),
callback_(std::move(callback)) {
req_.Swap(req);
}

virtual ~CDCReadRpc() {
CHECK(called_);
}
virtual ~CDCReadRpc() { CHECK(called_); }

void SendRpc() override {
invoker_.Execute(tablet_id());
}
void SendRpc() override { invoker_.Execute(tablet_id()); }

void Finished(const Status &status) override {
auto retained = shared_from_this(); // Ensure we don't destruct until after the callback.
auto retained = shared_from_this(); // Ensure we don't destruct until after the callback.
Status new_status = status;
if (invoker_.Done(&new_status)) {
InvokeCallback(new_status);
}
}

void Failed(const Status &status) override { }
void Failed(const Status &status) override {}

void Abort() override {
rpc::Rpc::Abort();
}
void Abort() override { rpc::Rpc::Abort(); }

const tserver::TabletServerErrorPB *response_error() const override {
// Clear the contents of last_error_, since this function is invoked again on retry.
Expand Down Expand Up @@ -235,18 +231,17 @@ class CDCReadRpc : public rpc::Rpc, public client::internal::TabletRpc {
void SendRpcToTserver(int attempt_num) override {
// should be fast because the proxy cache has EndPoint from the tablet lookup.
cdc_proxy_ = std::make_shared<CDCServiceProxy>(
&invoker_.client().proxy_cache(), invoker_.ProxyEndpoint());
&invoker_.client().proxy_cache(), invoker_.ProxyEndpoint());

auto self = std::static_pointer_cast<CDCReadRpc>(shared_from_this());
InvokeAsync(cdc_proxy_.get(),
InvokeAsync(
cdc_proxy_.get(),
PrepareController(),
std::bind(&CDCReadRpc::Finished, self, Status::OK()));
}

private:
const std::string &tablet_id() const {
return req_.tablet_id();
}
const std::string &tablet_id() const { return req_.tablet_id(); }

std::string ToString() const override {
return Format("CDCReadRpc: $0, retrier: $1", req_, retrier());
Expand All @@ -255,16 +250,17 @@ class CDCReadRpc : public rpc::Rpc, public client::internal::TabletRpc {
void InvokeCallback(const Status &status) {
if (!called_) {
called_ = true;
// Can std::move since this is only called once the rpc is in a Done state, at which point
// resp_ will no longer be modified or accessed.
callback_(status, std::move(resp_));
} else {
LOG(WARNING) << "Multiple invocation of CDCReadRpc: "
<< status.ToString() << " : " << resp_.DebugString();
LOG(WARNING) << "Multiple invocation of CDCReadRpc: " << status.ToString() << " : "
<< req_.DebugString();
}
}

void InvokeAsync(CDCServiceProxy *cdc_proxy,
rpc::RpcController *controller,
rpc::ResponseCallback callback) {
void InvokeAsync(
CDCServiceProxy *cdc_proxy, rpc::RpcController *controller, rpc::ResponseCallback callback) {
cdc_proxy->GetChangesAsync(req_, &resp_, controller, std::move(callback));
}

Expand Down
4 changes: 2 additions & 2 deletions ent/src/yb/cdc/cdc_rpc.h
Expand Up @@ -38,7 +38,7 @@ class CDCRecordPB;
class GetChangesRequestPB;
class GetChangesResponsePB;

typedef std::function<void(const Status&, const tserver::WriteResponsePB&)> WriteCDCRecordCallback;
typedef std::function<void(const Status&, tserver::WriteResponsePB&&)> WriteCDCRecordCallback;

// deadline - operation deadline, i.e. timeout.
// tablet - tablet to write the CDC record to.
Expand All @@ -54,7 +54,7 @@ MUST_USE_RESULT rpc::RpcCommandPtr CreateCDCWriteRpc(
bool use_local_tserver);


typedef std::function<void(Status, GetChangesResponsePB&&)> GetChangesCDCRpcCallback;
typedef std::function<void(const Status&, GetChangesResponsePB&&)> GetChangesCDCRpcCallback;

MUST_USE_RESULT rpc::RpcCommandPtr CreateGetChangesCDCRpc(
CoarseTimePoint deadline,
Expand Down
80 changes: 44 additions & 36 deletions ent/src/yb/tserver/cdc_consumer.cc
Expand Up @@ -179,9 +179,15 @@ void CDCConsumer::Shutdown() {
uuid_master_addrs_.clear();
{
std::lock_guard<rw_spinlock> producer_pollers_map_write_lock(producer_pollers_map_mutex_);
// Shutdown the remote and local clients, and abort any of their ongoing rpcs.
for (auto &uuid_and_client : remote_clients_) {
uuid_and_client.second->Shutdown();
}

// Shutdown the pollers and output clients.
for (const auto& poller : producer_pollers_map_) {
poller.second->Shutdown();
}
producer_pollers_map_.clear();
}
local_client_->client->Shutdown();
Expand All @@ -205,6 +211,7 @@ void CDCConsumer::RunThread() {
}
}

TriggerDeletionOfOldPollers();
TriggerPollForNewTablets();

auto s = PublishXClusterSafeTime();
Expand Down Expand Up @@ -437,20 +444,12 @@ void CDCConsumer::TriggerPollForNewTablets() {
streams_with_local_tserver_optimization_.find(producer_tablet_info.stream_id) !=
streams_with_local_tserver_optimization_.end();
auto cdc_poller = std::make_shared<CDCPoller>(
producer_tablet_info, consumer_tablet_info,
std::bind(&CDCConsumer::ShouldContinuePolling, this, producer_tablet_info,
consumer_tablet_info),
std::bind(&CDCConsumer::RemoveFromPollersMap, this, producer_tablet_info),
thread_pool_.get(),
rpcs_.get(),
local_client_,
remote_clients_[uuid],
this,
use_local_tserver,
global_transaction_status_table_,
enable_replicate_transaction_status_table_);
LOG_WITH_PREFIX(INFO) << Format("Start polling for producer tablet $0",
producer_tablet_info.tablet_id);
producer_tablet_info, consumer_tablet_info, thread_pool_.get(), rpcs_.get(),
local_client_, remote_clients_[uuid], this, use_local_tserver,
global_transaction_status_table_, enable_replicate_transaction_status_table_);
LOG_WITH_PREFIX(INFO) << Format(
"Start polling for producer tablet $0, consumer tablet $1", producer_tablet_info,
consumer_tablet_info.tablet_id);
producer_pollers_map_[producer_tablet_info] = cdc_poller;
cdc_poller->Poll();
}
Expand All @@ -466,40 +465,49 @@ void CDCConsumer::TriggerPollForNewTablets() {
}
}

void CDCConsumer::RemoveFromPollersMap(const ProducerTabletInfo producer_tablet_info) {
LOG_WITH_PREFIX(INFO) << Format("Stop polling for producer tablet $0",
producer_tablet_info.tablet_id);
std::shared_ptr<CDCClient> client_to_delete; // decrement refcount to 0 outside lock
void CDCConsumer::TriggerDeletionOfOldPollers() {
// Shutdown outside of master_data_mutex_ lock, to not block any heartbeats.
std::vector<std::shared_ptr<CDCClient>> clients_to_delete;
std::vector<std::shared_ptr<CDCPoller>> pollers_to_shutdown;
{
SharedLock<rw_spinlock> read_lock_master(master_data_mutex_);
std::lock_guard<rw_spinlock> write_lock_pollers(producer_pollers_map_mutex_);
producer_pollers_map_.erase(producer_tablet_info);
// Check if no more objects with this UUID exist after registry refresh.
if (!ContainsKey(uuid_master_addrs_, producer_tablet_info.universe_uuid)) {
auto it = remote_clients_.find(producer_tablet_info.universe_uuid);
if (it != remote_clients_.end()) {
client_to_delete = it->second;
remote_clients_.erase(it);
for (auto it = producer_pollers_map_.cbegin(); it != producer_pollers_map_.cend();) {
const ProducerTabletInfo producer_info = it->first;
const cdc::ConsumerTabletInfo& consumer_info = it->second->GetConsumerTabletInfo();
// Check if we need to delete this poller.
if (ShouldContinuePolling(producer_info, consumer_info)) {
++it;
continue;
}

LOG_WITH_PREFIX(INFO) << Format(
"Stop polling for producer tablet $0, consumer tablet $1", producer_info,
consumer_info.tablet_id);
pollers_to_shutdown.emplace_back(it->second);
it = producer_pollers_map_.erase(it);

// Check if no more objects with this UUID exist after registry refresh.
if (!ContainsKey(uuid_master_addrs_, producer_info.universe_uuid)) {
auto clients_it = remote_clients_.find(producer_info.universe_uuid);
if (clients_it != remote_clients_.end()) {
clients_to_delete.emplace_back(clients_it->second);
remote_clients_.erase(clients_it);
}
}
}
}
if (client_to_delete != nullptr) {
client_to_delete->Shutdown();
for (const auto& poller : pollers_to_shutdown) {
poller->Shutdown();
}
for (const auto& client : clients_to_delete) {
client->Shutdown();
}
}

bool CDCConsumer::ShouldContinuePolling(
const ProducerTabletInfo producer_tablet_info,
const cdc::ConsumerTabletInfo consumer_tablet_info) {
{
std::lock_guard<std::mutex> l(should_run_mutex_);
if (!should_run_) {
return false;
}
}

SharedLock<rw_spinlock> read_lock_master(master_data_mutex_);

const auto& it = producer_consumer_tablet_map_from_master_.find(producer_tablet_info);
// We either no longer need to poll for this tablet, or a different tablet should be polling
// for it now instead of this one (due to a local tablet split).
Expand Down
5 changes: 4 additions & 1 deletion ent/src/yb/tserver/cdc_consumer.h
Expand Up @@ -132,9 +132,12 @@ class CDCConsumer {
// polled for.
void TriggerPollForNewTablets();

// Loop through pollers and check if they should still be polling, if not, shut them down.
void TriggerDeletionOfOldPollers();

bool ShouldContinuePolling(
const cdc::ProducerTabletInfo producer_tablet_info,
const cdc::ConsumerTabletInfo consumer_tablet_info) EXCLUDES(should_run_mutex_);
const cdc::ConsumerTabletInfo consumer_tablet_info) REQUIRES_SHARED(master_data_mutex_);

void RemoveFromPollersMap(const cdc::ProducerTabletInfo producer_tablet_info);

Expand Down

0 comments on commit b659155

Please sign in to comment.