Skip to content

Commit

Permalink
storage: Add client address to log reader config
Browse files Browse the repository at this point in the history
The log reader config is extended to contain the client address. This
allows the remote partition code paths to use the client address while
logging, enabling identification of consumers from the kafka layer.
  • Loading branch information
abhijat committed Nov 8, 2023
1 parent 4765c7a commit bd854a7
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 6 deletions.
4 changes: 4 additions & 0 deletions src/v/storage/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ std::ostream& operator<<(std::ostream& o, const log_reader_config& cfg) {
<< (cfg.abort_source.has_value()
? cfg.abort_source.value().get().abort_requested()
: false);

if (cfg.client_address.has_value()) {
o << ", client_address:" << cfg.client_address.value();
}
return o << "}";
}

Expand Down
23 changes: 17 additions & 6 deletions src/v/storage/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,23 +222,28 @@ struct append_result {
using opt_abort_source_t
= std::optional<std::reference_wrapper<ss::abort_source>>;

using opt_client_address_t = std::optional<model::client_address_t>;

struct timequery_config {
timequery_config(
model::timestamp t,
model::offset o,
ss::io_priority_class iop,
std::optional<model::record_batch_type> type_filter,
opt_abort_source_t as = std::nullopt) noexcept
opt_abort_source_t as = std::nullopt,
opt_client_address_t client_addr = std::nullopt) noexcept
: time(t)
, max_offset(o)
, prio(iop)
, type_filter(type_filter)
, abort_source(as) {}
, abort_source(as)
, client_address(std::move(client_addr)) {}
model::timestamp time;
model::offset max_offset;
ss::io_priority_class prio;
std::optional<model::record_batch_type> type_filter;
opt_abort_source_t abort_source;
opt_client_address_t client_address;

friend std::ostream& operator<<(std::ostream& o, const timequery_config&);
};
Expand Down Expand Up @@ -326,6 +331,8 @@ struct log_reader_config {
// historical read-once workloads like compaction).
bool skip_batch_cache{false};

opt_client_address_t client_address;

log_reader_config(
model::offset start_offset,
model::offset max_offset,
Expand All @@ -334,15 +341,17 @@ struct log_reader_config {
ss::io_priority_class prio,
std::optional<model::record_batch_type> type_filter,
std::optional<model::timestamp> time,
opt_abort_source_t as)
opt_abort_source_t as,
opt_client_address_t client_addr = std::nullopt)
: start_offset(start_offset)
, max_offset(max_offset)
, min_bytes(min_bytes)
, max_bytes(max_bytes)
, prio(prio)
, type_filter(type_filter)
, first_timestamp(time)
, abort_source(as) {}
, abort_source(as)
, client_address(std::move(client_addr)) {}

/**
* Read offsets [start, end].
Expand All @@ -351,7 +360,8 @@ struct log_reader_config {
model::offset start_offset,
model::offset max_offset,
ss::io_priority_class prio,
opt_abort_source_t as = std::nullopt)
opt_abort_source_t as = std::nullopt,
opt_client_address_t client_addr = std::nullopt)
: log_reader_config(
start_offset,
max_offset,
Expand All @@ -360,7 +370,8 @@ struct log_reader_config {
prio,
std::nullopt,
std::nullopt,
as) {}
as,
std::move(client_addr)) {}

friend std::ostream& operator<<(std::ostream& o, const log_reader_config&);
};
Expand Down

0 comments on commit bd854a7

Please sign in to comment.