Skip to content
This repository has been archived by the owner on Jul 15, 2019. It is now read-only.

Commit

Permalink
Merge pull request #111 from yahoo/omid-37
Browse files Browse the repository at this point in the history
[OMID-37] Improve creation of Disruptor components
  • Loading branch information
ikatkov committed May 26, 2016
2 parents 3236900 + 9722e40 commit a527447
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.WorkerPool;
import com.lmax.disruptor.dsl.Disruptor;
import org.apache.commons.pool2.ObjectPool;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.metrics.MetricsRegistry;
Expand All @@ -39,6 +40,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import static com.lmax.disruptor.dsl.ProducerType.SINGLE;
import static org.apache.omid.metrics.MetricsUtils.name;
import static org.apache.omid.tso.PersistenceProcessorImpl.PersistBatchEvent.EVENT_FACTORY;
import static org.apache.omid.tso.PersistenceProcessorImpl.PersistBatchEvent.makePersistBatch;
Expand All @@ -47,8 +49,6 @@ class PersistenceProcessorImpl implements PersistenceProcessor {

private static final Logger LOG = LoggerFactory.getLogger(PersistenceProcessorImpl.class);

private static final long INITIAL_LWM_VALUE = -1L;

private final RingBuffer<PersistBatchEvent> persistRing;

private final ObjectPool<Batch> batchPool;
Expand All @@ -73,29 +73,30 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
MetricsRegistry metrics)
throws Exception {

// ------------------------------------------------------------------------------------------------------------
// Disruptor initialization
// ------------------------------------------------------------------------------------------------------------

ThreadFactoryBuilder threadFactory = new ThreadFactoryBuilder().setNameFormat("persist-%d");
ExecutorService requestExec = Executors.newFixedThreadPool(config.getNumConcurrentCTWriters(), threadFactory.build());

Disruptor<PersistBatchEvent> disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 20, requestExec , SINGLE, new BusySpinWaitStrategy());
disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker));
disruptor.handleEventsWithWorkerPool(handlers);
this.persistRing = disruptor.start();

// ------------------------------------------------------------------------------------------------------------
// Attribute initialization
// ------------------------------------------------------------------------------------------------------------

this.metrics = metrics;
this.lowWatermarkWriter = commitTable.getWriter();
this.batchSequence = 0L;
this.batchPool = batchPool;
this.currentBatch = batchPool.borrowObject();

// Low Watermark writer
ThreadFactoryBuilder lwmThreadFactory = new ThreadFactoryBuilder().setNameFormat("lwm-writer-%d");
lowWatermarkWriterExecutor = Executors.newSingleThreadExecutor(lwmThreadFactory.build());

// Disruptor configuration
this.persistRing = RingBuffer.createSingleProducer(EVENT_FACTORY, 1 << 20, new BusySpinWaitStrategy());

ThreadFactoryBuilder threadFactory = new ThreadFactoryBuilder().setNameFormat("persist-%d");
ExecutorService requestExec = Executors.newFixedThreadPool(config.getNumConcurrentCTWriters(),
threadFactory.build());

WorkerPool<PersistBatchEvent> persistProcessor = new WorkerPool<>(persistRing,
persistRing.newBarrier(),
new FatalExceptionHandler(panicker),
handlers);
this.persistRing.addGatingSequences(persistProcessor.getWorkerSequences());
persistProcessor.start(requestExec);
this.lowWatermarkWriterExecutor = Executors.newSingleThreadExecutor(lwmThreadFactory.build());

