Skip to content

Commit

Permalink
cut down on the number of sstables compared for version and purge checks
Browse files Browse the repository at this point in the history
  • Loading branch information
jbellis committed Mar 26, 2012
1 parent 18b5564 commit 45af95a
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 43 deletions.
34 changes: 17 additions & 17 deletions src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Expand Up @@ -30,10 +30,7 @@
import java.util.regex.Pattern;
import javax.management.*;

import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.collect.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -69,6 +66,7 @@
import org.apache.cassandra.thrift.IndexExpression;
import org.apache.cassandra.utils.*;
import org.apache.cassandra.utils.IntervalTree.Interval;
import org.apache.cassandra.utils.IntervalTree.IntervalTree;
import org.cliffc.high_scale_lib.NonBlockingHashMap;

import static org.apache.cassandra.config.CFMetaData.Caching;
Expand Down Expand Up @@ -844,23 +842,25 @@ private static void removeDeletedSuper(ColumnFamily cf, int gcBefore)
}

/**
* Uses bloom filters to check if key may be present in any sstable in this
* ColumnFamilyStore, minus a set of provided ones.
*
* Because BFs are checked, negative returns ensure that the key is not
* present in the checked SSTables, but positive ones doesn't ensure key
* presence.
* @param sstables
* @return sstables whose key range overlaps with that of the given sstables, not including itself.
* (The given sstables may or may not overlap with each other.)
*/
public boolean isKeyInRemainingSSTables(DecoratedKey key, Set<? extends SSTable> sstablesToIgnore)
public Set<SSTableReader> getOverlappingSSTables(Collection<SSTableReader> sstables)
{
// we don't need to acquire references here, since the bloom filter is safe to use even post-compaction
List<SSTableReader> filteredSSTables = data.getView().intervalTree.search(new Interval(key, key));
for (SSTableReader sstable : filteredSSTables)
assert !sstables.isEmpty();
IntervalTree<SSTableReader> tree = data.getView().intervalTree;

Set<SSTableReader> results = null;
for (SSTableReader sstable : sstables)
{
if (!sstablesToIgnore.contains(sstable) && sstable.getBloomFilter().isPresent(key.key))
return true;
Set<SSTableReader> overlaps = ImmutableSet.copyOf(tree.search(new Interval<SSTableReader>(sstable.first, sstable.last)));
assert overlaps.contains(sstable);
results = results == null ? overlaps : Sets.union(results, overlaps);
}
return false;
results = Sets.difference(results, ImmutableSet.copyOf(sstables));

return results;
}

/*
Expand Down
16 changes: 8 additions & 8 deletions src/java/org/apache/cassandra/db/DataTracker.java
Expand Up @@ -552,6 +552,14 @@ public void unsubscribe(INotificationConsumer consumer)
assert found : consumer + " not subscribed";
}

public static IntervalTree<SSTableReader> buildIntervalTree(Iterable<SSTableReader> sstables)
{
List<Interval> intervals = new ArrayList<Interval>(Iterables.size(sstables));
for (SSTableReader sstable : sstables)
intervals.add(new Interval<SSTableReader>(sstable.first, sstable.last, sstable));
return new IntervalTree<SSTableReader>(intervals);
}

/**
* An immutable structure holding the current memtable, the memtables pending
* flush, the sstables for a column family, and the sstables that are active
Expand Down Expand Up @@ -584,14 +592,6 @@ public Sets.SetView<SSTableReader> nonCompactingSStables()
return Sets.difference(ImmutableSet.copyOf(sstables), compacting);
}

private IntervalTree buildIntervalTree(List<SSTableReader> sstables)
{
List<Interval> intervals = new ArrayList<Interval>(sstables.size());
for (SSTableReader sstable : sstables)
intervals.add(new Interval<SSTableReader>(sstable.first, sstable.last, sstable));
return new IntervalTree<SSTableReader>(intervals);
}

public View switchMemtable(Memtable newMemtable)
{
Set<Memtable> newPending = ImmutableSet.<Memtable>builder().addAll(memtablesPendingFlush).add(memtable).build();
Expand Down
Expand Up @@ -21,17 +21,17 @@

import java.util.*;

import com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.EchoedRow;
import org.apache.cassandra.db.*;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.utils.IntervalTree.Interval;
import org.apache.cassandra.utils.IntervalTree.IntervalTree;

/**
* Manage compaction options.
Expand All @@ -41,8 +41,8 @@ public class CompactionController
private static Logger logger = LoggerFactory.getLogger(CompactionController.class);

private final ColumnFamilyStore cfs;
private final Set<SSTableReader> sstables;
private final boolean forceDeserialize;
private final boolean deserializeRequired;
private final IntervalTree<SSTableReader> overlappingTree;

public final int gcBefore;
public boolean keyExistenceIsExpensive;
Expand All @@ -52,15 +52,16 @@ public CompactionController(ColumnFamilyStore cfs, Collection<SSTableReader> sst
{
assert cfs != null;
this.cfs = cfs;
this.sstables = new HashSet<SSTableReader>(sstables);
this.gcBefore = gcBefore;
// If we merge an old NodeId id, we must make sure that no further increment for that id are in an active memtable.
// For that, we must make sure that this id was renewed before the creation of the oldest unflushed memtable. We
// add 5 minutes to be sure we're on the safe side in terms of thread safety (though we should be fine in our
// current 'stop all write during memtable switch' situation).
this.mergeShardBefore = (int) ((cfs.oldestUnflushedMemtable() + 5 * 3600) / 1000);
this.forceDeserialize = forceDeserialize;
keyExistenceIsExpensive = cfs.getCompactionStrategy().isKeyExistenceExpensive(this.sstables);
deserializeRequired = forceDeserialize || !allLatestVersion(sstables);
Set<SSTableReader> overlappingSSTables = cfs.getOverlappingSSTables(sstables);
overlappingTree = DataTracker.buildIntervalTree(overlappingSSTables);
keyExistenceIsExpensive = cfs.getCompactionStrategy().isKeyExistenceExpensive(ImmutableSet.copyOf(sstables));
}

public String getKeyspace()
Expand All @@ -79,19 +80,21 @@ public String getColumnFamily()
*/
public boolean shouldPurge(DecoratedKey key)
{
return !cfs.isKeyInRemainingSSTables(key, sstables);
List<SSTableReader> filteredSSTables = overlappingTree.search(new Interval(key, key));
for (SSTableReader sstable : filteredSSTables)
{
if (sstable.getBloomFilter().isPresent(key.key))
return false;
}
return true;
}

public boolean needDeserialize()
private static boolean allLatestVersion(Iterable<SSTableReader> sstables)
{
if (forceDeserialize)
return true;

for (SSTableReader sstable : sstables)
if (!sstable.descriptor.isLatestVersion)
return true;

return false;
return false;
return true;
}

public void invalidateCachedRow(DecoratedKey key)
Expand Down Expand Up @@ -128,7 +131,7 @@ public AbstractCompactedRow getCompactedRow(List<SSTableIdentityIterator> rows)

// in-memory echoedrow is only enabled if we think checking for the key's existence in the other sstables,
// is going to be less expensive than simply de/serializing the row again
if (rows.size() == 1 && !needDeserialize()
if (rows.size() == 1 && !deserializeRequired
&& (rowSize > DatabaseDescriptor.getInMemoryCompactionLimit() || !keyExistenceIsExpensive)
&& !shouldPurge(rows.get(0).getKey()))
{
Expand Down

0 comments on commit 45af95a

Please sign in to comment.