Skip to content

Commit

Permalink
rptest: test spilled over topic deletion
Browse files Browse the repository at this point in the history
This commit extends the cloud storage topic deletion tests to action on
partitions with spillover manifests.

`SISettings` has also been updated to accomodate the spillover cluster
config.
  • Loading branch information
Vlad Lazar committed Jun 19, 2023
1 parent 55ec6b9 commit ae09a14
Showing 1 changed file with 149 additions and 59 deletions.
208 changes: 149 additions & 59 deletions tests/rptest/tests/topic_delete_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

import datetime
import time
import json
from typing import Optional
Expand All @@ -25,10 +26,11 @@
from rptest.services.rpk_producer import RpkProducer
from rptest.services.metrics_check import MetricCheck
from rptest.services.redpanda import CloudStorageType, SISettings, get_cloud_storage_type
from rptest.services.kgo_verifier_services import KgoVerifierProducer
from rptest.util import wait_for_local_storage_truncate, firewall_blocked
from rptest.services.admin import Admin
from rptest.tests.partition_movement import PartitionMovementMixin
from rptest.utils.si_utils import BucketView, NT, LifecycleMarkerStatus
from rptest.utils.si_utils import BucketView, NTP, NT, LifecycleMarkerStatus


def get_kvstore_topic_key_counts(redpanda):
Expand Down Expand Up @@ -359,6 +361,7 @@ class TopicDeleteCloudStorageTest(RedpandaTest):
partition_count = 3
topics = (TopicSpec(partition_count=partition_count,
cleanup_policy=TopicSpec.CLEANUP_DELETE), )
housekeeping_interval_ms = 1000 * 60 * 30

def __init__(self, test_context):
self.si_settings = SISettings(test_context,
Expand All @@ -368,7 +371,7 @@ def __init__(self, test_context):
test_context=test_context,
# Use all nodes as brokers: enables each test to set num_nodes
# and get a cluster of that size
num_brokers=test_context.cluster.available().size(),
num_brokers=min(test_context.cluster.available().size(), 4),
extra_rp_conf={
# Tests validating deletion _not_ happening can be failed if
# segments are deleted in the background by adjacent segment
Expand All @@ -377,18 +380,60 @@ def __init__(self, test_context):
# We rely on the scrubber to delete topic manifests, and to eventually
# delete data if cloud storage was unavailable during initial delete. To
# control test runtimes, set a short interval.
'cloud_storage_housekeeping_interval_ms': 5000,

# Segment merging confuses these tests, because it looks like segments getting deleted
'cloud_storage_enable_segment_merging': False,
'cloud_storage_idle_timeout_ms': 3000,
'cloud_storage_housekeeping_interval_ms':
self.housekeeping_interval_ms,
"cloud_storage_topic_purge_grace_period_ms": 5
},
si_settings=self.si_settings)

self._s3_port = self.si_settings.cloud_storage_api_endpoint_port

self.kafka_tools = KafkaCliTools(self.redpanda)

def _populate_topic(self, topic_name, nodes: Optional[list] = None):
def _produce_until_spillover(self, topic_name: str, local_retention: int):
self.redpanda.set_cluster_config({
"cloud_storage_housekeeping_interval_ms":
1000,
"cloud_storage_spillover_manifest_max_segments":
10
})

# Write more than 20MiB per partition to trigger spillover
producer = KgoVerifierProducer(self.test_context,
self.redpanda,
topic_name,
msg_size=1024 * 512,
msg_count=200)

producer.start()
producer.wait()
producer.free()

view = BucketView(self.redpanda)

def all_partitions_spilled():
for pid in range(0, self.partition_count):
ntp = NTP(ns="kafka", topic=topic_name, partition=pid)
spillovers = view.get_spillover_metadata(ntp)

self.logger.debug(f"Found {spillovers=} for {ntp=}")
if len(spillovers) == 0:
return False

return True

wait_until(all_partitions_spilled, timeout_sec=180, backoff_sec=30)

self.redpanda.set_cluster_config({
"cloud_storage_housekeeping_interval_ms":
self.housekeeping_interval_ms
})

