Skip to content

Commit

Permalink
Add latency metric components for WAGED rebalancer (apache#490)
Browse files Browse the repository at this point in the history
Add WAGED rebalancer metric framework and latency metric implementation

Changelist:
1. Add WAGED rebalancer metric interface
2. Implement latency-related metrics
3. Integrate latency metrics into WAGED rebalancer
4. Add tests
  • Loading branch information
narendly authored and jiajunwang committed Jan 6, 2020
1 parent 4b4b8c8 commit 0f72534
Show file tree
Hide file tree
Showing 13 changed files with 744 additions and 98 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class ConstraintBasedAlgorithmFactory {
// enlarge the overall weight of the evenness constraints compared with the movement constraint.
// TODO: Tune or make the following factor configurable.
private static final int EVENNESS_PREFERENCE_NORMALIZE_FACTOR = 50;
private static final Map<String, Float> MODEL = new HashMap<>() {
private static final Map<String, Float> MODEL = new HashMap<String, Float>() {
{
// The default setting
put(PartitionMovementConstraint.class.getSimpleName(), 1f);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;

import java.util.stream.Collectors;

import javax.management.JMException;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixRebalanceException;
Expand All @@ -42,6 +43,8 @@
import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
import org.apache.helix.monitoring.metrics.MetricCollector;
import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
Expand All @@ -64,6 +67,8 @@
public class BestPossibleStateCalcStage extends AbstractBaseStage {
private static final Logger logger =
LoggerFactory.getLogger(BestPossibleStateCalcStage.class.getName());
// Create a ThreadLocal of MetricCollector. Metrics could only be updated by the controller thread only.
private static final ThreadLocal<MetricCollector> METRIC_COLLECTOR_THREAD_LOCAL = new ThreadLocal<>();

@Override
public void process(ClusterEvent event) throws Exception {
Expand Down Expand Up @@ -253,20 +258,41 @@ private Map<String, Resource> computeResourceBestPossibleStateWithWagedRebalance
Map<String, IdealState> newIdealStates = new HashMap<>();

// Init rebalancer with the rebalance preferences.
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences = cache.getClusterConfig()
.getGlobalRebalancePreference();
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences =
cache.getClusterConfig().getGlobalRebalancePreference();

if (METRIC_COLLECTOR_THREAD_LOCAL.get() == null) {
try {
// If HelixManager is null, we just pass in null for MetricCollector so that a
// non-functioning WagedRebalancerMetricCollector would be created in WagedRebalancer's
// constructor. This is to handle two cases: 1. HelixManager is null for non-testing cases -
// in this case, WagedRebalancer will not read/write to metadata store and just use
// CurrentState-based rebalancing. 2. Tests that require instrumenting the rebalancer for
// verifying whether the cluster has converged.
METRIC_COLLECTOR_THREAD_LOCAL.set(helixManager == null ? null
: new WagedRebalancerMetricCollector(helixManager.getClusterName()));
} catch (JMException e) {
LogUtil.logWarn(logger, _eventId, String.format(
"MetricCollector instantiation failed! WagedRebalancer will not emit metrics due to JMException %s",
e));
}
}

// TODO avoid creating the rebalancer on every rebalance call for performance enhancement
WagedRebalancer wagedRebalancer = new WagedRebalancer(helixManager, preferences);
WagedRebalancer wagedRebalancer =
new WagedRebalancer(helixManager, preferences, METRIC_COLLECTOR_THREAD_LOCAL.get());
try {
newIdealStates.putAll(wagedRebalancer
.computeNewIdealStates(cache, wagedRebalancedResourceMap, currentStateOutput));
newIdealStates.putAll(wagedRebalancer.computeNewIdealStates(cache, wagedRebalancedResourceMap,
currentStateOutput));
} catch (HelixRebalanceException ex) {
// Note that unlike the legacy rebalancer, the WAGED rebalance won't return partial result.
// Since it calculates for all the eligible resources globally, a partial result is invalid.
// TODO propagate the rebalancer failure information to updateRebalanceStatus for monitoring.
LogUtil.logError(logger, _eventId, String
.format("Failed to calculate the new Ideal States using the rebalancer %s due to %s",
wagedRebalancer.getClass().getSimpleName(), ex.getFailureType()), ex);
LogUtil.logError(logger, _eventId,
String.format(
"Failed to calculate the new Ideal States using the rebalancer %s due to %s",
wagedRebalancer.getClass().getSimpleName(), ex.getFailureType()),
ex);
} finally {
wagedRebalancer.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ public enum MonitorDomainNames {
HelixThreadPoolExecutor,
HelixCallback,
RoutingTableProvider,
CLMParticipantReport
CLMParticipantReport,
Rebalancer
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package org.apache.helix.monitoring.metrics;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import javax.management.JMException;
import javax.management.ObjectName;
import org.apache.helix.HelixException;
import org.apache.helix.monitoring.metrics.model.Metric;
import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;

/**
* Collects and manages all metrics that implement the {@link Metric} interface.
*/
public abstract class MetricCollector extends DynamicMBeanProvider {
private static final String CLUSTER_NAME_KEY = "ClusterName";
private static final String ENTITY_NAME_KEY = "EntityName";
private final String _monitorDomainName;
private final String _clusterName;
private final String _entityName;
private Map<String, Metric> _metricMap;

public MetricCollector(String monitorDomainName, String clusterName, String entityName) {
_monitorDomainName = monitorDomainName;
_clusterName = clusterName;
_entityName = entityName;
_metricMap = new HashMap<>();
}

@Override
public DynamicMBeanProvider register() throws JMException {
// First cast all Metric objects to DynamicMetrics
Collection<DynamicMetric<?, ?>> dynamicMetrics = new HashSet<>();
_metricMap.values().forEach(metric -> dynamicMetrics.add(metric.getDynamicMetric()));

// Define MBeanName and ObjectName
// MBean name has two key-value pairs:
// ------ 1) ClusterName KV pair (first %s=%s)
// ------ 2) EntityName KV pair (second %s=%s)
String mbeanName =
String.format("%s=%s, %s=%s", CLUSTER_NAME_KEY, _clusterName, ENTITY_NAME_KEY, _entityName);

// ObjectName has one key-value pair:
// ------ 1) Monitor domain name KV pair where value is the MBean name
doRegister(dynamicMetrics,
new ObjectName(String.format("%s:%s", _monitorDomainName, mbeanName)));
return this;
}

@Override
public String getSensorName() {
return String.format("%s.%s.%s", MonitorDomainNames.Rebalancer.name(), _clusterName,
_entityName);
}

void addMetric(Metric metric) {
if (metric instanceof DynamicMetric) {
_metricMap.putIfAbsent(metric.getMetricName(), metric);
} else {
throw new HelixException("MetricCollector only supports Metrics that are DynamicMetric!");
}
}

/**
* Returns a desired type of the metric.
* @param metricName
* @param metricClass Desired type
* @param <T> Casted result of the metric
* @return
*/
public <T extends DynamicMetric> T getMetric(String metricName, Class<T> metricClass) {
return metricClass.cast(_metricMap.get(metricName));
}

public Map<String, Metric> getMetricMap() {
return _metricMap;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package org.apache.helix.monitoring.metrics;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import javax.management.JMException;
import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
import org.apache.helix.monitoring.metrics.implementation.RebalanceLatencyGauge;
import org.apache.helix.monitoring.metrics.model.LatencyMetric;

public class WagedRebalancerMetricCollector extends MetricCollector {
private static final String WAGED_REBALANCER_ENTITY_NAME = "WagedRebalancer";

/**
* This enum class contains all metric names defined for WagedRebalancer. Note that all enums are
* in camel case for readability.
*/
public enum WagedRebalancerMetricNames {
// Per-stage latency metrics
GlobalBaselineCalcLatencyGauge,
PartialRebalanceLatencyGauge,

// The following latency metrics are related to AssignmentMetadataStore
StateReadLatencyGauge,
StateWriteLatencyGauge
}

public WagedRebalancerMetricCollector(String clusterName) throws JMException {
super(MonitorDomainNames.Rebalancer.name(), clusterName, WAGED_REBALANCER_ENTITY_NAME);
createMetrics();
register();
}

/**
* This constructor will create but will not register metrics. This constructor will be used in
* case of JMException so that the rebalancer could proceed without registering and emitting
* metrics.
*/
public WagedRebalancerMetricCollector() {
super(MonitorDomainNames.Rebalancer.name(), null, null);
createMetrics();
}

/**
* Creates and registers all metrics in MetricCollector for WagedRebalancer.
*/
private void createMetrics() {
// Define all metrics
LatencyMetric globalBaselineCalcLatencyGauge = new RebalanceLatencyGauge(
WagedRebalancerMetricNames.GlobalBaselineCalcLatencyGauge.name(), getResetIntervalInMs());
LatencyMetric partialRebalanceLatencyGauge = new RebalanceLatencyGauge(
WagedRebalancerMetricNames.PartialRebalanceLatencyGauge.name(), getResetIntervalInMs());
LatencyMetric stateReadLatencyGauge = new RebalanceLatencyGauge(
WagedRebalancerMetricNames.StateReadLatencyGauge.name(), getResetIntervalInMs());
LatencyMetric stateWriteLatencyGauge = new RebalanceLatencyGauge(
WagedRebalancerMetricNames.StateWriteLatencyGauge.name(), getResetIntervalInMs());

// Add metrics to WagedRebalancerMetricCollector
addMetric(globalBaselineCalcLatencyGauge);
addMetric(partialRebalanceLatencyGauge);
addMetric(stateReadLatencyGauge);
addMetric(stateWriteLatencyGauge);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package org.apache.helix.monitoring.metrics.implementation;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import com.codahale.metrics.Histogram;
import com.codahale.metrics.SlidingTimeWindowArrayReservoir;
import java.util.concurrent.TimeUnit;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
import org.apache.helix.monitoring.metrics.model.LatencyMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RebalanceLatencyGauge extends LatencyMetric {
private static final Logger LOG = LoggerFactory.getLogger(RebalanceLatencyGauge.class);
private static final long VALUE_NOT_SET = -1;
private long _lastEmittedMetricValue = VALUE_NOT_SET;

/**
* Instantiates a new Histogram dynamic metric.
* @param metricName the metric name
*/
public RebalanceLatencyGauge(String metricName, long slidingTimeWindow) {
super(metricName, new Histogram(
new SlidingTimeWindowArrayReservoir(slidingTimeWindow, TimeUnit.MILLISECONDS)));
_metricName = metricName;
}

/**
* WARNING: this method is not thread-safe.
* Calling this method multiple times would simply overwrite the previous state. This is because
* the rebalancer could fail at any point, and we want it to recover gracefully by resetting the
* internal state of this metric.
*/
@Override
public void startMeasuringLatency() {
reset();
_startTime = System.currentTimeMillis();
}

/**
* WARNING: this method is not thread-safe.
*/
@Override
public void endMeasuringLatency() {
if (_startTime == VALUE_NOT_SET || _endTime != VALUE_NOT_SET) {
LOG.error(
"Needs to call startMeasuringLatency first! Ignoring and resetting the metric. Metric name: {}",
_metricName);
reset();
return;
}
_endTime = System.currentTimeMillis();
_lastEmittedMetricValue = _endTime - _startTime;
updateValue(_lastEmittedMetricValue);
reset();
}

@Override
public String getMetricName() {
return _metricName;
}

@Override
public void reset() {
_startTime = VALUE_NOT_SET;
_endTime = VALUE_NOT_SET;
}

@Override
public String toString() {
return String.format("Metric %s's latency is %d", _metricName, getLastEmittedMetricValue());
}

/**
* Returns the most recently emitted metric value at the time of the call.
* @return
*/
@Override
public long getLastEmittedMetricValue() {
return _lastEmittedMetricValue;
}

@Override
public DynamicMetric getDynamicMetric() {
return this;
}
}
Loading

0 comments on commit 0f72534

Please sign in to comment.