Skip to content

Commit

Permalink
Configure OAuth principal.builder.class also for controllers (#9682)
Browse files Browse the repository at this point in the history
Signed-off-by: Lukas Kral <lukywill16@gmail.com>
  • Loading branch information
im-konge committed Feb 14, 2024
1 parent 3e7e30d commit 891624a
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -313,10 +313,11 @@ public KafkaBrokerConfigurationBuilder withListeners(

writer.println();
}

configureOAuthPrincipalBuilderIfNeeded(writer, kafkaListeners);
}

// configure OAuth principal builder for all the nodes - brokers, controllers, and mixed
configureOAuthPrincipalBuilderIfNeeded(writer, kafkaListeners);

printSectionHeader("Common listener configuration");
writer.println("listener.security.protocol.map=" + String.join(",", securityProtocol));
writer.println("listeners=" + String.join(",", listeners));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,143 @@ public void testKraftListenersBrokerAndControllerNodes() {
"ssl.endpoint.identification.algorithm=HTTPS"));
}

@ParallelTest
public void testKraftOauthBrokerControllerAndMixedNodes() {
Set<NodeRef> nodes = Set.of(
new NodeRef("my-cluster-controllers-0", 0, "controllers", true, false),
new NodeRef("my-cluster-controllers-1", 1, "controllers", true, false),
new NodeRef("my-cluster-controllers-2", 2, "controllers", true, false),
new NodeRef("my-cluster-brokers-10", 10, "brokers", false, true),
new NodeRef("my-cluster-brokers-11", 11, "brokers", false, true),
new NodeRef("my-cluster-brokers-12", 12, "brokers", false, true),
new NodeRef("my-cluster-kafka-13", 13, "kafka", true, true),
new NodeRef("my-cluster-kafka-14", 14, "kafka", true, true),
new NodeRef("my-cluster-kafka-15", 15, "kafka", true, true)
);

GenericKafkaListener listener = new GenericKafkaListenerBuilder()
.withName("plain")
.withPort(9092)
.withType(KafkaListenerType.INTERNAL)
.withTls(false)
.withNewKafkaListenerAuthenticationOAuth()
.withValidIssuerUri("http://valid-issuer")
.withJwksEndpointUri("http://jwks")
.withEnableECDSA(true)
.withUserNameClaim("preferred_username")
.withGroupsClaim("$.groups")
.withGroupsClaimDelimiter(";")
.withMaxSecondsWithoutReauthentication(3600)
.withJwksMinRefreshPauseSeconds(5)
.withEnablePlain(true)
.withTokenEndpointUri("http://token")
.withConnectTimeoutSeconds(30)
.withReadTimeoutSeconds(30)
.withEnableMetrics(true)
.withIncludeAcceptHeader(false)
.endKafkaListenerAuthenticationOAuth()
.build();

// Controller-only node
String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, "2", true)
.withKRaft("my-cluster", "my-namespace", Set.of(ProcessRoles.CONTROLLER), nodes)
.withListeners("my-cluster", "my-namespace", new NodeRef("my-cluster-controllers-2", 2, "controllers", true, false), singletonList(listener), listenerId -> "my-cluster-controllers-2.my-cluster-kafka-brokers.my-namespace.svc", listenerId -> "9092")
.build();

assertThat(configuration, isEquivalent("node.id=2",
"process.roles=controller",
"controller.listener.names=CONTROLPLANE-9090",
"controller.quorum.voters=0@my-cluster-controllers-0.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,1@my-cluster-controllers-1.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,2@my-cluster-controllers-2.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,13@my-cluster-kafka-13.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,14@my-cluster-kafka-14.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,15@my-cluster-kafka-15.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090",
"listener.name.controlplane-9090.ssl.client.auth=required",
"listener.name.controlplane-9090.ssl.keystore.location=/tmp/kafka/cluster.keystore.p12",
"listener.name.controlplane-9090.ssl.keystore.password=${CERTS_STORE_PASSWORD}",
"listener.name.controlplane-9090.ssl.keystore.type=PKCS12",
"listener.name.controlplane-9090.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12",
"listener.name.controlplane-9090.ssl.truststore.password=${CERTS_STORE_PASSWORD}",
"listener.name.controlplane-9090.ssl.truststore.type=PKCS12",
"listeners=CONTROLPLANE-9090://0.0.0.0:9090",
"listener.security.protocol.map=CONTROLPLANE-9090:SSL",
"sasl.enabled.mechanisms=",
"ssl.endpoint.identification.algorithm=HTTPS",
"principal.builder.class=io.strimzi.kafka.oauth.server.OAuthKafkaPrincipalBuilder"));

