Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
tjake committed Jul 27, 2016
1 parent dc9ed46 commit 32bed3c
Show file tree
Hide file tree
Showing 20 changed files with 752 additions and 23 deletions.
6 changes: 4 additions & 2 deletions build.xml
Expand Up @@ -449,7 +449,8 @@
<dependency groupId="net.mintern" artifactId="primitive" version="1.0" />
<dependency groupId="com.github.rholder" artifactId="snowball-stemmer" version="1.3.0.581.1" />
<dependency groupId="com.googlecode.concurrent-trees" artifactId="concurrent-trees" version="2.4.0" />
<dependency groupId="com.github.ben-manes.caffeine" artifactId="caffeine" version="2.2.6" />
<dependency groupId="com.github.ben-manes.caffeine" artifactId="caffeine" version="2.2.6" />
<dependency groupId="net.openhft" artifactId="affinity" version="2.2" />
</dependencyManagement>
<developer id="alakshman" name="Avinash Lakshman"/>
<developer id="aleksey" name="Aleksey Yeschenko"/>
Expand Down Expand Up @@ -614,7 +615,8 @@
<dependency groupId="org.fusesource" artifactId="sigar"/>
<dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj"/>
<dependency groupId="org.caffinitas.ohc" artifactId="ohc-core"/>
<dependency groupId="com.github.ben-manes.caffeine" artifactId="caffeine" />
<dependency groupId="com.github.ben-manes.caffeine" artifactId="caffeine" />
<dependency groupId="net.openhft" artifactId="affinity"/>
</artifact:pom>
<artifact:pom id="thrift-pom"
artifactId="cassandra-thrift"
Expand Down
Binary file added lib/affinity-2.2.jar
Binary file not shown.
14 changes: 11 additions & 3 deletions src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
Expand Up @@ -20,7 +20,8 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

import io.netty.util.concurrent.FastThreadLocalThread;
import org.apache.cassandra.utils.AffinityThreadGroup;
import org.apache.cassandra.utils.AffinityThread;

