Skip to content

Commit

Permalink
txns: fix describe producers api to return correct last timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
rystsov committed Jul 20, 2023
1 parent 6a83d7d commit 2ec1f10
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 6 deletions.
16 changes: 10 additions & 6 deletions src/v/kafka/server/handlers/describe_producers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,15 @@ namespace {

using ret_res = checked<cluster::rm_stm::transaction_set, kafka::error_code>;

int64_t
convert_rm_stm_time_to_milliseconds(cluster::rm_stm::time_point_type time) {
auto diff = cluster::rm_stm::clock_type::now() - time;
auto now = std::chrono::steady_clock::now();
return now.time_since_epoch() / 1ms - diff / 1ms;
static int64_t
convert_rm_stm_time_to_ts_ms(cluster::rm_stm::time_point_type time) {
auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(
cluster::rm_stm::clock_type::now() - time)
.count();
auto now = std::chrono::duration_cast<std::chrono::milliseconds>(
ss::lowres_system_clock::now().time_since_epoch())
.count();
return now - diff;
}

ss::future<checked<cluster::rm_stm::transaction_set, kafka::error_code>>
Expand Down Expand Up @@ -140,7 +144,7 @@ describe_producers_handler::handle(request_context ctx, ss::smp_service_group) {

if (tx.second.info) {
producer_info.last_timestamp
= convert_rm_stm_time_to_milliseconds(
= convert_rm_stm_time_to_ts_ms(
tx.second.info->last_update);
}

Expand Down
30 changes: 30 additions & 0 deletions tests/rptest/tests/transaction_kafka_api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from rptest.tests.redpanda_test import RedpandaTest
from rptest.clients.kafka_cli_tools import KafkaCliTools
from rptest.clients.rpk import RpkTool
import time
import json


class TxKafkaApiTest(RedpandaTest):
Expand Down Expand Up @@ -77,6 +79,34 @@ def test_describe_producers(self):
assert (self.extract_producer(producer)
in expected_producers)

@cluster(num_nodes=3)
def test_last_timestamp_of_describe_producers(self):
producer1 = ck.Producer({
'bootstrap.servers': self.redpanda.brokers(),
'transactional.id': '0',
})
producer1.init_transactions()
producer1.begin_transaction()

for _ in range(2):
for topic in self.topics:
for partition in range(topic.partition_count):
producer1.produce(topic.name, '0', '0', partition)
producer1.flush()

now_ms = int(time.time() * 1000)

for topic in self.topics:
for partition in range(topic.partition_count):
producers = self.kafka_cli.describe_producers(
topic.name, partition)
self.redpanda.logger.debug(json.dumps(producers))
for producer in producers:
assert int(producer["LastSequence"]) > 0
# checking that the producer's info was recently updated
assert abs(now_ms -
int(producer["LastTimestamp"])) < 120 * 1000

@cluster(num_nodes=3)
def test_describe_transactions(self):
tx_id = "0"
Expand Down

0 comments on commit 2ec1f10

Please sign in to comment.