From f9e0016e5f3f805187f9ad49fec594293c5bcce9 Mon Sep 17 00:00:00 2001 From: Abhijat Malviya Date: Tue, 14 Nov 2023 08:05:01 +0530 Subject: [PATCH] cloud_storage: Log client address in batch parser The client address is added to the remote segment batch parser logs. The address cannot be set as a member because a remote segment reader may be cached and have its configuration reset. The client address is always read from the configuration to make sure we get the latest address. (cherry picked from commit 0996d4fa5c8516656aee6b2432704453e028356e) (cherry picked from commit e4de1ceed47ec0005226bba82dfdd65b589ede4f) --- src/v/cloud_storage/remote_segment.cc | 39 ++++++++++++++++++++------- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/src/v/cloud_storage/remote_segment.cc b/src/v/cloud_storage/remote_segment.cc index a352b4c518a5..e64d1264180a 100644 --- a/src/v/cloud_storage/remote_segment.cc +++ b/src/v/cloud_storage/remote_segment.cc @@ -1134,14 +1134,17 @@ class remote_segment_batch_consumer : public storage::batch_consumer { const model::record_batch_header& header) const override { vlog( _ctxlog.trace, - "accept_batch_start {}, current delta: {}", + "[{}] accept_batch_start {}, current delta: {}", + _config.client_address, header, _parent._cur_delta); if (rp_to_kafka(header.base_offset) > _config.max_offset) { vlog( _ctxlog.debug, - "accept_batch_start stop parser because {} > {}(kafka offset)", + "[{}] accept_batch_start stop parser because {} > {}(kafka " + "offset)", + _config.client_address, header.base_offset(), _config.max_offset); return batch_consumer::consume_result::stop_parser; @@ -1153,7 +1156,8 @@ class remote_segment_batch_consumer : public storage::batch_consumer { if (model::record_batch_type::raft_data != header.type) { vlog( _ctxlog.debug, - "accept_batch_start skip because record batch type is {}", + "[{}] accept_batch_start skip because record batch type is {}", + _config.client_address, header.type); return batch_consumer::consume_result::skip_batch; } @@ -1164,9 +1168,10 @@ class remote_segment_batch_consumer : public storage::batch_consumer { rp_to_kafka(header.last_offset()) < _config.start_offset)) { vlog( _ctxlog.debug, - "accept_batch_start skip because " + "[{}] accept_batch_start skip because " "last_kafka_offset {} (last_rp_offset: {}) < " "config.start_offset: {}", + _config.client_address, rp_to_kafka(header.last_offset()), header.last_offset(), _config.start_offset); @@ -1176,7 +1181,10 @@ class remote_segment_batch_consumer : public storage::batch_consumer { if ( (_config.strict_max_bytes || _config.bytes_consumed) && (_config.bytes_consumed + header.size_bytes) > _config.max_bytes) { - vlog(_ctxlog.debug, "accept_batch_start stop because overbudget"); + vlog( + _ctxlog.debug, + "[{}] accept_batch_start stop because overbudget", + _config.client_address); _config.over_budget = true; return batch_consumer::consume_result::stop_parser; } @@ -1184,7 +1192,8 @@ class remote_segment_batch_consumer : public storage::batch_consumer { if (_config.first_timestamp > header.max_timestamp) { vlog( _ctxlog.debug, - "accept_batch_start skip because header timestamp is {}", + "[{}] accept_batch_start skip because header timestamp is {}", + _config.client_address, header.first_timestamp); return batch_consumer::consume_result::skip_batch; } @@ -1199,7 +1208,8 @@ class remote_segment_batch_consumer : public storage::batch_consumer { size_t /*size_on_disk*/) override { vlog( _ctxlog.trace, - "consume_batch_start called for {}", + "[{}] consume_batch_start called for {}", + _config.client_address, header.base_offset); _header = header; _header.ctx.term = _term; @@ -1214,7 +1224,10 @@ class remote_segment_batch_consumer : public storage::batch_consumer { // changing the _cur_delta. The _cur_delta that is be used for current // record batch can only account record batches in all previous batches. vlog( - _ctxlog.debug, "skip_batch_start called for {}", header.base_offset); + _ctxlog.debug, + "[{}] skip_batch_start called for {}", + _config.client_address, + header.base_offset); advance_config_offsets(header); if ( std::count( @@ -1407,10 +1420,16 @@ ss::future<> remote_segment_batch_reader::stop() { co_return; } - vlog(_ctxlog.debug, "remote_segment_batch_reader::stop"); + vlog( + _ctxlog.debug, + "[{}] remote_segment_batch_reader::stop", + _config.client_address); co_await _gate.close(); if (_parser) { - vlog(_ctxlog.debug, "remote_segment_batch_reader::stop - parser-close"); + vlog( + _ctxlog.debug, + "[{}] remote_segment_batch_reader::stop - parser-close", + _config.client_address); co_await _parser->close(); _parser.reset(); }