Skip to content

Commit

Permalink
Merge 'Make replica::database and cql3::query_processor share wasm ma…
Browse files Browse the repository at this point in the history
…nager' from Pavel Emelyanov

This makes it possible to remove remaining users of the global qctx.

The thing is that db::schema_tables code needs to get wasm's engine, alien runner and instance cache to build wasm context for the merged function or to drop it from cache in the opposite case. To get the wasm stuff, this code uses global qctx -> query_processor -> wasm chain. However, the functions (un)merging code already has the database reference at hand, and its natural to get wasm stuff from it, not from the q.p. which is not available

So this PR packs the wasm engine, runner and cache on sharded<wasm::manager> instance, makes the manager be referenced by both q.p. and database and removes the qctx from schema tables code

Closes #14933

* github.com:scylladb/scylladb:
  schema_tables: Stop using qctx
  database: Add wasm::manager& dependency
  main, cql_test_env, wasm: Start wasm::manager earlier
  wasm: Shuffle context::context()
  wasm: Add manager::remove()
  wasm: Add manager::precompile()
  wasm: Move stop() out of query_processor
  wasm: Make wasm sharded<manager>
  query_processor: Wrap wasm stuff in a struct
  • Loading branch information
avikivity committed Aug 6, 2023
2 parents 412629a + fd50ba8 commit 6c1e44e
Show file tree
Hide file tree
Showing 11 changed files with 85 additions and 45 deletions.
8 changes: 2 additions & 6 deletions cql3/query_processor.cc
Expand Up @@ -58,7 +58,7 @@ static service::query_state query_state_for_internal_call() {
return {service::client_state::for_internal_calls(), empty_service_permit()};
}

