Skip to content

Commit

Permalink
service: storage_proxy: make hint write handlers retirable
Browse files Browse the repository at this point in the history
Whether a write handler should be retirable is now controlled by a
parameter passed to `create_write_response_handler`. We plumb it down
from `send_to_endpoint` which is called by hints manager.

This will cause hint write handlers to immediately timeout when we
shutdown or when a destination node is marked as dead.

Fixes scylladb#8079
  • Loading branch information
kbr-scylla committed May 26, 2023
1 parent cc7cc5d commit e6abd0a
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 28 deletions.
3 changes: 2 additions & 1 deletion db/view/view.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1605,7 +1605,8 @@ static future<> apply_to_remote_endpoints(service::storage_proxy& proxy, gms::in
std::move(pending_endpoints),
db::write_type::VIEW,
std::move(tr_state),
allow_hints);
allow_hints,
service::retirable::yes);
}

static bool should_update_synchronously(const schema& s) {
Expand Down
49 changes: 27 additions & 22 deletions service/storage_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1165,8 +1165,6 @@ class cas_mutation : public mutation_holder {
};
};

using retirable = bool_class<struct retirable_tag>;

class abstract_write_response_handler : public seastar::enable_shared_from_this<abstract_write_response_handler>, public bi::list_base_hook<bi::link_mode<bi::auto_unlink>> {
protected:
using error = storage_proxy::error;
Expand Down Expand Up @@ -2302,7 +2300,7 @@ ::shared_ptr<abstract_write_response_handler>& storage_proxy::get_write_response
result<storage_proxy::response_id_type> storage_proxy::create_write_response_handler(locator::effective_replication_map_ptr ermp,
db::consistency_level cl, db::write_type type, std::unique_ptr<mutation_holder> m,
inet_address_vector_replica_set targets, const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state,
storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info)
storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info, retirable retirable)
{
shared_ptr<abstract_write_response_handler> h;
auto& rs = ermp->get_replication_strategy();
Expand All @@ -2312,8 +2310,7 @@ result<storage_proxy::response_id_type> storage_proxy::create_write_response_han
} else if (cl == db::consistency_level::EACH_QUORUM && rs.get_type() == locator::replication_strategy_type::network_topology){
h = ::make_shared<datacenter_sync_write_response_handler>(shared_from_this(), std::move(ermp), cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info);
} else {
retirable r{type == db::write_type::VIEW};
h = ::make_shared<write_response_handler>(shared_from_this(), std::move(ermp), cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info, r);
h = ::make_shared<write_response_handler>(shared_from_this(), std::move(ermp), cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info, retirable);
}
return bo::success(register_response_handler(std::move(h)));
}
Expand Down Expand Up @@ -2838,7 +2835,7 @@ storage_proxy::mutate_counter_on_leader_and_replicate(const schema_ptr& s, froze

result<storage_proxy::response_id_type>
storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::token& token, std::unique_ptr<mutation_holder> mh,
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit) {
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit, retirable retirable) {
replica::table& table = _db.local().find_column_family(s->id());
auto erm = table.get_effective_replication_map();
inet_address_vector_replica_set natural_endpoints = erm->get_natural_endpoints_without_node_being_replaced(token);
Expand Down Expand Up @@ -2902,7 +2899,7 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok
db::assure_sufficient_live_nodes(cl, *erm, live_endpoints, pending_endpoints);

return create_write_response_handler(std::move(erm), cl, type, std::move(mh), std::move(live_endpoints), pending_endpoints,
std::move(dead_endpoints), std::move(tr_state), get_stats(), std::move(permit), rate_limit_info);
std::move(dead_endpoints), std::move(tr_state), get_stats(), std::move(permit), rate_limit_info, retirable);
}

