Skip to content

Commit

Permalink
Bridge networkpolicies fix
Browse files Browse the repository at this point in the history
Signed-off-by: jkalinic <jkalinic@redhat.com>
  • Loading branch information
jankalinic committed Mar 1, 2024
1 parent 18c90f5 commit 78df26a
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 1 deletion.
Expand Up @@ -4,12 +4,15 @@
*/
package io.strimzi.systemtest.kafkaclients.internalClients;

import io.fabric8.kubernetes.api.model.LabelSelector;
import io.fabric8.kubernetes.api.model.LabelSelectorBuilder;
import io.fabric8.kubernetes.api.model.LocalObjectReference;
import io.fabric8.kubernetes.api.model.PodSpecBuilder;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
import io.strimzi.systemtest.Environment;
import io.strimzi.systemtest.TestConstants;
import io.strimzi.systemtest.resources.kubernetes.NetworkPolicyResource;
import io.sundr.builder.annotations.Buildable;

import java.util.Collections;
Expand All @@ -19,8 +22,18 @@

@Buildable(editableEnabled = false)
public class BridgeClients extends KafkaClients {
private int port;
private String componentName;
private int pollInterval;
private int port;

public String getComponentName() {
return componentName;
}

public void setComponentName(String componentName) {
this.componentName = componentName;
}

public int getPollInterval() {
return pollInterval;
}
Expand All @@ -37,6 +50,17 @@ public void setPort(int port) {
this.port = port;
}

private void createNetworkPoliciesIfNeeded(String clientName, Map<String, String> clientLabels) {
// We need to create network policies if default policy is to deny traffic
if (Environment.DEFAULT_TO_DENY_NETWORK_POLICIES) {
LabelSelector producerLabelSelector = new LabelSelectorBuilder()
.addToMatchLabels(clientLabels)
.build();

NetworkPolicyResource.allowNetworkPolicySettingsForBridgeClients(this.getNamespaceName(), clientName, producerLabelSelector, this.getComponentName());
}
}

public JobBuilder defaultProducerStrimziBridge() {
Map<String, String> producerLabels = new HashMap<>();
producerLabels.put("app", this.getProducerName());
Expand All @@ -49,6 +73,8 @@ public JobBuilder defaultProducerStrimziBridge() {
podSpecBuilder.withImagePullSecrets(imagePullSecrets);
}

createNetworkPoliciesIfNeeded(this.getProducerName(), producerLabels);

return new JobBuilder()
.withNewMetadata()
.withNamespace(this.getNamespaceName())
Expand Down Expand Up @@ -114,6 +140,8 @@ public JobBuilder defaultConsumerStrimziBridge() {
podSpecBuilder.withImagePullSecrets(imagePullSecrets);
}

createNetworkPoliciesIfNeeded(this.getConsumerName(), consumerLabels);

return new JobBuilder()
.withNewMetadata()
.withNamespace(this.getNamespaceName())
Expand Down
Expand Up @@ -10,6 +10,7 @@
import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicy;
import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicyBuilder;
import io.fabric8.kubernetes.client.CustomResource;
import io.strimzi.api.kafka.model.bridge.KafkaBridge;
import io.strimzi.api.kafka.model.common.Spec;
import io.strimzi.api.kafka.model.kafka.Status;
import io.strimzi.systemtest.Environment;
Expand All @@ -25,6 +26,8 @@

import java.util.List;

import static io.strimzi.api.ResourceLabels.STRIMZI_KIND_LABEL;
import static io.strimzi.api.ResourceLabels.STRIMZI_NAME_LABEL;
import static io.strimzi.test.k8s.KubeClusterResource.kubeClient;

public class NetworkPolicyResource implements ResourceType<NetworkPolicy> {
Expand Down Expand Up @@ -89,6 +92,44 @@ public static void allowNetworkPolicySettingsForClusterOperator(String namespace
LOGGER.info("Network policy for LabelSelector {} successfully created", labelSelector);
}

public static void allowNetworkPolicySettingsForBridgeClients(String namespace, String clientName, LabelSelector clientLabelSelector, String componentName) {
LOGGER.info("Apply NetworkPolicy access to Kafka Bridge {} from client Pods with LabelSelector {}", componentName, clientLabelSelector);

NetworkPolicy networkPolicy = NetworkPolicyTemplates.networkPolicyBuilder(namespace, clientName, clientLabelSelector)
.editSpec()
.withNewPodSelector()
.addToMatchLabels(STRIMZI_KIND_LABEL, KafkaBridge.RESOURCE_KIND)
.addToMatchLabels(STRIMZI_NAME_LABEL, componentName)
.endPodSelector()
.endSpec()
.build();

LOGGER.debug("Creating NetworkPolicy: {}", networkPolicy.toString());
ResourceManager.getInstance().createResourceWithWait(networkPolicy);
LOGGER.info("Network policy for LabelSelector {} successfully created", clientLabelSelector);
}

public static void allowNetworkPolicySettingsForBridgeScraper(String namespace, String scraperPodName, String componentName) {
LabelSelector scraperLabelSelector = new LabelSelectorBuilder()
.addToMatchLabels(TestConstants.SCRAPER_LABEL_KEY, TestConstants.SCRAPER_LABEL_VALUE)
.build();

LOGGER.info("Apply NetworkPolicy access to Kafka Bridge {} from scraper Pods with LabelSelector {}", componentName, scraperLabelSelector);

NetworkPolicy networkPolicy = NetworkPolicyTemplates.networkPolicyBuilder(namespace, scraperPodName, scraperLabelSelector)
.editSpec()
.withNewPodSelector()
.addToMatchLabels(STRIMZI_KIND_LABEL, KafkaBridge.RESOURCE_KIND)
.addToMatchLabels(STRIMZI_NAME_LABEL, componentName)
.endPodSelector()
.endSpec()
.build();

LOGGER.debug("Creating NetworkPolicy: {}", networkPolicy.toString());
ResourceManager.getInstance().createResourceWithWait(networkPolicy);
LOGGER.info("Network policy for LabelSelector {} successfully created", scraperLabelSelector);
}

/**
* Method for allowing network policies for Connect
* @param resource mean Connect resource
Expand Down
Expand Up @@ -151,6 +151,7 @@ private void testWeirdUsername(String weirdUserName, KafkaListenerAuthentication
.withProducerName(testStorage.getClusterName() + "-" + producerName)
.withConsumerName(testStorage.getClusterName() + "-" + consumerName)
.withBootstrapAddress(KafkaBridgeResources.serviceName(testStorage.getClusterName()))
.withComponentName(KafkaBridgeResources.componentName(testStorage.getClusterName()))
.withTopicName(testStorage.getTopicName())
.withMessageCount(testStorage.getMessageCount())
.withPort(TestConstants.HTTP_BRIDGE_DEFAULT_PORT)
Expand Down
Expand Up @@ -76,6 +76,7 @@ void testSendSimpleMessage() {
final BridgeClients kafkaBridgeClientJob = new BridgeClientsBuilder()
.withProducerName(testStorage.getProducerName())
.withBootstrapAddress(KafkaBridgeResources.serviceName(suiteTestStorage.getClusterName()))
.withComponentName(KafkaBridgeResources.componentName(suiteTestStorage.getClusterName()))
.withTopicName(testStorage.getTopicName())
.withMessageCount(testStorage.getMessageCount())
.withPort(TestConstants.HTTP_BRIDGE_DEFAULT_PORT)
Expand Down Expand Up @@ -117,6 +118,7 @@ void testReceiveSimpleMessage() {
final BridgeClients kafkaBridgeClientJob = new BridgeClientsBuilder()
.withConsumerName(testStorage.getConsumerName())
.withBootstrapAddress(KafkaBridgeResources.serviceName(suiteTestStorage.getClusterName()))
.withComponentName(KafkaBridgeResources.componentName(suiteTestStorage.getClusterName()))
.withTopicName(testStorage.getTopicName())
.withMessageCount(testStorage.getMessageCount())
.withPort(TestConstants.HTTP_BRIDGE_DEFAULT_PORT)
Expand Down
Expand Up @@ -183,6 +183,7 @@ void setUp() {

kafkaBridgeClientJob = new BridgeClientsBuilder()
.withBootstrapAddress(KafkaBridgeResources.serviceName(suiteTestStorage.getClusterName()))
.withComponentName(KafkaBridgeResources.componentName(suiteTestStorage.getClusterName()))
.withTopicName(suiteTestStorage.getTopicName())
.withMessageCount(suiteTestStorage.getMessageCount())
.withPort(TestConstants.HTTP_BRIDGE_DEFAULT_PORT)
Expand Down
Expand Up @@ -176,6 +176,7 @@ void setUp() {

kafkaBridgeClientJob = new BridgeClientsBuilder()
.withBootstrapAddress(KafkaBridgeResources.serviceName(suiteTestStorage.getClusterName()))
.withComponentName(KafkaBridgeResources.componentName(suiteTestStorage.getClusterName()))
.withTopicName(suiteTestStorage.getTopicName())
.withMessageCount(suiteTestStorage.getMessageCount())
.withPort(TestConstants.HTTP_BRIDGE_DEFAULT_PORT)
Expand Down
Expand Up @@ -550,6 +550,9 @@ void testKafkaBridgeMetrics() {
.endMetadata()
.build());

// Allow connections from scraper to Bridge pods when NetworkPolicies are set to denied by default
NetworkPolicyResource.allowNetworkPolicySettingsForBridgeScraper(namespaceFirst, scraperPodName, KafkaBridgeResources.componentName(bridgeClusterName));

MetricsCollector bridgeCollector = kafkaCollector.toBuilder()
.withComponentName(bridgeClusterName)
.withComponentType(ComponentType.KafkaBridge)
Expand All @@ -561,6 +564,7 @@ void testKafkaBridgeMetrics() {
.withProducerName(testStorage.getProducerName())
.withConsumerName(testStorage.getConsumerName())
.withBootstrapAddress(KafkaBridgeResources.serviceName(bridgeClusterName))
.withComponentName(KafkaBridgeResources.componentName(bridgeClusterName))
.withTopicName(bridgeTopicName)
.withMessageCount(testStorage.getMessageCount())
.withPort(TestConstants.HTTP_BRIDGE_DEFAULT_PORT)
Expand Down
Expand Up @@ -483,6 +483,7 @@ void testPasswordGrantsKafkaBridge() {
BridgeClients kafkaBridgeClientJob = new BridgeClientsBuilder()
.withProducerName(producerName)
.withBootstrapAddress(KafkaBridgeResources.serviceName(oauthClusterName))
.withComponentName(KafkaBridgeResources.componentName(oauthClusterName))
.withTopicName(testStorage.getTopicName())
.withMessageCount(testStorage.getMessageCount())
.withPort(HTTP_BRIDGE_DEFAULT_PORT)
Expand Down
Expand Up @@ -748,6 +748,7 @@ void testProducerConsumerBridgeWithOauthMetrics() {
.withNamespaceName(Environment.TEST_SUITE_NAMESPACE)
.withProducerName(bridgeProducerName)
.withBootstrapAddress(KafkaBridgeResources.serviceName(oauthClusterName))
.withComponentName(KafkaBridgeResources.componentName(oauthClusterName))
.withTopicName(testStorage.getTopicName())
.withMessageCount(testStorage.getMessageCount())
.withPort(HTTP_BRIDGE_DEFAULT_PORT)
Expand Down
Expand Up @@ -239,6 +239,7 @@ void testProducerConsumerBridge() {
BridgeClients kafkaBridgeClientJob = new BridgeClientsBuilder()
.withProducerName(producerName)
.withBootstrapAddress(KafkaBridgeResources.serviceName(oauthClusterName))
.withComponentName(KafkaBridgeResources.componentName(oauthClusterName))
.withTopicName(testStorage.getTopicName())
.withMessageCount(10)
.withPort(HTTP_BRIDGE_DEFAULT_PORT)
Expand Down

0 comments on commit 78df26a

Please sign in to comment.