Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v23.3.x] audit: Reduce the number of memory allocations in the hotpath #16147

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions src/v/hashing/utils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

#pragma once

#include <absl/hash/hash.h>
#include <boost/functional/hash.hpp>

#include <functional>

namespace hash {
namespace impl {
/// Pulled from utils/functional.h to prevent making v::utils a dependency of
/// v::hashing
template<typename>
inline constexpr bool always_false_v = false;

template<typename T>
concept is_absl_hashable = requires(T const& t) {
{ absl::Hash<T>{}(t) } -> std::convertible_to<std::size_t>;
};

template<typename T>
concept is_std_hashable = requires(T const& t) {
{ std::hash<T>{}(t) } -> std::convertible_to<std::size_t>;
};

template<typename T>
concept is_boost_hashable = requires(T const& t) {
{ boost::hash<T>{}(t) } -> std::convertible_to<std::size_t>;
};

template<typename T>
size_t combine(size_t& seed, T const& t) {
if constexpr (is_std_hashable<T>) {
boost::hash_combine(seed, std::hash<T>{}(t));
} else if constexpr (is_absl_hashable<T>) {
boost::hash_combine(seed, absl::Hash<T>{}(t));
} else if constexpr (is_boost_hashable<T>) {
boost::hash_combine(seed, boost::hash<T>{}(t));
} else {
static_assert(always_false_v<T>, "No hasher found for T");
}
return seed;
}

} // namespace impl

template<typename... Args>
size_t combine(size_t& seed, Args&&... args) {
return (impl::combine(seed, std::forward<Args>(args)), ...);
}

} // namespace hash
20 changes: 20 additions & 0 deletions src/v/security/acl.h
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,15 @@ class resource_pattern_filter
const std::optional<ss::sstring>& name() const { return _name; }
std::optional<pattern_filter_type> pattern() const { return _pattern; }

template<typename H>
friend H AbslHashValue(H h, const pattern_match&) {
return H::combine(std::move(h), 0x1B3A5CD7); // random number
}
template<typename H>
friend H AbslHashValue(H h, const resource_pattern_filter& f) {
return H::combine(std::move(h), f._resource, f._name, f._pattern);
}

friend void read_nested(
iobuf_parser& in,
resource_pattern_filter& filter,
Expand Down Expand Up @@ -444,6 +453,12 @@ class acl_entry_filter
return std::tie(_principal, _host, _operation, _permission);
}

template<typename H>
friend H AbslHashValue(H h, const acl_entry_filter& f) {
return H::combine(
std::move(h), f._principal, f._host, f._operation, f._permission);
}

friend bool operator==(const acl_entry_filter&, const acl_entry_filter&)
= default;

Expand All @@ -470,6 +485,11 @@ class acl_binding_filter
: _pattern(std::move(pattern))
, _acl(std::move(acl)) {}

template<typename H>
friend H AbslHashValue(H h, const acl_binding_filter& f) {
return H::combine(std::move(h), f._pattern, f._acl);
}

/*
* A filter that matches any ACL binding.
*/
Expand Down
1 change: 1 addition & 0 deletions src/v/security/audit/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ v_cc_library(
probes.cc
logger.cc
schemas/utils.cc
schemas/hashing_utils.cc
DEPS
v::kafka_client
v::bytes
Expand Down
37 changes: 1 addition & 36 deletions src/v/security/audit/audit_log_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ ss::future<> audit_sink::publish_app_lifecycle_event(
/// events. This ensures that the event won't get discarded in the case
/// audit is disabled.
auto lifecycle_event = std::make_unique<application_lifecycle>(
make_application_lifecycle(event, ss::sstring{subsystem_name}));
application_lifecycle::construct(event, ss::sstring{subsystem_name}));
auto as_json = lifecycle_event->to_json();
iobuf b;
b.append(as_json.c_str(), as_json.size());
Expand Down Expand Up @@ -834,41 +834,6 @@ bool audit_log_manager::report_redpanda_app_event(is_started app_started) {
: application_lifecycle::activity_id::stop);
}

