Skip to content

Commit

Permalink
service: storage_proxy: make hint write handlers cancellable
Browse files Browse the repository at this point in the history
Whether a write handler should be cancellable is now controlled by a
parameter passed to `create_write_response_handler`. We plumb it down
from `send_to_endpoint` which is called by hints manager.

This will cause hint write handlers to immediately timeout when we
shutdown or when a destination node is marked as dead.

Fixes scylladb#8079
  • Loading branch information
kbr-scylla authored and margdoc committed Jun 6, 2023
1 parent 1b7765e commit 3a88c1c
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 27 deletions.
3 changes: 2 additions & 1 deletion db/view/view.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1605,7 +1605,8 @@ static future<> apply_to_remote_endpoints(service::storage_proxy& proxy, gms::in
std::move(pending_endpoints),
db::write_type::VIEW,
std::move(tr_state),
allow_hints);
allow_hints,
service::is_cancellable::yes);
}

static bool should_update_synchronously(const schema& s) {
Expand Down
47 changes: 26 additions & 21 deletions service/storage_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1165,8 +1165,6 @@ class cas_mutation : public mutation_holder {
};
};

using is_cancellable = bool_class<struct cancellable_tag>;

class abstract_write_response_handler : public seastar::enable_shared_from_this<abstract_write_response_handler>, public bi::list_base_hook<bi::link_mode<bi::auto_unlink>> {
protected:
using error = storage_proxy::error;
Expand Down Expand Up @@ -2307,7 +2305,7 @@ ::shared_ptr<abstract_write_response_handler>& storage_proxy::get_write_response
result<storage_proxy::response_id_type> storage_proxy::create_write_response_handler(locator::effective_replication_map_ptr ermp,
db::consistency_level cl, db::write_type type, std::unique_ptr<mutation_holder> m,
inet_address_vector_replica_set targets, const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state,
storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info)
storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info, is_cancellable cancellable)
{
shared_ptr<abstract_write_response_handler> h;
auto& rs = ermp->get_replication_strategy();
Expand All @@ -2317,7 +2315,6 @@ result<storage_proxy::response_id_type> storage_proxy::create_write_response_han
} else if (cl == db::consistency_level::EACH_QUORUM && rs.get_type() == locator::replication_strategy_type::network_topology){
h = ::make_shared<datacenter_sync_write_response_handler>(shared_from_this(), std::move(ermp), cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info);
} else {
is_cancellable cancellable{type == db::write_type::VIEW};
h = ::make_shared<write_response_handler>(shared_from_this(), std::move(ermp), cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info, cancellable);
}
return bo::success(register_response_handler(std::move(h)));
Expand Down Expand Up @@ -2843,7 +2840,7 @@ storage_proxy::mutate_counter_on_leader_and_replicate(const schema_ptr& s, froze

result<storage_proxy::response_id_type>
storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::token& token, std::unique_ptr<mutation_holder> mh,
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit) {
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit, is_cancellable cancellable) {
replica::table& table = _db.local().find_column_family(s->id());
auto erm = table.get_effective_replication_map();
inet_address_vector_replica_set natural_endpoints = erm->get_natural_endpoints_without_node_being_replaced(token);
Expand Down Expand Up @@ -2907,7 +2904,7 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok
db::assure_sufficient_live_nodes(cl, *erm, live_endpoints, pending_endpoints);

return create_write_response_handler(std::move(erm), cl, type, std::move(mh), std::move(live_endpoints), pending_endpoints,
std::move(dead_endpoints), std::move(tr_state), get_stats(), std::move(permit), rate_limit_info);
std::move(dead_endpoints), std::move(tr_state), get_stats(), std::move(permit), rate_limit_info, cancellable);
}