query_processor::query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, query_processor::memory_config mcfg, cql_config& cql_cfg, utils::loading_cache_config auth_prep_cache_cfg, std::optional<wasm::startup_context> wasm_ctx)
query_processor::query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, query_processor::memory_config mcfg, cql_config& cql_cfg, utils::loading_cache_config auth_prep_cache_cfg, wasm::manager& wasm)
: _migration_subscriber{std::make_unique<migration_subscriber>(this)}
, _proxy(proxy)
, _db(db)
Expand All @@ -71,9 +71,7 @@ query_processor::query_processor(service::storage_proxy& proxy, data_dictionary:
, _authorized_prepared_cache_config_action([this] { update_authorized_prepared_cache_config(); return make_ready_future<>(); })
, _authorized_prepared_cache_update_interval_in_ms_observer(_db.get_config().permissions_update_interval_in_ms.observe(_auth_prepared_cache_cfg_cb))
, _authorized_prepared_cache_validity_in_ms_observer(_db.get_config().permissions_validity_in_ms.observe(_auth_prepared_cache_cfg_cb))
, _wasm_engine(wasm_ctx ? std::move(wasm_ctx->engine) : nullptr)
, _wasm_instance_cache(wasm_ctx ? std::make_optional<wasm::instance_cache>(wasm_ctx->cache_size, wasm_ctx->instance_size, wasm_ctx->timer_period) : std::nullopt)
, _alien_runner(wasm_ctx ? std::move(wasm_ctx->alien_runner) : nullptr)
, _wasm(wasm)
{
namespace sm = seastar::metrics;
namespace stm = statements;
Expand Down Expand Up @@ -483,8 +481,6 @@ future<> query_processor::stop_remote() {
future<> query_processor::stop() {
return _mnotifier.unregister_listener(_migration_subscriber.get()).then([this] {
return _authorized_prepared_cache.stop().finally([this] { return _prepared_cache.stop(); });
}).then([this] {
return _wasm_instance_cache ? _wasm_instance_cache->stop() : make_ready_future<>();
});
}

Expand Down
21 changes: 4 additions & 17 deletions cql3/query_processor.hh
Expand Up @@ -21,14 +21,13 @@
#include "cql3/authorized_prepared_statements_cache.hh"
#include "cql3/statements/prepared_statement.hh"
#include "exceptions/exceptions.hh"
#include "lang/wasm_instance_cache.hh"
#include "service/migration_listener.hh"
#include "transport/messages/result_message.hh"
#include "service/qos/service_level_controller.hh"
#include "service/client_state.hh"
#include "service/broadcast_tables/experimental/query_result.hh"
#include "utils/observable.hh"
#include "lang/wasm_alien_thread_runner.hh"
#include "lang/wasm.hh"


namespace service {
Expand Down Expand Up @@ -129,9 +128,7 @@ private:
// don't bother with expiration on those.
std::unordered_map<sstring, std::unique_ptr<statements::prepared_statement>> _internal_statements;

std::shared_ptr<rust::Box<wasmtime::Engine>> _wasm_engine;
std::optional<wasm::instance_cache> _wasm_instance_cache;
std::shared_ptr<wasm::alien_thread_runner> _alien_runner;
wasm::manager& _wasm;
public:
static const sstring CQL_VERSION;

Expand All @@ -146,7 +143,7 @@ public:
static std::unique_ptr<statements::raw::parsed_statement> parse_statement(const std::string_view& query);
static std::vector<std::unique_ptr<statements::raw::parsed_statement>> parse_statements(std::string_view queries);

query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, memory_config mcfg, cql_config& cql_cfg, utils::loading_cache_config auth_prep_cache_cfg, std::optional<wasm::startup_context> wasm_ctx);
query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, memory_config mcfg, cql_config& cql_cfg, utils::loading_cache_config auth_prep_cache_cfg, wasm::manager& wasm);

~query_processor();

Expand All @@ -169,17 +166,7 @@ public:
return _cql_stats;
}

wasmtime::Engine& wasm_engine() {
return **_wasm_engine;
}

wasm::instance_cache& wasm_instance_cache() {
return *_wasm_instance_cache;
}

wasm::alien_thread_runner& alien_runner() {
return *_alien_runner;
}
wasm::manager& wasm() { return _wasm; }

statements::prepared_statement::checked_weak_ptr get_prepared(const std::optional<auth::authenticated_user>& user, const prepared_cache_key_type& key) {
if (user) {
Expand Down
4 changes: 2 additions & 2 deletions cql3/statements/create_function_statement.cc
Expand Up @@ -49,9 +49,9 @@ seastar::future<shared_ptr<functions::function>> create_function_statement::crea
std::move(return_type), _called_on_null_input, std::move(ctx));
} else if (_language == "wasm") {
// FIXME: need better way to test wasm compilation without real_database()
wasm::context ctx{qp.wasm_engine(), _name.name, qp.wasm_instance_cache(), db.get_config().wasm_udf_yield_fuel(), db.get_config().wasm_udf_total_fuel()};
wasm::context ctx(qp.wasm(), _name.name, db.get_config().wasm_udf_yield_fuel(), db.get_config().wasm_udf_total_fuel());
try {
co_await wasm::precompile(qp.alien_runner(), ctx, arg_names, _body);
co_await qp.wasm().precompile(ctx, arg_names, _body);
co_return ::make_shared<functions::user_function>(_name, _arg_types, std::move(arg_names), _body, _language,
std::move(return_type), _called_on_null_input, std::move(ctx));
} catch (const wasm::exception& we) {
Expand Down
6 changes: 3 additions & 3 deletions db/schema_tables.cc
Expand Up @@ -1917,8 +1917,8 @@ static seastar::future<shared_ptr<cql3::functions::user_function>> create_func(r
std::move(body), language, std::move(return_type),
row.get_nonnull<bool>("called_on_null_input"), std::move(ctx));
} else if (language == "wasm") {
wasm::context ctx{qctx->qp().wasm_engine(), name.name, qctx->qp().wasm_instance_cache(), db.get_config().wasm_udf_yield_fuel(), db.get_config().wasm_udf_total_fuel()};
co_await wasm::precompile(qctx->qp().alien_runner(), ctx, arg_names, body);
wasm::context ctx(db.wasm(), name.name, db.get_config().wasm_udf_yield_fuel(), db.get_config().wasm_udf_total_fuel());
co_await db.wasm().precompile(ctx, arg_names, body);
co_return ::make_shared<cql3::functions::user_function>(std::move(name), std::move(arg_types), std::move(arg_names),
std::move(body), language, std::move(return_type),
row.get_nonnull<bool>("called_on_null_input"), std::move(ctx));
Expand Down Expand Up @@ -1986,7 +1986,7 @@ static void drop_cached_func(replica::database& db, const query::result_set_row&
cql3::functions::function_name name{
row.get_nonnull<sstring>("keyspace_name"), row.get_nonnull<sstring>("function_name")};
auto arg_types = read_arg_types(db, row, name.keyspace);
qctx->qp().wasm_instance_cache().remove(name, arg_types);
db.wasm().remove(name, arg_types);
}
}

Expand Down
20 changes: 20 additions & 0 deletions lang/wasm.cc
Expand Up @@ -35,6 +35,18 @@ startup_context::startup_context(db::config& cfg, replica::database_config& dbcf
, timer_period(std::chrono::milliseconds(cfg.wasm_cache_timeout_in_ms())) {
}

manager::manager(const std::optional<wasm::startup_context>& ctx)
: _engine(ctx ? ctx->engine : nullptr)
, _instance_cache(ctx ? std::make_optional<wasm::instance_cache>(ctx->cache_size, ctx->instance_size, ctx->timer_period) : std::nullopt)
, _alien_runner(ctx ? ctx->alien_runner : nullptr)
{}

future<> manager::stop() {
if (_instance_cache) {
co_await _instance_cache->stop();
}
}

context::context(wasmtime::Engine& engine_ptr, std::string name, instance_cache& cache, uint64_t yield_fuel, uint64_t total_fuel)
: engine_ptr(engine_ptr)
, function_name(name)
Expand All @@ -43,6 +55,10 @@ context::context(wasmtime::Engine& engine_ptr, std::string name, instance_cache&
, total_fuel(total_fuel) {
}

context::context(manager& manager, std::string name, uint64_t yield_fuel, uint64_t total_fuel)
: context(**manager._engine, std::move(name), *manager._instance_cache, yield_fuel, total_fuel)
{ }

static constexpr size_t WASM_PAGE_SIZE = 64 * 1024;

static void init_abstract_arg(const abstract_type& t, const bytes_opt& param, wasmtime::ValVec& argv, wasmtime::Store& store, wasmtime::Instance& instance) {
Expand Down Expand Up @@ -227,6 +243,10 @@ struct from_val_visitor {
}
};

seastar::future<> manager::precompile(context& ctx, const std::vector<sstring>& arg_names, std::string script) {
return ::wasm::precompile(*_alien_runner, ctx, arg_names, std::move(script));
}

seastar::future<> precompile(alien_thread_runner& alien_runner, context& ctx, const std::vector<sstring>& arg_names, std::string script) {
seastar::promise<rust::Box<wasmtime::Module>> done;
alien_runner.submit(done, [&engine_ptr = ctx.engine_ptr, script = std::move(script)] {
Expand Down
17 changes: 17 additions & 0 deletions lang/wasm.hh
Expand Up @@ -13,6 +13,7 @@
#include <seastar/core/future.hh>
#include "db/functions/function_name.hh"
#include "rust/wasmtime_bindings.hh"
#include "lang/wasm_instance_cache.hh"
#include "lang/wasm_alien_thread_runner.hh"
#include "db/config.hh"
#include "replica/database.hh"
Expand Down Expand Up @@ -44,6 +45,21 @@ struct startup_context {
startup_context(db::config& cfg, replica::database_config& dbcfg);
};

class manager {
std::shared_ptr<rust::Box<wasmtime::Engine>> _engine;
std::optional<wasm::instance_cache> _instance_cache;
std::shared_ptr<wasm::alien_thread_runner> _alien_runner;

public:
manager(const std::optional<wasm::startup_context>&);
friend context;
future<> stop();
seastar::future<> precompile(context& ctx, const std::vector<sstring>& arg_names, std::string script);
void remove(const db::functions::function_name& name, const std::vector<data_type>& arg_types) noexcept {
_instance_cache->remove(name, arg_types);
}
};

struct context {
wasmtime::Engine& engine_ptr;
std::optional<rust::Box<wasmtime::Module>> module;
Expand All @@ -53,6 +69,7 @@ struct context {
uint64_t total_fuel;

context(wasmtime::Engine& engine_ptr, std::string name, instance_cache& cache, uint64_t yield_fuel, uint64_t total_fuel);
context(manager&, std::string name, uint64_t yield_fuel, uint64_t total_fuel);
};

seastar::future<> precompile(alien_thread_runner& alien_runner, context& ctx, const std::vector<sstring>& arg_names, std::string script);
Expand Down
5 changes: 4 additions & 1 deletion lang/wasm_instance_cache.hh
Expand Up @@ -16,12 +16,15 @@
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/timer.hh>
#include <unordered_map>
#include "lang/wasm.hh"
#include "rust/cxx.h"
#include "rust/wasmtime_bindings.hh"
#include "types/types.hh"

namespace wasm {

class instance_cache;
struct context;

class module_handle {
wasmtime::Module& _module;
instance_cache& _cache;
Expand Down
19 changes: 12 additions & 7 deletions main.cc
Expand Up @@ -961,10 +961,20 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
sstm.stop().get();
});

std::optional<wasm::startup_context> wasm_ctx;
if (cfg->enable_user_defined_functions() && cfg->check_experimental(db::experimental_features_t::feature::UDF)) {
wasm_ctx.emplace(*cfg, dbcfg);
}

static sharded<wasm::manager> wasm;
wasm.start(std::ref(wasm_ctx)).get();
// don't stop for real until query_processor stops
auto stop_wasm = defer_verbose_shutdown("wasm", [] { wasm.invoke_on_all(&wasm::manager::stop).get(); });

supervisor::notify("starting database");
debug::the_database = &db;
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notifier), std::ref(feature_service), std::ref(token_metadata),
std::ref(cm), std::ref(sstm), std::ref(sst_dir_semaphore), utils::cross_shard_barrier()).get();
std::ref(cm), std::ref(sstm), std::ref(wasm), std::ref(sst_dir_semaphore), utils::cross_shard_barrier()).get();
auto stop_database_and_sstables = defer_verbose_shutdown("database", [&db] {
// #293 - do not stop anything - not even db (for real)
//return db.stop();
Expand Down Expand Up @@ -1029,12 +1039,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
std::chrono::duration_cast<std::chrono::milliseconds>(cql3::prepared_statements_cache::entry_expiry));
auth_prep_cache_config.refresh = std::chrono::milliseconds(cfg->permissions_update_interval_in_ms());

std::optional<wasm::startup_context> wasm_ctx;
if (cfg->enable_user_defined_functions() && cfg->check_experimental(db::experimental_features_t::feature::UDF)) {
wasm_ctx.emplace(*cfg, dbcfg);
}

qp.start(std::ref(proxy), std::move(local_data_dict), std::ref(mm_notifier), qp_mcfg, std::ref(cql_config), std::move(auth_prep_cache_config), std::move(wasm_ctx)).get();
qp.start(std::ref(proxy), std::move(local_data_dict), std::ref(mm_notifier), qp_mcfg, std::ref(cql_config), std::move(auth_prep_cache_config), std::ref(wasm)).get();

supervisor::notify("starting lifecycle notifier");
lifecycle_notifier.start().get();
Expand Down
3 changes: 2 additions & 1 deletion replica/database.cc
Expand Up @@ -310,7 +310,7 @@ class db_user_types_storage : public data_dictionary::dummy_user_types_storage {
};

database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, const locator::shared_token_metadata& stm,
compaction_manager& cm, sstables::storage_manager& sstm, sharded<sstables::directory_semaphore>& sst_dir_sem, utils::cross_shard_barrier barrier)
compaction_manager& cm, sstables::storage_manager& sstm, wasm::manager& wasm, sharded<sstables::directory_semaphore>& sst_dir_sem, utils::cross_shard_barrier barrier)
: _stats(make_lw_shared<db_stats>())
, _user_types(std::make_shared<db_user_types_storage>(*this))
, _cl_stats(std::make_unique<cell_locker_stats>())
Expand Down Expand Up @@ -374,6 +374,7 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
, _mnotifier(mn)
, _feat(feat)
, _shared_token_metadata(stm)
, _wasm(wasm)
, _sst_dir_semaphore(sst_dir_sem)
, _stop_barrier(std::move(barrier))
, _update_memtable_flush_static_shares_action([this, &cfg] { return _memtable_controller.update_static_shares(cfg.memtable_flush_static_shares()); })
Expand Down
8 changes: 7 additions & 1 deletion replica/database.hh
Expand Up @@ -77,6 +77,8 @@ class reconcilable_result;
namespace tracing { class trace_state_ptr; }
namespace s3 { struct endpoint_config; }

namespace wasm { class manager; }

namespace service {
class storage_proxy;
class storage_service;
Expand Down Expand Up @@ -1428,6 +1430,7 @@ private:
gms::feature_service& _feat;
std::vector<std::any> _listeners;
const locator::shared_token_metadata& _shared_token_metadata;
wasm::manager& _wasm;

sharded<sstables::directory_semaphore>& _sst_dir_semaphore;

Expand Down Expand Up @@ -1516,7 +1519,7 @@ public:
future<> parse_system_tables(distributed<service::storage_proxy>&, sharded<db::system_keyspace>&);

database(const db::config&, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, const locator::shared_token_metadata& stm,
compaction_manager& cm, sstables::storage_manager& sstm, sharded<sstables::directory_semaphore>& sst_dir_sem, utils::cross_shard_barrier barrier = utils::cross_shard_barrier(utils::cross_shard_barrier::solo{}) /* for single-shard usage */);
compaction_manager& cm, sstables::storage_manager& sstm, wasm::manager& wasm, sharded<sstables::directory_semaphore>& sst_dir_sem, utils::cross_shard_barrier barrier = utils::cross_shard_barrier(utils::cross_shard_barrier::solo{}) /* for single-shard usage */);
database(database&&) = delete;
~database();

Expand Down Expand Up @@ -1551,6 +1554,9 @@ public:
const locator::shared_token_metadata& get_shared_token_metadata() const { return _shared_token_metadata; }
const locator::token_metadata& get_token_metadata() const { return *_shared_token_metadata.get(); }

wasm::manager& wasm() noexcept { return _wasm; }
const wasm::manager& wasm() const noexcept { return _wasm; }

service::migration_notifier& get_notifier() { return _mnotifier; }
const service::migration_notifier& get_notifier() const { return _mnotifier; }

Expand Down
19 changes: 12 additions & 7 deletions test/lib/cql_test_env.cc
Expand Up @@ -657,7 +657,17 @@ class single_node_cql_env : public cql_test_env {
sstm.start(std::ref(*cfg)).get();
auto stop_sstm = deferred_stop(sstm);

db.start(std::ref(*cfg), dbcfg, std::ref(mm_notif), std::ref(feature_service), std::ref(token_metadata), std::ref(cm), std::ref(sstm), std::ref(sst_dir_semaphore), utils::cross_shard_barrier()).get();
std::optional<wasm::startup_context> wasm_ctx;
if (cfg->enable_user_defined_functions() && cfg->check_experimental(db::experimental_features_t::feature::UDF)) {
wasm_ctx.emplace(*cfg, dbcfg);
}

sharded<wasm::manager> wasm;
wasm.start(std::ref(wasm_ctx)).get();
auto stop_wasm = defer([&wasm] { wasm.stop().get(); });


db.start(std::ref(*cfg), dbcfg, std::ref(mm_notif), std::ref(feature_service), std::ref(token_metadata), std::ref(cm), std::ref(sstm), std::ref(wasm), std::ref(sst_dir_semaphore), utils::cross_shard_barrier()).get();
auto stop_db = defer([&db] {
db.stop().get();
});
Expand Down Expand Up @@ -696,12 +706,7 @@ class single_node_cql_env : public cql_test_env {
std::chrono::duration_cast<std::chrono::milliseconds>(cql3::prepared_statements_cache::entry_expiry));
auth_prep_cache_config.refresh = std::chrono::milliseconds(cfg->permissions_update_interval_in_ms());

std::optional<wasm::startup_context> wasm_ctx;
if (cfg->enable_user_defined_functions() && cfg->check_experimental(db::experimental_features_t::feature::UDF)) {
wasm_ctx.emplace(*cfg, dbcfg);
}

qp.start(std::ref(proxy), std::move(local_data_dict), std::ref(mm_notif), qp_mcfg, std::ref(cql_config), auth_prep_cache_config, wasm_ctx).get();
qp.start(std::ref(proxy), std::move(local_data_dict), std::ref(mm_notif), qp_mcfg, std::ref(cql_config), auth_prep_cache_config, std::ref(wasm)).get();
auto stop_qp = defer([&qp] { qp.stop().get(); });

sharded<service::endpoint_lifecycle_notifier> elc_notif;
Expand Down

0 comments on commit 6c1e44e

Please sign in to comment.