Skip to content

Commit

Permalink
Ensure compaction strategies do not loop indefinitely when not able t…
Browse files Browse the repository at this point in the history
…o acquire Tracker lock

Patch by Paulo Motta; Reviewed by Marcus Eriksson for CASSANDRA-13948
  • Loading branch information
pauloricardomg committed Oct 31, 2017
1 parent ea443df commit 9fdb8f0
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 8 deletions.
Expand Up @@ -73,16 +73,28 @@ public DateTieredCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> o
@SuppressWarnings("resource")
public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
{
List<SSTableReader> previousCandidate = null;
while (true)
{
List<SSTableReader> latestBucket = getNextBackgroundSSTables(gcBefore);

if (latestBucket.isEmpty())
return null;

// Already tried acquiring references without success. It means there is a race with
// the tracker but candidate SSTables were not yet replaced in the compaction strategy manager
if (latestBucket.equals(previousCandidate))
{
logger.warn("Could not acquire references for compacting SSTables {} which is not a problem per se," +
"unless it happens frequently, in which case it must be reported. Will retry later.",
latestBucket);
return null;
}

LifecycleTransaction modifier = cfs.getTracker().tryModify(latestBucket, OperationType.COMPACTION);
if (modifier != null)
return new CompactionTask(cfs, modifier, gcBefore);
previousCandidate = latestBucket;
}
}

Expand Down Expand Up @@ -170,6 +182,8 @@ private long getNow()
// no need to convert to collection if had an Iterables.max(), but not present in standard toolkit, and not worth adding
List<SSTableReader> list = new ArrayList<>();
Iterables.addAll(list, cfs.getSSTables(SSTableSet.LIVE));
if (list.isEmpty())
return 0;
return Collections.max(list, (o1, o2) -> Long.compare(o1.getMaxTimestamp(), o2.getMaxTimestamp()))
.getMaxTimestamp();
}
Expand Down Expand Up @@ -462,7 +476,7 @@ public static Map<String, String> validateOptions(Map<String, String> options) t
return uncheckedOptions;
}

