Skip to content

Commit

Permalink
Merge branch 'zvo/get-tokens'
Browse files Browse the repository at this point in the history
  • Loading branch information
Bj0rnen committed Dec 1, 2014
2 parents 5dc93ab + 1b88d4f commit 0c50b4e
Show file tree
Hide file tree
Showing 8 changed files with 360 additions and 0 deletions.
8 changes: 8 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<dropwizard.version>0.7.0</dropwizard.version>
<cassandra.version>2.0.11</cassandra.version>
</properties>

<dependencies>
Expand All @@ -21,6 +22,11 @@
<artifactId>dropwizard-core</artifactId>
<version>${dropwizard.version}</version>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>${cassandra.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand All @@ -30,6 +36,7 @@

<build>
<plugins>
<!--
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-enforcer-plugin</artifactId>
Expand All @@ -48,6 +55,7 @@
</execution>
</executions>
</plugin>
-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/com/spotify/reaper/ReaperException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.spotify.reaper;

public class ReaperException extends Exception {

public ReaperException(String s) {
super(s);
}

public ReaperException(Exception e) {
super(e);
}

public ReaperException(String s, Exception e) {
super(s,e);
}
}
30 changes: 30 additions & 0 deletions src/main/java/com/spotify/reaper/cassandra/ClusterInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.spotify.reaper.cassandra;

import com.spotify.reaper.ReaperException;

import java.util.List;

public class ClusterInfo implements IClusterInfo {

private String seedHost;
private int seedPort = 0;

private List<String> tokens;

public ClusterInfo(String seedHost, int seedPort) {
this.seedHost = seedHost;
this.seedPort = seedPort;
}

public void init() throws ReaperException {
JMXProxy jmx = seedPort == 0 ? JMXProxy.connect(seedHost) : JMXProxy.connect(seedHost, seedPort);
tokens = jmx.getTokens();
jmx.close();
}

@Override
public List<String> getTokens() {
return tokens;
}

}
9 changes: 9 additions & 0 deletions src/main/java/com/spotify/reaper/cassandra/IClusterInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.spotify.reaper.cassandra;

import java.util.List;

public interface IClusterInfo {

public List<String> getTokens();

}
67 changes: 67 additions & 0 deletions src/main/java/com/spotify/reaper/cassandra/JMXProxy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.spotify.reaper.cassandra;

import com.google.common.collect.Lists;
import com.spotify.reaper.ReaperException;
import org.apache.cassandra.service.StorageServiceMBean;

import java.io.IOException;
import java.net.MalformedURLException;
import java.util.List;

import javax.management.JMX;
import javax.management.MBeanServerConnection;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;

public class JMXProxy {

private static final int DEFAULT_JMX_PORT = 7199;

private JMXConnector jmxc = null;
private StorageServiceMBean ssProxy;

public JMXProxy(JMXConnector jmxc, StorageServiceMBean ssProxy) {
this.jmxc = jmxc;
this.ssProxy = ssProxy;
}

public static JMXProxy connect(String host) throws ReaperException {
return connect(host, DEFAULT_JMX_PORT);
}

public static JMXProxy connect(String host, int port) throws ReaperException {
JMXServiceURL jmxUrl;
ObjectName name;
try {
jmxUrl = new JMXServiceURL(String.format("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi",
host, port));
name = new ObjectName("org.apache.cassandra.db:type=StorageService");
} catch (MalformedURLException | MalformedObjectNameException e) {
throw new ReaperException("Failure during preparations for JMX connection", e);
}
try {
JMXConnector jmxc = JMXConnectorFactory.connect(jmxUrl);
MBeanServerConnection mbeanServerConn = jmxc.getMBeanServerConnection();
StorageServiceMBean
ssProxy = JMX.newMBeanProxy(mbeanServerConn, name, StorageServiceMBean.class);
return new JMXProxy(jmxc, ssProxy);
} catch (IOException e) {
throw new ReaperException("Failure when establishing JMX connection", e);
}
}

public List<String> getTokens() {
return Lists.newArrayList(ssProxy.getTokenToEndpointMap().keySet());
}

public void close() throws ReaperException {
try {
jmxc.close();
} catch (IOException e) {
throw new ReaperException(e);
}
}
}
6 changes: 6 additions & 0 deletions src/main/java/com/spotify/reaper/core/RepairSegment.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,10 @@ public RepairSegment build() {
return new RepairSegment(this);
}
}