// Metrics config
this.lwmWriteTimer = metrics.timer(name("tso", "lwmWriter", "latency"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.dsl.Disruptor;
import org.apache.commons.pool2.ObjectPool;
import org.apache.omid.metrics.Meter;
import org.apache.omid.metrics.MetricsRegistry;
Expand All @@ -38,10 +37,10 @@
import java.util.PriorityQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;

import static com.codahale.metrics.MetricRegistry.name;
import static com.lmax.disruptor.dsl.ProducerType.MULTI;

class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEvent>, ReplyProcessor {

Expand All @@ -65,28 +64,31 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
@Inject
ReplyProcessorImpl(MetricsRegistry metrics, Panicker panicker, ObjectPool<Batch> batchPool) {

this.batchPool = batchPool;

this.nextIDToHandle.set(0);
// ------------------------------------------------------------------------------------------------------------
// Disruptor initialization
// ------------------------------------------------------------------------------------------------------------

this.replyRing = RingBuffer.createMultiProducer(ReplyBatchEvent.EVENT_FACTORY, 1 << 12, new BusySpinWaitStrategy());
ThreadFactoryBuilder threadFactory = new ThreadFactoryBuilder().setNameFormat("reply-%d");
ExecutorService requestExec = Executors.newSingleThreadExecutor(threadFactory.build());

SequenceBarrier replySequenceBarrier = replyRing.newBarrier();
BatchEventProcessor<ReplyBatchEvent> replyProcessor = new BatchEventProcessor<>(replyRing, replySequenceBarrier, this);
replyProcessor.setExceptionHandler(new FatalExceptionHandler(panicker));
Disruptor<ReplyProcessorImpl.ReplyBatchEvent> disruptor = new Disruptor<>(ReplyBatchEvent.EVENT_FACTORY, 1 << 12, requestExec, MULTI, new BusySpinWaitStrategy());
disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker));
disruptor.handleEventsWith(this);
this.replyRing = disruptor.start();

replyRing.addGatingSequences(replyProcessor.getSequence());

ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("reply-%d").build();
ExecutorService replyExec = Executors.newSingleThreadExecutor(threadFactory);
replyExec.submit(replyProcessor);
// ------------------------------------------------------------------------------------------------------------
// Attribute initialization
// ------------------------------------------------------------------------------------------------------------

this.batchPool = batchPool;
this.nextIDToHandle.set(0);
this.futureEvents = new PriorityQueue<>(10, new Comparator<ReplyBatchEvent>() {
public int compare(ReplyBatchEvent replyBatchEvent1, ReplyBatchEvent replyBatchEvent2) {
return Long.compare(replyBatchEvent1.getBatchSequence(), replyBatchEvent2.getBatchSequence());
}
});

// Metrics config
this.abortMeter = metrics.meter(name("tso", "aborts"));
this.commitMeter = metrics.meter(name("tso", "commits"));
this.timestampMeter = metrics.meter(name("tso", "timestampAllocation"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.TimeoutBlockingWaitStrategy;
import com.lmax.disruptor.TimeoutHandler;
import com.lmax.disruptor.dsl.Disruptor;
import org.apache.omid.metrics.MetricsRegistry;
import org.apache.omid.tso.TSOStateManager.TSOState;
import org.jboss.netty.channel.Channel;
Expand All @@ -38,8 +39,13 @@
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import static com.lmax.disruptor.dsl.ProducerType.MULTI;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.omid.tso.RequestProcessorImpl.RequestEvent.EVENT_FACTORY;

class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestEvent>, RequestProcessor, TimeoutHandler {

private static final Logger LOG = LoggerFactory.getLogger(RequestProcessorImpl.class);
Expand All @@ -59,29 +65,29 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
TSOServerConfig config)
throws IOException {

this.metrics = metrics;
// ------------------------------------------------------------------------------------------------------------
// Disruptor initialization
// ------------------------------------------------------------------------------------------------------------

TimeoutBlockingWaitStrategy timeoutStrategy = new TimeoutBlockingWaitStrategy(config.getBatchPersistTimeoutInMs(), MILLISECONDS);

ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("request-%d").build();
ExecutorService requestExec = Executors.newSingleThreadExecutor(threadFactory);

Disruptor<RequestEvent> disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 12, requestExec, MULTI, timeoutStrategy);
disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker)); // This should be before the event handler
disruptor.handleEventsWith(this);
this.requestRing = disruptor.start();

// ------------------------------------------------------------------------------------------------------------
// Attribute initialization
// ------------------------------------------------------------------------------------------------------------

this.metrics = metrics;
this.persistProc = persistProc;
this.timestampOracle = timestampOracle;

this.hashmap = new CommitHashMap(config.getMaxItems());

final TimeoutBlockingWaitStrategy timeoutStrategy
= new TimeoutBlockingWaitStrategy(config.getBatchPersistTimeoutInMs(), TimeUnit.MILLISECONDS);

// Set up the disruptor thread
requestRing = RingBuffer.createMultiProducer(RequestEvent.EVENT_FACTORY, 1 << 12, timeoutStrategy);
SequenceBarrier requestSequenceBarrier = requestRing.newBarrier();
BatchEventProcessor<RequestEvent> requestProcessor =
new BatchEventProcessor<>(requestRing, requestSequenceBarrier, this);
requestRing.addGatingSequences(requestProcessor.getSequence());
requestProcessor.setExceptionHandler(new FatalExceptionHandler(panicker));

ExecutorService requestExec = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat("request-%d").build());
// Each processor runs on a separate thread
requestExec.submit(requestProcessor);

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,30 @@

import com.google.common.base.Optional;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import org.apache.commons.pool2.ObjectPool;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.CommitTable.CommitTimestamp;
import org.apache.omid.metrics.Meter;
import org.apache.omid.metrics.MetricsRegistry;

import org.jboss.netty.channel.Channel;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;

import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

import static com.codahale.metrics.MetricRegistry.name;
import static com.lmax.disruptor.dsl.ProducerType.SINGLE;
import static org.apache.omid.tso.RetryProcessorImpl.RetryEvent.EVENT_FACTORY;

/**
* Manages the disambiguation of the retry requests that clients send when they did not received a response in the
Expand Down Expand Up @@ -74,20 +72,26 @@ class RetryProcessorImpl implements EventHandler<RetryProcessorImpl.RetryEvent>,
ObjectPool<Batch> batchPool)
throws InterruptedException, ExecutionException, IOException {

// ------------------------------------------------------------------------------------------------------------
// Disruptor initialization
// ------------------------------------------------------------------------------------------------------------

ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("retry-%d").build();
ExecutorService requestExec = Executors.newSingleThreadExecutor(threadFactory);

Disruptor<RetryProcessorImpl.RetryEvent> disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 12, requestExec, SINGLE, new YieldingWaitStrategy());
disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker)); // This should be before the event handler
disruptor.handleEventsWith(this);
this.retryRing = disruptor.start();

// ------------------------------------------------------------------------------------------------------------
// Attribute initialization
// ------------------------------------------------------------------------------------------------------------

this.commitTableClient = commitTable.getClient();
this.replyProc = replyProc;
this.batchPool = batchPool;

retryRing = RingBuffer.createSingleProducer(RetryEvent.EVENT_FACTORY, 1 << 12, new YieldingWaitStrategy());
SequenceBarrier retrySequenceBarrier = retryRing.newBarrier();
BatchEventProcessor<RetryEvent> retryProcessor = new BatchEventProcessor<>(retryRing, retrySequenceBarrier, this);
retryProcessor.setExceptionHandler(new FatalExceptionHandler(panicker));
retryRing.addGatingSequences(retryProcessor.getSequence());

ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("retry-%d").build();
ExecutorService retryExec = Executors.newSingleThreadExecutor(threadFactory);
retryExec.submit(retryProcessor);

// Metrics configuration
this.txAlreadyCommittedMeter = metrics.meter(name("tso", "retries", "commits", "tx-already-committed"));
this.invalidTxMeter = metrics.meter(name("tso", "retries", "aborts", "tx-invalid"));
Expand Down Expand Up @@ -116,7 +120,7 @@ private void handleCommitRetry(RetryEvent event) {
long startTimestamp = event.getStartTimestamp();
try {
Optional<CommitTimestamp> commitTimestamp = commitTableClient.getCommitTimestamp(startTimestamp).get();
if(commitTimestamp.isPresent()) {
if (commitTimestamp.isPresent()) {
if (commitTimestamp.get().isValid()) {
LOG.trace("Tx {}: Valid commit TS found in Commit Table. Sending Commit to client.", startTimestamp);
replyProc.sendCommitResponse(startTimestamp, commitTimestamp.get().getValue(), event.getChannel());
Expand Down

0 comments on commit a527447

Please sign in to comment.