public CompactionLogger.Strategy strategyLogger()
public CompactionLogger.Strategy strategyLogger()
{
return new CompactionLogger.Strategy()
{
Expand Down
Expand Up @@ -62,12 +62,12 @@ public LeveledCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> opti
int configuredLevelFanoutSize = DEFAULT_LEVEL_FANOUT_SIZE;
SizeTieredCompactionStrategyOptions localOptions = new SizeTieredCompactionStrategyOptions(options);
if (options != null)
{
if (options.containsKey(SSTABLE_SIZE_OPTION))
{
configuredMaxSSTableSize = Integer.parseInt(options.get(SSTABLE_SIZE_OPTION));
{
if (options.containsKey(SSTABLE_SIZE_OPTION))
{
configuredMaxSSTableSize = Integer.parseInt(options.get(SSTABLE_SIZE_OPTION));
if (!tolerateSstableSize)
{
{
if (configuredMaxSSTableSize >= 1000)
logger.warn("Max sstable size of {}MB is configured for {}.{}; having a unit of compaction this large is probably a bad idea",
configuredMaxSSTableSize, cfs.name, cfs.getColumnFamilyName());
Expand Down Expand Up @@ -113,6 +113,7 @@ public void startup()
@SuppressWarnings("resource") // transaction is closed by AbstractCompactionTask::execute
public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
{
Collection<SSTableReader> previousCandidate = null;
while (true)
{
OperationType op;
Expand All @@ -136,13 +137,24 @@ public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
op = OperationType.COMPACTION;
}

// Already tried acquiring references without success. It means there is a race with
// the tracker but candidate SSTables were not yet replaced in the compaction strategy manager
if (candidate.sstables.equals(previousCandidate))
{
logger.warn("Could not acquire references for compacting SSTables {} which is not a problem per se," +
"unless it happens frequently, in which case it must be reported. Will retry later.",
candidate.sstables);
return null;
}

LifecycleTransaction txn = cfs.getTracker().tryModify(candidate.sstables, OperationType.COMPACTION);
if (txn != null)
{
LeveledCompactionTask newTask = new LeveledCompactionTask(cfs, txn, candidate.level, gcBefore, candidate.maxSSTableBytes, false);
newTask.setCompactionType(op);
return newTask;
}
previousCandidate = candidate.sstables;
}
}

Expand Down
Expand Up @@ -23,6 +23,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
Expand All @@ -43,6 +44,8 @@
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;

import static com.google.common.collect.Iterables.filter;

public class LeveledManifest
{
private static final Logger logger = LoggerFactory.getLogger(LeveledManifest.class);
Expand Down Expand Up @@ -351,7 +354,7 @@ public synchronized CompactionCandidate getCompactionCandidates()
continue; // mostly this just avoids polluting the debug log with zero scores
// we want to calculate score excluding compacting ones
Set<SSTableReader> sstablesInLevel = Sets.newHashSet(sstables);
Set<SSTableReader> remaining = Sets.difference(sstablesInLevel, cfs.getTracker().getCompacting());
Set<SSTableReader> remaining = ImmutableSet.copyOf(filter(cfs.getUncompactingSSTables(), sstablesInLevel::contains));
double score = (double) SSTableReader.getTotalBytes(remaining) / (double)maxBytesForLevel(i, maxSSTableSizeInBytes);
logger.trace("Compaction score for level {} is {}", i, score);

Expand Down
Expand Up @@ -65,7 +65,8 @@ private long avgSize(List<SSTableReader> sstables)

protected SizeTieredCompactionStrategyOptions sizeTieredOptions;
protected volatile int estimatedRemainingTasks;
private final Set<SSTableReader> sstables = new HashSet<>();
@VisibleForTesting
protected final Set<SSTableReader> sstables = new HashSet<>();

public SizeTieredCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
{
Expand Down Expand Up @@ -176,16 +177,28 @@ private static double hotness(SSTableReader sstr)
@SuppressWarnings("resource")
public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
{
List<SSTableReader> previousCandidate = null;
while (true)
{
List<SSTableReader> hottestBucket = getNextBackgroundSSTables(gcBefore);

if (hottestBucket.isEmpty())
return null;

// Already tried acquiring references without success. It means there is a race with
// the tracker but candidate SSTables were not yet replaced in the compaction strategy manager
if (hottestBucket.equals(previousCandidate))
{
logger.warn("Could not acquire references for compacting SSTables {} which is not a problem per se," +
"unless it happens frequently, in which case it must be reported. Will retry later.",
hottestBucket);
return null;
}

LifecycleTransaction transaction = cfs.getTracker().tryModify(hottestBucket, OperationType.COMPACTION);
if (transaction != null)
return new CompactionTask(cfs, transaction, gcBefore);
previousCandidate = hottestBucket;
}
}

Expand Down
Expand Up @@ -72,16 +72,28 @@ public TimeWindowCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> o
@SuppressWarnings("resource") // transaction is closed by AbstractCompactionTask::execute
public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
{
List<SSTableReader> previousCandidate = null;
while (true)
{
List<SSTableReader> latestBucket = getNextBackgroundSSTables(gcBefore);

if (latestBucket.isEmpty())
return null;

// Already tried acquiring references without success. It means there is a race with
// the tracker but candidate SSTables were not yet replaced in the compaction strategy manager
if (latestBucket.equals(previousCandidate))
{
logger.warn("Could not acquire references for compacting SSTables {} which is not a problem per se," +
"unless it happens frequently, in which case it must be reported. Will retry later.",
latestBucket);
return null;
}

LifecycleTransaction modifier = cfs.getTracker().tryModify(latestBucket, OperationType.COMPACTION);
if (modifier != null)
return new TimeWindowCompactionTask(cfs, modifier, gcBefore, options.ignoreOverlaps);
previousCandidate = latestBucket;
}
}

Expand Down
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.db.compaction;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;

import junit.framework.Assert;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.RowUpdateBuilder;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.schema.CompactionParams;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.utils.FBUtilities;

public class AbstractCompactionStrategyTest
{
private static final String KEYSPACE1 = "Keyspace1";
private static final String LCS_TABLE = "LCS_TABLE";
private static final String STCS_TABLE = "STCS_TABLE";
private static final String DTCS_TABLE = "DTCS_TABLE";
private static final String TWCS_TABLE = "TWCS_TABLE";

@BeforeClass
public static void loadData() throws ConfigurationException
{
Map<String, String> stcsOptions = new HashMap<>();
stcsOptions.put("tombstone_compaction_interval", "1");

SchemaLoader.prepareServer();
SchemaLoader.createKeyspace(KEYSPACE1,
KeyspaceParams.simple(1),
SchemaLoader.standardCFMD(KEYSPACE1, LCS_TABLE)
.compaction(CompactionParams.lcs(Collections.emptyMap())),
SchemaLoader.standardCFMD(KEYSPACE1, STCS_TABLE)
.compaction(CompactionParams.scts(Collections.emptyMap())),
SchemaLoader.standardCFMD(KEYSPACE1, DTCS_TABLE)
.compaction(CompactionParams.create(DateTieredCompactionStrategy.class, Collections.emptyMap())),
SchemaLoader.standardCFMD(KEYSPACE1, TWCS_TABLE)
.compaction(CompactionParams.create(TimeWindowCompactionStrategy.class, Collections.emptyMap())));
Keyspace.open(KEYSPACE1).getColumnFamilyStore(LCS_TABLE).disableAutoCompaction();
Keyspace.open(KEYSPACE1).getColumnFamilyStore(STCS_TABLE).disableAutoCompaction();
Keyspace.open(KEYSPACE1).getColumnFamilyStore(DTCS_TABLE).disableAutoCompaction();
Keyspace.open(KEYSPACE1).getColumnFamilyStore(TWCS_TABLE).disableAutoCompaction();
}

@After
public void tearDown()
{

Keyspace.open(KEYSPACE1).getColumnFamilyStore(LCS_TABLE).truncateBlocking();
Keyspace.open(KEYSPACE1).getColumnFamilyStore(STCS_TABLE).truncateBlocking();
Keyspace.open(KEYSPACE1).getColumnFamilyStore(DTCS_TABLE).truncateBlocking();
Keyspace.open(KEYSPACE1).getColumnFamilyStore(TWCS_TABLE).truncateBlocking();
}

@Test(timeout=30000)
public void testGetNextBackgroundTaskDoesNotBlockLCS()
{
testGetNextBackgroundTaskDoesNotBlock(LCS_TABLE);
}

@Test(timeout=30000)
public void testGetNextBackgroundTaskDoesNotBlockSTCS()
{
testGetNextBackgroundTaskDoesNotBlock(STCS_TABLE);
}

@Test(timeout=30000)
public void testGetNextBackgroundTaskDoesNotBlockDTCS()
{
testGetNextBackgroundTaskDoesNotBlock(DTCS_TABLE);
}

@Test(timeout=30000)
public void testGetNextBackgroundTaskDoesNotBlockTWCS()
{
testGetNextBackgroundTaskDoesNotBlock(TWCS_TABLE);
}

public void testGetNextBackgroundTaskDoesNotBlock(String table)
{
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(table);
AbstractCompactionStrategy strategy = cfs.getCompactionStrategyManager().getStrategies().get(1).get(0);

// Add 4 sstables
for (int i = 1; i <= 4; i++)
{
insertKeyAndFlush(table, i);
}

// Check they are returned on the next background task
try (LifecycleTransaction txn = strategy.getNextBackgroundTask(FBUtilities.nowInSeconds()).transaction)
{
Assert.assertEquals(cfs.getLiveSSTables(), txn.originals());
}

// now remove sstables on the tracker, to simulate a concurrent transaction
cfs.getTracker().removeUnsafe(cfs.getLiveSSTables());

// verify the compaction strategy will return null
Assert.assertNull(strategy.getNextBackgroundTask(FBUtilities.nowInSeconds()));
}


private static void insertKeyAndFlush(String table, int key)
{
long timestamp = System.currentTimeMillis();
DecoratedKey dk = Util.dk(String.format("%03d", key));
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(table);
new RowUpdateBuilder(cfs.metadata, timestamp, dk.getKey())
.clustering(String.valueOf(key))
.add("val", "val")
.build()
.applyUnsafe();
cfs.forceBlockingFlush();
}
}

0 comments on commit 9fdb8f0

Please sign in to comment.