Skip to content

Commit

Permalink
Add initial Wasm subsystem
Browse files Browse the repository at this point in the history
This commit defines a ABI for wasm modules to perform transforms within
redpanda. There are also helpers for custom host bindings that are
defined in an engine agnostic way. See redpanda-data#12322 for the corresponding guest
side bindings as well as a video overview of the ABI contract.

Additionally, a WASI module is defined, which supports a subset of the
WASI standard. Namely, environment variables and writing to std{out,err}
is supported.

There are currently bindings to both WasmEdge and wasmtime, although the
wasmtime integration is mostly experimental and can easily crash the
redpanda process (see scylladb/scylladb#14163).

Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
  • Loading branch information
rockwotj committed Aug 9, 2023
1 parent 01bbfcd commit ed31656
Show file tree
Hide file tree
Showing 30 changed files with 3,304 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/v/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ add_subdirectory(compat)
add_subdirectory(rp_util)
add_subdirectory(resource_mgmt)
add_subdirectory(migrations)
add_subdirectory(wasm)

option(ENABLE_GIT_VERSION "Build with Git metadata" OFF)

Expand Down
4 changes: 4 additions & 0 deletions src/v/pandaproxy/schema_registry/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ class Node;
namespace cluster {
class controller;
}
namespace wasm {
class schema_registry;
}

namespace pandaproxy::schema_registry {

Expand All @@ -46,6 +49,7 @@ class api {

private:
friend class schema_id_validator;
friend class wasm::schema_registry;
model::node_id _node_id;
ss::smp_service_group _sg;
size_t _max_memory;
Expand Down
24 changes: 24 additions & 0 deletions src/v/wasm/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
v_cc_library(
NAME wasm
HDRS
api.h
fwd.h
SRCS
"api.cc"
"ffi.cc"
"logger.cc"
"probe.cc"
"schema_registry.cc"
"schema_registry_module.cc"
"transform_module.cc"
"wasi.cc"
"wasmedge.cc"
DEPS
wasmedge
v::storage
v::model
v::pandaproxy_schema_registry
Seastar::seastar
)

add_subdirectory(tests)
23 changes: 23 additions & 0 deletions src/v/wasm/api.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2023 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

#include "wasm/api.h"

#include "wasm/schema_registry.h"
#include "wasm/wasmedge.h"

namespace wasm {
std::unique_ptr<runtime>
runtime::create_default(pandaproxy::schema_registry::api* schema_reg) {
return wasmedge::create_runtime(
wasm::schema_registry::make_default(schema_reg));
}
} // namespace wasm
94 changes: 94 additions & 0 deletions src/v/wasm/api.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright 2023 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

#pragma once

#include "model/record.h"
#include "model/transform.h"
#include "pandaproxy/schema_registry/fwd.h"
#include "seastarx.h"
#include "wasm/fwd.h"

#include <memory>

namespace wasm {

/**
* A wasm engine is a running VM loaded with a user module and capable of
* transforming batches.
*/
class engine {
public:
virtual ss::future<model::record_batch>
transform(model::record_batch batch, transform_probe* probe) = 0;

virtual ss::future<> start() = 0;
virtual ss::future<> initialize() = 0;
virtual ss::future<> stop() = 0;

virtual std::string_view function_name() const = 0;
virtual uint64_t memory_usage_size_bytes() const = 0;

engine() = default;
virtual ~engine() = default;
engine(const engine&) = delete;
engine& operator=(const engine&) = delete;
engine(engine&&) = default;
engine& operator=(engine&&) = default;
};

/**
* A factory is a compilation service that can create many engines from a single
* transform metadata and wasm module.
*
* The idea is that factory has a cached version of the parsed module so the
* parsing/validation of a wasm module can be only done once and then the
* attaching to an engine becomes a very fast operation.
*/
class factory {
public:
factory() = default;
factory(const factory&) = delete;
factory& operator=(const factory&) = delete;
factory(factory&&) = delete;
factory& operator=(factory&&) = delete;
virtual ss::future<std::unique_ptr<engine>> make_engine() = 0;
virtual ~factory() = default;
};

/**
* A wasm runtime is capable of creating engines.
*
* There should only be a single runtime for a given process.
*/
class runtime {
public:
/**
* Create the default runtime.
*/
static std::unique_ptr<runtime>
create_default(pandaproxy::schema_registry::api*);

runtime() = default;
runtime(const runtime&) = delete;
runtime& operator=(const runtime&) = delete;
runtime(runtime&&) = delete;
runtime& operator=(runtime&&) = delete;
/**
* Create a factory for this transform and the corresponding source wasm
* module.
*/
virtual ss::future<std::unique_ptr<factory>>
make_factory(model::transform_metadata, iobuf, ss::logger*) = 0;
virtual ~runtime() = default;
};

} // namespace wasm
77 changes: 77 additions & 0 deletions src/v/wasm/errc.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2023 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

#pragma once

#include <system_error>

namespace wasm {
enum class errc {
success = 0,
// When the user's code fails to be loaded
load_failure,
// When the engine is fails to be created
engine_creation_failure,
// When the user's supplied code errors
user_code_failure,
// Engine is not running
engine_not_running,
};

struct errc_category final : public std::error_category {
const char* name() const noexcept final { return "wasm::errc"; }

std::string message(int c) const final {
switch (static_cast<errc>(c)) {
case errc::success:
return "wasm::errc::success";
case errc::load_failure:
return "wasm::errc::load_failure";
case errc::engine_creation_failure:
return "wasm::errc::engine_creation_failure";
case errc::user_code_failure:
return "wasm::errc::user_code_failure";
case errc::engine_not_running:
return "wasm::errc::engine_not_running";
default:
return "wasm::errc::unknown(" + std::to_string(c) + ")";
}
}
};
inline const std::error_category& error_category() noexcept {
static errc_category e;
return e;
}
inline std::error_code make_error_code(errc e) noexcept {
return {static_cast<int>(e), error_category()};
}

class wasm_exception final : public std::exception {
public:
explicit wasm_exception(std::string msg, errc err_code) noexcept
: _msg(std::move(msg))
, _err_code(err_code) {}

const char* what() const noexcept final { return _msg.c_str(); }

errc error_code() const noexcept { return _err_code; }

private:
std::string _msg;
errc _err_code;
};

} // namespace wasm

namespace std {
template<>
struct is_error_code_enum<wasm::errc> : true_type {};
} // namespace std
Loading

0 comments on commit ed31656

Please sign in to comment.