-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
wasm: reuse instances for wasm UDFs #10306
Conversation
// auxiliary variables for reusing instances of wasm UDFs | ||
std::optional<wasmtime::Store> _store; | ||
std::optional<wasmtime::Instance> _instance; | ||
std::optional<wasmtime::Func> _func; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I now think this is the wrong place. First, it entangles wasm into a more generic context. Second, it doesn't allow reuse in single-row queries.
We should have a cache of instances, with eviction based on idle time or memory use. Execution should remove an entry from the cache, execute the function, and insert it into the cache again.
cql3/functions/aggregate_fcts.cc
Outdated
// auxiliary variables for reusing instances of wasm UDFs for the scalar function | ||
std::optional<wasmtime::Store> _store; | ||
std::optional<wasmtime::Instance> _instance; | ||
std::optional<wasmtime::Func> _func; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having a cache at the wasm level means we don't need to repeat the code for UDF/UDA.
I've added a simple cache of wasm instances to the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, but please collapse the queue trio into a single queue holding a struct
cql3/functions/user_function.hh
Outdated
// wasm UDF instance cache, allowing maximum concurrency | ||
std::queue<wasmtime::Store> _store; | ||
std::queue<wasmtime::Instance> _instance; | ||
std::queue<wasmtime::Func> _func; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that instead of 3 queues we just need a single queue which keeps a struct. And this struct should wrap _store, _instance and _func. Am I correct that we always use all three? In this case it makes perfect sense to only use a single queue. That would make the call site look nicer as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cache has to be shard-global, otherwise many distinct functions can eat all shard memory.
cql3/functions/user_function.cc
Outdated
auto memory_export = instance.get(store, "memory"); | ||
if (!memory_export) { | ||
throw wasm::exception("memory export not found - please export `memory` in the wasm module"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't check this every time.
cql3/functions/user_function.cc
Outdated
throw wasm::exception(format("Exported object {} is not a function", ctx.function_name)); | ||
} | ||
_func.push(std::move(*fnc)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Betterr move to a separate function for clarity.
cql3/functions/user_function.cc
Outdated
auto memory = std::get<wasmtime::Memory>(*memory_export); | ||
if (memory.size(store) < 256 && _store.size() < 8) { | ||
// reuse the the instance if the memory used is less than 256 pages (16MB) | ||
// TODO: also evict the instances if not used for a while |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See loading_cache.hh
test/cql-pytest/test_wasm.py
Outdated
@@ -948,3 +965,6 @@ def test_word_double(cql, test_keyspace, table1, scylla_with_wasm_only): | |||
cql.execute(f"INSERT INTO {table} (p, txt) VALUES (1001, 'cat42')") | |||
res = [row for row in cql.execute(f"SELECT {test_keyspace}.{dbl_name}(txt) AS result FROM {table} WHERE p = 1001")] | |||
assert len(res) == 1 and res[0].result == 'cat42cat42' | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need tests to verify reuse. You can add a metric for function creations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Metrics are a great idea, for the sake of future benchmarking too - e.g. we could verify how many times functions were compiled vs run, how many wasm-induced allocations were performed, and so on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the same idea behind prepared statements and their metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One frustrating problem with tests that use global metrics is that they cannot be run reliably in parallel against the same instance of Scylla. It's not a real problem currently (we don't run tests in parallel), so maybe there's no reason to be apprehensive about it. However, note that there is a reliable way to have metrics in parallel tests - it is to use per-table metrics. It turns out (!) we also have a way to extract per-table metrics via the rest api - see rest_api.py::get_column_family_metric. We don't use this trick enough in tests. We should.
What's the status on this one? Is another iteration expected? |
The new version depends on #10541, please comment the changes to the loading_cache there |
Does it work across different queries? I expect so but it isn't clear from the message.
That actually agrees with the comment before and disagrees with my expectation. But why not reuse across queries? Note that if you use a UDA on three-row partitions, then you'll have to create an instance every three rows.
|
cql3/functions/function_name.hh
Outdated
bool operator<(const function_name& x) const { | ||
return keyspace < x.keyspace || ( keyspace == x.keyspace && name < x.name); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about: std::strong_ordering operator<=>(const function_name&) = default;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, I'll add it
cql3/functions/user_function.cc
Outdated
@@ -7,6 +7,8 @@ | |||
*/ | |||
|
|||
#include "user_function.hh" | |||
#include "lang/udf_cache.hh" | |||
#include "lang/wasmtime.hh" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's work to have an interface to engines and so only include the abstract engine, not every runtime here. Not high priority but keep in mind.
cql3/functions/user_function.cc
Outdated
@@ -58,7 +60,40 @@ bytes_opt user_function::execute(cql_serialization_format sf, const std::vector< | |||
}, | |||
[&] (wasm::context& ctx) { | |||
try { | |||
return wasm::run_script(ctx, arg_types(), parameters, return_type(), _called_on_null_input).get0(); | |||
auto func_cache = ctx.cache; | |||
auto [func_inst, return_promise] = func_cache->get(name(), [this, &ctx] () { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use the get() variant that doesn't accept a function (and uses the function from the constructor instead?) I want to get rid of the lambda variant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will also be nice to extract this to a separate function regardless.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll try doing that
lang/udf_cache.hh
Outdated
@@ -0,0 +1,154 @@ | |||
/* | |||
*/ | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like I've copied a corrupted copyright clause (I see it appears in other files too), but I'll fix it here
lang/udf_cache.hh
Outdated
#include "utils/overloaded_functor.hh" | ||
#include "wasmtime.hh" | ||
#include "seastar/core/shared_ptr.hh" | ||
#include "seastar/core/scheduling.hh" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
<seastar/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok
lang/udf_cache.hh
Outdated
|
||
template <typename LoadFunc> | ||
requires std::is_invocable_r_v<value_type, LoadFunc> | ||
std::pair<value_type, seastar::promise<value_type>> get(const key_type& key, LoadFunc&& load) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see you have a specialized wrapper here, so regardless of loading_cache issues, you can move LoadFunc here and absolve the caller from having to supply it. It can supply the extra parameters LoadFunc needs as regular parameters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does it return both a value_type and a promise<value_type>?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, is that for returning the instance? I think it should just call put(), future/promise isn't needed for synchronous communication.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see you have a specialized wrapper here, so regardless of loading_cache issues, you can move LoadFunc here and absolve the caller from having to supply it. It can supply the extra parameters LoadFunc needs as regular parameters.
I'll do that
Ah, is that for returning the instance? I think it should just call put(), future/promise isn't needed for synchronous communication.
Yes, it's for returning. We don't have to use the futures now, but when we add pre-empting to udf calls, I think we will need them
lang/udf_cache.hh
Outdated
size_t _max_size; | ||
|
||
public: | ||
udf_cache(size_t size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
explicit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok
@@ -336,6 +339,7 @@ def test_f64_param(cql, test_keyspace, table1, scylla_with_wasm_only): | |||
(table (;0;) 1 1 funcref) | |||
(table (;1;) 32 externref) | |||
(memory (;0;) 17) | |||
(export "memory" (memory 0)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why these changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to export memory to be able to check the instance size (we're using the size of the memory as the size of the instance in the cache)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if the instance doesn't export memory? Does it have no memory, or is it just not reachable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we've discussed on the sync, the instance may have no memory and store everything on the stack
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but why did this become important when we started to reuse instances?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, for sizing. Maybe it's better for the sizer to assume a fixed size in that case (for all the overhead).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean a fixed size for instances that do not export memory? That could work, but it's hard to come up with a reasonable default size. Maybe it will be easier to decide after we do some testing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
lang/udf_cache.hh
Outdated
return _stats; | ||
} | ||
|
||
void setup_metrics() { | ||
namespace sm = seastar::metrics; | ||
_metrics.add_group("user functions", { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No spaces in metric names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok
lang/udf_cache.hh
Outdated
void setup_metrics() { | ||
namespace sm = seastar::metrics; | ||
_metrics.add_group("user functions", { | ||
sm::make_derive("udf_hits", wasm::udf_cache::shard_stats().cache_hits, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Repeats "udf" and forgets to mention "cache"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok
CI state |
These comments referred to the first version and are no longer here. The instances can be reused across queries, I've added a check to the tests to confirm that (although its not optimal to test this using only metrics) |
The last rebase adds the new strategy of evicting entries from the cache. An entry that has been created is only completely removed when the corresponding UDF is dropped. Until then, it stores an wasm instance only when it is in use by some queries, but it stores the function name, function signature and a seastar::shared_mutex the entire time. |
CI state |
lang/wasm_instance_cache.hh
Outdated
#include "wasmtime.hh" | ||
#include "lang/wasm.hh" | ||
#include <exception> | ||
#include <seastar/core/metrics.hh> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's enough to use metrics_registration.
Please see if you can trim the #include list here. .hh should try to reduce their dependencies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be shorter now
lang/wasm_instance_cache.hh
Outdated
#include <seastar/core/timer.hh> | ||
#include <seastar/util/defer.hh> | ||
#include "utils/hash.hh" | ||
#include "utils/overloaded_functor.hh" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and deduplicate it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
lang/wasm_instance_cache.hh
Outdated
|
||
#include "cql3/functions/function_name.hh" | ||
#include "utils/overloaded_functor.hh" | ||
#include "cql3/prepared_statements_cache.hh" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we don't, I deleted this
// sizes in wasm pages (16KiB) | ||
size_t _total_size = 0; | ||
size_t _max_size; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to count size in bytes to avoid confusion later on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea especially because the page size was incorrect in the comment
Looks good, I'll go review that first commit. |
I see #10234 is merged, so I don't understand where the first commit belongs. |
The last rebase has only minor cleanup changes, but one of them is a rewrite of the
I don't really see the correlation to #10234 in the first commit. This commit, while beneficial on its own, is not necessary for this patch (still, it is useful here). Perhaps it would be better if I made it into another PR, but I'd prefer not to make another patch that needs to be merged before this one |
CI state |
The cover letter says
Since there are three commits, I assumed the first one is related to #10234. |
696403a
to
b947853
Compare
db/schema_tables.cc
Outdated
row.get_nonnull<sstring>("keyspace_name"), row.get_nonnull<sstring>("function_name")}; | ||
auto arg_types = read_arg_types(db, row, name.keyspace); | ||
return std::make_pair(std::move(name), std::move(arg_types)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function is called drop, but it doesn't drop anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The drop function is dropped in the rebase
db/schema_tables.cc
Outdated
@@ -1697,6 +1697,13 @@ static std::vector<data_value> read_arg_values(const query::result_set_row& row) | |||
} | |||
#endif | |||
|
|||
static std::pair<cql3::functions::function_name, std::vector<data_type>> drop_func(replica::database& db, const query::result_set_row& row) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
struct function_signature { ... }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not be necessary anymore in the last rebase.
db/schema_tables.cc
Outdated
auto arg_types = read_arg_types(db, row, name.keyspace); | ||
return std::make_pair(std::move(name), std::move(arg_types)); | ||
} | ||
|
||
static shared_ptr<cql3::functions::user_function> create_func(replica::database& db, const query::result_set_row& row) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one actually does create something. So better rename drop_func() to reflect what it does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or change it to actually drop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've changed it so the drop function is actually added in the same patch as the cache, and it does what is says
Ok, so I reviewed it too. |
Currently, we have 2 mere_functions methods, where one is only the only call to the other. We can replace them with a simple one. The merge_functions method compiles a UDF (using create_func) only to read its signature. We can avoid that by reading it from the row ourselves. Signed-off-by: Wojciech Mitros <wojciech.mitros@scylladb.com>
When executing a wasm UDF, most of the time is spent on setting up the instance. To minimize its cost, we reuse the instance using wasm::instance_cache. This patch adds a wasm instance cache, that stores a wasmtime instance for each UDF and scheduling group. The instances are evicted using LRU strategy. The cache may store some entries for the UDF after evicting the instance, but they are evicted when the corresponding UDF is dropped, which greatly limits their number. The size of stored instances is estimated using the size of their WASM memories. In order to be able to read the size of memory, we require that the memory is exported by the client. Signed-off-by: Wojciech Mitros <wojciech.mitros@scylladb.com>
Add a test for a wasm aggregate function which uses the new metrics to check if the cache has been hit at least once. Also check that the cache can get reused on different queries, by testing that the number of queries is higher than the number of cache misses. Signed-off-by: Wojciech Mitros <wojciech.mitros@scylladb.com>
Thanks, the patch didn't look that good after review, so I tried a different approach made possible by the fact, that after 5a30f9b from yesterday the paths of creating/dropping UDFs and UDAs are now separate (we only wanted to change the UDF path). The same result is achieved but now it's more straightforward |
CI state |
There weren't many issues with the version before the last rebase, so if the last changes look good maybe we can merge this @avikivity @psarna |
Actually, this does not compile if we don't have wasmtime (it probably didn't since the start of this PR) |
Calling WebAssembly UDFs requires wasmtime instance. Creating such an instance is expensive,
but these instances can be reused for subsequent calls of the same UDF on various inputs.
This patch introduces a way of reusing wasmtime instances: a wasm instance cache.
The cache stores a wasmtime instance for each UDF and scheduling group. The instances are
evicted using LRU strategy and their size is based on the size of their wasm memories.
The instances stored in the cache are also dropped when the UDF is dropped itself. For that reason,
the first patch modifies the current implementation of UDF dropping, so that the instance dropping may be added
later. The patch also removes the need of compiling the UDF again when dropping it.
The second patch contains the implementation and use of the new cache. The cache is implemented
in
lang/wasm_instance_cache.hh
and the main ways of using it are therun_script
methods fromwasm.hh
The third patch adds tests to
test_wasm.py
that check the correctness and performance of the newcache. The tests confirm the instance reuse, size limits, instance eviction after timeout and after dropping the UDF.