Skip to content

Commit

Permalink
Merge 'schema_tables: limit concurrency' from Benny Halevy
Browse files Browse the repository at this point in the history
To prevent stalls due to large number of tables.

Fixes #11574

Closes #11689

* github.com:scylladb/scylladb:
  schema_tables: merge_tables_and_views reindent
  schema_tables: limit paralellism
  • Loading branch information
avikivity committed Oct 19, 2022
2 parents a979bbf + ce22dd4 commit 69199db
Showing 1 changed file with 30 additions and 23 deletions.
53 changes: 30 additions & 23 deletions db/schema_tables.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include <seastar/rpc/rpc_types.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/core/loop.hh>

#include <boost/algorithm/string/predicate.hpp>
#include <boost/range/algorithm/copy.hpp>
Expand Down Expand Up @@ -1087,6 +1088,13 @@ future<> store_column_mapping(distributed<service::storage_proxy>& proxy, schema
co_await proxy.local().mutate_locally(std::move(muts), tracing::trace_state_ptr());
}

// Limit concurrency of user tables to prevent stalls.
// See https://github.com/scylladb/scylladb/issues/11574
// Note: we aim at providing enough concurrency to utilize
// the cpu while operations are blocked on disk I/O
// and or filesystem calls, e.g. fsync.
constexpr size_t max_concurrent = 8;

