Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.1.x] tests: fix replaced segments accounting in TopicRecoveryTest #18620

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions tests/rptest/tests/topic_recovery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1329,12 +1329,13 @@ def _wipe_data(self):
self.redpanda.remove_local_data(node)

@staticmethod
def _normalize_lw_seg_key(key: str, lw_seg_meta: dict) -> str:
"""adds archiver_term to key, to match the actual object key in cloud storage"""
if not key.endswith(".log"):
# key already has archiver term set in the name
return key
return f"{key}.{lw_seg_meta['archiver_term']}" if "archiver_term" in lw_seg_meta else key
def _cloud_segment_key(lw_seg_meta: dict) -> str:
"""calculates the segment key in cloud storage from segment meta"""
assert lw_seg_meta.get("sname_format") == 3
sm = lw_seg_meta
return (
f"{sm['base_offset']}-{sm['committed_offset']}-{sm['size_bytes']}"
f"-{sm['segment_term']}-v1.log.{sm['archiver_term']}")

def _collect_replaced_segments(self, replaced: dict[NTPR, dict[str, dict]],
manifest_key: str):
Expand All @@ -1349,7 +1350,7 @@ def _collect_replaced_segments(self, replaced: dict[NTPR, dict[str, dict]],
assert manifest is not None, f"failed to load manifest from path {manifest_key}"
if replaced_segments := manifest.get('replaced'):
replaced[parse_s3_manifest_path(manifest_key)] = {
TopicRecoveryTest._normalize_lw_seg_key(k, v): v
TopicRecoveryTest._cloud_segment_key(v): v
for k, v in replaced_segments.items()
}

Expand Down Expand Up @@ -1419,8 +1420,7 @@ def verify():
tmp_size += size
size_on_disk = max(tmp_size, size_on_disk)

size_in_cloud = sum(obj.content_length for obj in segments
if obj.content_length > EMPTY_SEGMENT_SIZE)
size_in_cloud = sum(obj.content_length for obj in segments)
self.logger.debug(
f'segments in cloud: {pprint.pformat(segments, indent=2)}, '
f'size in cloud: {size_in_cloud}')
Expand Down
Loading