Skip to content

Commit

Permalink
Have the stage pass in the runtime Id for unique prefixing
Browse files Browse the repository at this point in the history
  • Loading branch information
FieldFlux authored and bwjefft committed Jun 5, 2015
1 parent 421df72 commit fa2beab
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 15 deletions.
Expand Up @@ -31,14 +31,11 @@
import com.ea.orbit.actors.metrics.annotations.ExportMetric;
import com.ea.orbit.actors.metrics.config.reporters.ReporterConfig;
import com.ea.orbit.annotation.Config;
import com.ea.orbit.container.Container;
import com.ea.orbit.exception.UncheckedException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.inject.Singleton;

import com.codahale.metrics.Gauge;
Expand All @@ -56,21 +53,22 @@ public class MetricsManager
{
static final MetricRegistry registry = new MetricRegistry();
private static final Logger logger = LoggerFactory.getLogger(MetricsManager.class);
private boolean isInitialized = false;

@Config("orbit.metrics.reporters")
private List<ReporterConfig> reporterConfigs;

@Inject
Container container;

@PostConstruct
protected void initializeMetrics()
public synchronized void initializeMetrics(String uniqueId)
{
for (ReporterConfig reporterConfig : reporterConfigs)
if (!isInitialized)
{
reporterConfig.enableReporter(registry);
for (ReporterConfig reporterConfig : reporterConfigs)
{
reporterConfig.enableReporter(registry, uniqueId);
}
isInitialized = true;
}
logger.warn(Integer.toString(reporterConfigs.size()));
}

public <T> void registerMetric(Class clazz, String name, Supplier<T> value)
Expand Down
Expand Up @@ -68,14 +68,20 @@ public void setPort(final int port)
}

@Override
public void enableReporter(MetricRegistry registry)
public void enableReporter(MetricRegistry registry, String uniqueId)
{
if (getPrefix() == null || getPrefix().isEmpty())
{
setPrefix(uniqueId);
}

try
{
final GMetric ganglia = new GMetric(host, port, GMetric.UDPAddressingMode.MULTICAST,1);
final GangliaReporter reporter = GangliaReporter.forRegistry(registry)
.convertRatesTo(getRateTimeUnit())
.convertDurationsTo(getDurationTimeUnit())
.prefixedWith(getPrefix())
.build(ganglia);

reporter.start(getPeriod(), getPeriodTimeUnit());
Expand Down
Expand Up @@ -61,12 +61,18 @@ public void setPort(final int port)
}

@Override
public void enableReporter(MetricRegistry registry)
public void enableReporter(MetricRegistry registry, String uniqueId)
{
if (getPrefix() == null || getPrefix().isEmpty())
{
setPrefix(uniqueId);
}

final Graphite graphite = new Graphite(new InetSocketAddress(host, port));
final GraphiteReporter reporter = GraphiteReporter.forRegistry(registry)
.convertRatesTo(getRateTimeUnit())
.convertDurationsTo(getDurationTimeUnit())
.prefixedWith(getPrefix())
.build(graphite);

reporter.start(getPeriod(), getPeriodTimeUnit());
Expand Down
Expand Up @@ -29,7 +29,6 @@
package com.ea.orbit.actors.metrics.config.reporters;

import com.codahale.metrics.MetricRegistry;

import java.util.concurrent.TimeUnit;

/**
Expand All @@ -42,6 +41,18 @@ public abstract class ReporterConfig
private String rateUnit = "SECONDS";
private String durationUnit = "MILLISECONDS";

private String prefix = "";

public String getPrefix()
{
return prefix;
}

public void setPrefix(final String prefix)
{
this.prefix = prefix;
}

public int getPeriod()
{
return period;
Expand Down Expand Up @@ -97,5 +108,5 @@ protected TimeUnit getPeriodTimeUnit()
return TimeUnit.valueOf(getRateUnit());
}

public void enableReporter(MetricRegistry registry) {};
public void enableReporter(MetricRegistry registry, String uniqueId) {};
}
Expand Up @@ -46,8 +46,12 @@ public void setLoggerName(final String loggerName)
}

@Override
public void enableReporter(MetricRegistry registry)
public void enableReporter(MetricRegistry registry, String uniqueId)
{
if (getPrefix() == null || getPrefix().isEmpty())
{
setPrefix(uniqueId);
}

final Slf4jReporter reporter = Slf4jReporter.forRegistry(registry)
.convertRatesTo(getRateTimeUnit())
Expand Down
3 changes: 3 additions & 0 deletions actors/stage/src/main/java/com/ea/orbit/actors/Stage.java
Expand Up @@ -247,7 +247,10 @@ public Task<?> start()
execution.start();

if (metricsManager != null)
{
metricsManager.initializeMetrics(runtimeIdentity());
metricsManager.registerExportedMetrics(execution);
}

Task<?> future = clusterPeer.join(clusterName, nodeName);
if (mode == StageMode.HOST)
Expand Down

0 comments on commit fa2beab

Please sign in to comment.