static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std::vector<mutation> mutations, bool do_flush)
{
slogger.trace("do_merge_schema: {}", mutations);
Expand Down Expand Up @@ -1118,7 +1126,7 @@ static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std:

if (do_flush) {
auto& db = proxy.local().get_db();
co_await coroutine::parallel_for_each(column_families, [&db] (const table_id& id) -> future<> {
co_await max_concurrent_for_each(column_families, max_concurrent, [&db] (const table_id& id) -> future<> {
return replica::database::flush_table_on_all_shards(db, id);
});
}
Expand Down Expand Up @@ -1327,37 +1335,38 @@ static future<> merge_tables_and_views(distributed<service::storage_proxy>& prox
// was already dropped (see https://github.com/scylladb/scylla/issues/5614)
auto& db = proxy.local().get_db();
auto ts = db_clock::now();
co_await coroutine::parallel_for_each(views_diff.dropped, [&db, ts] (schema_diff::dropped_schema& dt) {
co_await max_concurrent_for_each(views_diff.dropped, max_concurrent, [&db, ts] (schema_diff::dropped_schema& dt) {
auto& s = *dt.schema.get();
return replica::database::drop_table_on_all_shards(db, s.ks_name(), s.cf_name());
});
co_await coroutine::parallel_for_each(tables_diff.dropped, [&db, ts] (schema_diff::dropped_schema& dt) -> future<> {
co_await max_concurrent_for_each(tables_diff.dropped, max_concurrent, [&db, ts] (schema_diff::dropped_schema& dt) -> future<> {
auto& s = *dt.schema.get();
return replica::database::drop_table_on_all_shards(db, s.ks_name(), s.cf_name());
});

co_await proxy.local().get_db().invoke_on_all([&] (replica::database& db) -> future<> {
// In order to avoid possible races we first create the tables and only then the views.
// That way if a view seeks information about its base table it's guarantied to find it.
co_await coroutine::parallel_for_each(tables_diff.created, [&] (global_schema_ptr& gs) -> future<> {
co_await max_concurrent_for_each(tables_diff.created, max_concurrent, [&] (global_schema_ptr& gs) -> future<> {
co_await db.add_column_family_and_make_directory(gs);
});
co_await coroutine::parallel_for_each(views_diff.created, [&] (global_schema_ptr& gs) -> future<> {
co_await max_concurrent_for_each(views_diff.created, max_concurrent, [&] (global_schema_ptr& gs) -> future<> {
co_await db.add_column_family_and_make_directory(gs);
});
for (auto&& gs : boost::range::join(tables_diff.created, views_diff.created)) {
db.find_column_family(gs).mark_ready_for_writes();
co_await coroutine::maybe_yield();
}
std::vector<bool> columns_changed;
columns_changed.reserve(tables_diff.altered.size() + views_diff.altered.size());
for (auto&& altered : boost::range::join(tables_diff.altered, views_diff.altered)) {
columns_changed.push_back(db.update_column_family(altered.new_schema));
co_await coroutine::maybe_yield();
}

auto it = columns_changed.begin();
auto notify = [&] (auto& r, auto&& f) -> future<> {
auto notifications = r | boost::adaptors::transformed(f);
co_await when_all(notifications.begin(), notifications.end());
co_await max_concurrent_for_each(r, max_concurrent, std::move(f));
};
// View drops are notified first, because a table can only be dropped if its views are already deleted
co_await notify(views_diff.dropped, [&] (auto&& dt) { return db.get_notifier().drop_view(view_ptr(dt.schema)); });
Expand All @@ -1380,20 +1389,18 @@ static future<> merge_tables_and_views(distributed<service::storage_proxy>& prox
//
// Drop column mapping entries for dropped tables since these will not be TTLed automatically
// and will stay there forever if we don't clean them up manually
co_await when_all_succeed(
parallel_for_each(tables_diff.created, [&proxy] (global_schema_ptr& gs) -> future<> {
co_await store_column_mapping(proxy, gs.get(), false);
}),
parallel_for_each(tables_diff.altered, [&proxy] (schema_diff::altered_schema& altered) -> future<> {
co_await when_all_succeed(
store_column_mapping(proxy, altered.old_schema.get(), true),
store_column_mapping(proxy, altered.new_schema.get(), false));
}),
parallel_for_each(tables_diff.dropped, [&proxy] (schema_diff::dropped_schema& dropped) -> future<> {
schema_ptr s = dropped.schema.get();
co_await drop_column_mapping(s->id(), s->version());
})
);
co_await max_concurrent_for_each(tables_diff.created, max_concurrent, [&proxy] (global_schema_ptr& gs) -> future<> {
co_await store_column_mapping(proxy, gs.get(), false);
});
co_await max_concurrent_for_each(tables_diff.altered, max_concurrent, [&proxy] (schema_diff::altered_schema& altered) -> future<> {
co_await when_all_succeed(
store_column_mapping(proxy, altered.old_schema.get(), true),
store_column_mapping(proxy, altered.new_schema.get(), false));
});
co_await max_concurrent_for_each(tables_diff.dropped, max_concurrent, [&proxy] (schema_diff::dropped_schema& dropped) -> future<> {
schema_ptr s = dropped.schema.get();
co_await drop_column_mapping(s->id(), s->version());
});
}

static std::vector<const query::result_set_row*> collect_rows(const std::set<sstring>& keys, const schema_result& result) {
Expand Down Expand Up @@ -2691,7 +2698,7 @@ future<schema_ptr> create_table_from_name(distributed<service::storage_proxy>& p
future<std::map<sstring, schema_ptr>> create_tables_from_tables_partition(distributed<service::storage_proxy>& proxy, const schema_result::mapped_type& result)
{
auto tables = std::map<sstring, schema_ptr>();
co_await coroutine::parallel_for_each(result->rows().begin(), result->rows().end(), [&] (const query::result_set_row& row) -> future<> {
co_await max_concurrent_for_each(result->rows().begin(), result->rows().end(), max_concurrent, [&] (const query::result_set_row& row) -> future<> {
schema_ptr cfm = co_await create_table_from_table_row(proxy, row);
tables.emplace(cfm->cf_name(), std::move(cfm));
});
Expand Down Expand Up @@ -3190,7 +3197,7 @@ static future<view_ptr> create_view_from_table_row(distributed<service::storage_
future<std::vector<view_ptr>> create_views_from_schema_partition(distributed<service::storage_proxy>& proxy, const schema_result::mapped_type& result)
{
std::vector<view_ptr> views;
co_await coroutine::parallel_for_each(result->rows().begin(), result->rows().end(), [&] (auto&& row) -> future<> {
co_await max_concurrent_for_each(result->rows().begin(), result->rows().end(), max_concurrent, [&] (auto&& row) -> future<> {
auto v = co_await create_view_from_table_row(proxy, row);
views.push_back(std::move(v));
});
Expand Down

0 comments on commit 69199db

Please sign in to comment.