/**
Expand All @@ -2920,13 +2917,13 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok
result<storage_proxy::response_id_type>
storage_proxy::create_write_response_handler(const mutation& m, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit) {
return create_write_response_handler_helper(m.schema(), m.token(), std::make_unique<shared_mutation>(m), cl, type, tr_state,
std::move(permit), allow_limit);
std::move(permit), allow_limit, is_cancellable::no);
}

result<storage_proxy::response_id_type>
storage_proxy::create_write_response_handler(const hint_wrapper& h, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit) {
return create_write_response_handler_helper(h.mut.schema(), h.mut.token(), std::make_unique<hint_mutation>(h.mut), cl, type, tr_state,
std::move(permit), allow_limit);
std::move(permit), allow_limit, is_cancellable::yes);
}

result<storage_proxy::response_id_type>
Expand All @@ -2941,7 +2938,7 @@ storage_proxy::create_write_response_handler(const read_repair_mutation& mut, db
tracing::trace(tr_state, "Creating write handler for read repair token: {} endpoint: {}", mh->token(), endpoints);

// No rate limiting for read repair
return create_write_response_handler(std::move(mut.ermp), cl, type, std::move(mh), std::move(endpoints), inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate());
return create_write_response_handler(std::move(mut.ermp), cl, type, std::move(mh), std::move(endpoints), inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate(), is_cancellable::no);
}

result<storage_proxy::response_id_type>
Expand All @@ -2950,7 +2947,7 @@ storage_proxy::create_write_response_handler(const std::tuple<lw_shared_ptr<paxo
auto& [commit, s, h, t] = meta;

return create_write_response_handler_helper(s, t, std::make_unique<cas_mutation>(std::move(commit), s, std::move(h)), cl,
db::write_type::CAS, tr_state, std::move(permit), allow_limit);
db::write_type::CAS, tr_state, std::move(permit), allow_limit, is_cancellable::no);
}

result<storage_proxy::response_id_type>
Expand All @@ -2967,7 +2964,7 @@ storage_proxy::create_write_response_handler(const std::tuple<lw_shared_ptr<paxo

// No rate limiting for paxos (yet)
return create_write_response_handler(std::move(ermp), cl, db::write_type::CAS, std::make_unique<cas_mutation>(std::move(commit), s, nullptr), std::move(endpoints),
inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate());
inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate(), is_cancellable::no);
}

void storage_proxy::register_cdc_operation_result_tracker(const storage_proxy::unique_response_handler_vector& ids, lw_shared_ptr<cdc::operation_result_tracker> tracker) {
Expand Down Expand Up @@ -3495,7 +3492,7 @@ storage_proxy::mutate_atomically_result(std::vector<mutation> mutations, db::con
return _p.mutate_prepare<>(std::array<mutation, 1>{std::move(m)}, cl, db::write_type::BATCH_LOG, _permit, [this] (const mutation& m, db::consistency_level cl, db::write_type type, service_permit permit) {
auto& table = _p._db.local().find_column_family(m.schema()->id());
auto ermp = table.get_effective_replication_map();
return _p.create_write_response_handler(std::move(ermp), cl, type, std::make_unique<shared_mutation>(m), _batchlog_endpoints, {}, {}, _trace_state, _stats, std::move(permit), std::monostate());
return _p.create_write_response_handler(std::move(ermp), cl, type, std::make_unique<shared_mutation>(m), _batchlog_endpoints, {}, {}, _trace_state, _stats, std::move(permit), std::monostate(), is_cancellable::no);
}).then(utils::result_wrap([this, cl] (unique_response_handler_vector ids) {
_p.register_cdc_operation_result_tracker(ids, _cdc_tracker);
return _p.mutate_begin(std::move(ids), cl, _trace_state, _timeout);
Expand Down Expand Up @@ -3600,7 +3597,8 @@ future<> storage_proxy::send_to_endpoint(
db::write_type type,
tracing::trace_state_ptr tr_state,
write_stats& stats,
allow_hints allow_hints) {
allow_hints allow_hints,
is_cancellable cancellable) {
utils::latency_counter lc;
lc.start();

Expand All @@ -3612,7 +3610,7 @@ future<> storage_proxy::send_to_endpoint(
timeout = clock_type::now() + 5min;
}
return mutate_prepare(std::array{std::move(m)}, cl, type, /* does view building should hold a real permit */ empty_service_permit(),
[this, tr_state, target = std::array{target}, pending_endpoints = std::move(pending_endpoints), &stats] (
[this, tr_state, target = std::array{target}, pending_endpoints = std::move(pending_endpoints), &stats, cancellable] (
std::unique_ptr<mutation_holder>& m,
db::consistency_level cl,
db::write_type type, service_permit permit) mutable {
Expand All @@ -3639,7 +3637,8 @@ future<> storage_proxy::send_to_endpoint(
tr_state,
stats,
std::move(permit),
std::monostate()); // TODO: Pass the correct enforcement type
std::monostate(), // TODO: Pass the correct enforcement type
cancellable);
}).then(utils::result_wrap([this, cl, tr_state = std::move(tr_state), timeout = std::move(timeout)] (unique_response_handler_vector ids) mutable {
return mutate_begin(std::move(ids), cl, std::move(tr_state), std::move(timeout));
})).then_wrapped([p = shared_from_this(), lc, &stats] (future<result<>> f) {
Expand All @@ -3653,15 +3652,17 @@ future<> storage_proxy::send_to_endpoint(
inet_address_vector_topology_change pending_endpoints,
db::write_type type,
tracing::trace_state_ptr tr_state,
allow_hints allow_hints) {
allow_hints allow_hints,
is_cancellable cancellable) {
return send_to_endpoint(
std::make_unique<shared_mutation>(std::move(fm_a_s)),
std::move(target),
std::move(pending_endpoints),
type,
std::move(tr_state),
get_stats(),
allow_hints);
allow_hints,
cancellable);
}

future<> storage_proxy::send_to_endpoint(
Expand All @@ -3671,15 +3672,17 @@ future<> storage_proxy::send_to_endpoint(
db::write_type type,
tracing::trace_state_ptr tr_state,
write_stats& stats,
allow_hints allow_hints) {
allow_hints allow_hints,
is_cancellable cancellable) {
return send_to_endpoint(
std::make_unique<shared_mutation>(std::move(fm_a_s)),
std::move(target),
std::move(pending_endpoints),
type,
std::move(tr_state),
stats,
allow_hints);
allow_hints,
cancellable);
}

future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target) {
Expand All @@ -3691,7 +3694,8 @@ future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s,
db::write_type::SIMPLE,
tracing::trace_state_ptr(),
get_stats(),
allow_hints::no);
allow_hints::no,
is_cancellable::yes);
}