def _populate_topic(self,
topic_name,
nodes: Optional[list] = None,
spillover=True):
"""
Get system into state where there is data in both local
and remote storage for the topic.
Expand All @@ -398,27 +443,41 @@ def _populate_topic(self, topic_name, nodes: Optional[list] = None):
self.kafka_tools.alter_topic_config(
topic_name, {'retention.local.target.bytes': local_retention})

# Write out 10MB per partition
self.kafka_tools.produce(topic_name,
record_size=4096,
num_records=2560 * self.partition_count)

# Wait for segments evicted from local storage
for i in range(0, self.partition_count):
wait_for_local_storage_truncate(self.redpanda,
topic_name,
partition_idx=i,
target_bytes=local_retention,
timeout_sec=30,
nodes=nodes)
if not spillover:
# Write out 10MB per partition
self.kafka_tools.produce(topic_name,
record_size=4096,
num_records=2560 * self.partition_count)

# Wait for segments evicted from local storage
timeout = 120
deadline = datetime.datetime.now() + datetime.timedelta(
seconds=timeout)
for i in range(0, self.partition_count):
self.logger.debug(
f"Waiting for truncation of {topic_name}/{i} for {timeout=} seconds"
)
wait_for_local_storage_truncate(self.redpanda,
topic_name,
partition_idx=i,
target_bytes=local_retention,
timeout_sec=timeout,
nodes=nodes)

timeout = (deadline - datetime.datetime.now()).total_seconds()
if timeout < 10:
timeout = 10
else:
self._produce_until_spillover(topic_name, local_retention)

# Confirm objects in remote storage
objects = self.cloud_storage_client.list_objects(
self.si_settings.cloud_storage_bucket, topic=topic_name)
assert sum(1 for _ in objects) > 0

@skip_debug_mode # Rely on timely uploads during leader transfers
@cluster(num_nodes=3)
@cluster(num_nodes=3,
log_allow_list=['Failed to fetch manifest during finalize()'])
def topic_delete_installed_snapshots_test(self):
"""
Test the case where a partition had remote snapshots installed prior
Expand All @@ -437,7 +496,7 @@ def topic_delete_installed_snapshots_test(self):
self.logger.info(
f"Populating topic and waiting for nodes {[n.name for n in other_nodes]}"
)
self._populate_topic(self.topic, nodes=other_nodes)
self._populate_topic(self.topic, nodes=other_nodes, spillover=False)

self.logger.info(f"Starting victim node {victim_node.name}")
self.redpanda.start_node(victim_node)
Expand All @@ -447,19 +506,14 @@ def topic_delete_installed_snapshots_test(self):

# Write more: this should prompt the victim node to do some prefix truncations
# after having installed a snapshot
self._populate_topic(self.topic)
self._populate_topic(self.topic, spillover=False)

self.kafka_tools.delete_topic(self.topic)
wait_until(lambda: topic_storage_purged(self.redpanda, self.topic),
timeout_sec=30,
backoff_sec=1)

wait_until(lambda: self._topic_remote_deleted(self.topic),
timeout_sec=30,
backoff_sec=1)

self._assert_topic_lifecycle_marker_status(
self.topic, LifecycleMarkerStatus.PURGED)
self._validate_topic_deletion(self.topic, CloudStorageType.S3)

@ok_to_fail
@cluster(
Expand Down Expand Up @@ -505,9 +559,10 @@ def drop_lifecycle_marker_test(self, cloud_storage_type):

@skip_debug_mode # Rely on timely uploads during leader transfers
@cluster(
num_nodes=3,
num_nodes=5,
log_allow_list=[
'exception while executing partition operation: {type: deletion'
'exception while executing partition operation: {type: deletion',
'Failed to fetch manifest during finalize()'
])
@matrix(cloud_storage_type=get_cloud_storage_type())
def topic_delete_unavailable_test(self, cloud_storage_type):
Expand Down Expand Up @@ -551,7 +606,8 @@ def topic_delete_unavailable_test(self, cloud_storage_type):
TopicSpec(name=next_topic,
partition_count=self.partition_count,
cleanup_policy=TopicSpec.CLEANUP_DELETE))
self._populate_topic(next_topic)
self._populate_topic(next_topic, spillover=False)

after_keys = set(
o.key for o in self.redpanda.cloud_storage_client.list_objects(
self.si_settings.cloud_storage_bucket, topic=next_topic))
Expand All @@ -562,29 +618,64 @@ def topic_delete_unavailable_test(self, cloud_storage_type):
timeout_sec=30,
backoff_sec=1)

wait_until(lambda: self._topic_remote_deleted(next_topic),
timeout_sec=30,
backoff_sec=1)

self._assert_topic_lifecycle_marker_status(
next_topic, LifecycleMarkerStatus.PURGED)
self._validate_topic_deletion(next_topic, cloud_storage_type)

# Eventually, the original topic should be deleted: this is the tiered
# storage scrubber doing its thing.
wait_until(lambda: self._topic_remote_deleted(self.topic),
timeout_sec=30,
backoff_sec=1)
self._validate_topic_deletion(self.topic, cloud_storage_type)

