Skip to content

Commit

Permalink
CASSANDRA-8460 - enable DTCS to archive non-compactable tables to slo…
Browse files Browse the repository at this point in the history
…w disk tier
  • Loading branch information
jeffjirsa committed Aug 15, 2015
1 parent 793bf45 commit cc0ab8f
Show file tree
Hide file tree
Showing 17 changed files with 508 additions and 28 deletions.
1 change: 1 addition & 0 deletions build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1235,6 +1235,7 @@
</junit>
<delete quiet="true" failonerror="false" dir="${build.test.dir}/cassandra/commitlog:@{poffset}"/>
<delete quiet="true" failonerror="false" dir="${build.test.dir}/cassandra/data:@{poffset}"/>
<delete quiet="true" failonerror="false" dir="${build.test.dir}/cassandra/slowdata:@{poffset}"/>
<delete quiet="true" failonerror="false" dir="${build.test.dir}/cassandra/saved_caches:@{poffset}"/>
</sequential>
</macrodef>
Expand Down
7 changes: 7 additions & 0 deletions conf/cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,13 @@ partitioner: org.apache.cassandra.dht.Murmur3Partitioner
# data_file_directories:
# - /var/lib/cassandra/data

# Compaction strategies (such as date-tiered compaction) may desire
# the ability to archive older, non-compactable sstables to a slower
# tier of storage.
# If not set, the archive tiered storage is disabled
# archive_file_directories:
# - /var/lib/cassandra/slow-data

# commit log. when running on magnetic HDD, this should be a
# separate spindle than the data directories.
# If not set, the default directory is $CASSANDRA_HOME/data/commitlog.
Expand Down
1 change: 1 addition & 0 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ public class Config
public volatile Integer inter_dc_stream_throughput_outbound_megabits_per_sec = 0;

public String[] data_file_directories = new String[0];
public String[] archive_data_file_directories = new String[0];

public String saved_caches_directory;

Expand Down
27 changes: 27 additions & 0 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.primitives.Longs;

import org.apache.commons.lang3.ArrayUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -508,6 +510,17 @@ public int compare(InetAddress endpoint1, InetAddress endpoint2)
throw new ConfigurationException("saved_caches_directory must not be the same as any data_file_directories", false);
}

if (conf.archive_data_file_directories != null)
{
for (String datadir : conf.archive_data_file_directories)
{
if (datadir.equals(conf.commitlog_directory))
throw new ConfigurationException("commitlog_directory must not be the same as any archive_data_file_directories", false);
if (datadir.equals(conf.saved_caches_directory))
throw new ConfigurationException("saved_caches_directory must not be the same as any archive_data_file_directories", false);
}
}

if (conf.commitlog_directory.equals(conf.saved_caches_directory))
throw new ConfigurationException("saved_caches_directory must not be the same as the commitlog_directory", false);

Expand Down Expand Up @@ -718,6 +731,10 @@ public static void createAllDirectories()
FileUtils.createDirectory(dataFileDirectory);
}

if (conf.archive_data_file_directories != null && conf.archive_data_file_directories.length > 0)
for (String dataFileDirectory : conf.archive_data_file_directories)
FileUtils.createDirectory(dataFileDirectory);

if (conf.commitlog_directory == null)
throw new ConfigurationException("commitlog_directory must be specified", false);

Expand Down Expand Up @@ -1101,10 +1118,20 @@ public static void setInterDCStreamThroughputOutboundMegabitsPerSec(int value)
}

public static String[] getAllDataFileLocations()
{
return ArrayUtils.addAll(conf.data_file_directories, conf.archive_data_file_directories);
}

public static String[] getStandardDataFileLocations()
{
return conf.data_file_directories;
}

public static String[] getArchiveDataFileLocations()
{
return conf.archive_data_file_directories;
}

