Skip to content
This repository has been archived by the owner on Mar 31, 2022. It is now read-only.

Commit

Permalink
Fix getting metrics for pending compactions and active/pending repair… (
Browse files Browse the repository at this point in the history
#53)

* Fix getting metrics for pending compactions and active/pending repair sessions
Add check for running validation compactions

* Bump version to 0.4.1-SNAPSHOT

* Fix validation compaction stats not being available for C* 1.2

* Remove C* 1.2 from the test since it doesn't support "IF NOT EXISTS"

* Remove C* 2.0.13 that doesn't support incremental repair

* Distinguish how to check for running repairs pre and post 2.2
Revert back to AntiEntropySessions when <= 2.1
Use 10x higher timeouts for incremental repair

* Fix SonarQube complaints
  • Loading branch information
adejanovski committed Mar 3, 2017
1 parent 9fb2b6e commit 9c25be0
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 16 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ env:
- CASSANDRA_VERSION=2.1.16
- CASSANDRA_VERSION=2.2.8
- CASSANDRA_VERSION=3.0.9
- CASSANDRA_VERSION=3.9
- CASSANDRA_VERSION=3.10
services:
- postgresql
before_script:
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<name>Cassandra Reaper</name>
<groupId>com.spotify</groupId>
<artifactId>cassandra-reaper</artifactId>
<version>0.4.0-SNAPSHOT</version>
<version>0.4.1-SNAPSHOT</version>
<packaging>jar</packaging>

<properties>
Expand Down
81 changes: 74 additions & 7 deletions src/main/java/com/spotify/reaper/cassandra/JmxProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,15 @@ public class JmxProxy implements NotificationListener, AutoCloseable {
private static final String SS_OBJECT_NAME = "org.apache.cassandra.db:type=StorageService";
private static final String AES_OBJECT_NAME =
"org.apache.cassandra.internal:type=AntiEntropySessions";
private static final String VALIDATION_ACTIVE_OBJECT_NAME =
"org.apache.cassandra.metrics:type=ThreadPools,path=internal,scope=ValidationExecutor,name=ActiveTasks";
private static final String VALIDATION_PENDING_OBJECT_NAME =
"org.apache.cassandra.metrics:type=ThreadPools,path=internal,scope=ValidationExecutor,name=PendingTasks";
private static final String COMP_OBJECT_NAME =
"org.apache.cassandra.metrics:type=Compaction";
"org.apache.cassandra.metrics:type=Compaction,name=PendingTasks";
private static final String VALUE_ATTRIBUTE = "Value";
private static final String FAILED_TO_CONNECT_TO_USING_JMX = "Failed to connect to {} using JMX";
private static final String ERROR_GETTING_ATTR_JMX = "Error getting attribute from JMX";

private final JMXConnector jmxConnector;
private final ObjectName ssMbeanName;
Expand Down Expand Up @@ -263,19 +270,19 @@ public int getPendingCompactions() {
checkNotNull(cmProxy, "Looks like the proxy is not connected");
try {
ObjectName name = new ObjectName(COMP_OBJECT_NAME);
int pendingCount = (int) mbeanServer.getAttribute(name, "PendingTasks");
int pendingCount = (int) mbeanServer.getAttribute(name, VALUE_ATTRIBUTE);
return pendingCount;
} catch (IOException ignored) {
LOG.warn("Failed to connect to " + host + " using JMX", ignored);
LOG.warn(FAILED_TO_CONNECT_TO_USING_JMX, host, ignored);
} catch (MalformedObjectNameException ignored) {
LOG.error("Internal error, malformed name", ignored);
} catch (InstanceNotFoundException e) {
// This happens if no repair has yet been run on the node
// The AntiEntropySessions object is created on the first repair
LOG.debug("No compaction has run yet on the node. Ignoring exception.", e);
LOG.error("Error getting pending compactions attribute from JMX", e);
return 0;
} catch (Exception e) {
LOG.error("Error getting attribute from JMX", e);
LOG.error(ERROR_GETTING_ATTR_JMX, e);
}
// If uncertain, assume it's running
return 0;
Expand All @@ -285,14 +292,22 @@ public int getPendingCompactions() {
* @return true if any repairs are running on the node.
*/
public boolean isRepairRunning() {
return isRepairRunningPre22() || isRepairRunningPost22() || isValidationCompactionRunning();
}


/**
* @return true if any repairs are running on the node.
*/
public boolean isRepairRunningPre22() {
// Check if AntiEntropySession is actually running on the node
try {
ObjectName name = new ObjectName(AES_OBJECT_NAME);
int activeCount = (Integer) mbeanServer.getAttribute(name, "ActiveCount");
long pendingCount = (Long) mbeanServer.getAttribute(name, "PendingTasks");
return activeCount + pendingCount != 0;
} catch (IOException ignored) {
LOG.warn("Failed to connect to " + host + " using JMX", ignored);
LOG.warn(FAILED_TO_CONNECT_TO_USING_JMX, host, ignored);
} catch (MalformedObjectNameException ignored) {
LOG.error("Internal error, malformed name", ignored);
} catch (InstanceNotFoundException e) {
Expand All @@ -301,7 +316,59 @@ public boolean isRepairRunning() {
LOG.debug("No repair has run yet on the node. Ignoring exception.", e);
return false;
} catch (Exception e) {
LOG.error("Error getting attribute from JMX", e);
LOG.error(ERROR_GETTING_ATTR_JMX, e);
}
// If uncertain, assume it's running
return true;
}

/**
* @return true if any repairs are running on the node.
*/
public boolean isValidationCompactionRunning() {
// Check if AntiEntropySession is actually running on the node
try {
int activeCount = (Integer) mbeanServer.getAttribute(new ObjectName(VALIDATION_ACTIVE_OBJECT_NAME), VALUE_ATTRIBUTE);
long pendingCount = (Long) mbeanServer.getAttribute(new ObjectName(VALIDATION_PENDING_OBJECT_NAME), VALUE_ATTRIBUTE);

return activeCount + pendingCount != 0;
} catch (IOException ignored) {
LOG.warn(FAILED_TO_CONNECT_TO_USING_JMX, host, ignored);
} catch (MalformedObjectNameException ignored) {
LOG.error("Internal error, malformed name", ignored);
} catch (InstanceNotFoundException e) {
LOG.error("Error getting pending/active validation compaction attributes from JMX", e);
return false;
} catch (Exception e) {
LOG.error(ERROR_GETTING_ATTR_JMX, e);
}
// If uncertain, assume it's not running
return false;
}

/**
* New way of determining if a repair is running after C* 2.2
*
* @return true if any repairs are running on the node.
*/
public boolean isRepairRunningPost22() {
try {
// list all mbeans in search of one with the name Repair#??
// This is the replacement for AntiEntropySessions since Cassandra 2.2
Set beanSet = mbeanServer.queryNames(new ObjectName("org.apache.cassandra.internal:*"), null);
for(Object bean:beanSet) {
ObjectName objName = (ObjectName) bean;
if(objName.getCanonicalName().contains("Repair#")){
return true;
}
}
return false;
} catch (IOException ignored) {
LOG.warn(FAILED_TO_CONNECT_TO_USING_JMX, host, ignored);
} catch (MalformedObjectNameException ignored) {
LOG.error("Internal error, malformed name", ignored);
} catch (Exception e) {
LOG.error(ERROR_GETTING_ATTR_JMX, e);
}
// If uncertain, assume it's running
return true;
Expand Down
44 changes: 37 additions & 7 deletions src/main/java/com/spotify/reaper/service/SegmentRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public final class SegmentRunner implements RepairStatusHandler, Runnable {
private static final Logger LOG = LoggerFactory.getLogger(SegmentRunner.class);

private static final int MAX_PENDING_COMPACTIONS = 20;
private static final int MAX_TIMEOUT_EXTENSIONS = 10;
private static final Pattern REPAIR_UUID_PATTERN =
Pattern.compile("[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}");

Expand Down Expand Up @@ -196,6 +197,9 @@ protected Set<String> initialize() {
}

LOG.debug("Triggered repair with command id {}", commandId);

// incremental repair can take way more time for a segment so we're extending the timeout MAX_TIMEOUT_EXTENSIONS times
long timeout = repairUnit.getIncrementalRepair()?timeoutMillis*MAX_TIMEOUT_EXTENSIONS:timeoutMillis;
context.storage.updateRepairSegment(segment.with()
.coordinatorHost(coordinator.getHost())
.repairCommandId(commandId)
Expand All @@ -204,9 +208,9 @@ protected Set<String> initialize() {
segment.getId(), coordinator.getHost());
repairRunner.updateLastEvent(eventMsg);
LOG.info("Repair for segment {} started, status wait will timeout in {} millis", segmentId,
timeoutMillis);
timeout);
try {
condition.await(timeoutMillis, TimeUnit.MILLISECONDS);
condition.await(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.warn("Repair command {} on segment {} interrupted", commandId, segmentId, e);
} finally {
Expand Down Expand Up @@ -236,19 +240,30 @@ protected Set<String> initialize() {
}
LOG.debug("Exiting synchronized section with segment ID {}", segmentId);
}

private void declineRun() {
LOG.info("SegmentRunner declined to repair segment {} because only one segment is allowed "
+ "at once for incremental repairs", segmentId);
String msg = "Postponed due to already running segment";
repairRunner.updateLastEvent(msg);
}

boolean canRepair(RepairSegment segment, String keyspace, JmxProxy coordinator,
LazyInitializer<Set<String>> busyHosts) {
Collection<String> allHosts;
if(repairUnit.getIncrementalRepair()){
// In incremental repairs, only one segment is allowed at once (one segment == the full primary range of one node)
if(repairHasSegmentRunning(segment.getRunId())) {
LOG.info("SegmentRunner declined to repair segment {} because only one segment is allowed "
+ "at once for incremental repairs", segmentId);
String msg = "Postponed due to already running segment";
repairRunner.updateLastEvent(msg);
return false;
declineRun();
return false;
}

if (IsRepairRunningOnOneNode(segment)) {
declineRun();
return false;
}


}
try {
// when hosts are coming up or going down, this method can throw an
Expand Down Expand Up @@ -318,6 +333,21 @@ boolean canRepair(RepairSegment segment, String keyspace, JmxProxy coordinator,
return true;
}

private boolean IsRepairRunningOnOneNode(RepairSegment segment) {
for(RepairSegment segmentInRun:context.storage.getRepairSegmentsForRun(segment.getRunId())){
try (JmxProxy hostProxy = context.jmxConnectionFactory.connect(segmentInRun.getCoordinatorHost())) {
if(hostProxy.isRepairRunning()) {
return true;
}
} catch (ReaperException e) {
LOG.error("Unreachable node when trying to determine if repair is running on a node. Crossing fingers and continuing...", e);
}
}

return false;

}

private boolean repairHasSegmentRunning(long repairRunId) {
Collection<RepairSegment> segments = context.storage.getRepairSegmentsForRun(repairRunId);
for(RepairSegment segment:segments) {
Expand Down

0 comments on commit 9c25be0

Please sign in to comment.