def _validate_topic_deletion(self, topic_name: str,
cloud_storage_type: CloudStorageType):
if cloud_storage_type == CloudStorageType.ABS:
# ABS does not have a plural delete implementation, which
# makes deletion of large topics slow in the CI env.
# Instead of requiring the bucket to be empty, we check
# that deletion is progressing in this case.
crnt = datetime.datetime.now()
finish = crnt + datetime.timedelta(seconds=130)
interval = 40

sizes = []
while crnt <= finish:
size = sum(1 for _ in self._blobs_for_topic(topic_name))

if len(sizes) > 0 and size > 0:
assert size <= sizes[-1]

sizes.append(size)
time.sleep(interval)
crnt = datetime.datetime.now()

assert len(sizes) > 1 and (
sizes[0] > sizes[-1] or sizes[-1] == 0), \
f"Count of blobs for topic did not decrease {sizes=}"

if sizes[-1] > 0:
self.logger.warn(
"Some blobs are still in the container"
f"but deletion is progressing: history={sizes}")

self.redpanda.si_settings.set_expected_damage({
"unknown_keys", "missing_segments", "ntpr_no_manifest",
"ntr_no_topic_manifest", "archive_manifests_outside_manifest"
})
else:
wait_until(lambda: self._topic_remote_deleted_entirely(topic_name),
timeout_sec=60,
backoff_sec=10)

self._assert_topic_lifecycle_marker_status(
self.topic, LifecycleMarkerStatus.PURGED)
self._assert_topic_lifecycle_marker_status(
self.topic, LifecycleMarkerStatus.PURGED)

def _topic_remote_deleted(self, topic_name: str):
"""Return true if all objects removed from cloud storage"""
after_objects = self.cloud_storage_client.list_objects(
def _blobs_for_topic(self, topic_name: str):
return self.cloud_storage_client.list_objects(
self.si_settings.cloud_storage_bucket, topic=topic_name)

def _topic_remote_deleted_entirely(self, topic_name: str):
"""Return true if all objects removed from cloud storage"""
self.logger.debug(f"Objects after topic {topic_name} deletion:")
empty = True
for i in after_objects:
for i in self._blobs_for_topic(topic_name):
self.logger.debug(f" {i}")
empty = False

Expand All @@ -596,11 +687,14 @@ def _assert_topic_lifecycle_marker_status(self, topic_name: str,
throw if the lifecycle marker for the topic doesn't exist, or
has a status != `status`
"""
view = BucketView(self.redpanda)
marker = view.get_lifecycle_marker(NT('kafka', topic_name))
def has_purged_marker():
view = BucketView(self.redpanda)
marker = view.get_lifecycle_marker(NT('kafka', topic_name))

# The JSON value is an integer, use the underlying value of the Python enum
assert marker['status'] == status.value
# The JSON value is an integer, use the underlying value of the Python enum
return marker['status'] == status.value

wait_until(has_purged_marker, timeout_sec=10, backoff_sec=1)

if status == LifecycleMarkerStatus.PURGED:
# Shortly after the remote status goes to PURGED, we should see the local
Expand Down Expand Up @@ -632,7 +726,8 @@ def _assert_topic_lifecycle_marker_absent(self, topic_name: str):
raise AssertionError(f"Found unexpected lifecycle marker {marker}")

@skip_debug_mode # Rely on timely uploads during leader transfers
@cluster(num_nodes=3)
@cluster(num_nodes=5,
log_allow_list=['Failed to fetch manifest during finalize()'])
@matrix(disable_delete=[False, True],
cloud_storage_type=get_cloud_storage_type())
def topic_delete_cloud_storage_test(self, disable_delete,
Expand Down Expand Up @@ -673,12 +768,7 @@ def topic_delete_cloud_storage_test(self, disable_delete,
else:
# The counter-test that deletion _doesn't_ happen in read replicas
# is done as part of read_replica_e2e_test
wait_until(lambda: self._topic_remote_deleted(self.topic),
timeout_sec=30,
backoff_sec=1)

self._assert_topic_lifecycle_marker_status(
self.topic, LifecycleMarkerStatus.PURGED)
self._validate_topic_deletion(self.topic, cloud_storage_type)

# TODO: include transactional data so that we verify that .txrange
# objects are deleted.
Expand All @@ -703,7 +793,7 @@ def partition_movement_test(self, cloud_storage_type):

admin = Admin(self.redpanda)

self._populate_topic(self.topic)
self._populate_topic(self.topic, spillover=False)

# We do not check that literally no keys are deleted, because adjacent segment
# compaction may delete segments (which are replaced by merged segments) at any time.
Expand Down

0 comments on commit ae09a14

Please sign in to comment.