Skip to content

Commit

Permalink
initial implementation for tiered storage support (#9727)
Browse files Browse the repository at this point in the history
Signed-off-by: Lixin Yao <lixin_yao@apple.com>
Signed-off-by: Jakub Scholz <www@scholzj.com>
Co-authored-by: Lixin Yao <lixin_yao@apple.com>
Co-authored-by: Jakub Scholz <www@scholzj.com>
  • Loading branch information
3 people committed Feb 29, 2024
1 parent eaf8950 commit 4ca2bc6
Show file tree
Hide file tree
Showing 12 changed files with 412 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* Added support for Kafka Exporter `offset.show-all` parameter
* Prevent removal of the `broker` process role from KRaft mixed-nodes that have assigned partition-replicas
* Improve broker scale-down prevention to continue in reconciliation when scale-down cannot be executed
* Added support for Tiered Storage by enabling the configuration of custom storage plugins through the Kafka custom resource.

### Changes, deprecations and removals

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.strimzi.api.kafka.model.common.jmx.KafkaJmxOptions;
import io.strimzi.api.kafka.model.common.metrics.MetricsConfig;
import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListener;
import io.strimzi.api.kafka.model.kafka.tieredstorage.TieredStorage;
import io.strimzi.crdgenerator.annotations.AddedIn;
import io.strimzi.crdgenerator.annotations.Description;
import io.strimzi.crdgenerator.annotations.DescriptionFile;
Expand All @@ -39,7 +40,7 @@
/**
* Representation of a Strimzi-managed Kafka "cluster".
*/
@DescriptionFile
@DescriptionFile
@Buildable(
editableEnabled = false,
builderPackage = Constants.FABRIC8_KUBERNETES_API
Expand Down Expand Up @@ -84,6 +85,7 @@ public class KafkaClusterSpec implements HasConfigurableMetrics, HasConfigurable
private List<GenericKafkaListener> listeners;
private KafkaAuthorization authorization;
private KafkaClusterTemplate template;
private TieredStorage tieredStorage;
private Map<String, Object> additionalProperties = new HashMap<>(0);

@Description("The Kafka broker version. Defaults to the latest version. " +
Expand Down Expand Up @@ -173,7 +175,7 @@ public void setReplicas(Integer replicas) {
}

@Description("The container image used for Kafka pods. "
+ "If the property is not set, the default Kafka image version is determined based on the `version` configuration. "
+ "If the property is not set, the default Kafka image version is determined based on the `version` configuration. "
+ "The image names are specifically mapped to corresponding versions in the Cluster Operator configuration. "
+ "Changing the Kafka image version does not automatically update the image versions for other components, such as Kafka Exporter. ")
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
Expand Down Expand Up @@ -281,6 +283,16 @@ public void setTemplate(KafkaClusterTemplate template) {
this.template = template;
}

@Description("Configure the tiered storage feature for Kafka brokers")
@JsonInclude(JsonInclude.Include.NON_NULL)
public TieredStorage getTieredStorage() {
return tieredStorage;
}

public void setTieredStorage(TieredStorage tieredStorage) {
this.tieredStorage = tieredStorage;
}

@Override
public Map<String, Object> getAdditionalProperties() {
return this.additionalProperties;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/

package io.strimzi.api.kafka.model.kafka.tieredstorage;

import com.fasterxml.jackson.annotation.JsonInclude;
import io.strimzi.api.kafka.model.common.Constants;
import io.strimzi.api.kafka.model.common.UnknownPropertyPreserving;
import io.strimzi.crdgenerator.annotations.Description;
import io.sundr.builder.annotations.Buildable;
import lombok.EqualsAndHashCode;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

/**
* Configures a RemoteStorageManager in the tiered storage setup.
*/
@Buildable(
editableEnabled = false,
builderPackage = Constants.FABRIC8_KUBERNETES_API
)
@JsonInclude(JsonInclude.Include.NON_NULL)
@EqualsAndHashCode
public class RemoteStorageManager implements UnknownPropertyPreserving, Serializable {
private static final long serialVersionUID = 1L;
protected Map<String, Object> additionalProperties;
private String className;
private String classPath;
private Map<String, String> config;

@Override
public Map<String, Object> getAdditionalProperties() {
return this.additionalProperties;
}

@Override
public void setAdditionalProperty(String name, Object value) {
if (this.additionalProperties == null) {
this.additionalProperties = new HashMap<>(1);
}
this.additionalProperties.put(name, value);
}

@Description("The class name for the 'RemoteStorageManager' implementation.")
@JsonInclude(JsonInclude.Include.NON_NULL)
public String getClassName() {
return className;
}

public void setClassName(String className) {
this.className = className;
}

@Description("The class path for the 'RemoteStorageManager' implementation.")
@JsonInclude(JsonInclude.Include.NON_NULL)
public String getClassPath() {
return classPath;
}

public void setClassPath(String classPath) {
this.classPath = classPath;
}

@Description("The additional configuration map for the 'RemoteStorageManager' implementation. " +
"Keys will be automatically prefixed with `rsm.config.`, and added to Kafka broker configuration.")
@JsonInclude(JsonInclude.Include.NON_NULL)
public Map<String, String> getConfig() {
return this.config;
}

public void setConfig(Map<String, String> config) {
this.config = config;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/

package io.strimzi.api.kafka.model.kafka.tieredstorage;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.strimzi.api.kafka.model.common.UnknownPropertyPreserving;
import io.strimzi.crdgenerator.annotations.Description;
import lombok.EqualsAndHashCode;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

/**
* Abstract baseclass for different representations of tieredStorage, discriminated by {@link #getType() type}.
*/
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.EXISTING_PROPERTY,
property = "type"
)
@JsonSubTypes({
@JsonSubTypes.Type(value = TieredStorageCustom.class, name = TieredStorage.TYPE_CUSTOM)}
)
@JsonInclude(JsonInclude.Include.NON_NULL)
@EqualsAndHashCode
public abstract class TieredStorage implements UnknownPropertyPreserving, Serializable {

private static final long serialVersionUID = 1L;
public static final String TYPE_CUSTOM = "custom";

private final Map<String, Object> additionalProperties = new HashMap<>(0);

@Description("Storage type, only 'custom' is supported at the moment.")
public abstract String getType();

@Override
public Map<String, Object> getAdditionalProperties() {
return this.additionalProperties;
}

@Override
public void setAdditionalProperty(String name, Object value) {
this.additionalProperties.put(name, value);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/

package io.strimzi.api.kafka.model.kafka.tieredstorage;

import com.fasterxml.jackson.annotation.JsonInclude;
import io.strimzi.api.kafka.model.common.Constants;
import io.strimzi.crdgenerator.annotations.Description;
import io.strimzi.crdgenerator.annotations.DescriptionFile;
import io.sundr.builder.annotations.Buildable;
import lombok.EqualsAndHashCode;

/**
* Configures a tieredStorage to use custom type.
*/
@DescriptionFile
@Buildable(
editableEnabled = false,
builderPackage = Constants.FABRIC8_KUBERNETES_API
)
@JsonInclude(JsonInclude.Include.NON_NULL)
@EqualsAndHashCode
public class TieredStorageCustom extends TieredStorage {
private static final long serialVersionUID = 1L;

private RemoteStorageManager remoteStorageManager;

@Description("Must be `" + TYPE_CUSTOM + "`")
@JsonInclude(JsonInclude.Include.NON_NULL)
@Override
public String getType() {
return TYPE_CUSTOM;
}

@Description("Configuration for the Remote Storage Manager.")
@JsonInclude(JsonInclude.Include.NON_NULL)
public RemoteStorageManager getRemoteStorageManager() {
return remoteStorageManager;
}

public void setRemoteStorageManager(RemoteStorageManager remoteStorageManager) {
this.remoteStorageManager = remoteStorageManager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationOAuth;
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationScramSha512;
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationTls;
import io.strimzi.api.kafka.model.kafka.tieredstorage.RemoteStorageManager;
import io.strimzi.api.kafka.model.kafka.tieredstorage.TieredStorage;
import io.strimzi.api.kafka.model.kafka.tieredstorage.TieredStorageCustom;
import io.strimzi.api.kafka.model.nodepool.ProcessRoles;
import io.strimzi.kafka.oauth.server.ServerConfig;
import io.strimzi.kafka.oauth.server.plain.ServerPlainConfig;
Expand Down Expand Up @@ -830,6 +833,54 @@ public KafkaBrokerConfigurationBuilder withLogDirs(List<VolumeMount> mounts) {
return this;
}

/**
* Configure the tiered storage configuration for Kafka brokers.
*
* @param clusterName Name of the cluster
* @param tieredStorage TieredStorage configuration.
*
* @return Returns the builder instance
*/
public KafkaBrokerConfigurationBuilder withTieredStorage(String clusterName, TieredStorage tieredStorage) {
if (tieredStorage == null) {
return this;
}

printSectionHeader("Kafka tiered storage configuration");
writer.println("# RLMM configuration generated by Strimzi");

writer.println("remote.log.storage.system.enable=true");
writer.println("remote.log.metadata.manager.impl.prefix=rlmm.config.");
writer.println("remote.log.metadata.manager.class.name=org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager");
writer.println("remote.log.metadata.manager.listener.name=" + REPLICATION_LISTENER_NAME);
writer.println("rlmm.config.remote.log.metadata.common.client.bootstrap.servers="
+ clusterName + "-kafka-brokers:9091");
writer.println("rlmm.config.remote.log.metadata.common.client.security.protocol=SSL");
writer.println("rlmm.config.remote.log.metadata.common.client.ssl.keystore.location=/tmp/kafka/cluster.keystore.p12");
writer.println("rlmm.config.remote.log.metadata.common.client.ssl.keystore.password=${CERTS_STORE_PASSWORD}");
writer.println("rlmm.config.remote.log.metadata.common.client.ssl.keystore.type=PKCS12");
writer.println("rlmm.config.remote.log.metadata.common.client.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12");
writer.println("rlmm.config.remote.log.metadata.common.client.ssl.truststore.password=${CERTS_STORE_PASSWORD}");
writer.println("rlmm.config.remote.log.metadata.common.client.ssl.truststore.type=PKCS12");

writer.println("# RSM configs set by the operator and by the user");

if (tieredStorage instanceof TieredStorageCustom customTieredStorage) {
RemoteStorageManager rsm = customTieredStorage.getRemoteStorageManager();
writer.println("remote.log.storage.manager.class.name=" + rsm.getClassName());
writer.println("remote.log.storage.manager.class.path=" + rsm.getClassPath());
writer.println("remote.log.storage.manager.impl.prefix=rsm.config.");

for (Map.Entry<String, String> config : rsm.getConfig().entrySet()) {
writer.println(String.format("rsm.config.%s=%s", config.getKey(), config.getValue()));
}
}

writer.println();

return this;
}

/**
* Internal method which prints the section header into the configuration file. This makes it more human-readable
* when looking for issues in running pods etc.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationCustom;
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationOAuth;
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerType;
import io.strimzi.api.kafka.model.kafka.tieredstorage.TieredStorage;
import io.strimzi.api.kafka.model.nodepool.KafkaNodePoolStatus;
import io.strimzi.api.kafka.model.podset.StrimziPodSet;
import io.strimzi.certs.CertAndKey;
Expand Down Expand Up @@ -237,6 +238,8 @@ public class KafkaCluster extends AbstractModel implements SupportsMetrics, Supp
private ResourceTemplate templateBootstrapRoute;
private ResourceTemplate templateBootstrapIngress;

private TieredStorage tieredStorage;

private static final Map<String, String> DEFAULT_POD_LABELS = new HashMap<>();
static {
String value = System.getenv(CO_ENV_VAR_CUSTOM_KAFKA_POD_LABELS);
Expand Down Expand Up @@ -379,6 +382,9 @@ public static KafkaCluster fromCrd(Reconciliation reconciliation, Kafka kafka, L
result.templateServiceAccount = template.getServiceAccount();
}

if (kafkaClusterSpec.getTieredStorage() != null) {
result.tieredStorage = kafkaClusterSpec.getTieredStorage();
}
// Should run at the end when everything is set
KafkaSpecChecker specChecker = new KafkaSpecChecker(kafkaSpec, versions, result);
result.warningConditions.addAll(specChecker.run(useKRaft));
Expand Down Expand Up @@ -1674,6 +1680,7 @@ private String generatePerBrokerConfiguration(NodeRef node, KafkaPool pool, Map<
)
.withAuthorization(cluster, authorization)
.withCruiseControl(cluster, ccMetricsReporter, node.broker())
.withTieredStorage(cluster, tieredStorage)
.withUserConfiguration(configuration, node.broker() && ccMetricsReporter != null);

if (useKRaft) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationOAuth;
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationOAuthBuilder;
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerType;
import io.strimzi.api.kafka.model.kafka.tieredstorage.RemoteStorageManager;
import io.strimzi.api.kafka.model.kafka.tieredstorage.TieredStorageCustom;
import io.strimzi.api.kafka.model.nodepool.ProcessRoles;
import io.strimzi.kafka.oauth.server.ServerConfig;
import io.strimzi.operator.cluster.model.cruisecontrol.CruiseControlMetricsReporter;
Expand Down Expand Up @@ -2261,6 +2263,41 @@ public void testOAuthDefaultOptions() {
assertThat(actualOptions, is(equalTo(Collections.emptyMap())));
}

@ParallelTest
public void testWithTieredStorage() {
TieredStorageCustom tieredStorage = new TieredStorageCustom();
RemoteStorageManager rsm = new RemoteStorageManager();
rsm.setClassName("com.example.kafka.tiered.storage.s3.S3RemoteStorageManager");
rsm.setClassPath("/opt/kafka/plugins/tiered-storage-s3/*");
Map<String, String> rsmConfigs = new HashMap<>();
rsmConfigs.put("storage.bucket.name", "my-bucket");
rsm.setConfig(rsmConfigs);
tieredStorage.setRemoteStorageManager(rsm);
String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, "2", false)
.withTieredStorage("test-cluster-1", tieredStorage)
.build();

assertThat(configuration, isEquivalent("broker.id=2",
"node.id=2",
"remote.log.storage.system.enable=true",
"remote.log.metadata.manager.impl.prefix=rlmm.config.",
"remote.log.metadata.manager.class.name=org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager",
"remote.log.metadata.manager.listener.name=REPLICATION-9091",
"rlmm.config.remote.log.metadata.common.client.bootstrap.servers=test-cluster-1-kafka-brokers:9091",
"rlmm.config.remote.log.metadata.common.client.security.protocol=SSL",
"rlmm.config.remote.log.metadata.common.client.ssl.keystore.location=/tmp/kafka/cluster.keystore.p12",
"rlmm.config.remote.log.metadata.common.client.ssl.keystore.password=${CERTS_STORE_PASSWORD}",
"rlmm.config.remote.log.metadata.common.client.ssl.keystore.type=PKCS12",
"rlmm.config.remote.log.metadata.common.client.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12",
"rlmm.config.remote.log.metadata.common.client.ssl.truststore.password=${CERTS_STORE_PASSWORD}",
"rlmm.config.remote.log.metadata.common.client.ssl.truststore.type=PKCS12",
"remote.log.storage.manager.class.name=com.example.kafka.tiered.storage.s3.S3RemoteStorageManager",
"remote.log.storage.manager.class.path=/opt/kafka/plugins/tiered-storage-s3/*",
"remote.log.storage.manager.impl.prefix=rsm.config.",
"rsm.config.storage.bucket.name=my-bucket"
));
}

static class IsEquivalent extends TypeSafeMatcher<String> {
private final List<String> expectedLines;

Expand Down

0 comments on commit 4ca2bc6

Please sign in to comment.