Skip to content

Commit

Permalink
KAFKA-15189: only init remote topic metrics when enabled (apache#14133)
Browse files Browse the repository at this point in the history
Only initialize remote topic metrics when system-wise remote storage is enabled to avoid impacting performance for existing brokers. Also add tests.

Reviewers: Divij Vaidya <diviv@amazon.com>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
  • Loading branch information
showuon committed Aug 5, 2023
1 parent faf3635 commit 748175c
Show file tree
Hide file tree
Showing 24 changed files with 236 additions and 81 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Expand Up @@ -1638,6 +1638,8 @@ project(':storage:api') {

dependencies {
implementation project(':clients')
implementation project(':server-common')
implementation libs.metrics
implementation libs.slf4jApi

testImplementation project(':clients')
Expand Down
4 changes: 4 additions & 0 deletions checkstyle/import-control.xml
Expand Up @@ -261,6 +261,10 @@
<allow pkg="org.apache.kafka.storage"/>
<subpackage name="remote">
<allow pkg="scala.collection" />
<subpackage name="storage">
<allow pkg="com.yammer.metrics.core" />
<allow pkg="org.apache.kafka.server.metrics" />
</subpackage>
</subpackage>

</subpackage>
Expand Down
10 changes: 4 additions & 6 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Expand Up @@ -111,6 +111,7 @@
import java.util.stream.Stream;

import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC;

/**
* This class is responsible for
Expand All @@ -123,8 +124,6 @@ public class RemoteLogManager implements Closeable {

private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogManager.class);
private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX = "remote-log-reader";
public static final String REMOTE_LOG_READER_METRICS_NAME_PREFIX = "RemoteLogReader";
public static final String REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT = "RemoteLogManagerTasksAvgIdlePercent";
private final RemoteLogManagerConfig rlmConfig;
private final int brokerId;
private final String logDir;
Expand Down Expand Up @@ -185,7 +184,7 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig,
delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
rlmScheduledThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize());

metricsGroup.newGauge(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT, new Gauge<Double>() {
metricsGroup.newGauge(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC.getName(), new Gauge<Double>() {
@Override
public Double value() {
return rlmScheduledThreadPool.getIdlePercent();
Expand All @@ -195,13 +194,12 @@ public Double value() {
remoteStorageReaderThreadPool = new RemoteStorageThreadPool(
REMOTE_LOG_READER_THREAD_NAME_PREFIX,
rlmConfig.remoteLogReaderThreads(),
rlmConfig.remoteLogReaderMaxPendingTasks(),
REMOTE_LOG_READER_METRICS_NAME_PREFIX
rlmConfig.remoteLogReaderMaxPendingTasks()
);
}

private void removeMetrics() {
metricsGroup.removeMetric(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT);
metricsGroup.removeMetric(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC.getName());
remoteStorageReaderThreadPool.removeMetrics();
}

Expand Down
Expand Up @@ -38,6 +38,7 @@

import java.util.Collections;
import java.util.Optional;

import scala.compat.java8.OptionConverters;


Expand Down Expand Up @@ -171,7 +172,7 @@ public KafkaApis build() {
if (metrics == null) throw new RuntimeException("You must set metrics");
if (quotas == null) throw new RuntimeException("You must set quotas");
if (fetchManager == null) throw new RuntimeException("You must set fetchManager");
if (brokerTopicStats == null) brokerTopicStats = new BrokerTopicStats();
if (brokerTopicStats == null) brokerTopicStats = new BrokerTopicStats(Optional.of(config));
if (apiVersionManager == null) throw new RuntimeException("You must set apiVersionManager");

return new KafkaApis(requestChannel,
Expand Down
Expand Up @@ -54,7 +54,7 @@ public class ReplicaManagerBuilder {
private MetadataCache metadataCache = null;
private LogDirFailureChannel logDirFailureChannel = null;
private AlterPartitionManager alterPartitionManager = null;
private BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
private BrokerTopicStats brokerTopicStats = null;
private AtomicBoolean isShuttingDown = new AtomicBoolean(false);
private Optional<RemoteLogManager> remoteLogManager = Optional.empty();
private Optional<KafkaZkClient> zkClient = Optional.empty();
Expand Down Expand Up @@ -179,6 +179,7 @@ public ReplicaManager build() {
if (metadataCache == null) throw new RuntimeException("You must set metadataCache");
if (logDirFailureChannel == null) throw new RuntimeException("You must set logDirFailureChannel");
if (alterPartitionManager == null) throw new RuntimeException("You must set alterIsrManager");
if (brokerTopicStats == null) brokerTopicStats = new BrokerTopicStats(Optional.of(config));
return new ReplicaManager(config,
metrics,
time,
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Expand Up @@ -188,8 +188,7 @@ class BrokerServer(
kafkaScheduler.startup()

/* register broker metrics */
brokerTopicStats = new BrokerTopicStats

brokerTopicStats = new BrokerTopicStats(java.util.Optional.of(config))

quotaManagers = QuotaFactory.instantiate(config, metrics, time, s"broker-${config.nodeId}-")

Expand Down
61 changes: 30 additions & 31 deletions core/src/main/scala/kafka/server/KafkaRequestHandler.scala
Expand Up @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger
import com.yammer.metrics.core.Meter
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.utils.{KafkaThread, Time}
import org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics
import org.apache.kafka.server.metrics.KafkaMetricsGroup

import java.util.Collections
Expand Down Expand Up @@ -227,7 +228,7 @@ class KafkaRequestHandlerPool(val brokerId: Int,
}
}

class BrokerTopicMetrics(name: Option[String]) {
class BrokerTopicMetrics(name: Option[String], configOpt: java.util.Optional[KafkaConfig]) {
private val metricsGroup = new KafkaMetricsGroup(this.getClass)

val tags: java.util.Map[String, String] = name match {
Expand Down Expand Up @@ -277,24 +278,31 @@ class BrokerTopicMetrics(name: Option[String]) {
BrokerTopicStats.TotalFetchRequestsPerSec -> MeterWrapper(BrokerTopicStats.TotalFetchRequestsPerSec, "requests"),
BrokerTopicStats.FetchMessageConversionsPerSec -> MeterWrapper(BrokerTopicStats.FetchMessageConversionsPerSec, "requests"),
BrokerTopicStats.ProduceMessageConversionsPerSec -> MeterWrapper(BrokerTopicStats.ProduceMessageConversionsPerSec, "requests"),
BrokerTopicStats.RemoteCopyBytesPerSec -> MeterWrapper(BrokerTopicStats.RemoteCopyBytesPerSec, "bytes"),
BrokerTopicStats.RemoteFetchBytesPerSec -> MeterWrapper(BrokerTopicStats.RemoteFetchBytesPerSec, "bytes"),
BrokerTopicStats.RemoteFetchRequestsPerSec -> MeterWrapper(BrokerTopicStats.RemoteFetchRequestsPerSec, "requests"),
BrokerTopicStats.RemoteCopyRequestsPerSec -> MeterWrapper(BrokerTopicStats.RemoteCopyRequestsPerSec, "requests"),
BrokerTopicStats.FailedRemoteFetchRequestsPerSec -> MeterWrapper(BrokerTopicStats.FailedRemoteFetchRequestsPerSec, "requests"),
BrokerTopicStats.FailedRemoteCopyRequestsPerSec -> MeterWrapper(BrokerTopicStats.FailedRemoteCopyRequestsPerSec, "requests"),
BrokerTopicStats.NoKeyCompactedTopicRecordsPerSec -> MeterWrapper(BrokerTopicStats.NoKeyCompactedTopicRecordsPerSec, "requests"),
BrokerTopicStats.InvalidMagicNumberRecordsPerSec -> MeterWrapper(BrokerTopicStats.InvalidMagicNumberRecordsPerSec, "requests"),
BrokerTopicStats.InvalidMessageCrcRecordsPerSec -> MeterWrapper(BrokerTopicStats.InvalidMessageCrcRecordsPerSec, "requests"),
BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec -> MeterWrapper(BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec, "requests")
).asJava)

if (name.isEmpty) {
metricTypeMap.put(BrokerTopicStats.ReplicationBytesInPerSec, MeterWrapper(BrokerTopicStats.ReplicationBytesInPerSec, "bytes"))
metricTypeMap.put(BrokerTopicStats.ReplicationBytesOutPerSec, MeterWrapper(BrokerTopicStats.ReplicationBytesOutPerSec, "bytes"))
metricTypeMap.put(BrokerTopicStats.ReassignmentBytesInPerSec, MeterWrapper(BrokerTopicStats.ReassignmentBytesInPerSec, "bytes"))
metricTypeMap.put(BrokerTopicStats.ReassignmentBytesOutPerSec, MeterWrapper(BrokerTopicStats.ReassignmentBytesOutPerSec, "bytes"))
}

configOpt.ifPresent(config =>
if (config.remoteLogManagerConfig.enableRemoteStorageSystem()) {
metricTypeMap.putAll(Map(
RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName, "bytes"),
RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName, "bytes"),
RemoteStorageMetrics.REMOTE_FETCH_REQUESTS_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.REMOTE_FETCH_REQUESTS_PER_SEC_METRIC.getName, "requests"),
RemoteStorageMetrics.REMOTE_COPY_REQUESTS_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.REMOTE_COPY_REQUESTS_PER_SEC_METRIC.getName, "requests"),
RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName, "requests"),
RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName, "requests")
).asJava)
})