/**
Expand All @@ -2915,13 +2912,13 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok
result<storage_proxy::response_id_type>
storage_proxy::create_write_response_handler(const mutation& m, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit) {
return create_write_response_handler_helper(m.schema(), m.token(), std::make_unique<shared_mutation>(m), cl, type, tr_state,
std::move(permit), allow_limit);
std::move(permit), allow_limit, retirable::no);
}

result<storage_proxy::response_id_type>
storage_proxy::create_write_response_handler(const hint_wrapper& h, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit) {
return create_write_response_handler_helper(h.mut.schema(), h.mut.token(), std::make_unique<hint_mutation>(h.mut), cl, type, tr_state,
std::move(permit), allow_limit);
std::move(permit), allow_limit, retirable::yes);
}

result<storage_proxy::response_id_type>
Expand All @@ -2936,7 +2933,7 @@ storage_proxy::create_write_response_handler(const read_repair_mutation& mut, db
tracing::trace(tr_state, "Creating write handler for read repair token: {} endpoint: {}", mh->token(), endpoints);

// No rate limiting for read repair
return create_write_response_handler(std::move(mut.ermp), cl, type, std::move(mh), std::move(endpoints), inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate());
return create_write_response_handler(std::move(mut.ermp), cl, type, std::move(mh), std::move(endpoints), inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate(), retirable::no);
}

result<storage_proxy::response_id_type>
Expand All @@ -2945,7 +2942,7 @@ storage_proxy::create_write_response_handler(const std::tuple<lw_shared_ptr<paxo
auto& [commit, s, h, t] = meta;

return create_write_response_handler_helper(s, t, std::make_unique<cas_mutation>(std::move(commit), s, std::move(h)), cl,
db::write_type::CAS, tr_state, std::move(permit), allow_limit);
db::write_type::CAS, tr_state, std::move(permit), allow_limit, retirable::no);
}

result<storage_proxy::response_id_type>
Expand All @@ -2962,7 +2959,7 @@ storage_proxy::create_write_response_handler(const std::tuple<lw_shared_ptr<paxo

// No rate limiting for paxos (yet)
return create_write_response_handler(std::move(ermp), cl, db::write_type::CAS, std::make_unique<cas_mutation>(std::move(commit), s, nullptr), std::move(endpoints),
inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate());
inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate(), retirable::no);
}

void storage_proxy::register_cdc_operation_result_tracker(const storage_proxy::unique_response_handler_vector& ids, lw_shared_ptr<cdc::operation_result_tracker> tracker) {
Expand Down Expand Up @@ -3490,7 +3487,7 @@ storage_proxy::mutate_atomically_result(std::vector<mutation> mutations, db::con
return _p.mutate_prepare<>(std::array<mutation, 1>{std::move(m)}, cl, db::write_type::BATCH_LOG, _permit, [this] (const mutation& m, db::consistency_level cl, db::write_type type, service_permit permit) {
auto& table = _p._db.local().find_column_family(m.schema()->id());
auto ermp = table.get_effective_replication_map();
return _p.create_write_response_handler(std::move(ermp), cl, type, std::make_unique<shared_mutation>(m), _batchlog_endpoints, {}, {}, _trace_state, _stats, std::move(permit), std::monostate());
return _p.create_write_response_handler(std::move(ermp), cl, type, std::make_unique<shared_mutation>(m), _batchlog_endpoints, {}, {}, _trace_state, _stats, std::move(permit), std::monostate(), retirable::no);
}).then(utils::result_wrap([this, cl] (unique_response_handler_vector ids) {
_p.register_cdc_operation_result_tracker(ids, _cdc_tracker);
return _p.mutate_begin(std::move(ids), cl, _trace_state, _timeout);
Expand Down Expand Up @@ -3595,7 +3592,8 @@ future<> storage_proxy::send_to_endpoint(
db::write_type type,
tracing::trace_state_ptr tr_state,
write_stats& stats,
allow_hints allow_hints) {
allow_hints allow_hints,
retirable retirable) {
utils::latency_counter lc;
lc.start();

Expand All @@ -3607,7 +3605,7 @@ future<> storage_proxy::send_to_endpoint(
timeout = clock_type::now() + 5min;
}
return mutate_prepare(std::array{std::move(m)}, cl, type, /* does view building should hold a real permit */ empty_service_permit(),
[this, tr_state, target = std::array{target}, pending_endpoints = std::move(pending_endpoints), &stats] (
[this, tr_state, target = std::array{target}, pending_endpoints = std::move(pending_endpoints), &stats, retirable] (
std::unique_ptr<mutation_holder>& m,
db::consistency_level cl,
db::write_type type, service_permit permit) mutable {
Expand All @@ -3634,7 +3632,8 @@ future<> storage_proxy::send_to_endpoint(
tr_state,
stats,
std::move(permit),
std::monostate()); // TODO: Pass the correct enforcement type
std::monostate(), // TODO: Pass the correct enforcement type
retirable);
}).then(utils::result_wrap([this, cl, tr_state = std::move(tr_state), timeout = std::move(timeout)] (unique_response_handler_vector ids) mutable {
return mutate_begin(std::move(ids), cl, std::move(tr_state), std::move(timeout));
})).then_wrapped([p = shared_from_this(), lc, &stats] (future<result<>> f) {
Expand All @@ -3648,15 +3647,17 @@ future<> storage_proxy::send_to_endpoint(
inet_address_vector_topology_change pending_endpoints,
db::write_type type,
tracing::trace_state_ptr tr_state,
allow_hints allow_hints) {
allow_hints allow_hints,
retirable retirable) {
return send_to_endpoint(
std::make_unique<shared_mutation>(std::move(fm_a_s)),
std::move(target),
std::move(pending_endpoints),
type,
std::move(tr_state),
get_stats(),
allow_hints);
allow_hints,
retirable);
}

future<> storage_proxy::send_to_endpoint(
Expand All @@ -3666,15 +3667,17 @@ future<> storage_proxy::send_to_endpoint(
db::write_type type,
tracing::trace_state_ptr tr_state,
write_stats& stats,
allow_hints allow_hints) {
allow_hints allow_hints,
retirable retirable) {
return send_to_endpoint(
std::make_unique<shared_mutation>(std::move(fm_a_s)),
std::move(target),
std::move(pending_endpoints),
type,
std::move(tr_state),
stats,
allow_hints);
allow_hints,
retirable);
}

future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target) {
Expand All @@ -3686,7 +3689,8 @@ future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s,
db::write_type::SIMPLE,
tracing::trace_state_ptr(),
get_stats(),
allow_hints::no);
allow_hints::no,
retirable::yes);
}

