Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ST] Refactor CustomAuthorizerST #9135

Merged
merged 2 commits into from
Sep 23, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import io.strimzi.systemtest.AbstractST;
import io.strimzi.systemtest.Constants;
import io.strimzi.systemtest.Environment;
import io.strimzi.systemtest.annotations.KRaftNotSupported;
import io.strimzi.systemtest.annotations.ParallelTest;
import io.strimzi.systemtest.kafkaclients.internalClients.KafkaClients;
import io.strimzi.systemtest.kafkaclients.internalClients.KafkaClientsBuilder;
Expand All @@ -33,12 +32,31 @@
import static io.strimzi.systemtest.Constants.INTERNAL_CLIENTS_USED;

@Tag(REGRESSION)
@KRaftNotSupported("Custom Authorizer is not supported by KRaft mode and is used in this test case")
public class CustomAuthorizerST extends AbstractST {
static final String CLUSTER_NAME = "custom-authorizer";
static final String ADMIN = "sre-admin";
private static final Logger LOGGER = LogManager.getLogger(CustomAuthorizerST.class);

/**
* @description This test case verifies Access Control Lists with simple authorization and tls listener.
*
* @steps
* 1. - Kafka with simple authorization and tls listener is deployed even before the test itself start
* - Kafka with desired authorization and listener is ready
* 2. - Create first KafkaUser, with ACLs to write and describe specific topic
* - KafkaUser authorized to produce into specific topic is ready
* 3. - Create second KafkaUser, with ACLs to read and describe specific topic
* - KafkaUser authorized to consume from specific topic is ready
* 4. - Deploy Kafka clients using first KafkaUser authorized to produce data into specific topic
* - Producer completes successfully whereas consumer timeouts
* 5. - Deploy Kafka clients using second KafkaUser authorized to consume data into specific topic
* - Producer timeouts whereas consumer timeouts
*
* @usecase
* - custom-authorization
* - acls
* - kafka-user
*/
@ParallelTest
@Tag(INTERNAL_CLIENTS_USED)
void testAclRuleReadAndWrite(ExtensionContext extensionContext) {
Expand All @@ -56,7 +74,7 @@ void testAclRuleReadAndWrite(ExtensionContext extensionContext) {
.withNewAclRuleTopicResource()
.withName(testStorage.getTopicName())
.endAclRuleTopicResource()
.withOperations(AclOperation.WRITE, AclOperation.DESCRIBE)
.withOperations(AclOperation.WRITE, AclOperation.DESCRIBE, AclOperation.CREATE) // create is necessary if topic does not exist prior to data production
.endAcl()
.endKafkaUserAuthorizationSimple()
.endSpec()
Expand Down Expand Up @@ -116,29 +134,29 @@ void testAclRuleReadAndWrite(ExtensionContext extensionContext) {
ClientUtils.waitForProducerClientTimeout(testStorage);
}

/**
* @description This test case verifies Access Control Lists with simple authorization and tls listener.
*
* @steps
* 1. - Kafka with simple authorization and specified superuser is deployed even before the test itself start
* - Kafka with desired authorization is ready
* 2. - Create explicit KafkaUser, with no other properties except necessary metadata and specific name referencing pre-created superuser
* - Admin KafkaUser is ready
* 3. - Deploy Kafka clients using admin KafkaUser
* - Producer and consumer complete successfully
*
* @usecase
* - custom-authorization
* - acls
* - kafka-user
*/
@ParallelTest
@Tag(INTERNAL_CLIENTS_USED)
void testAclWithSuperUser(ExtensionContext extensionContext) {
final TestStorage testStorage = new TestStorage(extensionContext, Environment.TEST_SUITE_NAMESPACE);

resourceManager.createResourceWithWait(extensionContext, KafkaTopicTemplates.topic(CLUSTER_NAME, testStorage.getTopicName(), Environment.TEST_SUITE_NAMESPACE).build());

KafkaUser adminUser = KafkaUserTemplates.tlsUser(Environment.TEST_SUITE_NAMESPACE, CLUSTER_NAME, ADMIN)
.editSpec()
.withNewKafkaUserAuthorizationSimple()
.addNewAcl()
.withNewAclRuleTopicResource()
.withName(testStorage.getTopicName())
.endAclRuleTopicResource()
.withOperations(AclOperation.WRITE, AclOperation.DESCRIBE)
.endAcl()
.endKafkaUserAuthorizationSimple()
.endSpec()
.build();

resourceManager.createResourceWithWait(extensionContext, adminUser);

LOGGER.info("Checking Kafka Super User: {}/{} that is able to send messages to Topic: {}", Environment.TEST_SUITE_NAMESPACE, ADMIN, testStorage.getTopicName());
resourceManager.createResourceWithWait(extensionContext, KafkaUserTemplates.tlsUser(Environment.TEST_SUITE_NAMESPACE, CLUSTER_NAME, ADMIN).build());

KafkaClients kafkaClients = new KafkaClientsBuilder()
.withProducerName(testStorage.getProducerName())
Expand All @@ -150,14 +168,9 @@ void testAclWithSuperUser(ExtensionContext extensionContext) {
.withUsername(ADMIN)
.build();

resourceManager.createResourceWithWait(extensionContext, kafkaClients.producerTlsStrimzi(CLUSTER_NAME));
ClientUtils.waitForProducerClientSuccess(testStorage);

LOGGER.info("Checking Kafka Super User: {}/{} that is able to read messages from Topic: {} regardless that " +
"we configured Acls with only write operation", Environment.TEST_SUITE_NAMESPACE, ADMIN, TOPIC_NAME);

resourceManager.createResourceWithWait(extensionContext, kafkaClients.consumerTlsStrimzi(CLUSTER_NAME));
ClientUtils.waitForConsumerClientSuccess(testStorage);
LOGGER.info("Checking Kafka Super User: {}/{} is able to produce/consume despite having no explicit rights in KafkaUser", Environment.TEST_SUITE_NAMESPACE, ADMIN);
resourceManager.createResourceWithWait(extensionContext, kafkaClients.producerTlsStrimzi(CLUSTER_NAME), kafkaClients.consumerTlsStrimzi(CLUSTER_NAME));
ClientUtils.waitForClientsSuccess(testStorage);
}

@BeforeAll
Expand All @@ -173,8 +186,9 @@ public void setup(ExtensionContext extensionContext) {
.endMetadata()
.editSpec()
.editKafka()
.addToConfig("auto.create.topics.enable", "true")
henryZrncik marked this conversation as resolved.
Show resolved Hide resolved
.withNewKafkaAuthorizationCustom()
.withAuthorizerClass(KafkaAuthorizationSimple.AUTHORIZER_CLASS_NAME)
.withAuthorizerClass(Environment.isKRaftModeEnabled() ? KafkaAuthorizationSimple.KRAFT_AUTHORIZER_CLASS_NAME : KafkaAuthorizationSimple.AUTHORIZER_CLASS_NAME)
.withSupportsAdminApi(true)
.withSuperUsers("CN=" + ADMIN)
.endKafkaAuthorizationCustom()
Expand Down
Loading