public static String getCommitLogLocation()
{
return conf.commitlog_directory;
Expand Down
106 changes: 91 additions & 15 deletions src/java/org/apache/cassandra/db/Directories.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.google.common.collect.ImmutableSet.Builder;
import com.google.common.collect.Iterables;

import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -96,12 +97,31 @@ public class Directories
public static final String SECONDARY_INDEX_NAME_SEPARATOR = ".";

public static final DataDirectory[] dataDirectories;
public static final DataDirectory[] standardDataDirectories;
public static final DataDirectory[] archiveDataDirectories;

static
{
String[] locations = DatabaseDescriptor.getAllDataFileLocations();
dataDirectories = new DataDirectory[locations.length];
for (int i = 0; i < locations.length; ++i)
dataDirectories[i] = new DataDirectory(new File(locations[i]));
String[] standardDataFileLocations = DatabaseDescriptor.getStandardDataFileLocations();
standardDataDirectories = new DataDirectory[standardDataFileLocations.length];
for (int i = 0; i < standardDataFileLocations.length; ++i)
standardDataDirectories[i] = new DataDirectory(new File(standardDataFileLocations[i]));

String[] archiveDataFileLocations = DatabaseDescriptor.getArchiveDataFileLocations();
if (archiveDataFileLocations != null)
{
archiveDataDirectories = new DataDirectory[archiveDataFileLocations.length];
for (int i = 0; i < archiveDataFileLocations.length; ++i)
archiveDataDirectories[i] = new DataDirectory(new File(archiveDataFileLocations[i]));

dataDirectories = ArrayUtils.addAll(standardDataDirectories, archiveDataDirectories);
}
else
{
archiveDataDirectories = new DataDirectory[0];
dataDirectories = standardDataDirectories;
}

}

/**
Expand Down Expand Up @@ -179,6 +199,8 @@ public static boolean hasPrivilege(File file, FileAction action)

private final CFMetaData metadata;
private final File[] dataPaths;
private final File[] standardDataPaths;
private final File[] archiveDataPaths;

/**
* Create Directories of given ColumnFamily.
Expand All @@ -195,14 +217,21 @@ public Directories(final CFMetaData metadata)
String cfName = idx >= 0 ? metadata.cfName.substring(0, idx) : metadata.cfName;
String indexNameWithDot = idx >= 0 ? metadata.cfName.substring(idx) : null;

this.dataPaths = new File[dataDirectories.length];
this.standardDataPaths = new File[standardDataDirectories.length];
this.archiveDataPaths = new File[archiveDataDirectories.length];
// If upgraded from version less than 2.1, use existing directories
String oldSSTableRelativePath = join(metadata.ksName, cfName);
for (int i = 0; i < dataDirectories.length; ++i)
for (int i = 0; i < standardDataDirectories.length; ++i)
{
// check if old SSTable directory exists
dataPaths[i] = new File(dataDirectories[i].location, oldSSTableRelativePath);
standardDataPaths[i] = new File(standardDataDirectories[i].location, oldSSTableRelativePath);
}
for (int i = 0 ; i < archiveDataDirectories.length; ++i)
{
archiveDataPaths[i] = new File(archiveDataDirectories[i].location, oldSSTableRelativePath);
}
this.dataPaths = ArrayUtils.addAll(standardDataPaths, archiveDataPaths);

boolean olderDirectoryExists = Iterables.any(Arrays.asList(dataPaths), new Predicate<File>()
{
public boolean apply(File file)
Expand Down Expand Up @@ -315,19 +344,42 @@ public File getWriteableLocationAsFile(long writeSize)
}

/**
* Returns a non-blacklisted data directory that _currently_ has {@code writeSize} bytes as usable space.
* Returns a non-blacklisted, non-archive data directory that _currently_ has {@code writeSize} bytes as usable space.
*
* @throws IOError if all directories are blacklisted.
*/
public DataDirectory getWriteableLocation(long writeSize)
{
return getWriteableLocation(writeSize, false);
}

/**
* Returns a non-blacklisted data directory that _currently_ has {@code writeSize} bytes as usable space.
*
* @throws IOError if all directories are blacklisted.
*/
public DataDirectory getWriteableLocation(long writeSize, boolean getArchiveDirectory)
{
List<DataDirectoryCandidate> candidates = new ArrayList<>();

long totalAvailable = 0L;

DataDirectory[] dataDirs;
if(getArchiveDirectory)
{
// data directories are guaranteed to be defined
// but archive data directories are not
if (archiveDataDirectories == null)
throw new IOError(new IOException("No archive data directories are configured"));

dataDirs = archiveDataDirectories;
}
else
dataDirs = standardDataDirectories;

// pick directories with enough space and so that resulting sstable dirs aren't blacklisted for writes.
boolean tooBig = false;
for (DataDirectory dataDir : dataDirectories)
for (DataDirectory dataDir : dataDirs)
{
if (BlacklistedDirectories.isUnwritable(getLocationForDisk(dataDir)))
{
Expand Down Expand Up @@ -893,18 +945,42 @@ private static String join(String... s)
}

@VisibleForTesting
static void overrideDataDirectoriesForTest(String loc)
static void overrideDataDirectoriesForTest(String standardLoc, String archiveLoc)
{
for (int i = 0; i < dataDirectories.length; ++i)
dataDirectories[i] = new DataDirectory(new File(loc));
for (int i = 0 ; i < standardDataDirectories.length; ++i)
standardDataDirectories[i] = new DataDirectory(new File(standardLoc));

for (int i = 0; i < archiveDataDirectories.length; ++i)
archiveDataDirectories[i] = new DataDirectory(new File(archiveLoc));

for (int i = 0; i < standardDataDirectories.length; ++i)
dataDirectories[i] = standardDataDirectories[i];

for (int j = 0; j < archiveDataDirectories.length; ++j)
dataDirectories[j + standardDataDirectories.length] = archiveDataDirectories[j];
}

@VisibleForTesting
static void resetDataDirectoriesAfterTest()
{
String[] locations = DatabaseDescriptor.getAllDataFileLocations();
for (int i = 0; i < locations.length; ++i)
dataDirectories[i] = new DataDirectory(new File(locations[i]));
int j = 0;
String[] standardLocations = DatabaseDescriptor.getStandardDataFileLocations();
for (int i = 0; i < standardLocations.length; ++i)
{
standardDataDirectories[i] = new DataDirectory(new File(standardLocations[i]));
dataDirectories[j] = standardDataDirectories[i];
j++;
}

String[] archiveLocations = DatabaseDescriptor.getArchiveDataFileLocations();
if (archiveLocations != null)
{
for (int i = 0; i < archiveLocations.length; ++i) {
archiveDataDirectories[i] = new DataDirectory(new File(archiveLocations[i]));
dataDirectories[j] = archiveDataDirectories[i];
j++;
}
}
}

private class TrueFilesSizeVisitor extends SimpleFileVisitor<Path>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.lifecycle.SSTableSet;
Expand Down Expand Up @@ -66,12 +67,24 @@ public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
{
List<SSTableReader> latestBucket = getNextBackgroundSSTables(gcBefore);

if (latestBucket.isEmpty())
return null;

LifecycleTransaction modifier = cfs.getTracker().tryModify(latestBucket, OperationType.COMPACTION);
if (modifier != null)
return new CompactionTask(cfs, modifier, gcBefore);
// Normal compaction candidates
if ( !latestBucket.isEmpty() )
{
LifecycleTransaction modifier = cfs.getTracker().tryModify(latestBucket, OperationType.COMPACTION);
if (modifier != null)
return new DateTieredCompactionTask(cfs, modifier, gcBefore, false);
}
// Archive compaction candidates
else
{
List<SSTableReader> archiveCandidates = getArchiveCandidates(gcBefore);
if (archiveCandidates.isEmpty())
return null;
LifecycleTransaction modifier = cfs.getTracker().tryModify(archiveCandidates, OperationType.COMPACTION);
if (modifier != null)
return new DateTieredCompactionTask(cfs, modifier, gcBefore, true);
}
return null;
}
}

Expand Down Expand Up @@ -141,6 +154,26 @@ private List<SSTableReader> getCompactionCandidates(Iterable<SSTableReader> cand
return null;
}

/**
+ * If archive data directories are defined, and archiving is set in the DTCS options, find appropriate archival candidates
+ *
+ * @param gcBefore
+ * @return List of SSTableReaders suitable for archive to slow disk (likely one, potentially multiple)
+ */
private List<SSTableReader> getArchiveCandidates(int gcBefore)
{
if (!options.sstableArchive || DatabaseDescriptor.getArchiveDataFileLocations() == null )
return Collections.emptyList();

long now = getNow();

Set<SSTableReader> uncompacting = ImmutableSet.copyOf(filter(cfs.getUncompactingSSTables(), sstables::contains));
Set<SSTableReader> candidates = Sets.newHashSet(filterSuspectSSTables(uncompacting));

Iterable<SSTableReader> iCandidates = filterNonArchivableSSTables(Lists.newArrayList(candidates), options.maxSSTableAge, now);
return Lists.newArrayList(iCandidates);
}

/**
* Gets the timestamp that DateTieredCompactionStrategy considers to be the "current time".
* @return the maximum timestamp across all SSTables.
Expand Down Expand Up @@ -178,6 +211,33 @@ public boolean apply(SSTableReader sstable)
});
}

/**
* Removes all sstables that aren't eligible to be archived
* An sstable is eligible iff max timestamp older >= maxSSTableAge and the sstable is not already archived
*
* @param sstables all sstables to consider
* @param archiveSSTableAge the age in milliseconds when an SSTable can be moved to archive tier storage
* @param now current time. SSTables with max timestamp less than (now - maxSSTableAge) are filtered.
* @return a list of sstables where the max timestamp is older then sstable max age days
*/
@VisibleForTesting
static Iterable<SSTableReader> filterNonArchivableSSTables(List<SSTableReader> sstables, long archiveSSTableAge, long now)
{
if (archiveSSTableAge == 0)
return Collections.EMPTY_LIST;

final long archiveCutoff = now - archiveSSTableAge;

return Iterables.filter(sstables, new Predicate<SSTableReader>()
{
@Override
public boolean apply(SSTableReader sstable)
{
return (!sstable.isArchivedDiskDirectory() && sstable.getMaxTimestamp() < archiveCutoff );
}
});
}

/**
*
* @param sstables
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ public final class DateTieredCompactionStrategyOptions
protected static final String TIMESTAMP_RESOLUTION_KEY = "timestamp_resolution";
protected static final String MAX_SSTABLE_AGE_KEY = "max_sstable_age_days";
protected static final String BASE_TIME_KEY = "base_time_seconds";
protected static final String ARCHIVE_SSTABLE_KEY = "archive_sstables";

protected final long maxSSTableAge;
protected final long baseTime;
protected final boolean sstableArchive;

public DateTieredCompactionStrategyOptions(Map<String, String> options)
{
Expand All @@ -43,12 +45,18 @@ public DateTieredCompactionStrategyOptions(Map<String, String> options)
maxSSTableAge = Math.round(fractionalDays * timestampResolution.convert(1, TimeUnit.DAYS));
optionValue = options.get(BASE_TIME_KEY);
baseTime = timestampResolution.convert(optionValue == null ? DEFAULT_BASE_TIME_SECONDS : Long.parseLong(optionValue), TimeUnit.SECONDS);
optionValue = options.get(ARCHIVE_SSTABLE_KEY);
if(Boolean.parseBoolean(optionValue))
sstableArchive = true;
else
sstableArchive = false;
}

public DateTieredCompactionStrategyOptions()
{
maxSSTableAge = Math.round(DEFAULT_MAX_SSTABLE_AGE_DAYS * DEFAULT_TIMESTAMP_RESOLUTION.convert(1, TimeUnit.DAYS));
baseTime = DEFAULT_TIMESTAMP_RESOLUTION.convert(DEFAULT_BASE_TIME_SECONDS, TimeUnit.SECONDS);
sstableArchive = false;
}

public static Map<String, String> validateOptions(Map<String, String> options, Map<String, String> uncheckedOptions) throws ConfigurationException
Expand Down Expand Up @@ -95,6 +103,7 @@ public static Map<String, String> validateOptions(Map<String, String> options, M
uncheckedOptions.remove(MAX_SSTABLE_AGE_KEY);
uncheckedOptions.remove(BASE_TIME_KEY);
uncheckedOptions.remove(TIMESTAMP_RESOLUTION_KEY);
uncheckedOptions.remove(ARCHIVE_SSTABLE_KEY);

return uncheckedOptions;
}
Expand Down

0 comments on commit cc0ab8f

Please sign in to comment.