Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage_proxy: place unique_response_handler:s in small_vector instead of std::vector #8606

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 19 additions & 11 deletions service/storage_proxy.cc
Expand Up @@ -1811,7 +1811,15 @@ storage_proxy::storage_proxy(distributed<database>& db, storage_proxy::config cf
}

storage_proxy::unique_response_handler::unique_response_handler(storage_proxy& p_, response_id_type id_) : id(id_), p(p_) {}
storage_proxy::unique_response_handler::unique_response_handler(unique_response_handler&& x) : id(x.id), p(x.p) { x.id = 0; };
storage_proxy::unique_response_handler::unique_response_handler(unique_response_handler&& x) noexcept : id(x.id), p(x.p) { x.id = 0; };

storage_proxy::unique_response_handler&
storage_proxy::unique_response_handler::operator=(unique_response_handler&& x) noexcept {
// this->p must equal x.p
nyh marked this conversation as resolved.
Show resolved Hide resolved
id = std::exchange(x.id, 0);
return *this;
}

storage_proxy::unique_response_handler::~unique_response_handler() {
if (id) {
p.remove_response_handler(id);
Expand Down Expand Up @@ -2012,7 +2020,7 @@ storage_proxy::create_write_response_handler(const std::tuple<lw_shared_ptr<paxo
inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit));
}

