Skip to content

Commit

Permalink
chore: upgrade Fabric8 Kubernetes Client to 4.13.3
Browse files Browse the repository at this point in the history
Also addresses some of the quirks of the upgrade and in the Kafka broker
/topic auto discovery allows for `apiextensions.k8s.io/v1beta1` and
`apiextensions.k8s.io/v1` versions.
  • Loading branch information
zregvart committed Oct 29, 2021
1 parent 75292fd commit e3407a7
Show file tree
Hide file tree
Showing 13 changed files with 214 additions and 159 deletions.
6 changes: 5 additions & 1 deletion app/connector/kafka/pom.xml
Expand Up @@ -74,7 +74,11 @@
</dependency>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-model</artifactId>
<artifactId>kubernetes-model-core</artifactId>
</dependency>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-model-apiextensions</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Expand Down
Expand Up @@ -21,13 +21,17 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.apiextensions.CustomResourceDefinition;
import io.fabric8.kubernetes.api.model.apiextensions.CustomResourceDefinitionBuilder;
import io.fabric8.kubernetes.api.model.apiextensions.CustomResourceDefinitionStatusBuilder;
import io.fabric8.kubernetes.api.model.apiextensions.v1beta1.CustomResourceDefinition;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.V1ApiextensionAPIGroupDSL;
import io.fabric8.kubernetes.client.V1beta1ApiextensionAPIGroupDSL;
import io.fabric8.kubernetes.client.dsl.ApiextensionsAPIGroupDSL;
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
import io.syndesis.connector.kafka.model.crd.Address;
import io.syndesis.connector.kafka.model.crd.Kafka;
import io.syndesis.connector.kafka.model.crd.KafkaResourceDoneable;
Expand All @@ -37,82 +41,62 @@
import io.syndesis.connector.support.verifier.api.PropertyPair;
import io.syndesis.connector.support.verifier.api.SyndesisMetadata;
import io.syndesis.connector.support.verifier.api.SyndesisMetadataProperties;

