Skip to content

Commit

Permalink
Merge pull request #16913 from oleiman/vbotbuildovich/backport-16566-…
Browse files Browse the repository at this point in the history
…v23.3.x-28
  • Loading branch information
oleiman committed Mar 6, 2024
2 parents fb61474 + 3d2d4e3 commit ced9963
Show file tree
Hide file tree
Showing 7 changed files with 516 additions and 105 deletions.
2 changes: 2 additions & 0 deletions src/v/transform/logging/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ v_cc_library(
io.h
log_manager.h
logger.h
probes.h
rpc_client.h
SRCS
record_batcher.cc
event.cc
log_manager.cc
logger.cc
rpc_client.cc
probes.cc
DEPS
v::config
v::cluster
Expand Down
2 changes: 2 additions & 0 deletions src/v/transform/logging/fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ namespace transform::logging {
template<typename ClockType>
class manager;
class client;
class logger_probe;
class manager_probe;
} // namespace transform::logging
40 changes: 36 additions & 4 deletions src/v/transform/logging/log_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "transform/logging/errc.h"
#include "transform/logging/io.h"
#include "transform/logging/logger.h"
#include "transform/logging/probes.h"
#include "utils/utf8.h"
#include "vassert.h"

Expand Down Expand Up @@ -45,10 +46,12 @@ class flusher {
explicit flusher(
ss::abort_source* as,
transform::logging::client* c,
config::binding<std::chrono::milliseconds> fi)
config::binding<std::chrono::milliseconds> fi,
manager_probe* probe)
: _as(as)
, _client(c)
, _interval_ms(std::move(fi))
, _probe(probe)
, _jitter(_interval_ms(), jitter_amt) {
_interval_ms.watch([this]() {
_jitter = simple_time_jitter<ClockType>{_interval_ms(), jitter_amt};
Expand Down Expand Up @@ -184,12 +187,14 @@ class flusher {
// been produced.
std::error_code ec = co_await _client->write(pid, std::move(events));
if (ec != errc::success) {
_probe->write_error();
vlog(tlg_log.warn, "Flush failed: {}", ec.message());
}
}
ss::abort_source* _as = nullptr;
transform::logging::client* _client = nullptr;
config::binding<std::chrono::milliseconds> _interval_ms;
manager_probe* _probe;
simple_time_jitter<ClockType> _jitter;
ss::condition_variable _wakeup_signal{};
ss::gate _gate{};
Expand All @@ -213,21 +218,32 @@ manager<ClockType>::manager(
, _buffer_limit_bytes(bc)
, _buffer_low_water_mark(_buffer_limit_bytes / lwm_denom)
, _buffer_sem(_buffer_limit_bytes, "Log manager buffer semaphore")
, _flusher(std::make_unique<detail::flusher<ClockType>>(
&_as, _client.get(), std::move(fi))) {}
, _flush_interval(std::move(fi)) {}

template<typename ClockType>
manager<ClockType>::~manager() = default;

template<typename ClockType>
ss::future<> manager<ClockType>::start() {
_probe = std::make_unique<manager_probe>();
_probe->setup_metrics([this] {
return 1.0
- (static_cast<double>(_buffer_sem.available_units()) / static_cast<double>(_buffer_limit_bytes));
});
_flusher = std::make_unique<detail::flusher<ClockType>>(
&_as, _client.get(), _flush_interval, _probe.get());
co_return co_await _flusher->template start<>(&_log_buffers);
}

template<typename ClockType>
ss::future<> manager<ClockType>::stop() {
_as.request_abort();
co_await _flusher->stop();
if (_flusher != nullptr) {
co_await _flusher->stop();
}
_flusher.reset(nullptr);
_probe.reset(nullptr);
std::exchange(_logger_probes, probe_map_t{});
}

template<typename ClockType>
Expand Down Expand Up @@ -274,11 +290,26 @@ void manager<ClockType>::enqueue_log(
return res;
};

auto get_probe = [this](std::string_view name) -> logger_probe* {
auto res = _logger_probes.find(name);
if (res == _logger_probes.end()) {
auto [it, _] = _logger_probes.emplace(
ss::sstring{name.data(), name.size()},
std::make_unique<logger_probe>());
it->second->setup_metrics(model::transform_name_view{name});
return it->second.get();
}
return res->second.get();
};

auto it = get_queue(transform_name);
if (it == _log_buffers.end()) {
return;
}

auto probe = get_probe(transform_name);
probe->log_event();

// Unfortunately, we don't know how long an escaped string will be
// until we've allocated memory for it. So we optimistically grab
// units for the unmodified log message here, hoping that, in the
Expand All @@ -289,6 +320,7 @@ void manager<ClockType>::enqueue_log(
message = message.substr(0, msg_len(message));
auto units = ss::try_get_units(_buffer_sem, message.size());
if (!units) {
probe->dropped_log_event();
vlog(tlg_log.debug, "Failed to enqueue transform log: Buffer full");
return;
}
Expand Down
6 changes: 6 additions & 0 deletions src/v/transform/logging/log_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "model/transform.h"
#include "ssx/semaphore.h"
#include "transform/logging/event.h"
#include "transform/logging/fwd.h"
#include "transform/logging/io.h"
#include "utils/absl_sstring_hash.h"
#include "wasm/logger.h"
Expand Down Expand Up @@ -116,6 +117,7 @@ class manager {
size_t _buffer_limit_bytes;
ssize_t _buffer_low_water_mark;
ssx::semaphore _buffer_sem;
config::binding<std::chrono::milliseconds> _flush_interval;

ss::abort_source _as{};

Expand All @@ -133,7 +135,11 @@ class manager {
// lands
using buffer_t = ss::chunked_fifo<buffer_entry>;
absl::btree_map<ss::sstring, buffer_t, sstring_less> _log_buffers;
using probe_map_t = absl::
btree_map<ss::sstring, std::unique_ptr<logger_probe>, sstring_less>;
probe_map_t _logger_probes;

std::unique_ptr<manager_probe> _probe{};
std::unique_ptr<detail::flusher<ClockType>> _flusher{};
};

Expand Down
93 changes: 93 additions & 0 deletions src/v/transform/logging/probes.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

#include "transform/logging/probes.h"

#include "config/configuration.h"
#include "prometheus/prometheus_sanitize.h"
#include "seastarx.h"

#include <seastar/core/metrics_registration.hh>

namespace transform::logging {

void logger_probe::setup_metrics(model::transform_name_view transform_name) {
namespace sm = ss::metrics;

const auto name_label = sm::label("function_name");
const std::vector<sm::label_instance> labels = {
name_label(transform_name()),
};

auto setup_common = [this, &labels]() {
std::vector<sm::impl::metric_definition_impl> defs;
defs.emplace_back(sm::make_counter(
"events_total",
[this]() { return _total_log_events; },
sm::description("Running count of transform log events"),
labels));
defs.emplace_back(sm::make_counter(
"events_dropped_total",
[this]() { return _total_dropped_log_events; },
sm::description("Running count of dropped transform log events"),
labels));
return defs;
};

auto group_name = prometheus_sanitize::metrics_name(
"data_transforms_logger");

if (!config::shard_local_cfg().disable_metrics()) {
_metrics.add_group(
group_name, setup_common(), {}, {sm::shard_label, name_label});
}

if (!config::shard_local_cfg().disable_public_metrics()) {
auto defs_impl = setup_common();
std::vector<sm::metric_definition> defs;
defs.reserve(defs_impl.size());
std::transform(
std::make_move_iterator(defs_impl.begin()),
std::make_move_iterator(defs_impl.end()),
std::back_inserter(defs),
[](auto def) {
return sm::metric_definition{def.aggregate({sm::shard_label})};
});

_public_metrics.add_group(group_name, defs);
}
}

void manager_probe::setup_metrics(std::function<double()> get_usage_ratio) {
namespace sm = ss::metrics;

if (config::shard_local_cfg().disable_metrics()) {
return;
}

auto group_name = prometheus_sanitize::metrics_name(
"data_transforms_log_manager");

std::vector<ss::metrics::impl::metric_definition_impl> defs{
sm::make_gauge(
"buffer_usage_ratio",
[fn = std::move(get_usage_ratio)] { return fn(); },
sm::description("Transform log manager buffer usage ratio")),
sm::make_counter(
"write_errors_total",
[this] { return _total_write_errors; },
sm::description("Running count of errors while writing to the "
"transform logs topic")),
};

_metrics.add_group(group_name, std::move(defs), {}, {sm::shard_label});
}
} // namespace transform::logging
60 changes: 60 additions & 0 deletions src/v/transform/logging/probes.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

#pragma once

#include "metrics/metrics.h"
#include "model/transform.h"

#include <seastar/core/smp.hh>

namespace transform::logging {

class logger_probe {
public:
logger_probe() = default;
logger_probe(const logger_probe&) = delete;
logger_probe& operator=(const logger_probe&) = delete;
logger_probe(logger_probe&&) = delete;
logger_probe& operator=(logger_probe&&) = delete;
~logger_probe() {}

void setup_metrics(model::transform_name_view transform_name);

void log_event() { ++_total_log_events; }
void dropped_log_event() { ++_total_dropped_log_events; }

private:
uint64_t _total_log_events{0};
uint64_t _total_dropped_log_events{0};
metrics::internal_metric_groups _metrics;
metrics::public_metric_groups _public_metrics;
};

class manager_probe {
public:
manager_probe() = default;
manager_probe(const manager_probe&) = delete;
manager_probe& operator=(const manager_probe&) = delete;
manager_probe(manager_probe&&) = delete;
manager_probe& operator=(manager_probe&&) = delete;
~manager_probe() = default;

void setup_metrics(std::function<double()> get_usage_ratio);

void write_error() { ++_total_write_errors; }

private:
uint64_t _total_write_errors{0};
metrics::internal_metric_groups _metrics;
};

} // namespace transform::logging
Loading

0 comments on commit ced9963

Please sign in to comment.