bool audit_log_manager::do_enqueue_audit_event(
std::unique_ptr<security::audit::ocsf_base_impl> msg) {
auto& map = _queue.get<underlying_unordered_map>();
auto it = map.find(msg->key());
if (it == map.end()) {
const auto msg_size = msg->estimated_size();
auto units = ss::try_get_units(_queue_bytes_sem, msg_size);
if (!units) {
vlog(
adtlog.warn,
"Unable to enqueue audit event {}, msg size: {}, avail units: {}",
*msg,
msg_size,
_queue_bytes_sem.available_units());
probe().audit_error();
return false;
}
auto& list = _queue.get<underlying_list>();
vlog(
adtlog.trace,
"Successfully enqueued audit event {}, semaphore contains {} units",
*msg,
_queue_bytes_sem.available_units());
list.push_back(audit_msg(std::move(msg), std::move(*units)));
} else {
vlog(adtlog.trace, "Incrementing count of event {}", *msg);
auto now = security::audit::timestamp_t{
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count()};
it->increment(now);
}
return true;
}

ss::future<> audit_log_manager::drain() {
if (_queue.empty()) {
co_return;
Expand Down
111 changes: 68 additions & 43 deletions src/v/security/audit/audit_log_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "model/timeout_clock.h"
#include "net/types.h"
#include "security/acl.h"
#include "security/audit/logger.h"
#include "security/audit/probe.h"
#include "security/audit/schemas/application_activity.h"
#include "security/audit/schemas/iam.h"
Expand Down Expand Up @@ -76,20 +77,6 @@ class audit_log_manager
/// Shuts down the internal kafka client and stops all pending bg work
ss::future<> stop();

/// Enqueue an event to be produced onto an audit log partition
///
/// Returns: bool representing if the audit msg was successfully moved into
/// the queue or not. If unsuccessful this means the audit subsystem cannot
/// publish messages. Consumers of this API should react accordingly, i.e.
/// return an error to the client.
template<InheritsFromOCSFBase T>
bool enqueue_audit_event(event_type type, T&& t) {
if (auto val = should_enqueue_audit_event(type); val.has_value()) {
return (bool)*val;
}
return do_enqueue_audit_event(std::make_unique<T>(std::forward<T>(t)));
}

template<
typename T,
security::audit::returns_auditable_resource_vector Func,
Expand All @@ -114,12 +101,11 @@ class audit_log_manager
}
}

return do_enqueue_audit_event(
std::make_unique<api_activity>(make_api_activity_event(
operation_name,
std::move(result),
std::forward<Args>(args)...,
create_resource_details(restrict_topics(std::move(func))))));
return do_enqueue_audit_event<api_activity>(
operation_name,
std::move(result),
std::forward<Args>(args)...,
restrict_topics(std::move(func)));
}

template<typename T, typename... Args>
Expand All @@ -141,12 +127,11 @@ class audit_log_manager
return (bool)*val;
}
}
return do_enqueue_audit_event(
std::make_unique<api_activity>(make_api_activity_event(
operation_name,
std::move(result),
std::forward<Args>(args)...,
{})));
return do_enqueue_audit_event<api_activity>(
operation_name,
std::move(result),
std::forward<Args>(args)...,
std::vector<model::topic>());
}

bool enqueue_authn_event(authentication_event_options options) {
Expand All @@ -155,18 +140,16 @@ class audit_log_manager
val.has_value()) {
return (bool)*val;
}
return do_enqueue_audit_event(std::make_unique<authentication>(
make_authentication_event(std::move(options))));
return do_enqueue_audit_event<authentication>(std::move(options));
}