return send_to_endpoint(
Expand All @@ -3701,7 +3705,8 @@ future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s,
db::write_type::SIMPLE,
tracing::trace_state_ptr(),
get_stats(),
allow_hints::no);
allow_hints::no,
is_cancellable::yes);
}

future<> storage_proxy::send_hint_to_all_replicas(frozen_mutation_and_schema fm_a_s) {
Expand Down
13 changes: 8 additions & 5 deletions service/storage_proxy.hh
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ struct view_update_backlog_timestamped {
struct allow_hints_tag {};
using allow_hints = bool_class<allow_hints_tag>;

using is_cancellable = bool_class<struct cancellable_tag>;

struct storage_proxy_coordinator_query_result {
foreign_ptr<lw_shared_ptr<query::result>> query_result;
replicas_per_token_range last_replicas;
Expand Down Expand Up @@ -307,9 +309,9 @@ private:
::shared_ptr<abstract_write_response_handler>& get_write_response_handler(storage_proxy::response_id_type id);
result<response_id_type> create_write_response_handler_helper(schema_ptr s, const dht::token& token,
std::unique_ptr<mutation_holder> mh, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state,
service_permit permit, db::allow_per_partition_rate_limit allow_limit);
service_permit permit, db::allow_per_partition_rate_limit allow_limit, is_cancellable);
result<response_id_type> create_write_response_handler(locator::effective_replication_map_ptr ermp, db::consistency_level cl, db::write_type type, std::unique_ptr<mutation_holder> m, inet_address_vector_replica_set targets,
const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info);
const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info, is_cancellable);
result<response_id_type> create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
result<response_id_type> create_write_response_handler(const hint_wrapper&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
result<response_id_type> create_write_response_handler(const read_repair_mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
Expand Down Expand Up @@ -416,7 +418,8 @@ private:
db::write_type type,
tracing::trace_state_ptr tr_state,
write_stats& stats,
allow_hints allow_hints = allow_hints::yes);
allow_hints,
is_cancellable);

db::view::update_backlog get_view_update_backlog() const;

Expand Down Expand Up @@ -565,9 +568,9 @@ public:
// hinted handoff support, and just one target. See also
// send_to_live_endpoints() - another take on the same original function.
future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, inet_address_vector_topology_change pending_endpoints, db::write_type type,
tracing::trace_state_ptr tr_state, write_stats& stats, allow_hints allow_hints = allow_hints::yes);
tracing::trace_state_ptr tr_state, write_stats& stats, allow_hints, is_cancellable);
future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, inet_address_vector_topology_change pending_endpoints, db::write_type type,
tracing::trace_state_ptr tr_state, allow_hints allow_hints = allow_hints::yes);
tracing::trace_state_ptr tr_state, allow_hints, is_cancellable);

// Send a mutation to a specific remote target as a hint.
// Unlike regular mutations during write operations, hints are sent on the streaming connection
Expand Down

0 comments on commit 3a88c1c

Please sign in to comment.