Skip to content

Commit

Permalink
Merge pull request #16436 from vbotbuildovich/backport-pr-16301-v23.3…
Browse files Browse the repository at this point in the history
….x-934

[v23.3.x] Introduce `transform::logging::manager`
  • Loading branch information
rockwotj committed Feb 2, 2024
2 parents 47d0603 + d300db8 commit c7dfc19
Show file tree
Hide file tree
Showing 15 changed files with 965 additions and 0 deletions.
23 changes: 23 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,29 @@ configuration::configuration()
},
10_MiB,
{.min = 1_MiB, .max = 128_MiB})
, data_transforms_logging_buffer_capacity_bytes(
*this,
"data_transforms_logging_buffer_capacity_bytes",
"Buffer capacity for transform logs, per shard. Buffer occupancy is "
"calculated as the total size of buffered (i.e. emitted but not yet "
"produced) log messages.",
{.needs_restart = needs_restart::yes, .visibility = visibility::tunable},
100_KiB,
{.min = 100_KiB, .max = 2_MiB})
, data_transforms_logging_flush_interval_ms(
*this,
"data_transforms_logging_flush_interval_ms",
"Flush interval for transform logs. When a timer expires, pending logs "
"are collected and published to the transform_logs topic.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
500ms)
, data_transforms_logging_line_max_bytes(
*this,
"data_transforms_logging_line_max_bytes",
"Transform log lines will be truncate to this length. Truncation occurs "
"after any character escaping.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
1_KiB)
, topic_memory_per_partition(
*this,
"topic_memory_per_partition",
Expand Down
4 changes: 4 additions & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ struct configuration final : public config_store {
bounded_property<size_t> data_transforms_per_function_memory_limit;
property<std::chrono::milliseconds> data_transforms_runtime_limit_ms;
bounded_property<size_t> data_transforms_binary_max_size;
bounded_property<size_t> data_transforms_logging_buffer_capacity_bytes;
property<std::chrono::milliseconds>
data_transforms_logging_flush_interval_ms;
property<size_t> data_transforms_logging_line_max_bytes;

// Controller
bounded_property<std::optional<std::size_t>> topic_memory_per_partition;
Expand Down
6 changes: 6 additions & 0 deletions src/v/config/mock_property.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ class mock_property {

binding<T> bind() { return _property.bind(); }

const T& operator()() { return _property(); }

const T& operator()() const { return _property(); }

operator T() const { return _property(); } // NOLINT

template<typename U>
auto bind(std::function<U(const T&)> conv) -> conversion_binding<U, T> {
return _property.template bind<U>(std::move(conv));
Expand Down
1 change: 1 addition & 0 deletions src/v/transform/fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ template<typename ClockType>
class commit_batcher;
class processor;
class probe;
class log_manager;
namespace rpc {
class client;
class local_service;
Expand Down
6 changes: 6 additions & 0 deletions src/v/transform/logging/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@ v_cc_library(
NAME transform_logging
HDRS
event.h
io.h
log_manager.h
logger.h
SRCS
event.cc
log_manager.cc
logger.cc
DEPS
v::config
v::model
)

Expand Down
71 changes: 71 additions & 0 deletions src/v/transform/logging/io.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

#include "cluster/errc.h"
#include "model/record.h"
#include "ssx/semaphore.h"
#include "transform/logging/event.h"

#pragma once

namespace transform::logging {

namespace io {
struct json_batch {
json_batch(
model::transform_name n,
ss::chunked_fifo<iobuf> e,
ssx::semaphore_units units)
: name(std::move(n))
, events(std::move(e))
, _units(std::move(units)) {}

model::transform_name name;
ss::chunked_fifo<iobuf> events;

private:
ssx::semaphore_units _units;
};
using json_batches = ss::chunked_fifo<json_batch>;

} // namespace io

/**
* Abstract interface providing cluster and transform system access to a single
* instance the log manager.
*
* Responsibilities may include:
* - Creating the transform_logs topic
* - Batching and publishing collections of log data to the logs topic
* - Computing the output partition for a given transform's outgoing logs.
*/
class client {
public:
client() = default;
client(const client&) = delete;
client& operator=(const client&) = delete;
client(client&&) = delete;
client& operator=(client&&) = delete;
virtual ~client() = default;

/*
* Collect io::json_batches into model::record_batch(es) and publish
*
* Implementations should aim to batch outgoing records maximally and
* respect cluster-wide batch size limits.
*/
virtual ss::future<> write(model::partition_id, io::json_batches) = 0;

virtual model::partition_id
compute_output_partition(model::transform_name_view name)
= 0;
};
} // namespace transform::logging
Loading

0 comments on commit c7dfc19

Please sign in to comment.