Skip to content

Commit

Permalink
Merge pull request #16839 from vbotbuildovich/backport-pr-16724-v23.3…
Browse files Browse the repository at this point in the history
….x-923

[v23.3.x] producer_state_manager: add a metric for eviction
  • Loading branch information
piyushredpanda committed Mar 2, 2024
2 parents ca477c6 + d7eb561 commit f1914fa
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 10 deletions.
13 changes: 9 additions & 4 deletions src/v/cluster/producer_state_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,14 @@ void producer_state_manager::setup_metrics() {
_metrics.add_group(
prometheus_sanitize::metrics_name("cluster:producer_state_manager"),
{sm::make_gauge(
"producer_manager_total_active_producers",
[this] { return _num_producers; },
sm::description(
"Total number of active idempotent and transactional producers."))});
"producer_manager_total_active_producers",
[this] { return _num_producers; },
sm::description(
"Total number of active idempotent and transactional producers.")),
sm::make_counter(
"evicted_producers",
[this] { return _eviction_counter; },
sm::description("Number of evicted producers so far."))});
}

void producer_state_manager::register_producer(producer_state& state) {
Expand Down Expand Up @@ -109,6 +113,7 @@ void producer_state_manager::do_evict_excess_producers() {
// producers are in the list. This makes the whole logic lock free.
ssx::spawn_with_gate(_gate, [&state] { return state.evict(); });
--_num_producers;
++_eviction_counter;
}
}

Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/producer_state_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class producer_state_manager
bool can_evict_producer(const producer_state&) const;

size_t _num_producers = 0;
size_t _eviction_counter = 0;
// if a producer is inactive for this long, it will be gc-ed
std::chrono::milliseconds _producer_expiration_ms;
// maximum # of active producers allowed on this shard across
Expand Down
30 changes: 24 additions & 6 deletions tests/rptest/tests/transactions_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -754,24 +754,42 @@ def check_pids_overflow_test(self):
p.flush()
producers.append(p)

evicted_count = max_producers - max_concurrent_producer_ids

# Wait until eviction kicks in.
def wait_for_eviction():
samples = [
"idempotency_pid_cache_size",
"producer_state_manager_evicted_producers"
]
brokers = self.redpanda.started_nodes()
metrics = self.redpanda.metrics_sample(
"idempotency_pid_cache_size", brokers)
metrics = self.redpanda.metrics_samples(samples, brokers)
producers_per_node = defaultdict(int)
for m in metrics.samples:
id = self.redpanda.node_id(m.node)
producers_per_node[id] += int(m.value)
evicted_per_node = defaultdict(int)
for pattern, metric in metrics.items():
for m in metric.samples:
id = self.redpanda.node_id(m.node)
if pattern == "idempotency_pid_cache_size":
producers_per_node[id] += int(m.value)
elif pattern == "producer_state_manager_evicted_producers":
evicted_per_node[id] += int(m.value)

self.redpanda.logger.debug(
f"active producers: {producers_per_node}")
self.redpanda.logger.debug(
f"evicted producers: {evicted_per_node}")

return len(producers_per_node) == len(brokers) and all([
remaining_match = all([
num == max_concurrent_producer_ids
for num in producers_per_node.values()
])

evicted_match = all(
[val == evicted_count for val in evicted_per_node.values()])

return len(producers_per_node) == len(
brokers) and remaining_match and evicted_match

wait_until(wait_for_eviction,
timeout_sec=30,
backoff_sec=2,
Expand Down

0 comments on commit f1914fa

Please sign in to comment.