template<typename... Args>
bool enqueue_app_lifecycle_event(Args&&... args) {
if (auto val = should_enqueue_audit_event(); val.has_value()) {
return (bool)*val;
}

return do_enqueue_audit_event(std::make_unique<application_lifecycle>(
make_application_lifecycle(std::forward<Args>(args)...)));
return do_enqueue_audit_event<application_lifecycle>(
std::forward<Args>(args)...);
}

bool enqueue_api_activity_event(
Expand All @@ -181,10 +164,8 @@ class audit_log_manager
val.has_value()) {
return (bool)*val;
}

return do_enqueue_audit_event(
std::make_unique<api_activity>(make_api_activity_event(
req, auth_result, svc_name, authorized, reason)));
return do_enqueue_audit_event<api_activity>(
req, auth_result, svc_name, authorized, reason);
}

bool enqueue_api_activity_event(
Expand All @@ -196,9 +177,7 @@ class audit_log_manager
val.has_value()) {
return (bool)*val;
}

return do_enqueue_audit_event(std::make_unique<api_activity>(
make_api_activity_event(req, user, svc_name)));
return do_enqueue_audit_event<api_activity>(req, user, svc_name);
}

/// Returns the number of items pending to be written to auditing log
Expand Down Expand Up @@ -250,10 +229,50 @@ class audit_log_manager
ss::future<> resume();

bool is_audit_event_enabled(event_type) const;
bool do_enqueue_audit_event(
std::unique_ptr<security::audit::ocsf_base_impl> msg);
void set_enabled_events();

template<InheritsFromOCSFBase T, typename... Args>
bool do_enqueue_audit_event(Args&&... args) {
auto& map = _queue.get<underlying_unordered_map>();
const auto hash_key = T::hash(args...);
auto it = map.find(hash_key);
if (it == map.end()) {
auto msg = std::make_unique<T>(
T::construct(std::forward<Args>(args)...));
const auto msg_size = msg->estimated_size();
auto units = ss::try_get_units(_queue_bytes_sem, msg_size);
if (!units) {
vlog(
adtlog.warn,
"Unable to enqueue audit event {}, msg size: {}, avail "
"units: {}",
*msg,
msg_size,
_queue_bytes_sem.available_units());
probe().audit_error();
return false;
}
auto& list = _queue.get<underlying_list>();
vlog(
adtlog.trace,
"Successfully enqueued audit event {}, semaphore contains {} "
"units",
*msg,
_queue_bytes_sem.available_units());
list.push_back(
audit_msg(hash_key, std::move(msg), std::move(*units)));
} else {
vlog(
adtlog.trace, "Incrementing count of event {}", it->ocsf_msg());
auto now = security::audit::timestamp_t{
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count()};
it->increment(now);
}
return true;
}

audit_probe& probe() { return *_probe; }

template<security::audit::returns_auditable_resource_vector Func>
Expand All @@ -280,22 +299,28 @@ class audit_log_manager
/// Main benefit is to tie the lifetime of semaphore units with the
/// underlying ocsf event itself
audit_msg(
std::unique_ptr<ocsf_base_impl> msg, ssx::semaphore_units&& units)
: _msg(std::move(msg))
size_t hash_key,
std::unique_ptr<ocsf_base_impl> msg,
ssx::semaphore_units&& units)
: _hash_key(hash_key)
, _msg(std::move(msg))
, _units(std::move(units)) {
vassert(_msg != nullptr, "Audit record cannot be null");
}

size_t key() const { return _msg->key(); }
size_t key() const { return _hash_key; }

void increment(timestamp_t t) const { _msg->increment(t); }

const std::unique_ptr<ocsf_base_impl>& ocsf_msg() const { return _msg; }

std::unique_ptr<ocsf_base_impl> release() && {
_units.return_all();
return std::move(_msg);
}

private:
size_t _hash_key;
std::unique_ptr<ocsf_base_impl> _msg;
ssx::semaphore_units _units;
};
Expand Down
Loading