Skip to content

Commit

Permalink
rptest: test spilled over topic deletion
Browse files Browse the repository at this point in the history
This commit extends the cloud storage topic deletion tests to action on
partitions with spillover manifests.

`SISettings` has also been updated to accomodate the spillover cluster
config.
  • Loading branch information
Vlad Lazar committed May 30, 2023
1 parent 18f987a commit fbc00d8
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 36 deletions.
57 changes: 33 additions & 24 deletions tests/rptest/services/redpanda.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,30 +346,34 @@ class SISettings:
ABS_AZURITE_ACCOUNT = "devstoreaccount1"
ABS_AZURITE_KEY = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="

def __init__(self,
test_context,
*,
log_segment_size: int = 16 * 1000000,
cloud_storage_credentials_source: str = 'config_file',
cloud_storage_access_key: str = 'panda-user',
cloud_storage_secret_key: str = 'panda-secret',
cloud_storage_region: str = 'panda-region',
cloud_storage_api_endpoint: str = 'minio-s3',
cloud_storage_api_endpoint_port: int = 9000,
cloud_storage_cache_size: int = 160 * 1000000,
cloud_storage_enable_remote_read: bool = True,
cloud_storage_enable_remote_write: bool = True,
cloud_storage_max_connections: Optional[int] = None,
cloud_storage_disable_tls: bool = True,
cloud_storage_segment_max_upload_interval_sec: Optional[
int] = None,
cloud_storage_manifest_max_upload_interval_sec: Optional[
int] = None,
cloud_storage_readreplica_manifest_sync_timeout_ms: Optional[
int] = None,
bypass_bucket_creation: bool = False,
cloud_storage_housekeeping_interval_ms: Optional[int] = None,
fast_uploads=False):
def __init__(
self,
test_context,
*,
log_segment_size: int = 16 * 1000000,
cloud_storage_credentials_source: str = 'config_file',
cloud_storage_access_key: str = 'panda-user',
cloud_storage_secret_key: str = 'panda-secret',
cloud_storage_region: str = 'panda-region',
cloud_storage_api_endpoint: str = 'minio-s3',
cloud_storage_api_endpoint_port: int = 9000,
cloud_storage_cache_size: int = 160 * 1000000,
cloud_storage_enable_remote_read: bool = True,
cloud_storage_enable_remote_write: bool = True,
cloud_storage_max_connections: Optional[int] = None,
cloud_storage_disable_tls: bool = True,
cloud_storage_segment_max_upload_interval_sec: Optional[
int] = None,
cloud_storage_manifest_max_upload_interval_sec: Optional[
int] = None,
cloud_storage_readreplica_manifest_sync_timeout_ms: Optional[
int] = None,
bypass_bucket_creation: bool = False,
cloud_storage_housekeeping_interval_ms: Optional[int] = None,
# TODO: update spillover size to 4KiB to exercise behaviour
# in tests once the read path becomes spillover aware
cloud_storage_spillover_manifest_size: Optional[int] = None,
fast_uploads=False):
"""
:param fast_uploads: if true, set low upload intervals to help tests run
quickly when they wait for uploads to complete.
Expand Down Expand Up @@ -416,6 +420,7 @@ def __init__(self,
self.endpoint_url = f'http://{self.cloud_storage_api_endpoint}:{self.cloud_storage_api_endpoint_port}'
self.bypass_bucket_creation = bypass_bucket_creation
self.cloud_storage_housekeeping_interval_ms = cloud_storage_housekeeping_interval_ms
self.cloud_storage_spillover_manifest_size = cloud_storage_spillover_manifest_size

if fast_uploads:
self.cloud_storage_segment_max_upload_interval_sec = 10
Expand Down Expand Up @@ -539,6 +544,10 @@ def update_rp_conf(self, conf) -> dict[str, Any]:
if self.cloud_storage_housekeeping_interval_ms:
conf[
'cloud_storage_housekeeping_interval_ms'] = self.cloud_storage_housekeeping_interval_ms

if self.cloud_storage_spillover_manifest_size:
conf[
'cloud_storage_spillover_manifest_size'] = self.cloud_storage_spillover_manifest_size
return conf

def set_expected_damage(self, damage_types: set[str]):
Expand Down
72 changes: 60 additions & 12 deletions tests/rptest/tests/topic_delete_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@
from rptest.services.rpk_producer import RpkProducer
from rptest.services.metrics_check import MetricCheck
from rptest.services.redpanda import CloudStorageType, SISettings, get_cloud_storage_type
from rptest.services.kgo_verifier_services import KgoVerifierProducer
from rptest.util import wait_for_local_storage_truncate, firewall_blocked
from rptest.services.admin import Admin
from rptest.tests.partition_movement import PartitionMovementMixin
from rptest.utils.si_utils import BucketView
from rptest.utils.si_utils import BucketView, NTP, NT


def get_kvstore_topic_key_counts(redpanda):
Expand Down Expand Up @@ -363,12 +364,14 @@ class TopicDeleteCloudStorageTest(RedpandaTest):
def __init__(self, test_context):
self.si_settings = SISettings(test_context,
log_segment_size=1024 * 1024,
cloud_storage_spillover_manifest_size=4 *
1024,
fast_uploads=True)
super().__init__(
test_context=test_context,
# Use all nodes as brokers: enables each test to set num_nodes
# and get a cluster of that size
num_brokers=test_context.cluster.available().size(),
num_brokers=min(test_context.cluster.available().size(), 4),
extra_rp_conf={
# Tests validating deletion _not_ happening can be failed if
# segments are deleted in the background by adjacent segment
Expand All @@ -388,7 +391,49 @@ def __init__(self, test_context):

self.kafka_tools = KafkaCliTools(self.redpanda)

def _populate_topic(self, topic_name, nodes: Optional[list] = None):
def _produce_until_spillover(self, topic_name):
def topic_manifest_exists() -> bool:
try:
view = BucketView(self.redpanda)
view.get_topic_manifest(NT(ns="kafka", topic=topic_name))
return True
except KeyError:
return False

wait_until(topic_manifest_exists, timeout_sec=15, backoff_sec=3)

producer = KgoVerifierProducer(self.test_context,
self.redpanda,
self.topic,
msg_size=1024 * 512,
msg_count=20480)

producer.start()

def spilled():
view = BucketView(self.redpanda)

for pid in range(0, self.partition_count):
ntp = NTP(ns="kafka", topic=self.topic, partition=pid)
spillovers = view.get_spillover_metadata(ntp)

self.logger.debug(f"Found {spillovers=} for {ntp=}")

if len(spillovers) == 0:
return False

return True

try:
wait_until(spilled, timeout_sec=180, backoff_sec=5)
finally:
producer.stop()
producer.free()

def _populate_topic(self,
topic_name,
nodes: Optional[list] = None,
spillover=True):
"""
Get system into state where there is data in both local
and remote storage for the topic.
Expand All @@ -398,10 +443,13 @@ def _populate_topic(self, topic_name, nodes: Optional[list] = None):
self.kafka_tools.alter_topic_config(
topic_name, {'retention.local.target.bytes': local_retention})