/**
* This class is an implementation of the <i>ThreadFactory</i> interface. This
Expand All @@ -30,6 +31,8 @@

public class NamedThreadFactory implements ThreadFactory
{
public static final AffinityThreadGroup cassandraThreadGroup = new AffinityThreadGroup("Cassandra-Thread-Group");

public final String id;
private final int priority;
private final ClassLoader contextClassLoader;
Expand All @@ -41,9 +44,14 @@ public NamedThreadFactory(String id)
this(id, Thread.NORM_PRIORITY);
}

public NamedThreadFactory(String id, ThreadGroup threadGroup)
{
this(id, Thread.NORM_PRIORITY, null, threadGroup);
}

public NamedThreadFactory(String id, int priority)
{
this(id, priority, null, null);
this(id, priority, null, cassandraThreadGroup);
}

public NamedThreadFactory(String id, int priority, ClassLoader contextClassLoader, ThreadGroup threadGroup)
Expand All @@ -57,7 +65,7 @@ public NamedThreadFactory(String id, int priority, ClassLoader contextClassLoade
public Thread newThread(Runnable runnable)
{
String name = id + ":" + n.getAndIncrement();
Thread thread = new FastThreadLocalThread(threadGroup, runnable, name);
Thread thread = new AffinityThread(threadGroup, runnable, name);
thread.setPriority(priority);
thread.setDaemon(true);
if (contextClassLoader != null)
Expand Down
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/concurrent/SEPWorker.java
Expand Up @@ -47,8 +47,8 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl
{
this.pool = pool;
this.workerId = workerId;
thread = new FastThreadLocalThread(this, pool.poolName + "-Worker-" + workerId);
thread.setDaemon(true);
thread = pool.threadFactory.newThread(this);
thread.setName(pool.poolName + "-Worker-" + workerId);
set(initialState);
thread.start();
}
Expand Down
Expand Up @@ -74,9 +74,12 @@ public class SharedExecutorPool
// the collection of threads that have been asked to stop/deschedule - new workers are scheduled from here last
final ConcurrentSkipListMap<Long, SEPWorker> descheduled = new ConcurrentSkipListMap<>();

final NamedThreadFactory threadFactory;

public SharedExecutorPool(String poolName)
{
this.poolName = poolName;
threadFactory = new NamedThreadFactory(poolName);
}

void schedule(Work work)
Expand Down
72 changes: 69 additions & 3 deletions src/java/org/apache/cassandra/db/compaction/CompactionManager.java
Expand Up @@ -29,6 +29,7 @@
import javax.management.openmbean.TabularData;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.*;
import com.google.common.util.concurrent.*;
import org.slf4j.Logger;
Expand All @@ -38,6 +39,7 @@
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
Expand Down Expand Up @@ -83,6 +85,7 @@ public class CompactionManager implements CompactionManagerMBean
public static final String MBEAN_OBJECT_NAME = "org.apache.cassandra.db:type=CompactionManager";
private static final Logger logger = LoggerFactory.getLogger(CompactionManager.class);
public static final CompactionManager instance;
private static final AffinityThreadGroup compactionThreadGroup = new AffinityThreadGroup("Compaction-Thread-Group");

public static final int NO_GC = Integer.MIN_VALUE;
public static final int GC_ALL = Integer.MAX_VALUE;
Expand Down Expand Up @@ -110,6 +113,59 @@ protected Boolean initialValue()
{
throw new RuntimeException(e);
}


if (!FBUtilities.isWindows())
{
//Start checking the isolationRatio
ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(() -> {

double ratio = instance.isolationRatio;
int numCores = FBUtilities.getAvailableProcessors();
int activeCompactions = instance.getActiveCompactions();

// What's the point?
if (numCores <= 4)
return;

// Default case
if (compactionThreadGroup.getAffinityMask() == AffinityThreadGroup.ALL_CORES &&
NamedThreadFactory.cassandraThreadGroup.getAffinityMask() == AffinityThreadGroup.ALL_CORES
&& ratio == 0.0)
{
return;
}

// Remove all isolation
if (ratio == 0.0 || activeCompactions == 0)
{
compactionThreadGroup.setAffinityMask(AffinityThreadGroup.ALL_CORES);
NamedThreadFactory.cassandraThreadGroup.setAffinityMask(AffinityThreadGroup.ALL_CORES);
return;
}

long compactionMask = 0L;
long otherMask = 0L;

long compactionCores = (long) (numCores * ratio);
//compactionCores = Math.min(activeCompactions, compactionCores);

logger.debug("Compaction cores = {}", compactionCores);

//TODO: do we need to select a cpu offset since you can
//specify the num cores?
for (int i = numCores - 1; i >= 0 ; i--)
{
if (i >= numCores - compactionCores)
compactionMask |= 1L << i;
else
otherMask |= 1L << i;
}

NamedThreadFactory.cassandraThreadGroup.setAffinityMask(otherMask);
compactionThreadGroup.setAffinityMask(compactionMask);
}, 10, 5, TimeUnit.SECONDS);
}
}

private final CompactionExecutor executor = new CompactionExecutor();
Expand All @@ -121,6 +177,9 @@ protected Boolean initialValue()

private final RateLimiter compactionRateLimiter = RateLimiter.create(Double.MAX_VALUE);

//Represents the maximum number of cores to isolate compactions on
private double isolationRatio = 0.0d;

/**
* Gets compaction rate limiter.
* Rate unit is bytes per sec.
Expand Down Expand Up @@ -1189,7 +1248,6 @@ private void doValidationCompaction(ColumnFamilyStore cfs, Validator validator)
Refs<SSTableReader> sstables = null;
try
{

int gcBefore;
int nowInSec = FBUtilities.nowInSeconds();
UUID parentRepairSessionId = validator.desc.parentSessionId;
Expand Down Expand Up @@ -1604,21 +1662,29 @@ public void run()

return executor.submit(runnable);
}

public int getActiveCompactions()
{
return CompactionMetrics.getCompactions().size();
}

public void setIsolationRatio(double ratio)
{
Preconditions.checkArgument(ratio >= 0.0 && ratio < 1.0);
logger.info("Setting compaction isolation ratio to {}", ratio);
this.isolationRatio = ratio;
}

private static class CompactionExecutor extends JMXEnabledThreadPoolExecutor
{
protected CompactionExecutor(int minThreads, int maxThreads, String name, BlockingQueue<Runnable> queue)
{
super(minThreads, maxThreads, 60, TimeUnit.SECONDS, queue, new NamedThreadFactory(name, Thread.MIN_PRIORITY), "internal");
super(minThreads, maxThreads, 60, TimeUnit.SECONDS, queue, new NamedThreadFactory(name, compactionThreadGroup), "internal");
}

private CompactionExecutor(int threadCount, String name)
{
this(threadCount, threadCount, name, new LinkedBlockingQueue<Runnable>());
this(threadCount, threadCount, name, new LinkedBlockingQueue<>());
}

public CompactionExecutor()
Expand Down
Expand Up @@ -116,4 +116,10 @@ public interface CompactionManagerMBean
* @param number New maximum of validator threads
*/
public void setMaximumValidatorThreads(int number);

