Skip to content

Commit

Permalink
Limit the number of L0 sstables by waiting for compaction.
Browse files Browse the repository at this point in the history
Summary:
With large key space (billions number of keys), cassandra repair creates over 4000 L0 sstables and cause regression in read performance. See https://issues.apache.org/jira/browse/CASSANDRA-11432 for detailed sympton.

As we discuss with the community in https://issues.apache.org/jira/browse/CASSANDRA-10862, one way they suggest is we doing the compaction before make repaired table available for read.

However, with my investigation, there are several issues/problem with this approach:

  - The compaction code is strongly coupled with compactionmanager, which has limit apis to schedule compactions as we wanted, and also it's not guaranteed to be executed.
  - Also there is no easy way to manage sstables without make them available to read.

So, instead, we here introduced a different approach: Don't add the sstables for reading before the number of L0 sstables smaller than a configerable threshold. Since the compaction is always running, this is guaranteeded that after some time, the number of sstables in L0 will be reduced after compaction.
  • Loading branch information
scv119 committed Jun 9, 2016
1 parent 360541f commit 149d127
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 7 deletions.
2 changes: 2 additions & 0 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ public class Config
public volatile Integer compaction_throughput_mb_per_sec = 16;
public volatile Integer compaction_large_partition_warning_threshold_mb = 100;

public Integer compaction_max_l0_sstable_count = 0;

public Integer max_streaming_retries = 3;

public volatile Integer stream_throughput_outbound_megabits_per_sec = 200;
Expand Down
5 changes: 5 additions & 0 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -1141,6 +1141,11 @@ public static boolean getDisableSTCSInL0()
return Boolean.getBoolean("cassandra.disable_stcs_in_l0");
}

public static int getCompactionMaxL0SStableCount()
{
return conf.compaction_max_l0_sstable_count;
}

public static int getStreamThroughputOutboundMegabitsPerSec()
{
return conf.stream_throughput_outbound_megabits_per_sec;
Expand Down
11 changes: 11 additions & 0 deletions src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.cassandra.db.lifecycle.Tracker;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.notifications.INotificationConsumer;
import org.json.simple.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -3197,4 +3198,14 @@ public static ColumnFamilyStore getIfExists(String ksName, String cfName)

return keyspace.getColumnFamilyStore(id);
}

public void subscribe(INotificationConsumer consumer)
{
data.subscribe(consumer);
}

public void unsubscribe(INotificationConsumer consumer)
{
data.unsubscribe(consumer);
}
}
73 changes: 66 additions & 7 deletions src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,19 @@
import java.io.File;
import java.io.IOError;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.notifications.INotification;
import org.apache.cassandra.notifications.INotificationConsumer;
import org.apache.cassandra.notifications.SSTableListChangedNotification;
import org.apache.cassandra.transport.Connection;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,7 +45,6 @@
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Pair;

import org.apache.cassandra.utils.concurrent.Refs;

Expand Down Expand Up @@ -104,18 +108,71 @@ public long getTotalSize()

private static class OnCompletionRunnable implements Runnable
{
private final static Map<ColumnFamilyStore, Pair<Lock, Condition>> SSTABLE_CHANGED_LOCK_BY_CFS = new HashMap<>();
private final StreamReceiveTask task;

public OnCompletionRunnable(StreamReceiveTask task)
{
this.task = task;
}

private void maybeWaitForCompaction(ColumnFamilyStore cfs) {
int compactionMaxL0SStableNumber = DatabaseDescriptor.getCompactionMaxL0SStableCount();
if (compactionMaxL0SStableNumber == 0)
return;

if (!SSTABLE_CHANGED_LOCK_BY_CFS.containsKey(cfs)) {
synchronized (cfs) {
if (!SSTABLE_CHANGED_LOCK_BY_CFS.containsKey(cfs)) {
final Lock sstableChangedLock = new ReentrantLock();
final Condition sstableChangedCondition = sstableChangedLock.newCondition();
SSTABLE_CHANGED_LOCK_BY_CFS.put(cfs, Pair.of(sstableChangedLock, sstableChangedCondition));

cfs.subscribe(new INotificationConsumer() {
@Override
public void handleNotification(INotification notification, Object sender) {
if (notification instanceof SSTableListChangedNotification) {
sstableChangedLock.lock();
sstableChangedCondition.signal();
sstableChangedLock.unlock();
}
}
});
}
}
}

try {
Pair<Lock, Condition> pairs = SSTABLE_CHANGED_LOCK_BY_CFS.get(cfs);
final Lock sstableChangedLock = pairs.getLeft();
final Condition sstableChangedCondition = pairs.getRight();

while (getL0SStableCount(cfs) > compactionMaxL0SStableNumber) {
sstableChangedLock.lock();
try {
sstableChangedCondition.await();
} finally {
sstableChangedLock.unlock();
}
}
} catch (InterruptedException e) {
logger.warn("Receive task interrupted while waiting for compaction, {}", e);
}
}

private int getL0SStableCount(ColumnFamilyStore cfs) {
int[] leveledSStables = cfs.getSSTableCountPerLevel();
if (leveledSStables != null && leveledSStables.length > 0) {
return leveledSStables[0];
}
return 0;
}

public void run()
{
try
{
Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
org.apache.cassandra.utils.Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
if (kscf == null)
{
// schema was dropped during streaming
Expand All @@ -126,6 +183,8 @@ public void run()
}
ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);

maybeWaitForCompaction(cfs);

File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
lockfile.create(task.sstables);
Expand Down

0 comments on commit 149d127

Please sign in to comment.