# Write out 10MB per partition
self.kafka_tools.produce(topic_name,
record_size=4096,
num_records=2560 * self.partition_count)
if not spillover:
# Write out 10MB per partition
self.kafka_tools.produce(topic_name,
record_size=4096,
num_records=2560 * self.partition_count)
else:
self._produce_until_spillover(topic_name)

# Wait for segments evicted from local storage
for i in range(0, self.partition_count):
Expand Down Expand Up @@ -437,7 +485,7 @@ def topic_delete_installed_snapshots_test(self):
self.logger.info(
f"Populating topic and waiting for nodes {[n.name for n in other_nodes]}"
)
self._populate_topic(self.topic, nodes=other_nodes)
self._populate_topic(self.topic, nodes=other_nodes, spillover=False)

self.logger.info(f"Starting victim node {victim_node.name}")
self.redpanda.start_node(victim_node)
Expand All @@ -447,7 +495,7 @@ def topic_delete_installed_snapshots_test(self):

# Write more: this should prompt the victim node to do some prefix truncations
# after having installed a snapshot
self._populate_topic(self.topic)
self._populate_topic(self.topic, spillover=False)

self.kafka_tools.delete_topic(self.topic)
wait_until(lambda: topic_storage_purged(self.redpanda, self.topic),
Expand All @@ -460,7 +508,7 @@ def topic_delete_installed_snapshots_test(self):

@skip_debug_mode # Rely on timely uploads during leader transfers
@cluster(
num_nodes=3,
num_nodes=5,
log_allow_list=[
'exception while executing partition operation: {type: deletion'
])
Expand Down Expand Up @@ -540,7 +588,7 @@ def _topic_remote_deleted(self, topic_name: str):
return empty

@skip_debug_mode # Rely on timely uploads during leader transfers
@cluster(num_nodes=3)
@cluster(num_nodes=5)
@matrix(disable_delete=[False, True],
cloud_storage_type=get_cloud_storage_type())
def topic_delete_cloud_storage_test(self, disable_delete,
Expand Down Expand Up @@ -608,7 +656,7 @@ def partition_movement_test(self, cloud_storage_type):

admin = Admin(self.redpanda)

self._populate_topic(self.topic)
self._populate_topic(self.topic, spillover=False)

# We do not check that literally no keys are deleted, because adjacent segment
# compaction may delete segments (which are replaced by merged segments) at any time.
Expand Down

0 comments on commit fbc00d8

Please sign in to comment.