Skip to content

Commit

Permalink
Move from ExecutorService per ColumnFamily to ES per Memtable. This a…
Browse files Browse the repository at this point in the history
…llows us to

wait for the ES to quiesce completely before flushing, preventing the possibility
of ConcurrentModificationException when a get scheduled before the switch executes
concurrently with flush.  It also provides a simpler mental model (only one thread
touches memtable at a time, period) which is a valuable property.  Finally, it is
slightly more performant since it avoids hashing the CF name for each operation.

Patch by jbellis; reviewed by Todd Lipcon for #9
  • Loading branch information
Jonathan Ellis committed Apr 15, 2009
1 parent 98c808e commit d40b9b3
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 55 deletions.
Expand Up @@ -32,7 +32,7 @@
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
*/

public final class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
{
private static Logger logger_ = Logger.getLogger(DebuggableThreadPoolExecutor.class);

Expand Down
103 changes: 49 additions & 54 deletions src/org/apache/cassandra/db/Memtable.java
Expand Up @@ -18,23 +18,19 @@

package org.apache.cassandra.db;

import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.Comparator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

Expand All @@ -49,6 +45,7 @@
import org.apache.cassandra.utils.LogUtil;
import org.apache.cassandra.service.IPartitioner;
import org.apache.cassandra.service.StorageService;
import org.cliffc.high_scale_lib.NonBlockingHashSet;

/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
Expand All @@ -57,18 +54,19 @@
public class Memtable implements Comparable<Memtable>
{
private static Logger logger_ = Logger.getLogger( Memtable.class );
private static Map<String, ExecutorService> apartments_ = new HashMap<String, ExecutorService>();
private static Set<ExecutorService> runningExecutorServices_ = new NonBlockingHashSet<ExecutorService>();
public static final String flushKey_ = "FlushKey";

public static void shutdown()
{
Set<String> names = apartments_.keySet();
for (String name : names)
{
apartments_.get(name).shutdownNow();
}
for (ExecutorService exs : runningExecutorServices_)
{
exs.shutdownNow();
}
}

private MemtableThreadPoolExecutor executor_;

private int threshold_ = DatabaseDescriptor.getMemtableSize()*1024*1024;
private int thresholdCount_ = DatabaseDescriptor.getMemtableObjectCount()*1024*1024;
private AtomicInteger currentSize_ = new AtomicInteger(0);
Expand All @@ -79,23 +77,15 @@ public static void shutdown()
private String cfName_;
/* Creation time of this Memtable */
private long creationTime_;
private boolean isFrozen_ = false;
private volatile boolean isFrozen_ = false;
private Map<String, ColumnFamily> columnFamilies_ = new HashMap<String, ColumnFamily>();
/* Lock and Condition for notifying new clients about Memtable switches */
Lock lock_ = new ReentrantLock();

Memtable(String table, String cfName)
{
if ( apartments_.get(cfName) == null )
{
apartments_.put(cfName, new DebuggableThreadPoolExecutor( 1,
1,
Integer.MAX_VALUE,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
new ThreadFactoryImpl("FAST-MEMTABLE-POOL")
));
}
executor_ = new MemtableThreadPoolExecutor();
runningExecutorServices_.add(executor_);

table_ = table;
cfName_ = cfName;
Expand Down Expand Up @@ -144,28 +134,6 @@ public ColumnFamily call()
}
}

/**
* Flushes the current memtable to disk.
*
* @author alakshman
*
*/
class Flusher implements Runnable
{
private CommitLog.CommitLogContext cLogCtx_;

Flusher(CommitLog.CommitLogContext cLogCtx)
{
cLogCtx_ = cLogCtx;
}

public void run()
{
ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
MemtableManager.instance().submit(cfName_, Memtable.this, cLogCtx_);
}
}

/**
* Compares two Memtable based on creation time.
* @param rhs Memtable to compare to.
Expand Down Expand Up @@ -222,8 +190,7 @@ String getColumnFamily()

void printExecutorStats()
{
DebuggableThreadPoolExecutor es = (DebuggableThreadPoolExecutor)apartments_.get(cfName_);
long taskCount = (es.getTaskCount() - es.getCompletedTaskCount());
long taskCount = (executor_.getTaskCount() - executor_.getCompletedTaskCount());
logger_.debug("MEMTABLE TASKS : " + taskCount);
}

Expand All @@ -232,25 +199,31 @@ void printExecutorStats()
* the memtable. This version will respect the threshold and flush
* the memtable to disk when the size exceeds the threshold.
*/
void put(String key, ColumnFamily columnFamily, CommitLog.CommitLogContext cLogCtx) throws IOException
void put(String key, ColumnFamily columnFamily, final CommitLog.CommitLogContext cLogCtx) throws IOException
{
if (isThresholdViolated(key) )
{
lock_.lock();
try
{
ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
final ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
if (!isFrozen_)
{
isFrozen_ = true;
/* Submit this Memtable to be flushed. */
Runnable flusher = new Flusher(cLogCtx);
apartments_.get(cfName_).submit(flusher);
/* switch the memtable */
Runnable flushQueuer = new Runnable()
{
public void run()
{
MemtableManager.instance().submit(cfStore.getColumnFamilyName(), Memtable.this, cLogCtx);
}
};
cfStore.switchMemtable(key, columnFamily, cLogCtx);
executor_.runOnTermination(flushQueuer);
executor_.shutdown();
}
else
{
// retry the put on the new memtable
cfStore.apply(key, columnFamily, cLogCtx);
}
}
Expand All @@ -263,7 +236,7 @@ void put(String key, ColumnFamily columnFamily, CommitLog.CommitLogContext cLogC
{
printExecutorStats();
Runnable putter = new Putter(key, columnFamily);
apartments_.get(cfName_).submit(putter);
executor_.submit(putter);
}
}

Expand Down Expand Up @@ -375,7 +348,7 @@ ColumnFamily get(String key, String cfName, IFilter filter)
ColumnFamily cf = null;
try
{
cf = apartments_.get(cfName_).submit(call).get();
cf = executor_.submit(call).get();
}
catch ( ExecutionException ex )
{
Expand Down Expand Up @@ -440,4 +413,26 @@ public int compare(String o1, String o2)
columnFamilies_.clear();
}

private static class MemtableThreadPoolExecutor extends DebuggableThreadPoolExecutor
{
private ArrayList<Runnable> terminatedHooks = new ArrayList<Runnable>();

public MemtableThreadPoolExecutor()
{
super(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryImpl("FAST-MEMTABLE-POOL"));
}

protected void terminated()
{
super.terminated();
runningExecutorServices_.remove(this);
for (Runnable hook : terminatedHooks) {
hook.run();
}
}

public void runOnTermination(Runnable runnable) {
terminatedHooks.add(runnable);
}
}
}

0 comments on commit d40b9b3

Please sign in to comment.