Skip to content

Commit

Permalink
[HUDI-6313] Add metrics counters for compaction requested/completed e…
Browse files Browse the repository at this point in the history
…vents. (apache#8759)

- Add metrics counters for compaction start/stop events so that we can keep track of how many compactions were requested, how many finished, and how many produced error (interfered as number of starts - number of finished).
  • Loading branch information
amrishlal committed Jun 20, 2023
1 parent 6ee6a52 commit 35897ba
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;

Expand Down Expand Up @@ -50,6 +51,8 @@ public class HoodieMetrics {
private String conflictResolutionTimerName = null;
private String conflictResolutionSuccessCounterName = null;
private String conflictResolutionFailureCounterName = null;
private String compactionRequestedCounterName = null;
private String compactionCompletedCounterName = null;
private HoodieWriteConfig config;
private String tableName;
private Timer rollbackTimer = null;
Expand All @@ -64,6 +67,8 @@ public class HoodieMetrics {
private Timer conflictResolutionTimer = null;
private Counter conflictResolutionSuccessCounter = null;
private Counter conflictResolutionFailureCounter = null;
private Counter compactionRequestedCounter = null;
private Counter compactionCompletedCounter = null;

public HoodieMetrics(HoodieWriteConfig config) {
this.config = config;
Expand All @@ -82,6 +87,8 @@ public HoodieMetrics(HoodieWriteConfig config) {
this.conflictResolutionTimerName = getMetricsName("timer", "conflict_resolution");
this.conflictResolutionSuccessCounterName = getMetricsName("counter", "conflict_resolution.success");
this.conflictResolutionFailureCounterName = getMetricsName("counter", "conflict_resolution.failure");
this.compactionRequestedCounterName = getMetricsName("counter", "compaction.requested");
this.compactionCompletedCounterName = getMetricsName("counter", "compaction.completed");
}
}

Expand Down Expand Up @@ -270,7 +277,8 @@ public void updateIndexMetrics(final String action, final long durationInMs) {
}
}

String getMetricsName(String action, String metric) {
@VisibleForTesting
public String getMetricsName(String action, String metric) {
return config == null ? null : String.format("%s.%s.%s", config.getMetricReporterMetricsNamePrefix(), action, metric);
}

Expand Down Expand Up @@ -308,6 +316,20 @@ public void emitConflictResolutionFailed() {
}
}

public void emitCompactionRequested() {
if (config.isMetricsOn()) {
compactionRequestedCounter = getCounter(compactionRequestedCounter, compactionRequestedCounterName);
compactionRequestedCounter.inc();
}
}

public void emitCompactionCompleted() {
if (config.isMetricsOn()) {
compactionCompletedCounter = getCounter(compactionCompletedCounter, compactionCompletedCounterName);
compactionCompletedCounter.inc();
}
}

private Counter getCounter(Counter counter, String name) {
if (counter == null) {
return metrics.getRegistry().counter(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,15 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.HoodieCompactionHandler;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;
import org.apache.hudi.table.action.HoodieWriteMetadata;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
Expand All @@ -48,10 +52,14 @@
public class RunCompactionActionExecutor<T> extends
BaseActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>, HoodieWriteMetadata<HoodieData<WriteStatus>>> {

private static final Logger LOG = LoggerFactory.getLogger(RunCompactionActionExecutor.class);

private final HoodieCompactor compactor;
private final HoodieCompactionHandler compactionHandler;
private WriteOperationType operationType;

private final HoodieMetrics metrics;

public RunCompactionActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable table,
Expand All @@ -65,10 +73,14 @@ public RunCompactionActionExecutor(HoodieEngineContext context,
this.operationType = operationType;
checkArgument(operationType == WriteOperationType.COMPACT || operationType == WriteOperationType.LOG_COMPACT,
"Only COMPACT and LOG_COMPACT is supported");
metrics = new HoodieMetrics(config);
}

@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
LOG.info("Compaction requested. Instant time: {}.", instantTime);
metrics.emitCompactionRequested();

HoodieTimeline pendingMajorOrMinorCompactionTimeline = WriteOperationType.COMPACT.equals(operationType)
? table.getActiveTimeline().filterPendingCompactionTimeline()
: table.getActiveTimeline().filterPendingLogCompactionTimeline();
Expand Down Expand Up @@ -117,6 +129,8 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
throw new HoodieCompactionException("Could not compact " + config.getBasePath(), e);
}

LOG.info("Compaction completed. Instant time: {}.", instantTime);
metrics.emitCompactionCompleted();
return compactionMetadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,17 @@
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieMemoryConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.bloom.HoodieBloomIndex;
import org.apache.hudi.index.bloom.SparkHoodieBloomIndexHelper;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieClientTestHarness;

import com.codahale.metrics.Counter;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -56,6 +59,7 @@

import java.io.IOException;
import java.util.List;
import java.util.SortedMap;
import java.util.stream.Collectors;

import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -89,9 +93,22 @@ public void tearDown() throws Exception {
private HoodieWriteConfig getConfig() {
return getConfigBuilder()
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withMetricsConfig(getMetricsConfig())
.build();
}

private static HoodieMetricsConfig getMetricsConfig() {
return HoodieMetricsConfig.newBuilder().on(true).withReporterType("INMEMORY").build();
}

private long getCompactionMetricCount(String metric) {
HoodieMetrics metrics = writeClient.getMetrics();
String metricName = metrics.getMetricsName("counter", metric);
SortedMap<String, Counter> counters = metrics.getMetrics().getRegistry().getCounters();

return counters.containsKey(metricName) ? counters.get(metricName).getCount() : 0;
}

private HoodieWriteConfig.Builder getConfigBuilder() {
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2)
Expand All @@ -106,12 +123,18 @@ private HoodieWriteConfig.Builder getConfigBuilder() {
@Test
public void testCompactionOnCopyOnWriteFail() throws Exception {
metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE);
HoodieTable table = HoodieSparkTable.create(getConfig(), context, metaClient);
String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime();
assertThrows(HoodieNotSupportedException.class, () -> {
table.scheduleCompaction(context, compactionInstantTime, Option.empty());
table.compact(context, compactionInstantTime);
});
try (SparkRDDWriteClient writeClient = getHoodieWriteClient(getConfig());) {
HoodieTable table = HoodieSparkTable.create(getConfig(), context, metaClient);
String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime();
assertThrows(HoodieNotSupportedException.class, () -> {
table.scheduleCompaction(context, compactionInstantTime, Option.empty());
table.compact(context, compactionInstantTime);
});

// Verify compaction.requested, compaction.completed metrics counts.
assertEquals(0, getCompactionMetricCount(HoodieTimeline.REQUESTED_COMPACTION_SUFFIX));
assertEquals(0, getCompactionMetricCount(HoodieTimeline.COMPLETED_COMPACTION_SUFFIX));
}
}

@Test
Expand All @@ -129,6 +152,10 @@ public void testCompactionEmpty() {
String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime();
Option<HoodieCompactionPlan> plan = table.scheduleCompaction(context, compactionInstantTime, Option.empty());
assertFalse(plan.isPresent(), "If there is nothing to compact, result will be empty");

// Verify compaction.requested, compaction.completed metrics counts.
assertEquals(0, getCompactionMetricCount(HoodieTimeline.REQUESTED_COMPACTION_SUFFIX));
assertEquals(0, getCompactionMetricCount(HoodieTimeline.COMPLETED_COMPACTION_SUFFIX));
}
}

Expand All @@ -148,7 +175,7 @@ public void testScheduleCompactionWithInflightInstant() {
newCommitTime = "102";
writeClient.startCommitWithTime(newCommitTime);
metaClient.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(State.REQUESTED,
HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Option.empty());
HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Option.empty());

// create one compaction instance before exist inflight instance.
String compactionTime = "101";
Expand All @@ -161,6 +188,7 @@ public void testWriteStatusContentsAfterCompaction() throws Exception {
// insert 100 records
HoodieWriteConfig config = getConfigBuilder()
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withMetricsConfig(getMetricsConfig())
.build();
try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) {
String newCommitTime = "100";
Expand All @@ -180,6 +208,10 @@ public void testWriteStatusContentsAfterCompaction() throws Exception {
HoodieData<WriteStatus> result = compact(writeClient, compactionInstantTime);

verifyCompaction(result);

// Verify compaction.requested, compaction.completed metrics counts.
assertEquals(1, getCompactionMetricCount(HoodieTimeline.REQUESTED_COMPACTION_SUFFIX));
assertEquals(1, getCompactionMetricCount(HoodieTimeline.COMPLETED_COMPACTION_SUFFIX));
}
}

Expand All @@ -190,7 +222,9 @@ public void testSpillingWhenCompaction() throws Exception {
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withMemoryConfig(HoodieMemoryConfig.newBuilder()
.withMaxMemoryMaxSize(1L, 1L).build()) // force spill
.withMetricsConfig(getMetricsConfig())
.build();

try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) {
String newCommitTime = "100";
writeClient.startCommitWithTime(newCommitTime);
Expand All @@ -210,6 +244,10 @@ public void testSpillingWhenCompaction() throws Exception {
HoodieData<WriteStatus> result = compact(writeClient, "10" + (i + 1));

verifyCompaction(result);

// Verify compaction.requested, compaction.completed metrics counts.
assertEquals(i / 2 + 1, getCompactionMetricCount(HoodieTimeline.REQUESTED_COMPACTION_SUFFIX));
assertEquals(i / 2 + 1, getCompactionMetricCount(HoodieTimeline.COMPLETED_COMPACTION_SUFFIX));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ public interface HoodieTimeline extends Serializable {
String COMPACTION_ACTION = "compaction";
String LOG_COMPACTION_ACTION = "logcompaction";
String REQUESTED_EXTENSION = ".requested";
String COMPLETED_EXTENSION = ".completed";
String RESTORE_ACTION = "restore";
String INDEXING_ACTION = "indexing";
// only for schema save
String SCHEMA_COMMIT_ACTION = "schemacommit";

String[] VALID_ACTIONS_IN_TIMELINE = {COMMIT_ACTION, DELTA_COMMIT_ACTION,
CLEAN_ACTION, SAVEPOINT_ACTION, RESTORE_ACTION, ROLLBACK_ACTION,
COMPACTION_ACTION, REPLACE_COMMIT_ACTION, INDEXING_ACTION};
Expand All @@ -81,6 +81,7 @@ public interface HoodieTimeline extends Serializable {
String REQUESTED_ROLLBACK_EXTENSION = "." + ROLLBACK_ACTION + REQUESTED_EXTENSION;
String INFLIGHT_SAVEPOINT_EXTENSION = "." + SAVEPOINT_ACTION + INFLIGHT_EXTENSION;
String REQUESTED_COMPACTION_SUFFIX = StringUtils.join(COMPACTION_ACTION, REQUESTED_EXTENSION);
String COMPLETED_COMPACTION_SUFFIX = StringUtils.join(COMPACTION_ACTION, COMPLETED_EXTENSION);
String REQUESTED_COMPACTION_EXTENSION = StringUtils.join(".", REQUESTED_COMPACTION_SUFFIX);
String INFLIGHT_COMPACTION_EXTENSION = StringUtils.join(".", COMPACTION_ACTION, INFLIGHT_EXTENSION);
String REQUESTED_RESTORE_EXTENSION = "." + RESTORE_ACTION + REQUESTED_EXTENSION;
Expand Down

0 comments on commit 35897ba

Please sign in to comment.