Skip to content

Commit

Permalink
WIP done fixing MemoryStorage
Browse files Browse the repository at this point in the history
  • Loading branch information
Bj0rnen committed Jan 22, 2015
1 parent 4bc7880 commit b61a986
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 59 deletions.
14 changes: 10 additions & 4 deletions src/main/java/com/spotify/reaper/resources/RepairRunResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;

Expand Down Expand Up @@ -131,10 +132,16 @@ public Response addRepairRun(
jmxProxy.close();

Set<String> tableNames;
if (tableNamesParam.isPresent()) {
if (tableNamesParam.isPresent() && !tableNamesParam.get().isEmpty()) {
tableNames = Sets.newHashSet(COMMA_SEPARATED_LIST_SPLITTER.split(tableNamesParam.get()));
for (String name : tableNames) {
if (!knownTables.contains(name)) {
return Response.status(Response.Status.NOT_FOUND).entity(
"keyspace doesn't contain a table named \"" + name + "\"").build();
}
}
} else {
tableNames = knownTables;
tableNames = Collections.emptySet();
}

Optional<RepairUnit> storedRepairUnit =
Expand Down Expand Up @@ -254,7 +261,7 @@ private Response pauseRun(RepairRun repairRun, RepairUnit repairUnit) {
LOG.info("Pausing run {}", repairRun.getId());
RepairRun updatedRun = repairRun.with()
.runState(RepairRun.RunState.PAUSED)
.pauseTime(DateTime.now())
.pausedTime(DateTime.now())
.build(repairRun.getId());
storage.updateRepairRun(updatedRun);
return Response.ok().entity(new RepairRunStatus(repairRun, repairUnit)).build();
Expand All @@ -264,7 +271,6 @@ private Response resumeRun(RepairRun repairRun, RepairUnit repairUnit) {
LOG.info("Resuming run {}", repairRun.getId());
RepairRun updatedRun = repairRun.with()
.runState(RepairRun.RunState.RUNNING)
.pauseTime(null)
.build(repairRun.getId());
storage.updateRepairRun(updatedRun);
return Response.ok().entity(new RepairRunStatus(repairRun, repairUnit)).build();
Expand Down
9 changes: 4 additions & 5 deletions src/main/java/com/spotify/reaper/storage/IStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.spotify.reaper.service.RingRange;

import java.util.Collection;
import java.util.Set;

/**
* API definition for cassandra-reaper.
Expand Down Expand Up @@ -53,18 +54,16 @@ public interface IStorage {

/**
* Get a stored RepairUnit targeting the given tables in the given keyspace.
* Tables must be always defined, so targeting the whole keyspace requires
* first getting all the column family names from the keyspace.
*
* @param cluster Cluster name for the RepairUnit.
* @param keyspace Keyspace name for the RepairUnit.
* @param columnFamilyNames List of column families targeted by the RepairUnit.
* @param columnFamilyNames Set of column families targeted by the RepairUnit.
* @return Instance of a RepairUnit matching the parameters, or null if not found.
*/
Optional<RepairUnit> getRepairUnit(String cluster, String keyspace,
Collection<String> columnFamilyNames);
Set<String> columnFamilyNames);

boolean addRepairSegments(Collection<RepairSegment.Builder> newSegments, long runId);
void addRepairSegments(Collection<RepairSegment.Builder> newSegments, long runId);

boolean updateRepairSegment(RepairSegment newRepairSegment);

Expand Down
100 changes: 50 additions & 50 deletions src/main/java/com/spotify/reaper/storage/MemoryStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package com.spotify.reaper.storage;

import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import com.spotify.reaper.core.Cluster;
Expand All @@ -27,51 +29,50 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

import javax.annotation.Nullable;

/**
* Implements the StorageAPI using transient Java classes.
*/
public class MemoryStorage implements IStorage {

private final AtomicInteger REPAIR_RUN_ID = new AtomicInteger(0);
private final AtomicInteger COLUMN_FAMILY_ID = new AtomicInteger(0);
private final AtomicInteger REPAIR_UNIT_ID = new AtomicInteger(0);
private final AtomicInteger SEGMENT_ID = new AtomicInteger(0);

private ConcurrentMap<String, Cluster> clusters = Maps.newConcurrentMap();
private ConcurrentMap<Long, RepairRun> repairRuns = Maps.newConcurrentMap();
private ConcurrentMap<Long, RepairUnit> columnFamilies = Maps.newConcurrentMap();
private ConcurrentMap<TableName, RepairUnit> columnFamiliesByName = Maps.newConcurrentMap();
private ConcurrentMap<Long, RepairUnit> repairUnits = Maps.newConcurrentMap();
private ConcurrentMap<RepairUnitKey, RepairUnit> repairUnitsByKey = Maps.newConcurrentMap();
private ConcurrentMap<Long, RepairSegment> repairSegments = Maps.newConcurrentMap();
private ConcurrentMap<Long, LinkedHashMap<Long, RepairSegment>> repairSegmentsByRunId =
Maps.newConcurrentMap();

public static class TableName {
public static class RepairUnitKey {

public final String cluster;
public final String keyspace;
public final String table;
public final Set<String> tables;

public TableName(String cluster, String keyspace, String table) {
public RepairUnitKey(String cluster, String keyspace, Set<String> tables) {
this.cluster = cluster;
this.keyspace = keyspace;
this.table = table;
this.tables = tables;
}

@Override
public boolean equals(Object other) {
return other instanceof TableName &&
cluster.equals(((TableName) other).cluster) &&
keyspace.equals(((TableName) other).keyspace) &&
table.equals(((TableName) other).table);
return other instanceof RepairUnitKey &&
cluster.equals(((RepairUnitKey) other).cluster) &&
keyspace.equals(((RepairUnitKey) other).keyspace) &&
tables.equals(((RepairUnitKey) other).tables);
}

@Override
public int hashCode() {
return (cluster + keyspace + table).hashCode();
return cluster.hashCode() ^ keyspace.hashCode() ^ tables.hashCode();
}
}

Expand All @@ -87,14 +88,14 @@ public Collection<Cluster> getClusters() {
}

@Override
public Cluster addCluster(Cluster cluster) {
Cluster existing = clusters.put(cluster.getName(), cluster);
return existing == null ? cluster : null;
public boolean addCluster(Cluster cluster) {
Cluster existing = clusters.putIfAbsent(cluster.getName(), cluster);
return existing == null;
}

@Override
public boolean updateCluster(Cluster newCluster) {
if (getCluster(newCluster.getName()) == null) {
if (!getCluster(newCluster.getName()).isPresent()) {
return false;
} else {
clusters.put(newCluster.getName(), newCluster);
Expand All @@ -103,8 +104,8 @@ public boolean updateCluster(Cluster newCluster) {
}

@Override
public Cluster getCluster(String clusterName) {
return clusters.get(clusterName);
public Optional<Cluster> getCluster(String clusterName) {
return Optional.fromNullable(clusters.get(clusterName));
}

@Override
Expand All @@ -125,8 +126,8 @@ public boolean updateRepairRun(RepairRun repairRun) {
}

@Override
public RepairRun getRepairRun(long id) {
return repairRuns.get(id);
public Optional<RepairRun> getRepairRun(long id) {
return Optional.fromNullable(repairRuns.get(id));
}

@Override
Expand All @@ -152,29 +153,29 @@ public Collection<RepairRun> getAllRunningRepairRuns() {
}

@Override
public RepairUnit addColumnFamily(RepairUnit.Builder columnFamily) {
RepairUnit existing =
getColumnFamily(columnFamily.clusterName, columnFamily.keyspaceName, columnFamily.name);
if (existing == null) {
RepairUnit newRepairUnit = columnFamily.build(COLUMN_FAMILY_ID.incrementAndGet());
columnFamilies.put(newRepairUnit.getId(), newRepairUnit);
TableName tableName = new TableName(newRepairUnit.getClusterName(),
newRepairUnit.getKeyspaceName(), newRepairUnit.getName());
columnFamiliesByName.put(tableName, newRepairUnit);
return newRepairUnit;
public RepairUnit addRepairUnit(RepairUnit.Builder repairUnit) {
Optional<RepairUnit> existing =
getRepairUnit(repairUnit.clusterName, repairUnit.keyspaceName, repairUnit.columnFamilies);
if (existing.isPresent()) {
return existing.get();
} else {
return null;
RepairUnit newRepairUnit = repairUnit.build(REPAIR_UNIT_ID.incrementAndGet());
repairUnits.put(newRepairUnit.getId(), newRepairUnit);
RepairUnitKey unitTables = new RepairUnitKey(newRepairUnit.getClusterName(), newRepairUnit.getKeyspaceName(), newRepairUnit.getColumnFamilies());
repairUnitsByKey.put(unitTables, newRepairUnit);
return newRepairUnit;
}
}

@Override
public RepairUnit getColumnFamily(long id) {
return columnFamilies.get(id);
public Optional<RepairUnit> getRepairUnit(long id) {
return Optional.fromNullable(repairUnits.get(id));
}

@Override
public RepairUnit getColumnFamily(String cluster, String keyspace, String table) {
return columnFamiliesByName.get(new TableName(cluster, keyspace, table));
public Optional<RepairUnit> getRepairUnit(String cluster, String keyspace, Set<String> tables) {
return Optional
.fromNullable(repairUnitsByKey.get(new RepairUnitKey(cluster, keyspace, tables)));
}

@Override
Expand Down Expand Up @@ -202,42 +203,41 @@ public boolean updateRepairSegment(RepairSegment newRepairSegment) {
}

@Override
public RepairSegment getRepairSegment(long id) {
return repairSegments.get(id);
public Optional<RepairSegment> getRepairSegment(long id) {
return Optional.fromNullable(repairSegments.get(id));
}

@Override
public RepairSegment getNextFreeSegment(long runId) {
public Optional<RepairSegment> getNextFreeSegment(long runId) {
for (RepairSegment segment : repairSegmentsByRunId.get(runId).values()) {
if (segment.getState() == RepairSegment.State.NOT_STARTED) {
return segment;
return Optional.of(segment);
}
}
return null;
}

@Override
public RepairSegment getNextFreeSegmentInRange(long runId, RingRange range) {
public Optional<RepairSegment> getNextFreeSegmentInRange(long runId, RingRange range) {
for (RepairSegment segment : repairSegmentsByRunId.get(runId).values()) {
if (segment.getState() == RepairSegment.State.NOT_STARTED &&
range.encloses(segment.getTokenRange())) {
return segment;
return Optional.of(segment);
}
}
return null;
}

@Nullable
@Override
public RepairSegment getTheRunningSegment(long runId) {
RepairSegment theSegment = null;
public Collection<RepairSegment> getSegmentsWithStateForRun(long runId,
RepairSegment.State segmentState) {
List<RepairSegment> segments = Lists.newArrayList();
for (RepairSegment segment : repairSegmentsByRunId.get(runId).values()) {
if (segment.getState() == RepairSegment.State.RUNNING) {
assert null == theSegment : "there are more than one RUNNING segment on run: " + runId;
theSegment = segment;
if (segment.getState() == segmentState) {
segments.add(segment);
}
}
return theSegment;
return segments;
}

@Override
Expand Down

0 comments on commit b61a986

Please sign in to comment.