/**
* Allows user to isolate compaction threads to their own set of cores
* @param ratio
*/
public void setIsolationRatio(double ratio);
}
8 changes: 6 additions & 2 deletions src/java/org/apache/cassandra/net/IncomingTcpConnection.java
Expand Up @@ -35,15 +35,17 @@
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.xxhash.XXHashFactory;

import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.utils.AffinityThread;
import org.xerial.snappy.SnappyInputStream;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.UnknownColumnFamilyException;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
import org.apache.cassandra.io.util.NIODataInputStream;

public class IncomingTcpConnection extends FastThreadLocalThread implements Closeable
public class IncomingTcpConnection extends AffinityThread implements Closeable
{
private static final Logger logger = LoggerFactory.getLogger(IncomingTcpConnection.class);

Expand All @@ -57,7 +59,7 @@ public class IncomingTcpConnection extends FastThreadLocalThread implements Clos

public IncomingTcpConnection(int version, boolean compressed, Socket socket, Set<Closeable> group)
{
super("MessagingService-Incoming-" + socket.getInetAddress());
super(NamedThreadFactory.cassandraThreadGroup, null, "MessagingService-Incoming-" + socket.getInetAddress());
this.version = version;
this.compressed = compressed;
this.socket = socket;
Expand All @@ -83,6 +85,8 @@ public IncomingTcpConnection(int version, boolean compressed, Socket socket, Set
@Override
public void run()
{
super.run();

try
{
if (version < MessagingService.VERSION_20)
Expand Down
8 changes: 6 additions & 2 deletions src/java/org/apache/cassandra/net/OutboundTcpConnection.java
Expand Up @@ -46,11 +46,13 @@
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.xxhash.XXHashFactory;

import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.AffinityThread;
import org.apache.cassandra.utils.CoalescingStrategies;
import org.apache.cassandra.utils.CoalescingStrategies.Coalescable;
import org.apache.cassandra.utils.CoalescingStrategies.CoalescingStrategy;
Expand All @@ -64,7 +66,7 @@

import com.google.common.util.concurrent.Uninterruptibles;

public class OutboundTcpConnection extends FastThreadLocalThread
public class OutboundTcpConnection extends AffinityThread
{
private static final Logger logger = LoggerFactory.getLogger(OutboundTcpConnection.class);

Expand Down Expand Up @@ -139,7 +141,7 @@ private static CoalescingStrategy newCoalescingStrategy(String displayName)

public OutboundTcpConnection(OutboundTcpConnectionPool pool)
{
super("MessagingService-Outgoing-" + pool.endPoint());
super(NamedThreadFactory.cassandraThreadGroup, null, "MessagingService-Outgoing-" + pool.endPoint());
this.poolReference = pool;
cs = newCoalescingStrategy(pool.endPoint().getHostAddress());

Expand Down Expand Up @@ -193,6 +195,8 @@ public int getTargetVersion()

public void run()
{
super.run();

final int drainedMessageSize = 128;
// keeping list (batch) size small for now; that way we don't have an unbounded array (that we never resize)
final List<QueuedMessage> drainedMessages = new ArrayList<>(drainedMessageSize);
Expand Down
Expand Up @@ -32,6 +32,7 @@
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.EventExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.metrics.AuthMetrics;
import org.apache.cassandra.metrics.ClientMetrics;
Expand Down Expand Up @@ -66,12 +67,12 @@ synchronized void initialize()

if (useEpoll())
{
workerGroup = new EpollEventLoopGroup();
workerGroup = new EpollEventLoopGroup(0, new NamedThreadFactory("epoll"));
logger.info("Netty using native Epoll event loop");
}
else
{
workerGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup(0, new NamedThreadFactory("nio"));
logger.info("Netty using Java NIO event loop");
}

Expand Down
Expand Up @@ -38,13 +38,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.util.concurrent.FastThreadLocalThread;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
import org.apache.cassandra.net.IncomingStreamingConnection;
import org.apache.cassandra.streaming.messages.StreamInitMessage;
import org.apache.cassandra.streaming.messages.StreamMessage;
import org.apache.cassandra.utils.AffinityThread;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;

Expand Down Expand Up @@ -216,7 +217,7 @@ public void start(Socket socket, int protocolVersion)
this.socket = socket;
this.protocolVersion = protocolVersion;

new FastThreadLocalThread(this, name() + "-" + socket.getRemoteSocketAddress()).start();
new AffinityThread(NamedThreadFactory.cassandraThreadGroup, this, name() + "-" + socket.getRemoteSocketAddress()).start();
}

public ListenableFuture<?> close()
Expand Down
Expand Up @@ -31,8 +31,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.util.concurrent.FastThreadLocalThread;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.utils.AffinityThread;
import org.apache.cassandra.utils.ChecksumType;
import org.apache.cassandra.utils.WrappedRunnable;

Expand Down Expand Up @@ -81,7 +82,7 @@ public CompressedInputStream(InputStream source, CompressionInfo info, ChecksumT
this.crcCheckChanceSupplier = crcCheckChanceSupplier;
this.checksumType = checksumType;

new FastThreadLocalThread(new Reader(source, info, dataBuffer)).start();
new AffinityThread(NamedThreadFactory.cassandraThreadGroup, new Reader(source, info, dataBuffer), "Compressed input reader").start();
}

private void decompressNextChunk() throws IOException
Expand Down
5 changes: 5 additions & 0 deletions src/java/org/apache/cassandra/tools/NodeProbe.java
Expand Up @@ -1080,6 +1080,11 @@ public void setTimeout(String type, long value)
}
}

public void setCompactionIsolationRatio(double ratio)
{
compactionProxy.setIsolationRatio(ratio);
}

public void stopById(String compactionId)
{
compactionProxy.stopCompactionById(compactionId);
Expand Down
1 change: 1 addition & 0 deletions src/java/org/apache/cassandra/tools/NodeTool.java
Expand Up @@ -86,6 +86,7 @@ public static void main(String... args)
GetEndpoints.class,
GetSSTables.class,
GossipInfo.class,
IsolateCompactions.class,
InvalidateKeyCache.class,
InvalidateRowCache.class,
InvalidateCounterCache.class,
Expand Down

0 comments on commit 32bed3c

Please sign in to comment.