Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added produce to topic operation to ops fuzzer #11532

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions tests/rptest/services/admin_ops_fuzzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from rptest.clients.kafka_cli_tools import KafkaCliTools
from rptest.clients.types import TopicSpec
from time import sleep
from confluent_kafka import Producer

from rptest.clients.rpk import RpkTool
from rptest.services.admin import Admin
Expand Down Expand Up @@ -63,6 +64,7 @@ class RedpandaAdminOperation(Enum):
DELETE_USER = auto()
CREATE_ACLS = auto()
UPDATE_CONFIG = auto()
PRODUCE_TO_TOPIC = auto()


def _random_choice(prefix, collection):
Expand Down Expand Up @@ -436,6 +438,63 @@ def describe(self):
}


class ProduceToTopic(Operation):
def __init__(self, prefix):
self.prefix = prefix
self.topic = None
self.msg_cnt = random.randint(50, 500)
self.delivery_offsets = {}
self.produce_err = None

def execute(self, ctx):
if self.topic is None:
self.topic = _choice_random_topic(ctx, prefix=self.prefix)

if self.topic is None:
return False

producer = Producer({'bootstrap.servers': ctx.redpanda.brokers()})
for i in range(self.msg_cnt):
producer.produce(self.topic,
f"admin-ops-fuzzer-{self.topic}-{i}",
callback=lambda err, metadata: self.
_records_delivered(err, metadata))
producer.flush()

return True

def _records_delivered(self, err, metadata):
self.produce_err = err
self.delivery_offsets[metadata.partition()] = metadata.offset()

def validate(self, ctx):
if self.produce_err:
ctx.redpanda.logger.info(
f"Produce operation returned an error: {self.produce_err}")
return True

partitions = ctx.rpk().describe_topic(self.topic)
for p in partitions:
if p.id in self.delivery_offsets:
batch_offset = self.delivery_offsets[p.id]
ctx.redpanda.logger.debug(
f"Last produced record offset: {self.delivery_offsets[p.id]}, topic high watermark: {p.high_watermark}"
)
if batch_offset > p.high_watermark:
return False

return True

def describe(self):
return {
"type": "produce_to_topic",
"properties": {
"topic": self.topic,
"msg_cnt": self.msg_cnt
}
}


class AdminOperationsFuzzer():
def __init__(self,
redpanda,
Expand Down Expand Up @@ -613,6 +672,8 @@ def make_random_operation(self) -> Operation:
lambda: CreateAclOperation(self.prefix),
RedpandaAdminOperation.UPDATE_CONFIG:
lambda: UpdateConfigOperation(),
RedpandaAdminOperation.PRODUCE_TO_TOPIC:
lambda: ProduceToTopic(self.prefix),
}
return (op, actions[op]())

Expand Down
55 changes: 55 additions & 0 deletions tests/rptest/tests/admin_operations_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Copyright 2020 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

import random
import threading
from rptest.services.admin import Admin
from rptest.tests.prealloc_nodes import PreallocNodesTest

from ducktape.mark import matrix
from ducktape.utils.util import wait_until
from rptest.services.admin_ops_fuzzer import AdminOperationsFuzzer, RedpandaAdminOperation
from rptest.services.cluster import cluster
from rptest.clients.types import TopicSpec
from rptest.clients.default import DefaultClient
from rptest.services.kgo_verifier_services import KgoVerifierConsumerGroupConsumer, KgoVerifierProducer
from rptest.services.redpanda import CHAOS_LOG_ALLOW_LIST, PREV_VERSION_LOG_ALLOW_LIST
from rptest.services.redpanda_installer import RedpandaInstaller
from rptest.tests.redpanda_test import RedpandaTest
from rptest.utils.mode_checks import cleanup_on_early_exit, skip_debug_mode
from rptest.utils.node_operations import FailureInjectorBackgroundThread, NodeOpsExecutor, generate_random_workload

from rptest.clients.offline_log_viewer import OfflineLogViewer


class AdminOperationsTest(RedpandaTest):
def __init__(self, test_context, *args, **kwargs):
self.admin_fuzz = None

super().__init__(test_context=test_context,
num_brokers=3,
*args,
**kwargs)

def tearDown(self):
if self.admin_fuzz is not None:
self.admin_fuzz.stop()

return super().tearDown()

@cluster(num_nodes=3)
def test_admin_operations(self):

self.admin_fuzz = AdminOperationsFuzzer(self.redpanda,
min_replication=1,
operations_interval=1)

self.admin_fuzz.start()
self.admin_fuzz.wait(50, 360)
self.admin_fuzz.stop()
Loading