Skip to content

Commit

Permalink
KAFKA-15083: add config with "remote.log.metadata" prefix (apache#14151)
Browse files Browse the repository at this point in the history
When configuring RLMM, the configs passed into configure method is the RemoteLogManagerConfig. But in RemoteLogManagerConfig, there's no configs related to remote.log.metadata.*, ex: remote.log.metadata.topic.replication.factor. So, even if users have set the config in broker, it'll never be applied.

This PR fixed the issue to allow users setting RLMM prefix: remote.log.metadata.manager.impl.prefix (default is rlmm.config.), and then, appending the desired remote.log.metadata.* configs, it'll pass into RLMM, including remote.log.metadata.common.client./remote.log.metadata.producer./ remote.log.metadata.consumer. prefixes.

Ex:

# default value
# remote.log.storage.manager.impl.prefix=rsm.config.
# remote.log.metadata.manager.impl.prefix=rlmm.config.

rlmm.config.remote.log.metadata.topic.num.partitions=50
rlmm.config.remote.log.metadata.topic.replication.factor=4

rsm.config.test=value

Reviewers: Christo Lolov <christololov@gmail.com>, Kamal Chandraprakash <kchandraprakash@uber.com>, Divij Vaidya <diviv@amazon.com>
  • Loading branch information
showuon authored and jeqo committed Aug 15, 2023
1 parent f3bbb9d commit 34df434
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 11 deletions.
12 changes: 7 additions & 5 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -256,15 +256,17 @@ public void onEndPointCreated(EndPoint endpoint) {
}

private void configureRLMM() {
final Map<String, Object> rlmmProps = new HashMap<>(rlmConfig.remoteLogMetadataManagerProps());

rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId);
rlmmProps.put(KafkaConfig.LogDirProp(), logDir);
rlmmProps.put("cluster.id", clusterId);
final Map<String, Object> rlmmProps = new HashMap<>();
endpoint.ifPresent(e -> {
rlmmProps.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "bootstrap.servers", e.host() + ":" + e.port());
rlmmProps.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "security.protocol", e.securityProtocol().name);
});
// update the remoteLogMetadataProps here to override endpoint config if any
rlmmProps.putAll(rlmConfig.remoteLogMetadataManagerProps());

rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId);
rlmmProps.put(KafkaConfig.LogDirProp(), logDir);
rlmmProps.put("cluster.id", clusterId);

remoteLogMetadataManager.configure(rlmmProps);
}
Expand Down
73 changes: 71 additions & 2 deletions core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@
import java.util.concurrent.CountDownLatch;

import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_CONSUMER_PREFIX;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_PRODUCER_PREFIX;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX;
import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC;
import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_STORAGE_THREAD_POOL_METRICS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
Expand Down Expand Up @@ -122,6 +127,17 @@ public class RemoteLogManagerTest {
int brokerId = 0;
String logDir = TestUtils.tempDirectory("kafka-").toString();
String clusterId = "dummyId";
String remoteLogStorageTestProp = "remote.log.storage.test";
String remoteLogStorageTestVal = "storage.test";
String remoteLogMetadataTestProp = "remote.log.metadata.test";
String remoteLogMetadataTestVal = "metadata.test";
String remoteLogMetadataCommonClientTestProp = REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "common.client.test";
String remoteLogMetadataCommonClientTestVal = "common.test";
String remoteLogMetadataProducerTestProp = REMOTE_LOG_METADATA_PRODUCER_PREFIX + "producer.test";
String remoteLogMetadataProducerTestVal = "producer.test";
String remoteLogMetadataConsumerTestProp = REMOTE_LOG_METADATA_CONSUMER_PREFIX + "consumer.test";
String remoteLogMetadataConsumerTestVal = "consumer.test";
String remoteLogMetadataTopicPartitionsNum = "1";

RemoteStorageManager remoteStorageManager = mock(RemoteStorageManager.class);
RemoteLogMetadataManager remoteLogMetadataManager = mock(RemoteLogMetadataManager.class);
Expand Down Expand Up @@ -248,16 +264,59 @@ void testRemoteLogMetadataManagerWithEndpointConfig() {
assertEquals(brokerId, capture.getValue().get(KafkaConfig.BrokerIdProp()));
}

@Test
void testRemoteLogMetadataManagerWithEndpointConfigOverridden() throws IOException {
Properties props = new Properties();
// override common security.protocol by adding "RLMM prefix" and "remote log metadata common client prefix"
props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "security.protocol", "SSL");
try (RemoteLogManager remoteLogManager = new RemoteLogManager(createRLMConfig(props), brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), brokerTopicStats) {
public RemoteStorageManager createRemoteStorageManager() {
return remoteStorageManager;
}
public RemoteLogMetadataManager createRemoteLogMetadataManager() {
return remoteLogMetadataManager;
}
}) {

String host = "localhost";
String port = "1234";
String securityProtocol = "PLAINTEXT";
EndPoint endPoint = new EndPoint(host, Integer.parseInt(port), new ListenerName(securityProtocol),
SecurityProtocol.PLAINTEXT);
remoteLogManager.onEndPointCreated(endPoint);
remoteLogManager.startup();

ArgumentCaptor<Map<String, Object>> capture = ArgumentCaptor.forClass(Map.class);
verify(remoteLogMetadataManager, times(1)).configure(capture.capture());
assertEquals(host + ":" + port, capture.getValue().get(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "bootstrap.servers"));
// should be overridden as SSL
assertEquals("SSL", capture.getValue().get(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "security.protocol"));
assertEquals(clusterId, capture.getValue().get("cluster.id"));
assertEquals(brokerId, capture.getValue().get(KafkaConfig.BrokerIdProp()));
}
}