void storage_proxy::register_cdc_operation_result_tracker(const std::vector<storage_proxy::unique_response_handler>& ids, lw_shared_ptr<cdc::operation_result_tracker> tracker) {
void storage_proxy::register_cdc_operation_result_tracker(const storage_proxy::unique_response_handler_vector& ids, lw_shared_ptr<cdc::operation_result_tracker> tracker) {
if (!tracker) {
return;
}
Expand All @@ -2038,26 +2046,26 @@ storage_proxy::hint_to_dead_endpoints(response_id_type id, db::consistency_level
}

template<typename Range, typename CreateWriteHandler>
future<std::vector<storage_proxy::unique_response_handler>> storage_proxy::mutate_prepare(Range&& mutations, db::consistency_level cl, db::write_type type, service_permit permit, CreateWriteHandler create_handler) {
future<storage_proxy::unique_response_handler_vector> storage_proxy::mutate_prepare(Range&& mutations, db::consistency_level cl, db::write_type type, service_permit permit, CreateWriteHandler create_handler) {
// apply is used to convert exceptions to exceptional future
return futurize_invoke([this] (Range&& mutations, db::consistency_level cl, db::write_type type, service_permit permit, CreateWriteHandler create_handler) {
std::vector<unique_response_handler> ids;
unique_response_handler_vector ids;
ids.reserve(std::distance(std::begin(mutations), std::end(mutations)));
for (auto& m : mutations) {
ids.emplace_back(*this, create_handler(m, cl, type, permit));
}
return make_ready_future<std::vector<unique_response_handler>>(std::move(ids));
return make_ready_future<unique_response_handler_vector>(std::move(ids));
}, std::forward<Range>(mutations), cl, type, std::move(permit), std::move(create_handler));
}

template<typename Range>
future<std::vector<storage_proxy::unique_response_handler>> storage_proxy::mutate_prepare(Range&& mutations, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit) {
future<storage_proxy::unique_response_handler_vector> storage_proxy::mutate_prepare(Range&& mutations, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit) {
return mutate_prepare<>(std::forward<Range>(mutations), cl, type, std::move(permit), [this, tr_state = std::move(tr_state)] (const typename std::decay_t<Range>::value_type& m, db::consistency_level cl, db::write_type type, service_permit permit) mutable {
return create_write_response_handler(m, cl, type, tr_state, std::move(permit));
});
}

future<> storage_proxy::mutate_begin(std::vector<unique_response_handler> ids, db::consistency_level cl,
future<> storage_proxy::mutate_begin(unique_response_handler_vector ids, db::consistency_level cl,
tracing::trace_state_ptr trace_state, std::optional<clock_type::time_point> timeout_opt) {
return parallel_for_each(ids, [this, cl, timeout_opt] (unique_response_handler& protected_response) {
auto response_id = protected_response.id;
Expand Down Expand Up @@ -2342,7 +2350,7 @@ storage_proxy::mutate_internal(Range mutations, db::consistency_level cl, bool c
lc.start();

return mutate_prepare(mutations, cl, type, tr_state, std::move(permit)).then([this, cl, timeout_opt, tracker = std::move(cdc_tracker),
tr_state] (std::vector<storage_proxy::unique_response_handler> ids) mutable {
tr_state] (storage_proxy::unique_response_handler_vector ids) mutable {
register_cdc_operation_result_tracker(ids, tracker);
return mutate_begin(std::move(ids), cl, tr_state, timeout_opt);
}).then_wrapped([this, p = shared_from_this(), lc, tr_state] (future<> f) mutable {
Expand Down Expand Up @@ -2427,7 +2435,7 @@ storage_proxy::mutate_atomically(std::vector<mutation> mutations, db::consistenc
return _p.mutate_prepare<>(std::array<mutation, 1>{std::move(m)}, cl, db::write_type::BATCH_LOG, _permit, [this] (const mutation& m, db::consistency_level cl, db::write_type type, service_permit permit) {
auto& ks = _p._db.local().find_keyspace(m.schema()->ks_name());
return _p.create_write_response_handler(ks, cl, type, std::make_unique<shared_mutation>(m), _batchlog_endpoints, {}, {}, _trace_state, _stats, std::move(permit));
}).then([this, cl] (std::vector<unique_response_handler> ids) {
}).then([this, cl] (unique_response_handler_vector ids) {
_p.register_cdc_operation_result_tracker(ids, _cdc_tracker);
return _p.mutate_begin(std::move(ids), cl, _trace_state, _timeout);
});
Expand All @@ -2452,7 +2460,7 @@ storage_proxy::mutate_atomically(std::vector<mutation> mutations, db::consistenc
};

future<> run() {
return _p.mutate_prepare(_mutations, _cl, db::write_type::BATCH, _trace_state, _permit).then([this] (std::vector<unique_response_handler> ids) {
return _p.mutate_prepare(_mutations, _cl, db::write_type::BATCH, _trace_state, _permit).then([this] (unique_response_handler_vector ids) {
return sync_write_to_batchlog().then([this, ids = std::move(ids)] () mutable {
tracing::trace(_trace_state, "Sending batch mutations");
_p.register_cdc_operation_result_tracker(ids, _cdc_tracker);
Expand Down Expand Up @@ -2539,7 +2547,7 @@ future<> storage_proxy::send_to_endpoint(
tr_state,
stats,
std::move(permit));
}).then([this, cl, tr_state = std::move(tr_state), timeout = std::move(timeout)] (std::vector<unique_response_handler> ids) mutable {
}).then([this, cl, tr_state = std::move(tr_state), timeout = std::move(timeout)] (unique_response_handler_vector ids) mutable {
return mutate_begin(std::move(ids), cl, std::move(tr_state), std::move(timeout));
}).then_wrapped([p = shared_from_this(), lc, &stats] (future<>&& f) {
return p->mutate_end(std::move(f), lc, stats, nullptr);
Expand Down
13 changes: 8 additions & 5 deletions service/storage_proxy.hh
Expand Up @@ -66,6 +66,7 @@
#include "cdc/stats.hh"
#include "locator/token_metadata.hh"
#include "db/hints/host_filter.hh"
#include "utils/small_vector.hh"

class reconcilable_result;
class frozen_mutation_and_schema;
Expand Down Expand Up @@ -194,10 +195,12 @@ private:
unique_response_handler(storage_proxy& p_, response_id_type id_);
unique_response_handler(const unique_response_handler&) = delete;
unique_response_handler& operator=(const unique_response_handler&) = delete;
unique_response_handler(unique_response_handler&& x);
unique_response_handler(unique_response_handler&& x) noexcept;
unique_response_handler& operator=(unique_response_handler&&) noexcept;
~unique_response_handler();
response_id_type release();
};
using unique_response_handler_vector = utils::small_vector<unique_response_handler, 1>;
using response_handlers_map = std::unordered_map<response_id_type, ::shared_ptr<abstract_write_response_handler>>;

public:
Expand Down Expand Up @@ -346,7 +349,7 @@ private:
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
response_id_type create_write_response_handler(const std::tuple<lw_shared_ptr<paxos::proposal>, schema_ptr, dht::token, std::unordered_set<gms::inet_address>>& meta,
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
void register_cdc_operation_result_tracker(const std::vector<storage_proxy::unique_response_handler>& ids, lw_shared_ptr<cdc::operation_result_tracker> tracker);
void register_cdc_operation_result_tracker(const storage_proxy::unique_response_handler_vector& ids, lw_shared_ptr<cdc::operation_result_tracker> tracker);
void send_to_live_endpoints(response_id_type response_id, clock_type::time_point timeout);
template<typename Range>
size_t hint_to_dead_endpoints(std::unique_ptr<mutation_holder>& mh, const Range& targets, db::write_type type, tracing::trace_state_ptr tr_state) noexcept;
Expand Down Expand Up @@ -404,10 +407,10 @@ private:
db::consistency_level cl,
coordinator_query_options optional_params);
template<typename Range, typename CreateWriteHandler>
future<std::vector<unique_response_handler>> mutate_prepare(Range&& mutations, db::consistency_level cl, db::write_type type, service_permit permit, CreateWriteHandler handler);
future<unique_response_handler_vector> mutate_prepare(Range&& mutations, db::consistency_level cl, db::write_type type, service_permit permit, CreateWriteHandler handler);
template<typename Range>
future<std::vector<unique_response_handler>> mutate_prepare(Range&& mutations, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
future<> mutate_begin(std::vector<unique_response_handler> ids, db::consistency_level cl, tracing::trace_state_ptr trace_state, std::optional<clock_type::time_point> timeout_opt = { });
future<unique_response_handler_vector> mutate_prepare(Range&& mutations, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
future<> mutate_begin(unique_response_handler_vector ids, db::consistency_level cl, tracing::trace_state_ptr trace_state, std::optional<clock_type::time_point> timeout_opt = { });
future<> mutate_end(future<> mutate_result, utils::latency_counter, write_stats& stats, tracing::trace_state_ptr trace_state);
future<> schedule_repair(std::unordered_map<dht::token, std::unordered_map<gms::inet_address, std::optional<mutation>>> diffs, db::consistency_level cl, tracing::trace_state_ptr trace_state, service_permit permit);
bool need_throttle_writes() const;
Expand Down