return send_to_endpoint(
Expand All @@ -3696,7 +3700,8 @@ future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s,
db::write_type::SIMPLE,
tracing::trace_state_ptr(),
get_stats(),
allow_hints::no);
allow_hints::no,
retirable::yes);
}

future<> storage_proxy::send_hint_to_all_replicas(frozen_mutation_and_schema fm_a_s) {
Expand Down
13 changes: 8 additions & 5 deletions service/storage_proxy.hh
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ struct view_update_backlog_timestamped {
struct allow_hints_tag {};
using allow_hints = bool_class<allow_hints_tag>;

using retirable = bool_class<struct retirable_tag>;

struct storage_proxy_coordinator_query_result {
foreign_ptr<lw_shared_ptr<query::result>> query_result;
replicas_per_token_range last_replicas;
Expand Down Expand Up @@ -307,9 +309,9 @@ private:
::shared_ptr<abstract_write_response_handler>& get_write_response_handler(storage_proxy::response_id_type id);
result<response_id_type> create_write_response_handler_helper(schema_ptr s, const dht::token& token,
std::unique_ptr<mutation_holder> mh, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state,
service_permit permit, db::allow_per_partition_rate_limit allow_limit);
service_permit permit, db::allow_per_partition_rate_limit allow_limit, retirable);
result<response_id_type> create_write_response_handler(locator::effective_replication_map_ptr ermp, db::consistency_level cl, db::write_type type, std::unique_ptr<mutation_holder> m, inet_address_vector_replica_set targets,
const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info);
const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info, retirable);
result<response_id_type> create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
result<response_id_type> create_write_response_handler(const hint_wrapper&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
result<response_id_type> create_write_response_handler(const read_repair_mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
Expand Down Expand Up @@ -416,7 +418,8 @@ private:
db::write_type type,
tracing::trace_state_ptr tr_state,
write_stats& stats,
allow_hints allow_hints = allow_hints::yes);
allow_hints,
retirable);

db::view::update_backlog get_view_update_backlog() const;

Expand Down Expand Up @@ -565,9 +568,9 @@ public:
// hinted handoff support, and just one target. See also
// send_to_live_endpoints() - another take on the same original function.
future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, inet_address_vector_topology_change pending_endpoints, db::write_type type,
tracing::trace_state_ptr tr_state, write_stats& stats, allow_hints allow_hints = allow_hints::yes);
tracing::trace_state_ptr tr_state, write_stats& stats, allow_hints, retirable);
future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, inet_address_vector_topology_change pending_endpoints, db::write_type type,
tracing::trace_state_ptr tr_state, allow_hints allow_hints = allow_hints::yes);
tracing::trace_state_ptr tr_state, allow_hints, retirable);

// Send a mutation to a specific remote target as a hint.
// Unlike regular mutations during write operations, hints are sent on the streaming connection
Expand Down

0 comments on commit e6abd0a

Please sign in to comment.