Skip to content

Commit

Permalink
Refactor SegmentRunner, moving logic out of constructor
Browse files Browse the repository at this point in the history
  • Loading branch information
Bj0rnen committed Jan 20, 2015
1 parent 0749e6f commit 75d6924
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 104 deletions.
10 changes: 5 additions & 5 deletions src/main/java/com/spotify/reaper/cassandra/JmxProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@

import static com.google.common.base.Preconditions.checkNotNull;

public class JmxProxy implements NotificationListener, Serializable {
public class JmxProxy implements NotificationListener, Serializable, AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(JmxProxy.class);

Expand Down Expand Up @@ -97,8 +97,7 @@ public static JmxProxy connect(Optional<RepairStatusHandler> handler, String hos
String[] parts = host.split(":");
if (parts.length == 2) {
return connect(handler, parts[0], Integer.valueOf(parts[1]));
}
else {
} else {
return connect(handler, host, JMX_PORT);
}
}
Expand Down Expand Up @@ -221,6 +220,7 @@ public void cancelAllRepairs() {

/**
* Checks if table exists in the cluster by instantiating a MBean for that table.
*
* @throws ReaperException if the query fails, not when the table doesn't exist
*/
public boolean tableExists(String ks, String cf) throws ReaperException {
Expand All @@ -234,8 +234,7 @@ public boolean tableExists(String ks, String cf) throws ReaperException {
}
ObjectName bean = beans.iterator().next();
JMX.newMBeanProxy(mbeanServer, bean, ColumnFamilyStoreMBean.class);
}
catch (MalformedObjectNameException | IOException e) {
} catch (MalformedObjectNameException | IOException e) {
String errMsg = String.format("ColumnFamilyStore for %s/%s not found: %s", ks, cf,
e.getMessage());
LOG.warn(errMsg);
Expand Down Expand Up @@ -311,6 +310,7 @@ public boolean isConnectionAlive() {
/**
* Cleanly shut down by un-registering the listener and closing the JMX connection.
*/
@Override
public void close() throws ReaperException {
try {
mbeanServer.removeNotificationListener(ssMbeanName, this);
Expand Down
10 changes: 2 additions & 8 deletions src/main/java/com/spotify/reaper/service/RepairRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,8 @@ private void doRepairSegment(long segmentId, RingRange tokenRange) {
return;
}

try {
SegmentRunner.triggerRepair(storage, segmentId, potentialCoordinators, repairTimeoutMillis,
jmxConnectionFactory);
} catch (ReaperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
SegmentRunner.triggerRepair(storage, segmentId, potentialCoordinators, repairTimeoutMillis,
jmxConnectionFactory);

handleResult(segmentId);
}
Expand Down
177 changes: 88 additions & 89 deletions src/main/java/com/spotify/reaper/service/SegmentRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,88 +23,86 @@
import com.spotify.reaper.storage.IStorage;

import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.SimpleCondition;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;

public final class SegmentRunner implements RepairStatusHandler {

private static final Logger LOG = LoggerFactory.getLogger(SegmentRunner.class);

private final IStorage storage;
private final long segmentId;
private final int commandId;
private final JmxProxy jmxConnection;
private int commandId;

private final Condition condition = new SimpleCondition();


public static void triggerRepair(IStorage storage, long segmentId,
Collection<String> potentialCoordinators, long timeoutMillis,
JmxConnectionFactory jmxConnectionFactory)
throws ReaperException, InterruptedException {
new SegmentRunner(storage, segmentId, potentialCoordinators, jmxConnectionFactory)
.awaitOutcome(timeoutMillis);
Collection<String> potentialCoordinators, long timeoutMillis,
JmxConnectionFactory jmxConnectionFactory) {
new SegmentRunner(storage, segmentId)
.runRepair(potentialCoordinators, jmxConnectionFactory, timeoutMillis);
}

private SegmentRunner(IStorage storage, long segmentId, Collection<String> potentialCoordinators,
JmxConnectionFactory jmxConnectionFactory)
throws ReaperException {
private SegmentRunner(IStorage storage, long segmentId) {
this.storage = storage;
this.segmentId = segmentId;
}

// TODO: don't trigger the repair in the constructor. The change will force commandId to be
// TODO: mutable, but that's better than this.
synchronized (this) {
jmxConnection = jmxConnectionFactory
.connectAny(Optional.<RepairStatusHandler>of(this), potentialCoordinators);

RepairSegment segment = storage.getRepairSegment(segmentId);
private void runRepair(Collection<String> potentialCoordinators,
JmxConnectionFactory jmxConnectionFactory, long timeoutMillis) {
try (JmxProxy jmxConnection = jmxConnectionFactory
.connectAny(Optional.<RepairStatusHandler>of(this), potentialCoordinators)) {
final RepairSegment segment = storage.getRepairSegment(segmentId);
ColumnFamily columnFamily =
storage.getColumnFamily(segment.getColumnFamilyId());
String keyspace = columnFamily.getKeyspaceName();

assert !segment.getState().equals(RepairSegment.State.RUNNING);
commandId = jmxConnection
.triggerRepair(segment.getStartToken(), segment.getEndToken(), keyspace,
columnFamily.getName());
LOG.debug("Triggered repair with command id {}", commandId);
LOG.info("Repair for segment {} started", segmentId);
storage.updateRepairSegment(segment.with()
.state(RepairSegment.State.RUNNING)
.repairCommandId(commandId)
.build(segmentId));
}
}

private synchronized void awaitOutcome(long timeoutMillis)
throws InterruptedException, ReaperException {
long abortTime = (System.nanoTime() / 1000000) + timeoutMillis;
while (true) {
RepairSegment segment = storage.getRepairSegment(segmentId);
if (!segment.getState().equals(RepairSegment.State.RUNNING)) {
LOG.info("Repair command {} on segment {} finished", commandId, segmentId);
break;
}
long milliTime = System.nanoTime() / 1000000;
if (milliTime < abortTime) {
wait(abortTime - milliTime);
} else {
LOG.warn("Repair command {} on segment {} timed out", commandId, segmentId);
abort(segment);
break;
synchronized (condition) {
commandId = jmxConnection
.triggerRepair(segment.getStartToken(), segment.getEndToken(), keyspace,
columnFamily.getName());
LOG.debug("Triggered repair with command id {}", commandId);
storage.updateRepairSegment(segment.with()
.state(RepairSegment.State.RUNNING)
.repairCommandId(commandId)
.build(segmentId));
LOG.info("Repair for segment {} started", segmentId);

try {
condition.await(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.warn("Repair command {} on segment {} interrupted", commandId, segmentId);
} finally {
RepairSegment resultingSegment = storage.getRepairSegment(segmentId);
LOG.info("Repair command {} on segment {} exited with state {}", commandId, segmentId,
resultingSegment.getState());
if (resultingSegment.getState().equals(RepairSegment.State.RUNNING)) {
abort(resultingSegment, jmxConnection);
LOG.info("Repair command {} on segment {} has been cancelled", commandId, segmentId);
}
}
}
} catch (ReaperException e) {
LOG.warn("Failed to connect to a coordinator node for segment {}. Aborting repair", segmentId);
}
jmxConnection.close();
}

private synchronized void abort(RepairSegment segment) {
private void abort(RepairSegment segment, JmxProxy jmxConnection) {
LOG.warn("Aborting command {} on segment {}", commandId, segmentId);
jmxConnection.cancelAllRepairs();
storage.updateRepairSegment(segment.with()
.startTime(null)
.repairCommandId(null)
.state(RepairSegment.State.NOT_STARTED)
.build(segmentId));
.startTime(null)
.repairCommandId(null)
.state(RepairSegment.State.NOT_STARTED)
.build(segmentId));
}


Expand All @@ -117,45 +115,46 @@ private synchronized void abort(RepairSegment segment) {
* @param message additional information about the repair
*/
@Override
public synchronized void handle(int repairNumber, ActiveRepairService.Status status,
String message) {
LOG.debug(
"handleRepairOutcome called for repairCommandId {}, outcome {} and message: {}",
repairNumber, status, message);
if (repairNumber != commandId) {
LOG.debug("Handler for command id {} not handling message with number {}",
commandId, repairNumber);
return;
}
public void handle(int repairNumber, ActiveRepairService.Status status, String message) {
synchronized (condition) {
LOG.debug(
"handle called for repairCommandId {}, outcome {} and message: {}",
repairNumber, status, message);
if (repairNumber != commandId) {
LOG.debug("Handler for command id {} not handling message with number {}",
commandId, repairNumber);
return;
}

RepairSegment currentSegment = storage.getRepairSegment(segmentId);
// See status explanations from: https://wiki.apache.org/cassandra/RepairAsyncAPI
switch (status) {
case STARTED:
DateTime now = DateTime.now();
storage.updateRepairSegment(currentSegment.with()
.startTime(now)
.build(segmentId));
// We already set the state of the segment to RUNNING.
break;
case SESSION_FAILED:
// TODO: Bj0rn: How should we handle this? Here, it's almost treated like a success.
storage.updateRepairSegment(currentSegment.with()
.state(RepairSegment.State.ERROR)
.endTime(DateTime.now())
.build(segmentId));
notify();
break;
case SESSION_SUCCESS:
// Do nothing, wait for FINISHED.
break;
case FINISHED:
storage.updateRepairSegment(currentSegment.with()
.state(RepairSegment.State.DONE)
.endTime(DateTime.now())
.build(segmentId));
notify();
break;
RepairSegment currentSegment = storage.getRepairSegment(segmentId);
// See status explanations from: https://wiki.apache.org/cassandra/RepairAsyncAPI
switch (status) {
case STARTED:
DateTime now = DateTime.now();
storage.updateRepairSegment(currentSegment.with()
.startTime(now)
.build(segmentId));
// We already set the state of the segment to RUNNING.
break;
case SESSION_FAILED:
// TODO: Bj0rn: How should we handle this? Here, it's almost treated like a success.
storage.updateRepairSegment(currentSegment.with()
.state(RepairSegment.State.ERROR)
.endTime(DateTime.now())
.build(segmentId));
condition.signalAll();
break;
case SESSION_SUCCESS:
// Do nothing, wait for FINISHED.
break;
case FINISHED:
storage.updateRepairSegment(currentSegment.with()
.state(RepairSegment.State.DONE)
.endTime(DateTime.now())
.build(segmentId));
condition.signalAll();
break;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void successTest() throws InterruptedException, ReaperException, Executio
final MutableObject<Future<?>> future = new MutableObject<>();

SegmentRunner.triggerRepair(storage, segmentId,
Collections.singleton(""), 500, new JmxConnectionFactory() {
Collections.singleton(""), 1000, new JmxConnectionFactory() {
@Override
public JmxProxy create(final Optional<RepairStatusHandler> handler, String host)
throws ReaperException {
Expand Down Expand Up @@ -186,7 +186,7 @@ public void failureTest() throws InterruptedException, ReaperException, Executio
final MutableObject<Future<?>> future = new MutableObject<>();

SegmentRunner.triggerRepair(storage, segmentId,
Collections.singleton(""), 500, new JmxConnectionFactory() {
Collections.singleton(""), 1000, new JmxConnectionFactory() {
@Override
public JmxProxy create(final Optional<RepairStatusHandler> handler, String host)
throws ReaperException {
Expand Down

0 comments on commit 75d6924

Please sign in to comment.