Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
jbellis committed Oct 8, 2012
1 parent 5160de5 commit a2e6e1a
Show file tree
Hide file tree
Showing 12 changed files with 259 additions and 228 deletions.
2 changes: 0 additions & 2 deletions conf/cassandra.yaml
Expand Up @@ -417,8 +417,6 @@ in_memory_compaction_limit_in_mb: 64
# slowly or too fast, you should look at
# compaction_throughput_mb_per_sec first.
#
# This setting has no effect on LeveledCompactionStrategy.
#
# concurrent_compactors defaults to the number of cores.
# Uncomment to make compaction mono-threaded, the pre-0.8 default.
#concurrent_compactors: 1
Expand Down
46 changes: 14 additions & 32 deletions src/java/org/apache/cassandra/db/DataTracker.java
Expand Up @@ -172,46 +172,23 @@ public void run()
}

/**
* @return A subset of the given active sstables that have been marked compacting,
* or null if the thresholds cannot be met: files that are marked compacting must
* later be unmarked using unmarkCompacting.
* @return true if we are able to mark the given @param sstables as compacted, before anyone else
*
* Note that we could acquire references on the marked sstables and release them in
* unmarkCompacting, but since we will never call markCompacted on a sstable marked
* as compacting (unless there is a serious bug), we can skip this.
*/
public Set<SSTableReader> markCompacting(Collection<SSTableReader> tomark, int min, int max)
public boolean markCompacting(Collection<SSTableReader> sstables)
{
if (max < min || max < 1)
return null;
if (tomark == null || tomark.isEmpty())
return null;
assert sstables != null && !sstables.isEmpty();

View currentView, newView;
Set<SSTableReader> subset = null;
// order preserving set copy of the input
Set<SSTableReader> remaining = new LinkedHashSet<SSTableReader>(tomark);
do
{
currentView = view.get();

// find the subset that is active and not already compacting
remaining.removeAll(currentView.compacting);
remaining.retainAll(currentView.sstables);
if (remaining.size() < min)
// cannot meet the min threshold
return null;

// cap the newly compacting items into a subset set
subset = new HashSet<SSTableReader>();
Iterator<SSTableReader> iter = remaining.iterator();
for (int added = 0; added < max && iter.hasNext(); added++)
subset.add(iter.next());
View currentView = view.get();
Set<SSTableReader> inactive = Sets.difference(ImmutableSet.copyOf(sstables), currentView.compacting);
if (inactive.size() < sstables.size())
return false;

newView = currentView.markCompacting(subset);
}
while (!view.compareAndSet(currentView, newView));
return subset;
View newView = currentView.markCompacting(inactive);
return view.compareAndSet(currentView, newView);
}

/**
Expand Down Expand Up @@ -449,6 +426,11 @@ public static SSTableIntervalTree buildIntervalTree(Iterable<SSTableReader> ssta
return new SSTableIntervalTree(intervals);
}

public Set<SSTableReader> getCompacting()
{
return getView().compacting;
}

public static class SSTableIntervalTree extends IntervalTree<RowPosition, SSTableReader, Interval<RowPosition, SSTableReader>>
{
private static final SSTableIntervalTree EMPTY = new SSTableIntervalTree(null);
Expand Down
Expand Up @@ -66,22 +66,24 @@ public Map<String, String> getOptions()
public void shutdown() { }

/**
* @return the next background/minor compaction task to run; null if nothing to do.
* @param gcBefore throw away tombstones older than this
* @return the next background/minor compaction task to run; null if nothing to do.
* Is responsible for marking its sstables as compaction-pending.
*/
public abstract AbstractCompactionTask getNextBackgroundTask(final int gcBefore);

/**
* @param gcBefore throw away tombstones older than this
* @return a compaction task that should be run to compact this columnfamilystore
* as much as possible. Null if nothing to do.
* @param gcBefore throw away tombstones older than this
*/
public abstract AbstractCompactionTask getMaximalTask(final int gcBefore);

/**
* @param sstables SSTables to compact. Must be marked as compacting.
* @param gcBefore throw away tombstones older than this
* @return a compaction task corresponding to the requested sstables.
* Will not be null. (Will throw if user requests an invalid compaction.)
* @param gcBefore throw away tombstones older than this
*/
public abstract AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, final int gcBefore);

Expand All @@ -95,17 +97,10 @@ public void shutdown() { }
*/
public abstract long getMaxSSTableSize();

/**
* @return true if checking for whether a key exists, ignoring @param sstablesToIgnore,
* is going to be expensive
*/
public abstract boolean isKeyExistenceExpensive(Set<? extends SSTable> sstablesToIgnore);

/**
* Filters SSTables that are to be blacklisted from the given collection
*
* @param originalCandidates The collection to check for blacklisted SSTables
*
* @return list of the SSTables with blacklisted ones filtered out
*/
public static List<SSTableReader> filterSuspectSSTables(Collection<SSTableReader> originalCandidates)
Expand Down
Expand Up @@ -18,7 +18,6 @@
package org.apache.cassandra.db.compaction;

import java.util.Collection;
import java.util.Set;

import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.db.ColumnFamilyStore;
Expand All @@ -42,54 +41,11 @@ public AbstractCompactionTask(ColumnFamilyStore cfs, Collection<SSTableReader> s

public abstract int execute(CompactionExecutorStatsCollector collector);

public ColumnFamilyStore getColumnFamilyStore()
{
return cfs;
}

public Collection<SSTableReader> getSSTables()
{
return sstables;
}

/**
* Try to mark the sstable to compact as compacting.
* It returns true if some sstables have been marked for compaction, false
* otherwise.
* This *must* be called before calling execute(). Moreover,
* unmarkSSTables *must* always be called after execute() if this
* method returns true.
*/
public boolean markSSTablesForCompaction()
{
int min = isUserDefined ? 1 : cfs.getMinimumCompactionThreshold();
int max = isUserDefined ? Integer.MAX_VALUE : cfs.getMaximumCompactionThreshold();
return markSSTablesForCompaction(min, max);
}

public boolean markSSTablesForCompaction(int min, int max)
{
Set<SSTableReader> marked = cfs.getDataTracker().markCompacting(sstables, min, max);

if (marked == null || marked.isEmpty())
{
cancel();
return false;
}

this.sstables = marked;
return true;
}

public void unmarkSSTables()
{
cfs.getDataTracker().unmarkCompacting(sstables);
}

// Can be overriden for action that need to be performed if the task won't
// execute (if sstable can't be marked successfully)
protected void cancel() {}

public AbstractCompactionTask isUserDefined(boolean isUserDefined)
{
this.isUserDefined = isUserDefined;
Expand Down

0 comments on commit a2e6e1a

Please sign in to comment.