Skip to content

Commit

Permalink
Cache disk boundaries on CompactionStrategyManager and reload strateg…
Browse files Browse the repository at this point in the history
…ies when boundary changes

Patch by Paulo Motta; Reviewed by Marcus Eriksson for CASSANDRA-13948
  • Loading branch information
pauloricardomg committed Nov 1, 2017
1 parent 9fdb8f0 commit efb2afb
Show file tree
Hide file tree
Showing 11 changed files with 413 additions and 142 deletions.
8 changes: 1 addition & 7 deletions src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Expand Up @@ -272,7 +272,6 @@ public void reload()
cfs.crcCheckChance = new DefaultValue(metadata.params.crcCheckChance);

compactionStrategyManager.maybeReload(metadata);
directories = compactionStrategyManager.getDirectories();

scheduleFlush();

Expand Down Expand Up @@ -430,13 +429,9 @@ public ColumnFamilyStore(Keyspace keyspace,
else
this.directories = new Directories(metadata, Directories.dataDirectories);


// compaction strategy should be created after the CFS has been prepared
compactionStrategyManager = new CompactionStrategyManager(this);

// Since compaction can re-define data dir we need to reinit directories
this.directories = compactionStrategyManager.getDirectories();

if (maxCompactionThreshold.value() <= 0 || minCompactionThreshold.value() <=0)
{
logger.warn("Disabling compaction strategy by setting compaction thresholds to 0 is deprecated, set the compaction option 'enabled' to 'false' instead.");
Expand Down Expand Up @@ -1444,7 +1439,6 @@ public void addSSTable(SSTableReader sstable)
public void addSSTables(Collection<SSTableReader> sstables)
{
data.addSSTables(sstables);
CompactionManager.instance.submitBackground(this);
}

/**
Expand Down Expand Up @@ -1584,7 +1578,7 @@ public void markObsolete(Collection<SSTableReader> sstables, OperationType compa

void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
{
compactionStrategyManager.replaceFlushed(memtable, sstables);
data.replaceFlushed(memtable, sstables);
}

public boolean isValid()
Expand Down
10 changes: 9 additions & 1 deletion src/java/org/apache/cassandra/db/Directories.java
Expand Up @@ -90,7 +90,8 @@ public class Directories
public static final String TMP_SUBDIR = "tmp";
public static final String SECONDARY_INDEX_NAME_SEPARATOR = ".";

public static final DataDirectory[] dataDirectories;
@VisibleForTesting
public static DataDirectory[] dataDirectories;

static
{
Expand Down Expand Up @@ -579,6 +580,13 @@ public int hashCode()
{
return location.hashCode();
}

public String toString()
{
return "DataDirectory{" +
"location=" + location +
'}';
}
}

static final class DataDirectoryCandidate implements Comparable<DataDirectoryCandidate>
Expand Down
Expand Up @@ -81,8 +81,6 @@ public abstract class AbstractCompactionStrategy
protected boolean disableTombstoneCompactions = false;
protected boolean logAll = true;

private final Directories directories;

/**
* pause/resume/getNextBackgroundTask must synchronize. This guarantees that after pause completes,
* no new tasks will be generated; or put another way, pause can't run until in-progress tasks are
Expand Down Expand Up @@ -124,13 +122,6 @@ protected AbstractCompactionStrategy(ColumnFamilyStore cfs, Map<String, String>
tombstoneCompactionInterval = DEFAULT_TOMBSTONE_COMPACTION_INTERVAL;
uncheckedTombstoneCompaction = DEFAULT_UNCHECKED_TOMBSTONE_COMPACTION_OPTION;
}

directories = cfs.getDirectories();
}

public Directories getDirectories()
{
return directories;
}

/**
Expand Down Expand Up @@ -238,19 +229,6 @@ public long getMemtableReservedSize()
return 0;
}

/**
* Handle a flushed memtable.
*
* @param memtable the flushed memtable
* @param sstables the written sstables. can be null or empty if the memtable was clean.
*/
public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
{
cfs.getTracker().replaceFlushed(memtable, sstables);
if (sstables != null && !sstables.isEmpty())
CompactionManager.instance.submitBackground(cfs);
}

/**
* Filters SSTables that are to be blacklisted from the given collection
*
Expand Down
Expand Up @@ -271,7 +271,6 @@ public void run()
{
compactingCF.remove(cfs);
}
submitBackground(cfs);
}
}

Expand Down Expand Up @@ -530,8 +529,7 @@ public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction)
Set<SSTableReader> needsRelocation = originals.stream().filter(s -> !inCorrectLocation(s)).collect(Collectors.toSet());
transaction.cancel(Sets.difference(originals, needsRelocation));

Map<Integer, List<SSTableReader>> groupedByDisk = needsRelocation.stream().collect(Collectors.groupingBy((s) ->
CompactionStrategyManager.getCompactionStrategyIndex(cfs, cfs.getDirectories(), s)));
Map<Integer, List<SSTableReader>> groupedByDisk = cfs.getCompactionStrategyManager().groupByDiskIndex(needsRelocation);

int maxSize = 0;
for (List<SSTableReader> diskSSTables : groupedByDisk.values())
Expand All @@ -551,7 +549,8 @@ private boolean inCorrectLocation(SSTableReader sstable)
{
if (!cfs.getPartitioner().splitter().isPresent())
return true;
int directoryIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, cfs.getDirectories(), sstable);
cfs.getCompactionStrategyManager().maybeReload(cfs.metadata); //maybe reload boundaries
int directoryIndex = cfs.getCompactionStrategyManager().getCompactionStrategyIndex(sstable);
Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();

Directories.DataDirectory location = locations[directoryIndex];
Expand Down

0 comments on commit efb2afb

Please sign in to comment.