Skip to content

Commit

Permalink
Fix acceptance of window with zero valid entity as valid window in Me…
Browse files Browse the repository at this point in the history
…tricSampleAggregator (linkedin#307)
  • Loading branch information
kun du authored and Adem Efe Gencer committed Aug 27, 2018
1 parent 17f7196 commit 670bc64
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private boolean meetValidEntityRatioAfterMerge(MetricSampleCompleteness<G, E> co
int totalNumEntities = options.interestedEntities().size();
int numValidEntitiesAfterMerge =
numValidElementsAfterMerge(completeness.validEntities(), validEntitiesForWindow);
return (float) numValidEntitiesAfterMerge / totalNumEntities >= options.minValidEntityRatio();
return numValidEntitiesAfterMerge > 0 && (float) numValidEntitiesAfterMerge / totalNumEntities >= options.minValidEntityRatio();
}

private boolean meetValidEntityGroupRatioAfterMerge(MetricSampleCompleteness<G, E> completeness,
Expand All @@ -76,7 +76,7 @@ private boolean meetValidEntityGroupRatioAfterMerge(MetricSampleCompleteness<G,
int totalNumEntityGroups = options.interestedEntityGroups().size();
int numValidEntityGroupsAfterMerge =
numValidElementsAfterMerge(completeness.validEntityGroups(), validEntityGroupForWindow);
return (float) numValidEntityGroupsAfterMerge / totalNumEntityGroups >= options.minValidEntityGroupRatio();
return numValidEntityGroupsAfterMerge > 0 && (float) numValidEntityGroupsAfterMerge / totalNumEntityGroups >= options.minValidEntityGroupRatio();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,11 @@ public void testAggregationOption4() {
AggregationOptions.Granularity.ENTITY, true);

MetricSampleCompleteness<String, IntegerEntity> completeness = aggregator.completeness(-1, Long.MAX_VALUE, options);
assertEquals(20, completeness.validWindowIndexes().size());
assertEquals(1, completeness.validEntities().size());
assertEquals(17, completeness.validWindowIndexes().size());
assertEquals(2, completeness.validEntities().size());
assertTrue(completeness.validEntities().contains(ENTITY1));
assertTrue(completeness.validEntityGroups().isEmpty());
assertTrue(completeness.validEntities().contains(ENTITY3));
assertTrue(completeness.validEntityGroups().contains(ENTITY3.group()));
assertCompletenessByWindowIndex(completeness);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,46 @@ public void testClusterModelWithPartlyInvalidPartitions() throws NotEnoughValidW
assertEquals(13, clusterModel.partition(T0P0).leader().load().expectedUtilizationFor(Resource.NW_OUT), 0.0);
}

@Test
public void testClusterModelWithInvalidSnapshotWindows() throws NotEnoughValidWindowsException {
TestContext context = prepareContext(4);
LoadMonitor loadMonitor = context.loadmonitor();
KafkaPartitionMetricSampleAggregator aggregator = context.aggregator();

CruiseControlUnitTestUtils.populateSampleAggregator(1, 4, aggregator, PE_T0P0, 0, WINDOW_MS, METRIC_DEF);
CruiseControlUnitTestUtils.populateSampleAggregator(1, 4, aggregator, PE_T0P1, 0, WINDOW_MS, METRIC_DEF);
CruiseControlUnitTestUtils.populateSampleAggregator(1, 4, aggregator, PE_T1P0, 0, WINDOW_MS, METRIC_DEF);
CruiseControlUnitTestUtils.populateSampleAggregator(1, 4, aggregator, PE_T1P1, 0, WINDOW_MS, METRIC_DEF);

CruiseControlUnitTestUtils.populateSampleAggregator(1, 4, aggregator, PE_T0P0, 3, WINDOW_MS, METRIC_DEF);
CruiseControlUnitTestUtils.populateSampleAggregator(1, 4, aggregator, PE_T0P1, 3, WINDOW_MS, METRIC_DEF);
CruiseControlUnitTestUtils.populateSampleAggregator(1, 4, aggregator, PE_T1P0, 3, WINDOW_MS, METRIC_DEF);
CruiseControlUnitTestUtils.populateSampleAggregator(1, 4, aggregator, PE_T1P1, 3, WINDOW_MS, METRIC_DEF);

CruiseControlUnitTestUtils.populateSampleAggregator(1, 4, aggregator, PE_T0P0, 4, WINDOW_MS, METRIC_DEF);
CruiseControlUnitTestUtils.populateSampleAggregator(1, 4, aggregator, PE_T0P1, 4, WINDOW_MS, METRIC_DEF);
CruiseControlUnitTestUtils.populateSampleAggregator(1, 4, aggregator, PE_T1P0, 4, WINDOW_MS, METRIC_DEF);
CruiseControlUnitTestUtils.populateSampleAggregator(1, 4, aggregator, PE_T1P1, 4, WINDOW_MS, METRIC_DEF);

ClusterModel clusterModel = loadMonitor.clusterModel(-1, Long.MAX_VALUE,
new ModelCompletenessRequirements(2, 0, false),
new OperationProgress());

assertEquals(2, clusterModel.partition(T0P0).leader().load().numWindows());
assertEquals(16.5, clusterModel.partition(T0P0).leader().load().expectedUtilizationFor(Resource.CPU), 0.0);
assertEquals(2, clusterModel.partition(T0P1).leader().load().numWindows());
assertEquals(33, clusterModel.partition(T0P1).leader().load().expectedUtilizationFor(Resource.DISK), 0.0);
assertEquals(2, clusterModel.partition(T1P0).leader().load().numWindows());
assertEquals(33, clusterModel.partition(T1P0).leader().load().expectedUtilizationFor(Resource.NW_IN), 0.0);
assertEquals(2, clusterModel.partition(T1P1).leader().load().numWindows());
assertEquals(33, clusterModel.partition(T1P1).leader().load().expectedUtilizationFor(Resource.NW_OUT), 0.0);
}

private TestContext prepareContext() {
return prepareContext(NUM_WINDOWS);
}

private TestContext prepareContext(int numWindowToPreserve) {
// Create mock metadata client.
Metadata metadata = getMetadata(Arrays.asList(T0P0, T0P1, T1P0, T1P1));
MetadataClient mockMetadataClient = EasyMock.mock(MetadataClient.class);
Expand All @@ -426,7 +465,7 @@ private TestContext prepareContext() {

// create load monitor.
Properties props = KafkaCruiseControlUnitTestUtils.getKafkaCruiseControlProperties();
props.put(KafkaCruiseControlConfig.NUM_PARTITION_METRICS_WINDOWS_CONFIG, Integer.toString(NUM_WINDOWS));
props.put(KafkaCruiseControlConfig.NUM_PARTITION_METRICS_WINDOWS_CONFIG, Integer.toString(numWindowToPreserve));
props.put(KafkaCruiseControlConfig.MIN_SAMPLES_PER_PARTITION_METRICS_WINDOW_CONFIG,
Integer.toString(MIN_SAMPLES_PER_WINDOW));
props.put(KafkaCruiseControlConfig.PARTITION_METRICS_WINDOW_MS_CONFIG, Long.toString(WINDOW_MS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,17 @@ public void testAggregateWithUpdatedCluster() throws NotEnoughValidWindowsExcept

populateSampleAggregator(NUM_WINDOWS + 1, MIN_SAMPLES_PER_WINDOW, metricSampleAggregator);

TopicPartition tp1 = new TopicPartition(TOPIC, 1);
TopicPartition tp1 = new TopicPartition(TOPIC + "1", 0);
Cluster cluster = getCluster(Arrays.asList(TP, tp1));
metadata.update(cluster, Collections.emptySet(), 1);

Map<PartitionEntity, ValuesAndExtrapolations> aggregateResult =
metricSampleAggregator.aggregate(clusterAndGeneration(cluster), Long.MAX_VALUE, new OperationProgress())
.valuesAndExtrapolations();
assertTrue("tp1 should not be included because recent metric window does not include all topics",
aggregateResult.isEmpty());
// Partition "topic-0" should be valid in all NUM_WINDOW windows and Partition "topic1-0" should not since
// there is no sample for it.
assertEquals(1, aggregateResult.size());
assertEquals(NUM_WINDOWS, aggregateResult.get(PE).windows().size());

ModelCompletenessRequirements requirements =
new ModelCompletenessRequirements(1, 0.0, true);
Expand Down Expand Up @@ -182,7 +184,8 @@ public void testFallbackToAvgAvailable() throws NotEnoughValidWindowsException {
metricSampleAggregator.aggregate(clusterAndGeneration(metadata.fetch()),
NUM_WINDOWS * WINDOW_MS,
new OperationProgress());
assertTrue(result.valuesAndExtrapolations().isEmpty());
// Partition "topic-0" is expected to be a valid partition in result with valid sample values for window [3, NUM_WINDOWS].
assertEquals(NUM_WINDOWS - 2, result.valuesAndExtrapolations().get(PE).windows().size());

populateSampleAggregator(2, MIN_SAMPLES_PER_WINDOW - 2, metricSampleAggregator);

Expand Down Expand Up @@ -257,7 +260,8 @@ public void testTooManyFlaws() throws NotEnoughValidWindowsException {
metricSampleAggregator.aggregate(clusterAndGeneration(metadata.fetch()),
NUM_WINDOWS * WINDOW_MS,
new OperationProgress());
assertTrue(result.valuesAndExtrapolations().isEmpty());
// Partition "topic-0" is expected to be a valid partition in result, with valid sample values collected for window [1, NUM_WINDOW - 3].
assertEquals(NUM_WINDOWS - 3, result.valuesAndExtrapolations().get(PE).windows().size());
}

@Test
Expand Down

0 comments on commit 670bc64

Please sign in to comment.