@Override
public String toString() {
return String.format("(%s,%s)", startToken.toString(), endToken.toString());
}
}
107 changes: 107 additions & 0 deletions src/main/java/com/spotify/reaper/service/SegmentGenerator.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,117 @@
package com.spotify.reaper.service;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.spotify.reaper.ReaperException;
import com.spotify.reaper.core.RepairSegment;

import java.math.BigInteger;
import java.util.List;

/**
* Splits given Cassandra table's (column family's) token range into RepairSegments.
*
* The run order of RepairSegments in RepairRun defines the RepairStrategy.
*/
public class SegmentGenerator {

private final String partitioner;
private final BigInteger MIN_SEGMENT_SIZE = new BigInteger("100");
private BigInteger RANGE_MIN;
private BigInteger RANGE_MAX;
private BigInteger RANGE_SIZE;

public SegmentGenerator(String partitioner) throws ReaperException {
if (partitioner.endsWith("RandomPartitioner")) {
RANGE_MIN = BigInteger.ZERO;
RANGE_MAX = new BigInteger("2").pow(127).subtract(BigInteger.ONE);
} else if (partitioner.endsWith("Murmur3Partitioner")) {
RANGE_MIN = new BigInteger("2").pow(63).negate();
RANGE_MAX = new BigInteger("2").pow(63).subtract(BigInteger.ONE);
} else {
throw new ReaperException("Unsupported partitioner " + partitioner);
}
RANGE_SIZE = RANGE_MAX.subtract(RANGE_MIN).add(BigInteger.ONE);
this.partitioner = partitioner;
}

public List<RepairSegment> generateSegments(int cnt, List<String> ring) throws ReaperException {
List<RepairSegment> repairSegments = Lists.newArrayList();
int ringSize = ring.size();
BigInteger count = new BigInteger(String.valueOf(cnt));
BigInteger start;
BigInteger stop;
BigInteger next;
BigInteger cur;
BigInteger total = BigInteger.ZERO;

for (int i = 0; i < ringSize; i++) {
start = new BigInteger(ring.get(i));
stop = new BigInteger(ring.get((i+1) % ringSize));
if (!inRange(start) || !inRange(stop)) {
throw new ReaperException(String.format("Tokens (%s,%s) not in range of %s",
start, stop, partitioner));
}
if (lowerThan(stop, start)) {
// wrap around case
stop = stop.add(RANGE_SIZE);
}
BigInteger segmentSize = stop.subtract(start).divide(count).add(BigInteger.ONE);
BigInteger rangeLength = max(MIN_SEGMENT_SIZE, segmentSize);
cur = start;
while (lowerThan(cur, stop)) {
next = min(stop, cur.add(rangeLength));
BigInteger ocur = cur;
BigInteger onext = next;
if (greaterThan(onext, RANGE_MAX)) {
onext = onext.subtract(RANGE_SIZE);
}
if (greaterThan(ocur, RANGE_MAX)) {
ocur = ocur.subtract(RANGE_SIZE);
}
//repairSegments.add(new RepairSegment(ocur, onext));
repairSegments.add(new RepairSegment.RepairSegmentBuilder()
.startToken(ocur)
.endToken(onext)
.build());
total = total.add(next).subtract(cur);
cur = next;
}
}

if (!total.equals(RANGE_SIZE)) {
throw new ReaperException("Not entire ring would get repaired");
}
return repairSegments;
}

protected boolean inRange(BigInteger token) {
if (lowerThan(token, RANGE_MIN) || greaterThan(token, RANGE_MAX)) {
return false;
} else {
return true;
}
}

@VisibleForTesting
protected static BigInteger max(BigInteger a, BigInteger b) {
return greaterThan(a, b) ? a : b;
}

@VisibleForTesting
protected static BigInteger min(BigInteger a, BigInteger b) {
return lowerThan(a, b) ? a : b;
}

@VisibleForTesting
protected static boolean lowerThan(BigInteger a, BigInteger b) {
return a.compareTo(b) == -1;
}

@VisibleForTesting
protected static boolean greaterThan(BigInteger a, BigInteger b) {
return a.compareTo(b) == 1;
}


}
Loading

0 comments on commit 0c50b4e

Please sign in to comment.