// Broker-only node
configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, "11", true)
.withKRaft("my-cluster", "my-namespace", Set.of(ProcessRoles.BROKER), nodes)
.withListeners("my-cluster", "my-namespace", new NodeRef("my-cluster-brokers-11", 11, "brokers", false, true), singletonList(listener), listenerId -> "my-cluster-brokers-11.my-cluster-kafka-brokers.my-namespace.svc", listenerId -> "9092")
.build();

assertThat(configuration, isEquivalent("node.id=11",
"process.roles=broker",
"controller.listener.names=CONTROLPLANE-9090",
"controller.quorum.voters=0@my-cluster-controllers-0.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,1@my-cluster-controllers-1.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,2@my-cluster-controllers-2.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,13@my-cluster-kafka-13.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,14@my-cluster-kafka-14.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,15@my-cluster-kafka-15.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090",
"listener.name.controlplane-9090.ssl.client.auth=required",
"listener.name.controlplane-9090.ssl.keystore.location=/tmp/kafka/cluster.keystore.p12",
"listener.name.controlplane-9090.ssl.keystore.password=${CERTS_STORE_PASSWORD}",
"listener.name.controlplane-9090.ssl.keystore.type=PKCS12",
"listener.name.controlplane-9090.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12",
"listener.name.controlplane-9090.ssl.truststore.password=${CERTS_STORE_PASSWORD}",
"listener.name.controlplane-9090.ssl.truststore.type=PKCS12",
"listener.name.replication-9091.ssl.keystore.location=/tmp/kafka/cluster.keystore.p12",
"listener.name.replication-9091.ssl.keystore.password=${CERTS_STORE_PASSWORD}",
"listener.name.replication-9091.ssl.keystore.type=PKCS12",
"listener.name.replication-9091.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12",
"listener.name.replication-9091.ssl.truststore.password=${CERTS_STORE_PASSWORD}",
"listener.name.replication-9091.ssl.truststore.type=PKCS12",
"listener.name.replication-9091.ssl.client.auth=required",
"listeners=REPLICATION-9091://0.0.0.0:9091,PLAIN-9092://0.0.0.0:9092",
"advertised.listeners=REPLICATION-9091://my-cluster-brokers-11.my-cluster-kafka-brokers.my-namespace.svc:9091,PLAIN-9092://my-cluster-brokers-11.my-cluster-kafka-brokers.my-namespace.svc:9092",
"listener.security.protocol.map=CONTROLPLANE-9090:SSL,REPLICATION-9091:SSL,PLAIN-9092:SASL_PLAINTEXT",
"inter.broker.listener.name=REPLICATION-9091",
"sasl.enabled.mechanisms=",
"ssl.endpoint.identification.algorithm=HTTPS",
"principal.builder.class=io.strimzi.kafka.oauth.server.OAuthKafkaPrincipalBuilder",
"listener.name.plain-9092.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler",
"listener.name.plain-9092.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub=\"thePrincipalName\" oauth.valid.issuer.uri=\"http://valid-issuer\" oauth.jwks.endpoint.uri=\"http://jwks\" oauth.jwks.refresh.min.pause.seconds=\"5\" oauth.username.claim=\"preferred_username\" oauth.groups.claim=\"$.groups\" oauth.groups.claim.delimiter=\";\" oauth.connect.timeout.seconds=\"30\" oauth.read.timeout.seconds=\"30\" oauth.enable.metrics=\"true\" oauth.include.accept.header=\"false\" oauth.config.id=\"PLAIN-9092\";",
"listener.name.plain-9092.plain.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler",
"listener.name.plain-9092.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required oauth.valid.issuer.uri=\"http://valid-issuer\" oauth.jwks.endpoint.uri=\"http://jwks\" oauth.jwks.refresh.min.pause.seconds=\"5\" oauth.username.claim=\"preferred_username\" oauth.groups.claim=\"$.groups\" oauth.groups.claim.delimiter=\";\" oauth.connect.timeout.seconds=\"30\" oauth.read.timeout.seconds=\"30\" oauth.enable.metrics=\"true\" oauth.include.accept.header=\"false\" oauth.config.id=\"PLAIN-9092\" oauth.token.endpoint.uri=\"http://token\";",
"listener.name.plain-9092.sasl.enabled.mechanisms=OAUTHBEARER,PLAIN",
"listener.name.plain-9092.connections.max.reauth.ms=3600000"));

