Skip to content

Commit

Permalink
cst/ducktape: Make sure background thread stops
Browse files Browse the repository at this point in the history
Even if the test code fails, the background thread must be stopped to
avoid a hung test.
  • Loading branch information
abhijat committed May 13, 2024
1 parent 0fda0be commit fc99549
Showing 1 changed file with 50 additions and 26 deletions.
76 changes: 50 additions & 26 deletions tests/rptest/tests/cloud_storage_chunk_read_path_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,33 +218,47 @@ def test_read_chunks(self):
# delete chunks more aggressively to make sure some materialization ops fail.
rm_chunks = DeleteRandomChunks(self.redpanda, self.topic)
rm_chunks.start()
rand_cons.start()
rand_cons.wait(timeout_sec=300)
m.expect([(metric, lambda a, b: a < b <= 2 * self.default_chunk_size)])

# There should be no log files in cache
self._assert_not_in_cache(fr'.*kafka/{self.topic}/.*\.log\.[0-9]+$')
try:
rand_cons.start()
rand_cons.wait(timeout_sec=300)
m.expect([(metric,
lambda a, b: a < b <= 2 * self.default_chunk_size)])

# We cannot assert that there are chunks in cache because the deleting thread
# could delete them all before we run the assertion, causing it to fail.
# We can check that the thread deleted some chunks.
assert rm_chunks.deleted_chunks > 0, "Expected to delete some chunk files during rand-cons, none deleted"
rm_chunks.deleted_chunks = 0
# There should be no log files in cache
self._assert_not_in_cache(
fr'.*kafka/{self.topic}/.*\.log\.[0-9]+$')

consumer = KgoVerifierSeqConsumer(self.test_context,
self.redpanda,
self.topic,
0,
nodes=self.preallocated_nodes)
consumer.start()
consumer.wait(timeout_sec=120)
rm_chunks.stop()
rm_chunks.join(timeout=10)
assert rm_chunks.deleted_chunks > 0, "Expected to delete some chunk files during seq-cons, none deleted"
# We cannot assert that there are chunks in cache because the deleting thread
# could delete them all before we run the assertion, causing it to fail.
# We can check that the thread deleted some chunks.
assert rm_chunks.deleted_chunks > 0, "Expected to delete some chunk files during rand-cons, none deleted"
rm_chunks.deleted_chunks = 0

self._assert_not_in_cache(fr'.*kafka/{self.topic}/.*\.log\.[0-9]+$')

self._trim_and_verify()
consumer = KgoVerifierSeqConsumer(self.test_context,
self.redpanda,
self.topic,
0,
nodes=self.preallocated_nodes)
consumer.start()
consumer.wait(timeout_sec=120)
rm_chunks.stop()
rm_chunks.join(timeout=10)
assert rm_chunks.deleted_chunks > 0, "Expected to delete some chunk files during seq-cons, none deleted"

self._assert_not_in_cache(
fr'.*kafka/{self.topic}/.*\.log\.[0-9]+$')

self._trim_and_verify()
except Exception as ex:
try:
if not rm_chunks.stop_requested:
rm_chunks.stop()
rm_chunks.join(timeout=10)
except Exception as timed_out:
self.redpanda.logger.error(
f'failed to stop rm_chunks: {timed_out}')
raise ex

@cluster(num_nodes=4)
@parametrize(prefetch=0)
Expand Down Expand Up @@ -388,9 +402,19 @@ def test_read_when_cache_smaller_than_segment_size(self):
observe_cache_dir = ObserveCacheDir(self.redpanda, self.topic)
observe_cache_dir.start()

self._consume_baseline(timeout=180, max_msgs=read_count)
observe_cache_dir.stop()
observe_cache_dir.join(timeout=10)
try:
self._consume_baseline(timeout=180, max_msgs=read_count)
observe_cache_dir.stop()
observe_cache_dir.join(timeout=10)
except Exception as ex:
try:
if not observe_cache_dir.stop_requested:
observe_cache_dir.stop()
observe_cache_dir.join(timeout=10)
except Exception as timed_out:
self.redpanda.logger.error(
f'failed to stop observe_cache_dir: {timed_out}')
raise ex
assert not observe_cache_dir.is_alive(
), 'cache observer is unexpectedly alive'

Expand Down

0 comments on commit fc99549

Please sign in to comment.