Skip to content

Commit

Permalink
Merge c320a46 into 289e442
Browse files Browse the repository at this point in the history
  • Loading branch information
jordanw-bq committed Aug 1, 2018
2 parents 289e442 + c320a46 commit e2708b1
Show file tree
Hide file tree
Showing 25 changed files with 1,644 additions and 425 deletions.
7 changes: 7 additions & 0 deletions rubix-bookkeeper/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-jvm</artifactId>
<version>${dep.metrics.version}</version>
</dependency>

<dependency>
<groupId>com.readytalk</groupId>
<artifactId>metrics3-statsd</artifactId>
Expand Down Expand Up @@ -128,6 +134,7 @@
<include>com.google.guava:guava</include>
<include>com.readytalk:metrics-statsd-common</include>
<include>com.readytalk:metrics3-statsd</include>
<include>io.dropwizard.metrics:metrics-jvm</include>
</includes>
</artifactSet>
<relocations>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.cache.Weigher;
import com.qubole.rubix.bookkeeper.metrics.BookKeeperMetrics;
import com.qubole.rubix.bookkeeper.utils.DiskUtils;
import com.qubole.rubix.core.ClusterManagerInitilizationException;
import com.qubole.rubix.core.ReadRequest;
Expand Down Expand Up @@ -73,19 +74,11 @@
*/
public abstract class BookKeeper implements BookKeeperService.Iface
{
public static final String METRIC_BOOKKEEPER_CACHE_EVICTION_COUNT = "rubix.bookkeeper.cache_eviction.count";
public static final String METRIC_BOOKKEEPER_CACHE_HIT_RATE_GAUGE = "rubix.bookkeeper.cache_hit_rate.gauge";
public static final String METRIC_BOOKKEEPER_CACHE_MISS_RATE_GAUGE = "rubix.bookkeeper.cache_miss_rate.gauge";
public static final String METRIC_BOOKKEEPER_CACHE_SIZE_GAUGE = "rubix.bookkeeper.cache_size_mb.gauge";
public static final String METRIC_BOOKKEEPER_TOTAL_REQUEST_COUNT = "rubix.bookkeeper.total_request.count";
public static final String METRIC_BOOKKEEPER_CACHE_REQUEST_COUNT = "rubix.bookkeeper.cache_request.count";
public static final String METRIC_BOOKKEEPER_NONLOCAL_REQUEST_COUNT = "rubix.bookkeeper.nonlocal_request.count";
public static final String METRIC_BOOKKEEPER_REMOTE_REQUEST_COUNT = "rubix.bookkeeper.remote_request.count";
private static Log log = LogFactory.getLog(BookKeeper.class);

protected static Cache<String, FileMetadata> fileMetadataCache;
private static LoadingCache<String, FileInfo> fileInfoCache;
protected static ClusterManager clusterManager;
private static Log log = LogFactory.getLog(BookKeeper.class.getName());
String nodeName;
static String nodeHostName;
static String nodeHostAddress;
Expand Down Expand Up @@ -129,29 +122,29 @@ public BookKeeper(Configuration conf, MetricRegistry metrics) throws FileNotFoun
*/
private void initializeMetrics()
{
cacheEvictionCount = metrics.counter(METRIC_BOOKKEEPER_CACHE_EVICTION_COUNT);
totalRequestCount = metrics.counter(METRIC_BOOKKEEPER_TOTAL_REQUEST_COUNT);
cacheRequestCount = metrics.counter(METRIC_BOOKKEEPER_CACHE_REQUEST_COUNT);
nonlocalRequestCount = metrics.counter(METRIC_BOOKKEEPER_NONLOCAL_REQUEST_COUNT);
remoteRequestCount = metrics.counter(METRIC_BOOKKEEPER_REMOTE_REQUEST_COUNT);
cacheEvictionCount = metrics.counter(BookKeeperMetrics.CacheMetric.METRIC_BOOKKEEPER_CACHE_EVICTION_COUNT.getMetricName());
totalRequestCount = metrics.counter(BookKeeperMetrics.CacheMetric.METRIC_BOOKKEEPER_TOTAL_REQUEST_COUNT.getMetricName());
cacheRequestCount = metrics.counter(BookKeeperMetrics.CacheMetric.METRIC_BOOKKEEPER_CACHE_REQUEST_COUNT.getMetricName());
nonlocalRequestCount = metrics.counter(BookKeeperMetrics.CacheMetric.METRIC_BOOKKEEPER_NONLOCAL_REQUEST_COUNT.getMetricName());
remoteRequestCount = metrics.counter(BookKeeperMetrics.CacheMetric.METRIC_BOOKKEEPER_REMOTE_REQUEST_COUNT.getMetricName());

metrics.register(METRIC_BOOKKEEPER_CACHE_HIT_RATE_GAUGE, new Gauge<Double>()
metrics.register(BookKeeperMetrics.CacheMetric.METRIC_BOOKKEEPER_CACHE_HIT_RATE_GAUGE.getMetricName(), new Gauge<Double>()
{
@Override
public Double getValue()
{
return ((double) cacheRequestCount.getCount() / (cacheRequestCount.getCount() + remoteRequestCount.getCount()));
}
});
metrics.register(METRIC_BOOKKEEPER_CACHE_MISS_RATE_GAUGE, new Gauge<Double>()
metrics.register(BookKeeperMetrics.CacheMetric.METRIC_BOOKKEEPER_CACHE_MISS_RATE_GAUGE.getMetricName(), new Gauge<Double>()
{
@Override
public Double getValue()
{
return ((double) remoteRequestCount.getCount() / (cacheRequestCount.getCount() + remoteRequestCount.getCount()));
}
});
metrics.register(METRIC_BOOKKEEPER_CACHE_SIZE_GAUGE, new Gauge<Integer>()
metrics.register(BookKeeperMetrics.CacheMetric.METRIC_BOOKKEEPER_CACHE_SIZE_GAUGE.getMetricName(), new Gauge<Integer>()
{
@Override
public Integer getValue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@

import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.jvm.CachedThreadStatesGaugeSet;
import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.qubole.rubix.core.utils.ClusterUtil;
import com.qubole.rubix.bookkeeper.metrics.BookKeeperMetrics;
import com.qubole.rubix.spi.CacheConfig;
import com.qubole.rubix.spi.thrift.BookKeeperService;
import com.readytalk.metrics.StatsDReporter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -33,6 +35,7 @@
import org.apache.thrift.shaded.transport.TTransportException;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

import static com.qubole.rubix.spi.CacheConfig.getServerMaxThreads;
Expand All @@ -54,6 +57,7 @@ public class BookKeeperServer extends Configured implements Tool
private static TServer server;

private static Log log = LogFactory.getLog(BookKeeperServer.class.getName());
private static BookKeeperMetrics bookKeeperMetrics;

protected BookKeeperServer()
{
Expand Down Expand Up @@ -121,21 +125,22 @@ public static void startServer(Configuration conf, MetricRegistry metricsRegistr
*/
protected static void registerMetrics(Configuration conf)
{
if ((CacheConfig.isOnMaster(conf) && CacheConfig.isReportStatsdMetricsOnMaster(conf))
|| (!CacheConfig.isOnMaster(conf) && CacheConfig.isReportStatsdMetricsOnWorker(conf))) {
log.info("Reporting metrics to StatsD");
if (!CacheConfig.isOnMaster(conf)) {
CacheConfig.setStatsDMetricsHost(conf, ClusterUtil.getMasterHostname(conf));
}
StatsDReporter.forRegistry(metrics)
.build(CacheConfig.getStatsDMetricsHost(conf), CacheConfig.getStatsDMetricsPort(conf))
.start(CacheConfig.getStatsDMetricsInterval(conf), TimeUnit.MILLISECONDS);
}
bookKeeperMetrics = new BookKeeperMetrics(conf, metrics);

metrics.register(BookKeeperMetrics.BookKeeperJvmMetric.METRIC_BOOKKEEPER_JVM_GC_PREFIX.getMetricName(), new GarbageCollectorMetricSet());
metrics.register(BookKeeperMetrics.BookKeeperJvmMetric.METRIC_BOOKKEEPER_JVM_THREADS_PREFIX.getMetricName(), new CachedThreadStatesGaugeSet(CacheConfig.getStatsDMetricsInterval(conf), TimeUnit.MILLISECONDS));
metrics.register(BookKeeperMetrics.BookKeeperJvmMetric.METRIC_BOOKKEEPER_JVM_MEMORY_PREFIX.getMetricName(), new MemoryUsageGaugeSet());
}

public static void stopServer()
{
removeMetrics();
try {
bookKeeperMetrics.close();
}
catch (IOException e) {
log.error("Metrics reporters could not be closed", e);
}
server.stop();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.base.Ticker;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.qubole.rubix.bookkeeper.metrics.BookKeeperMetrics;
import com.qubole.rubix.spi.CacheConfig;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand All @@ -31,9 +32,6 @@ public class CoordinatorBookKeeper extends BookKeeper
{
private static Log log = LogFactory.getLog(CoordinatorBookKeeper.class.getName());

// Metric key for the number of live workers in the cluster.
public static final String METRIC_BOOKKEEPER_LIVE_WORKER_GAUGE = "rubix.bookkeeper.live_workers.gauge";

// Cache to store hostnames of live workers in the cluster.
protected Cache<String, Boolean> liveWorkerCache;

Expand Down Expand Up @@ -66,7 +64,7 @@ public void handleHeartbeat(String workerHostname)
*/
private void registerMetrics()
{
metrics.register(METRIC_BOOKKEEPER_LIVE_WORKER_GAUGE, new Gauge<Integer>()
metrics.register(BookKeeperMetrics.LivenessMetric.METRIC_BOOKKEEPER_LIVE_WORKER_GAUGE.getMetricName(), new Gauge<Integer>()
{
@Override
public Integer getValue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,14 @@
*/
package com.qubole.rubix.bookkeeper;

import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.jvm.CachedThreadStatesGaugeSet;
import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.qubole.rubix.bookkeeper.metrics.BookKeeperMetrics;
import com.qubole.rubix.spi.BookKeeperFactory;
import com.qubole.rubix.spi.CacheConfig;
import com.qubole.rubix.spi.CacheUtil;
Expand All @@ -37,6 +43,7 @@
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* Created by sakshia on 26/10/16.
Expand All @@ -47,6 +54,8 @@ public class LocalDataTransferServer extends Configured implements Tool
private static Log log = LogFactory.getLog(LocalDataTransferServer.class.getName());
private static Configuration conf;
private static LocalServer localServer;
private static MetricRegistry metrics;
private static BookKeeperMetrics bookKeeperMetrics;

private LocalDataTransferServer()
{
Expand All @@ -61,23 +70,52 @@ public static void main(String[] args) throws Exception
public int run(String[] args) throws Exception
{
conf = this.getConf();
startServer(conf);
startServer(conf, new MetricRegistry());
return 0;
}

public static void startServer(Configuration conf)
public static void startServer(Configuration conf, MetricRegistry metricRegistry)
{
metrics = metricRegistry;
registerMetrics(conf);

localServer = new LocalServer(conf);
new Thread(localServer).run();
}

/**
* Register desired metrics.
*
* @param conf The current Hadoop configuration.
*/
private static void registerMetrics(Configuration conf)
{
bookKeeperMetrics = new BookKeeperMetrics(conf, metrics);

metrics.register(BookKeeperMetrics.LDTSJvmMetric.METRIC_LDTS_JVM_GC_PREFIX.getMetricName(), new GarbageCollectorMetricSet());
metrics.register(BookKeeperMetrics.LDTSJvmMetric.METRIC_LDTS_JVM_THREADS_PREFIX.getMetricName(), new CachedThreadStatesGaugeSet(CacheConfig.getStatsDMetricsInterval(conf), TimeUnit.MILLISECONDS));
metrics.register(BookKeeperMetrics.LDTSJvmMetric.METRIC_LDTS_JVM_MEMORY_PREFIX.getMetricName(), new MemoryUsageGaugeSet());
}

public static void stopServer()
{
removeMetrics();
if (localServer != null) {
try {
bookKeeperMetrics.close();
}
catch (IOException e) {
log.error("Metrics reporters could not be closed", e);
}
localServer.stop();
}
}

protected static void removeMetrics()
{
metrics.removeMatching(MetricFilter.ALL);
}

@VisibleForTesting
public static boolean isServerUp()
{
Expand Down

0 comments on commit e2708b1

Please sign in to comment.