// Mixed node
configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, "14", true)
.withKRaft("my-cluster", "my-namespace", Set.of(ProcessRoles.BROKER, ProcessRoles.CONTROLLER), nodes)
.withListeners("my-cluster", "my-namespace", new NodeRef("my-cluster-kafka-14", 14, "kafka", false, true), singletonList(listener), listenerId -> "my-cluster-kafka-14.my-cluster-kafka-brokers.my-namespace.svc", listenerId -> "9092")
.build();

assertThat(configuration, isEquivalent("node.id=14",
"process.roles=broker,controller",
"controller.listener.names=CONTROLPLANE-9090",
"controller.quorum.voters=0@my-cluster-controllers-0.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,1@my-cluster-controllers-1.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,2@my-cluster-controllers-2.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,13@my-cluster-kafka-13.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,14@my-cluster-kafka-14.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,15@my-cluster-kafka-15.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090",
"listener.name.controlplane-9090.ssl.client.auth=required",
"listener.name.controlplane-9090.ssl.keystore.location=/tmp/kafka/cluster.keystore.p12",
"listener.name.controlplane-9090.ssl.keystore.password=${CERTS_STORE_PASSWORD}",
"listener.name.controlplane-9090.ssl.keystore.type=PKCS12",
"listener.name.controlplane-9090.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12",
"listener.name.controlplane-9090.ssl.truststore.password=${CERTS_STORE_PASSWORD}",
"listener.name.controlplane-9090.ssl.truststore.type=PKCS12",
"listener.name.replication-9091.ssl.keystore.location=/tmp/kafka/cluster.keystore.p12",
"listener.name.replication-9091.ssl.keystore.password=${CERTS_STORE_PASSWORD}",
"listener.name.replication-9091.ssl.keystore.type=PKCS12",
"listener.name.replication-9091.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12",
"listener.name.replication-9091.ssl.truststore.password=${CERTS_STORE_PASSWORD}",
"listener.name.replication-9091.ssl.truststore.type=PKCS12",
"listener.name.replication-9091.ssl.client.auth=required",
"listeners=REPLICATION-9091://0.0.0.0:9091,PLAIN-9092://0.0.0.0:9092",
"advertised.listeners=REPLICATION-9091://my-cluster-kafka-14.my-cluster-kafka-brokers.my-namespace.svc:9091,PLAIN-9092://my-cluster-kafka-14.my-cluster-kafka-brokers.my-namespace.svc:9092",
"listener.security.protocol.map=CONTROLPLANE-9090:SSL,REPLICATION-9091:SSL,PLAIN-9092:SASL_PLAINTEXT",
"inter.broker.listener.name=REPLICATION-9091",
"sasl.enabled.mechanisms=",
"ssl.endpoint.identification.algorithm=HTTPS",
"principal.builder.class=io.strimzi.kafka.oauth.server.OAuthKafkaPrincipalBuilder",
"listener.name.plain-9092.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler",
"listener.name.plain-9092.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub=\"thePrincipalName\" oauth.valid.issuer.uri=\"http://valid-issuer\" oauth.jwks.endpoint.uri=\"http://jwks\" oauth.jwks.refresh.min.pause.seconds=\"5\" oauth.username.claim=\"preferred_username\" oauth.groups.claim=\"$.groups\" oauth.groups.claim.delimiter=\";\" oauth.connect.timeout.seconds=\"30\" oauth.read.timeout.seconds=\"30\" oauth.enable.metrics=\"true\" oauth.include.accept.header=\"false\" oauth.config.id=\"PLAIN-9092\";",
"listener.name.plain-9092.plain.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler",
"listener.name.plain-9092.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required oauth.valid.issuer.uri=\"http://valid-issuer\" oauth.jwks.endpoint.uri=\"http://jwks\" oauth.jwks.refresh.min.pause.seconds=\"5\" oauth.username.claim=\"preferred_username\" oauth.groups.claim=\"$.groups\" oauth.groups.claim.delimiter=\";\" oauth.connect.timeout.seconds=\"30\" oauth.read.timeout.seconds=\"30\" oauth.enable.metrics=\"true\" oauth.include.accept.header=\"false\" oauth.config.id=\"PLAIN-9092\" oauth.token.endpoint.uri=\"http://token\";",
"listener.name.plain-9092.sasl.enabled.mechanisms=OAUTHBEARER,PLAIN",
"listener.name.plain-9092.connections.max.reauth.ms=3600000"));
}

@ParallelTest
public void testWithPlainListenersWithSaslAuth() {
GenericKafkaListener listener = new GenericKafkaListenerBuilder()
Expand Down

0 comments on commit 891624a

Please sign in to comment.