Skip to content
This repository has been archived by the owner on Jul 11, 2022. It is now read-only.

Commit

Permalink
[BZ 1146138] Add metrics from the Datastax driver, and remove some du…
Browse files Browse the repository at this point in the history
…plicate internal ones.

(cherry picked from commit b73f8e5)
  • Loading branch information
Michael Burman committed Sep 26, 2014
1 parent e9498be commit f709481
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 23 deletions.
Expand Up @@ -125,7 +125,7 @@ public Cluster build() {
// if (compression == null && !isIBMJRE()) {
// builder.withCompression(ProtocolOptions.Compression.SNAPPY);
// }
builder.withoutMetrics();
// builder.withoutMetrics();
builder.withCompression(ProtocolOptions.Compression.NONE);
return builder.build();
}
Expand Down
Expand Up @@ -48,6 +48,7 @@

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.Metrics;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.ProtocolOptions;
import com.datastax.driver.core.Session;
Expand Down Expand Up @@ -122,6 +123,8 @@ public class StorageClientManager implements StorageClientManagerMBean{
private String cachedStorageUsername;
private String cachedStoragePassword;

private Metrics driverMetrics;

public void scheduleStorageSessionMaintenance() {
// each time the webapp is reloaded, we don't want to create duplicate jobs
Collection<Timer> timers = timerService.getTimers();
Expand Down Expand Up @@ -183,6 +186,7 @@ public synchronized boolean init() {
metricsConfiguration = new MetricsConfiguration();
metricsDAO = new MetricsDAO(session, metricsConfiguration);


initMetricsServer();
JMXUtil.registerMBean(this, OBJECT_NAME);
initialized = true;
Expand Down Expand Up @@ -443,14 +447,30 @@ public void setRequestTimeoutDampening(long requestTimeoutDampening) {

@Override
@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
public long getRequestTimeouts() {
return session.getTimeouts();
public long getReadRequestTimeouts() {
return driverMetrics.getErrorMetrics().getReadTimeouts().count();
}

@Override
@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
public long getWriteRequestTimeouts() {
return driverMetrics.getErrorMetrics().getWriteTimeouts().count();
}

@Override
@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
public long getTotalRequests() {
return session.getTimeouts();
return driverMetrics.getRequestsTimer().count();
}

@Override
public long getRetries() {
return driverMetrics.getErrorMetrics().getRetries().count();
}

@Override
public long getConnectionErrors() {
return driverMetrics.getErrorMetrics().getConnectionErrors().count();
}

@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
Expand Down Expand Up @@ -520,6 +540,8 @@ private Session createSession() {
.withRetryPolicy(new LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE)).withCompression(
ProtocolOptions.Compression.NONE).build();

driverMetrics = cluster.getMetrics();

PoolingOptions poolingOptions = cluster.getConfiguration().getPoolingOptions();
poolingOptions.setCoreConnectionsPerHost(HostDistance.LOCAL, Integer.parseInt(
System.getProperty("rhq.storage.client.local-connections", "24")));
Expand Down Expand Up @@ -567,4 +589,44 @@ private void initMetricsServer() {
metricsServer.init();
}

@Override
public int getConnectedToHosts() {
return driverMetrics.getConnectedToHosts().value();
}

@Override
public int getKnownHosts() {
return driverMetrics.getKnownHosts().value();
}

@Override
public int getOpenConnections() {
return driverMetrics.getOpenConnections().value();
}

@Override
public double oneMinuteAvgRate() {
return driverMetrics.getRequestsTimer().oneMinuteRate();
}

@Override
public double fiveMinuteAvgRate() {
return driverMetrics.getRequestsTimer().fiveMinuteRate();
}

@Override
public double fifteenMinuteAvgRate() {
return driverMetrics.getRequestsTimer().fifteenMinuteRate();
}

@Override
public double meanRate() {
return driverMetrics.getRequestsTimer().meanRate();
}

@Override
public double meanLatency() {
return driverMetrics.getRequestsTimer().mean();
}

}
Expand Up @@ -25,8 +25,22 @@ public interface StorageClientManagerMBean {

void setRequestTimeoutDampening(long requestTimeoutDampening);

long getRequestTimeouts();

// Cassandra driver's exposed methods
int getConnectedToHosts();
int getKnownHosts();
int getOpenConnections();
long getReadRequestTimeouts();
long getWriteRequestTimeouts();
long getTotalRequests();

// Metrics.Errors, not exposing RetryPolicy statistics
long getRetries();
long getConnectionErrors();

// Timers
double oneMinuteAvgRate();
double fiveMinuteAvgRate();
double fifteenMinuteAvgRate();
double meanRate();
double meanLatency();
}
Expand Up @@ -13,7 +13,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Host;
Expand Down Expand Up @@ -55,10 +54,6 @@ public class StorageSession implements Host.StateListener {

private double topologyDelta = Double.parseDouble(System.getProperty(REQUEST_TOPOLOGY_CHANGE_DELTA, "30000"));

private long timeouts;

private AtomicLong totalRequests = new AtomicLong();

public StorageSession(Session wrappedSession) {
this.wrappedSession = wrappedSession;
this.wrappedSession.getCluster().register(this);
Expand Down Expand Up @@ -136,13 +131,8 @@ public void addStorageStateListener(StorageStateListener listener) {
listeners.add(listener);
}

public long getTimeouts() {
return timeouts;
}

public ResultSet execute(String query) {
try {
totalRequests.incrementAndGet();
permits.acquire();
return wrappedSession.execute(query);
} catch (QueryTimeoutException e) {
Expand All @@ -156,7 +146,6 @@ public ResultSet execute(String query) {

public ResultSet execute(Query query) {
try {
totalRequests.incrementAndGet();
permits.acquire();
return wrappedSession.execute(query);
} catch(QueryTimeoutException e) {
Expand All @@ -169,21 +158,18 @@ public ResultSet execute(Query query) {
}

public StorageResultSetFuture executeAsync(String query) {
totalRequests.incrementAndGet();
permits.acquire();
ResultSetFuture future = wrappedSession.executeAsync(query);
return new StorageResultSetFuture(future, this);
}

public StorageResultSetFuture executeAsync(Query query) {
totalRequests.incrementAndGet();
permits.acquire();
ResultSetFuture future = wrappedSession.executeAsync(query);
return new StorageResultSetFuture(future, this);
}

public PreparedStatement prepare(String query) {
totalRequests.incrementAndGet();
permits.acquire();
return wrappedSession.prepare(query);
}
Expand Down Expand Up @@ -255,7 +241,6 @@ void handleNoHostAvailable(NoHostAvailableException e) {
}

void handleTimeout() {
++timeouts;
if (System.currentTimeMillis() - permitsLastChanged > timeoutDampening) {
decreaseRequestThroughput((int) (getRequestLimit() * timeoutDelta));
}
Expand Down
Expand Up @@ -406,14 +406,41 @@
</plugin-configuration>

<metric
property="RequestTimeouts"
property="ReadRequestTimeouts"
measurementType="trendsup"
description="The number of request timeouts (client-side) this RHQ Server since it was started"/>
description="The number of read request timeouts (client-side) this RHQ Server since it was started"/>
<metric
property="WriteRequestTimeouts"
measurementType="trendsup"
description="The number of read request timeouts (client-side) this RHQ Server since it was started"/>
<metric
property="TotalRequests"
measurementType="trendsup"
description="The total number of requests this RHQ Server has made since it was started"/>

<metric property="OneMinuteAvgRate"
measurementType="dynamic" displayType="summary" description="The request rate per second during one minute interval" defaultInterval="60000" />

<metric property="FiveMinuteAvgRate"
measurementType="dynamic" displayType="summary" description="The request rate per second during five minute interval" defaultInterval="300000"/>

<metric property="FifteenMinuteAvgRate"
measurementType="dynamic" displayType="summary" description="The request rate per second during fifteen minute interval" defaultInterval="900000" />

<metric property="MeanRate"
measurementType="dynamic" displayType="summary" description="The mean request rate per second since server was started" />

<metric property="MeanLatency"
measurementType="dynamic" displayType="summary" description="The mean latency" />

<metric property="ConnectedToHosts" measurementType="dynamic" displayType="summary" description="Number of Storage nodes the driver is currently connected to (that is have at least one connection opened to)." />

<metric property="OpenConnections" measurementType="dynamic" displayType="summary" description="Total number of currently opened connections to Storage nodes." />

<metric property="Retries" measurementType="trendsup" displayType="summary" description="Number of times the request had to retried to storage node." />

<metric property="ConnectionErrors" measurementType="trendsup" displayType="summary" description="Number of times the connection failed to the storage node." />

<resource-configuration>
<c:simple-property name="RequestLimit" type="double" required="false" description="Sets throttling in terms of
requests per second. Defaults to 30,000 if undefined. Note that this setting is automatically
Expand Down

0 comments on commit f709481

Please sign in to comment.