import org.apache.camel.CamelContext;
import org.apache.camel.component.extension.MetaDataExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaMetaDataRetrieval extends ComponentMetadataRetrieval {

private static final Logger LOG = LoggerFactory.getLogger(KafkaMetaDataRetrieval.class);

static final CustomResourceDefinition KAFKA_CRD = new CustomResourceDefinitionBuilder()
.withApiVersion("apiextensions.k8s.io/v1")
.withKind("CustomResourceDefinition")
.withNewMetadata()
.withName("kafkas.kafka.strimzi.io")
.endMetadata()
.withNewSpec()
// sort versions, newest first
static final CustomResourceDefinitionContext[] KAFKA_CRDS = Stream.of("v1beta2", "v1beta1")
.map(v -> new CustomResourceDefinitionContext.Builder()
.withGroup("kafka.strimzi.io")
.withVersion(v)
.withScope("Namespaced")
.withVersion("v1beta2")
.withNewNames()
.withKind("Kafka")
.withListKind("KafkaList")
.withPlural("kafkas")
.withSingular("kafka")
.endNames()
.endSpec()
.withStatus(new CustomResourceDefinitionStatusBuilder().build())
.build();

/**
* TODO: use local extension, remove when switching to camel 2.22.x
*/
@Override
protected MetaDataExtension resolveMetaDataExtension(CamelContext context,
Class<? extends MetaDataExtension> metaDataExtensionClass,
String componentId, String actionId) {
return new KafkaMetaDataExtension(context);
}

@SuppressWarnings("unchecked")
@Override
protected SyndesisMetadata adapt(CamelContext context, String componentId, String actionId,
Map<String, Object> properties, MetaDataExtension.MetaData metadata) {
Set<String> topicsNames = (Set<String>) metadata.getPayload();

List<PropertyPair> topicsResult = new ArrayList<>();
topicsNames.forEach(
t -> topicsResult.add(new PropertyPair(t, t))
);
.withName("kafkas.kafka.strimzi.io")
.withPlural("kafkas")
.withKind("Kafka")
.build())
.toArray(CustomResourceDefinitionContext[]::new);

return SyndesisMetadata.of(
Collections.singletonMap("topic", topicsResult)
);
}
private static final Logger LOG = LoggerFactory.getLogger(KafkaMetaDataRetrieval.class);

/**
* Query the strimzi brokers available on this kubernetes environment
* to suggest auto discovered urls.
* Query the strimzi brokers available on this kubernetes environment to
* suggest auto discovered urls.
*/
@Override
public SyndesisMetadataProperties fetchProperties(CamelContext context, String componentId,
Map<String, Object> properties) {
List<PropertyPair> brokers = new ArrayList<>();
try (KubernetesClient client = createKubernetesClient()) {

CustomResourceDefinition crd = client.customResourceDefinitions().withName("kafkas.kafka.strimzi.io").get();
// verify if strimzi CRD is installed
if (crd != null) {
KafkaResourceList kafkaList = client.customResources(KAFKA_CRD, Kafka.class, KafkaResourceList.class, KafkaResourceDoneable.class)
.inAnyNamespace().list();
kafkaList.getItems().forEach(kafka -> processKafkaResource(brokers, kafka));
public SyndesisMetadataProperties fetchProperties(final CamelContext context, final String componentId, final Map<String, Object> properties) {
final List<PropertyPair> brokers = new ArrayList<>();

try (KubernetesClient client = createKubernetesClient();
ApiextensionsAPIGroupDSL apiextensions = client.apiextensions();
V1beta1ApiextensionAPIGroupDSL v1beta1 = apiextensions.v1beta1();
V1ApiextensionAPIGroupDSL v1 = apiextensions.v1()) {

final CustomResourceDefinition v1beta1CRD = v1beta1.customResourceDefinitions().withName("kafkas.kafka.strimzi.io").get();
final io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinition v1CRD = v1.customResourceDefinitions()
.withName("kafkas.kafka.strimzi.io").get();

if (v1beta1CRD != null || v1CRD != null) {
for (final CustomResourceDefinitionContext crd : KAFKA_CRDS) {
try {
final KafkaResourceList kafkaList = client.customResources(crd, Kafka.class, KafkaResourceList.class, KafkaResourceDoneable.class)
.inAnyNamespace().list();
kafkaList.getItems().forEach(kafka -> processKafkaResource(brokers, kafka));
} catch (KubernetesClientException e) {
LOG.debug("Could not list: " + crd.getGroup() + "/" + crd.getName(), e);
}
}
}
} catch (Exception t) {

} catch (final Exception t) {
LOG.warn("Couldn't auto discover any kafka broker.", t);
}
Map<String, List<PropertyPair>> dynamicProperties = new HashMap<>();

final Map<String, List<PropertyPair>> dynamicProperties = new HashMap<>();
dynamicProperties.put("brokers", brokers);
return new SyndesisMetadataProperties(dynamicProperties);
}
Expand All @@ -121,35 +105,45 @@ KubernetesClient createKubernetesClient() {
return new DefaultKubernetesClient();
}

@SuppressWarnings("unchecked")
@Override
protected SyndesisMetadata adapt(final CamelContext context, final String componentId, final String actionId,
final Map<String, Object> properties, final MetaDataExtension.MetaData metadata) {
final Set<String> topicsNames = (Set<String>) metadata.getPayload();

final List<PropertyPair> topicsResult = new ArrayList<>();
topicsNames.forEach(
t -> topicsResult.add(new PropertyPair(t, t)));

return SyndesisMetadata.of(
Collections.singletonMap("topic", topicsResult));
}

/**
* For each Kafka resource found on Kubernetes, extract the listeners and
* add them to the brokers list.
* TODO: use local extension, remove when switching to camel 2.22.x
*/
private static void processKafkaResource(List<PropertyPair> brokers, Kafka item) {
//Extract an identifier of this broker
final ObjectMeta metadata = item.getMetadata();
List<Listener> listeners = item.getStatus().getListeners();
listeners.stream().filter(KafkaMetaDataRetrieval::typesAllowed).forEach(
listener -> getAddress(
listener,
brokers,
String.format("%s::%s (%s)", metadata.getNamespace(), metadata.getName(), listener.getType()))
);
@Override
protected MetaDataExtension resolveMetaDataExtension(final CamelContext context,
final Class<? extends MetaDataExtension> metaDataExtensionClass,
final String componentId, final String actionId) {
return new KafkaMetaDataExtension(context);
}

/**
* Get the list of addresses for this connection and add it to the brokers list.
* Get the list of addresses for this connection and add it to the brokers
* list.
*
* @param listener metadata for this broker
* @param brokers list where all brokers are going to be added
* @param name identifier of this broker
* @param brokers list where all brokers are going to be added
* @param name identifier of this broker
*/
private static void getAddress(final Listener listener,
final List<PropertyPair> brokers,
final String name) {
StringBuilder add = new StringBuilder();
final List<PropertyPair> brokers,
final String name) {
final StringBuilder add = new StringBuilder();

List<Address> addresses = listener.getAddresses();
for (Address a : addresses) {
final List<Address> addresses = listener.getAddresses();
for (final Address a : addresses) {
if (add.length() > 0) {
add.append(',');
}
Expand All @@ -162,7 +156,23 @@ private static void getAddress(final Listener listener,
}

/**
* Used to filter which types of connections are we interested in. Right now, only plain connections.
* For each Kafka resource found on Kubernetes, extract the listeners and
* add them to the brokers list.
*/
private static void processKafkaResource(final List<PropertyPair> brokers, final Kafka item) {
// Extract an identifier of this broker
final ObjectMeta metadata = item.getMetadata();
final List<Listener> listeners = item.getStatus().getListeners();
listeners.stream().filter(KafkaMetaDataRetrieval::typesAllowed).forEach(
listener -> getAddress(
listener,
brokers,
String.format("%s::%s (%s)", metadata.getNamespace(), metadata.getName(), listener.getType())));
}

/**
* Used to filter which types of connections are we interested in. Right
* now, only plain connections.
*/
private static boolean typesAllowed(final Listener listener) {
return "plain".equalsIgnoreCase(listener.getType()) ||
Expand Down

0 comments on commit e3407a7

Please sign in to comment.