// used for testing only
def metricMap: Map[String, MeterWrapper] = metricTypeMap.toMap

Expand Down Expand Up @@ -342,17 +350,17 @@ class BrokerTopicMetrics(name: Option[String]) {

def invalidOffsetOrSequenceRecordsPerSec: Meter = metricTypeMap.get(BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec).meter()

def remoteCopyBytesRate: Meter = metricTypeMap.get(BrokerTopicStats.RemoteCopyBytesPerSec).meter()
def remoteCopyBytesRate: Meter = metricTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName).meter()

def remoteFetchBytesRate: Meter = metricTypeMap.get(BrokerTopicStats.RemoteFetchBytesPerSec).meter()
def remoteFetchBytesRate: Meter = metricTypeMap.get(RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName).meter()

def remoteFetchRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.RemoteFetchRequestsPerSec).meter()
def remoteFetchRequestRate: Meter = metricTypeMap.get(RemoteStorageMetrics.REMOTE_FETCH_REQUESTS_PER_SEC_METRIC.getName).meter()

def remoteCopyRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.RemoteCopyRequestsPerSec).meter()
def remoteCopyRequestRate: Meter = metricTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_REQUESTS_PER_SEC_METRIC.getName).meter()

def failedRemoteFetchRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.FailedRemoteFetchRequestsPerSec).meter()
def failedRemoteFetchRequestRate: Meter = metricTypeMap.get(RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName).meter()

def failedRemoteCopyRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.FailedRemoteCopyRequestsPerSec).meter()
def failedRemoteCopyRequestRate: Meter = metricTypeMap.get(RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName).meter()

def closeMetric(metricType: String): Unit = {
val meter = metricTypeMap.get(metricType)
Expand All @@ -378,27 +386,18 @@ object BrokerTopicStats {
val ProduceMessageConversionsPerSec = "ProduceMessageConversionsPerSec"
val ReassignmentBytesInPerSec = "ReassignmentBytesInPerSec"
val ReassignmentBytesOutPerSec = "ReassignmentBytesOutPerSec"
val RemoteCopyBytesPerSec = "RemoteCopyBytesPerSec"
val RemoteFetchBytesPerSec = "RemoteFetchBytesPerSec"
val RemoteFetchRequestsPerSec = "RemoteFetchRequestsPerSec"
val RemoteCopyRequestsPerSec = "RemoteCopyRequestsPerSec"
val FailedRemoteFetchRequestsPerSec = "RemoteFetchErrorsPerSec"
val FailedRemoteCopyRequestsPerSec = "RemoteCopyErrorsPerSec"

// These following topics are for LogValidator for better debugging on failed records
val NoKeyCompactedTopicRecordsPerSec = "NoKeyCompactedTopicRecordsPerSec"
val InvalidMagicNumberRecordsPerSec = "InvalidMagicNumberRecordsPerSec"
val InvalidMessageCrcRecordsPerSec = "InvalidMessageCrcRecordsPerSec"
val InvalidOffsetOrSequenceRecordsPerSec = "InvalidOffsetOrSequenceRecordsPerSec"

private val valueFactory = (k: String) => new BrokerTopicMetrics(Some(k))
}

class BrokerTopicStats extends Logging {
import BrokerTopicStats._
class BrokerTopicStats(configOpt: java.util.Optional[KafkaConfig] = java.util.Optional.empty()) extends Logging {

private val valueFactory = (k: String) => new BrokerTopicMetrics(Some(k), configOpt)
private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory))
val allTopicsStats = new BrokerTopicMetrics(None)
val allTopicsStats = new BrokerTopicMetrics(None, configOpt)

def topicStats(topic: String): BrokerTopicMetrics =
stats.getAndMaybePut(topic)
Expand Down Expand Up @@ -439,12 +438,12 @@ class BrokerTopicStats extends Logging {
topicMetrics.closeMetric(BrokerTopicStats.ProduceMessageConversionsPerSec)
topicMetrics.closeMetric(BrokerTopicStats.ReplicationBytesOutPerSec)
topicMetrics.closeMetric(BrokerTopicStats.ReassignmentBytesOutPerSec)
topicMetrics.closeMetric(BrokerTopicStats.RemoteCopyBytesPerSec)
topicMetrics.closeMetric(BrokerTopicStats.RemoteFetchBytesPerSec)
topicMetrics.closeMetric(BrokerTopicStats.RemoteFetchRequestsPerSec)
topicMetrics.closeMetric(BrokerTopicStats.RemoteCopyRequestsPerSec)
topicMetrics.closeMetric(BrokerTopicStats.FailedRemoteFetchRequestsPerSec)
topicMetrics.closeMetric(BrokerTopicStats.FailedRemoteCopyRequestsPerSec)
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName)
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName)
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_FETCH_REQUESTS_PER_SEC_METRIC.getName)
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_COPY_REQUESTS_PER_SEC_METRIC.getName)
topicMetrics.closeMetric(RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName)
topicMetrics.closeMetric(RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName)
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaServer.scala
Expand Up @@ -261,7 +261,7 @@ class KafkaServer(
metrics = Server.initializeMetrics(config, time, clusterId)

/* register broker metrics */
_brokerTopicStats = new BrokerTopicStats
_brokerTopicStats = new BrokerTopicStats(java.util.Optional.of(config))

quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
KafkaBroker.notifyClusterListeners(clusterId, kafkaMetricsReporters ++ metrics.reporters.asScala)
Expand Down
21 changes: 10 additions & 11 deletions core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
Expand Up @@ -59,7 +59,6 @@
import org.apache.kafka.storage.internals.log.LazyIndex;
import org.apache.kafka.storage.internals.log.OffsetIndex;
import org.apache.kafka.storage.internals.log.ProducerStateManager;
import org.apache.kafka.storage.internals.log.RemoteStorageThreadPool;
import org.apache.kafka.storage.internals.log.TimeIndex;
import org.apache.kafka.storage.internals.log.TransactionIndex;
import org.apache.kafka.test.TestUtils;
Expand Down Expand Up @@ -88,14 +87,14 @@
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;

import static kafka.log.remote.RemoteLogManager.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT;
import static kafka.log.remote.RemoteLogManager.REMOTE_LOG_READER_METRICS_NAME_PREFIX;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC;
import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_STORAGE_THREAD_POOL_METRICS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -127,7 +126,8 @@ public class RemoteLogManagerTest {
RemoteStorageManager remoteStorageManager = mock(RemoteStorageManager.class);
RemoteLogMetadataManager remoteLogMetadataManager = mock(RemoteLogMetadataManager.class);
RemoteLogManagerConfig remoteLogManagerConfig = null;
BrokerTopicStats brokerTopicStats = new BrokerTopicStats();

BrokerTopicStats brokerTopicStats = null;
RemoteLogManager remoteLogManager = null;

TopicIdPartition leaderTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("Leader", 0));
Expand Down Expand Up @@ -157,8 +157,10 @@ public List<EpochEntry> read() {
void setUp() throws Exception {
topicIds.put(leaderTopicIdPartition.topicPartition().topic(), leaderTopicIdPartition.topicId());
topicIds.put(followerTopicIdPartition.topicPartition().topic(), followerTopicIdPartition.topicId());
Properties props = new Properties();
Properties props = kafka.utils.TestUtils.createDummyBrokerConfig();
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true");
remoteLogManagerConfig = createRLMConfig(props);
brokerTopicStats = new BrokerTopicStats(Optional.of(KafkaConfig.fromProps(props)));

kafka.utils.TestUtils.clearYammerMetrics();

Expand Down Expand Up @@ -922,11 +924,8 @@ public RemoteLogMetadataManager createRemoteLogMetadataManager() {
KafkaMetricsGroup mockRlmMetricsGroup = mockMetricsGroupCtor.constructed().get(0);
KafkaMetricsGroup mockThreadPoolMetricsGroup = mockMetricsGroupCtor.constructed().get(1);

List<String> remoteLogManagerMetricNames = Collections.singletonList(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT);
List<String> remoteStorageThreadPoolMetricNames = RemoteStorageThreadPool.METRIC_SUFFIXES
.stream()
.map(suffix -> REMOTE_LOG_READER_METRICS_NAME_PREFIX + suffix)
.collect(Collectors.toList());
List<String> remoteLogManagerMetricNames = Collections.singletonList(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC.getName());
Set<String> remoteStorageThreadPoolMetricNames = REMOTE_STORAGE_THREAD_POOL_METRICS;

verify(mockRlmMetricsGroup, times(remoteLogManagerMetricNames.size())).newGauge(anyString(), any());
// Verify that the RemoteLogManager metrics are removed
Expand Down
9 changes: 8 additions & 1 deletion core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
Expand Up @@ -17,9 +17,11 @@
package kafka.log.remote;

import kafka.server.BrokerTopicStats;
import kafka.server.KafkaConfig;
import kafka.utils.TestUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
Expand All @@ -30,6 +32,8 @@
import org.mockito.ArgumentCaptor;

import java.io.IOException;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Consumer;

import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -44,13 +48,16 @@
public class RemoteLogReaderTest {
public static final String TOPIC = "test";
RemoteLogManager mockRLM = mock(RemoteLogManager.class);
BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
BrokerTopicStats brokerTopicStats = null;
LogOffsetMetadata logOffsetMetadata = new LogOffsetMetadata(100);
Records records = mock(Records.class);

@BeforeEach
public void setUp() {
TestUtils.clearYammerMetrics();
Properties props = kafka.utils.TestUtils.createDummyBrokerConfig();
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true");
brokerTopicStats = new BrokerTopicStats(Optional.of(KafkaConfig.fromProps(props)));
}

@Test
Expand Down

0 comments on commit 748175c

Please sign in to comment.