@Test
void testStartup() {
remoteLogManager.startup();
ArgumentCaptor<Map<String, Object>> capture = ArgumentCaptor.forClass(Map.class);
verify(remoteStorageManager, times(1)).configure(capture.capture());
assertEquals(brokerId, capture.getValue().get("broker.id"));
assertEquals(remoteLogStorageTestVal, capture.getValue().get(remoteLogStorageTestProp));

verify(remoteLogMetadataManager, times(1)).configure(capture.capture());
assertEquals(brokerId, capture.getValue().get("broker.id"));
assertEquals(logDir, capture.getValue().get("log.dir"));

// verify the configs starting with "remote.log.metadata", "remote.log.metadata.common.client."
// "remote.log.metadata.producer.", and "remote.log.metadata.consumer." are correctly passed in
assertEquals(remoteLogMetadataTopicPartitionsNum, capture.getValue().get(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP));
assertEquals(remoteLogMetadataTestVal, capture.getValue().get(remoteLogMetadataTestProp));
assertEquals(remoteLogMetadataConsumerTestVal, capture.getValue().get(remoteLogMetadataConsumerTestProp));
assertEquals(remoteLogMetadataProducerTestVal, capture.getValue().get(remoteLogMetadataProducerTestProp));
assertEquals(remoteLogMetadataCommonClientTestVal, capture.getValue().get(remoteLogMetadataCommonClientTestProp));
}

// This test creates 2 log segments, 1st one has start offset of 0, 2nd one (and active one) has start offset of 150.
Expand Down Expand Up @@ -726,13 +785,15 @@ private void verifyLogSegmentData(LogSegmentData logSegmentData,
@Test
void testGetClassLoaderAwareRemoteStorageManager() throws Exception {
ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class);
RemoteLogManager remoteLogManager =
try (RemoteLogManager remoteLogManager =
new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, t -> Optional.empty(), brokerTopicStats) {
public RemoteStorageManager createRemoteStorageManager() {
return rsmManager;
}
};
assertEquals(rsmManager, remoteLogManager.storageManager());
) {
assertEquals(rsmManager, remoteLogManager.storageManager());
}
}

private void verifyInCache(TopicIdPartition... topicIdPartitions) {
Expand Down Expand Up @@ -1007,6 +1068,14 @@ private RemoteLogManagerConfig createRLMConfig(Properties props) {
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true);
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, NoOpRemoteStorageManager.class.getName());
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, NoOpRemoteLogMetadataManager.class.getName());
props.put(DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX + remoteLogStorageTestProp, remoteLogStorageTestVal);
// adding configs with "remote log metadata manager config prefix"
props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP, remoteLogMetadataTopicPartitionsNum);
props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + remoteLogMetadataTestProp, remoteLogMetadataTestVal);
props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + remoteLogMetadataCommonClientTestProp, remoteLogMetadataCommonClientTestVal);
props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + remoteLogMetadataConsumerTestProp, remoteLogMetadataConsumerTestVal);
props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + remoteLogMetadataProducerTestProp, remoteLogMetadataProducerTestVal);

AbstractConfig config = new AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, props);
return new RemoteLogManagerConfig(config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,18 @@ public final class RemoteLogManagerConfig {
*/
public static final String REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP = "remote.log.storage.manager.impl.prefix";
public static final String REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_DOC = "Prefix used for properties to be passed to RemoteStorageManager " +
"implementation. For example this value can be `rsm.s3.`.";
"implementation. For example this value can be `rsm.config.`.";
public static final String DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX = "rsm.config.";

/**
* Prefix used for properties to be passed to {@link RemoteLogMetadataManager} implementation. Remote log subsystem collects all the properties having
* this prefix and passed to {@code RemoteLogMetadataManager} using {@link RemoteLogMetadataManager#configure(Map)}.
*/
public static final String REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP = "remote.log.metadata.manager.impl.prefix";
public static final String REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_DOC = "Prefix used for properties to be passed to RemoteLogMetadataManager " +
"implementation. For example this value can be `rlmm.s3.`.";
"implementation. For example this value can be `rlmm.config.`.";
public static final String DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX = "rlmm.config.";


public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP = "remote.log.storage.system.enable";
public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC = "Whether to enable tier storage functionality in a broker or not. Valid values " +
Expand Down Expand Up @@ -152,13 +155,13 @@ public final class RemoteLogManagerConfig {
REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC)
.defineInternal(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP,
STRING,
null,
DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX,
new ConfigDef.NonEmptyString(),
MEDIUM,
REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_DOC)
.defineInternal(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP,
STRING,
null,
DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX,
new ConfigDef.NonEmptyString(),
MEDIUM,
REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_DOC)
Expand Down

0 comments on commit 34df434

Please sign in to comment.