Skip to content

Commit

Permalink
删除tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Apr 23, 2015
1 parent 5946959 commit 4f3e600
Show file tree
Hide file tree
Showing 15 changed files with 16 additions and 839 deletions.
Expand Up @@ -17,8 +17,6 @@
*/ */
package org.lealone.cluster.concurrent; package org.lealone.cluster.concurrent;


import static org.lealone.cluster.tracing.Tracing.isTracing;

import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
Expand All @@ -28,15 +26,13 @@
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;


import org.lealone.cluster.tracing.TraceState;
import org.lealone.cluster.tracing.Tracing;
import org.lealone.cluster.utils.JVMStabilityInspector; import org.lealone.cluster.utils.JVMStabilityInspector;
import org.lealone.cluster.utils.concurrent.SimpleCondition; import org.lealone.cluster.utils.concurrent.SimpleCondition;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


public abstract class AbstractTracingAwareExecutorService implements TracingAwareExecutorService { public abstract class AbstractLealoneExecutorService implements LealoneExecutorService {
private static final Logger logger = LoggerFactory.getLogger(AbstractTracingAwareExecutorService.class); private static final Logger logger = LoggerFactory.getLogger(AbstractLealoneExecutorService.class);


protected abstract void addTask(FutureTask<?> futureTask); protected abstract void addTask(FutureTask<?> futureTask);


Expand Down Expand Up @@ -81,59 +77,20 @@ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, Ti
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }


protected <T> FutureTask<T> newTaskFor(Runnable runnable, T result) {
return newTaskFor(runnable, result, Tracing.instance.get());
}

@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected <T> FutureTask<T> newTaskFor(Runnable runnable, T result, TraceState traceState) { protected <T> FutureTask<T> newTaskFor(Runnable runnable, T result) {
if (traceState != null) {
if (runnable instanceof TraceSessionFutureTask)
return (TraceSessionFutureTask<T>) runnable;
return new TraceSessionFutureTask<T>(runnable, result, traceState);
}
if (runnable instanceof FutureTask) if (runnable instanceof FutureTask)
return (FutureTask<T>) runnable; return (FutureTask<T>) runnable;
return new FutureTask<>(runnable, result); return new FutureTask<>(runnable, result);
} }


@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected <T> FutureTask<T> newTaskFor(Callable<T> callable) { protected <T> FutureTask<T> newTaskFor(Callable<T> callable) {
if (isTracing()) {
if (callable instanceof TraceSessionFutureTask)
return (TraceSessionFutureTask<T>) callable;
return new TraceSessionFutureTask<T>(callable, Tracing.instance.get());
}
if (callable instanceof FutureTask) if (callable instanceof FutureTask)
return (FutureTask<T>) callable; return (FutureTask<T>) callable;
return new FutureTask<>(callable); return new FutureTask<>(callable);
} }


private class TraceSessionFutureTask<T> extends FutureTask<T> {
private final TraceState state;

public TraceSessionFutureTask(Callable<T> callable, TraceState state) {
super(callable);
this.state = state;
}

public TraceSessionFutureTask(Runnable runnable, T result, TraceState state) {
super(runnable, result);
this.state = state;
}

@Override
public void run() {
TraceState oldState = Tracing.instance.get();
Tracing.instance.set(state);
try {
super.run();
} finally {
Tracing.instance.set(oldState);
}
}
}

@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
class FutureTask<T> extends SimpleCondition implements Future<T>, Runnable { class FutureTask<T> extends SimpleCondition implements Future<T>, Runnable {
private boolean failure; private boolean failure;
Expand Down Expand Up @@ -206,9 +163,4 @@ private <T> FutureTask<T> submit(FutureTask<T> task) {
public void execute(Runnable command) { public void execute(Runnable command) {
addTask(newTaskFor(command, null)); addTask(newTaskFor(command, null));
} }

@Override
public void execute(Runnable command, TraceState state) {
addTask(newTaskFor(command, null, state));
}
} }
Expand Up @@ -17,26 +17,18 @@
*/ */
package org.lealone.cluster.concurrent; package org.lealone.cluster.concurrent;


import static org.lealone.cluster.tracing.Tracing.isTracing;

import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException; import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.SynchronousQueue; import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;


import org.lealone.cluster.tracing.TraceState;
import org.lealone.cluster.tracing.Tracing;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand All @@ -57,7 +49,7 @@
* threads and the queue is full, we want the enqueuer to block. But to allow the number of threads to drop if a * threads and the queue is full, we want the enqueuer to block. But to allow the number of threads to drop if a
* stage is less busy, core thread timeout is enabled. * stage is less busy, core thread timeout is enabled.
*/ */
public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements TracingAwareExecutorService { public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements LealoneExecutorService {
protected static final Logger logger = LoggerFactory.getLogger(DebuggableThreadPoolExecutor.class); protected static final Logger logger = LoggerFactory.getLogger(DebuggableThreadPoolExecutor.class);
public static final RejectedExecutionHandler blockingExecutionHandler = new RejectedExecutionHandler() { public static final RejectedExecutionHandler blockingExecutionHandler = new RejectedExecutionHandler() {
@Override @Override
Expand Down Expand Up @@ -153,65 +145,18 @@ protected void onFinalAccept(Runnable task) {
protected void onFinalRejection(Runnable task) { protected void onFinalRejection(Runnable task) {
} }


@Override
public void execute(Runnable command, TraceState state) {
super.execute(state == null || command instanceof TraceSessionWrapper ? command
: new TraceSessionWrapper<Object>(command, state));
}

@Override @Override
public void maybeExecuteImmediately(Runnable command) { public void maybeExecuteImmediately(Runnable command) {
execute(command); execute(command);
} }


// execute does not call newTaskFor
@Override
public void execute(Runnable command) {
super.execute(isTracing() && !(command instanceof TraceSessionWrapper) ? new TraceSessionWrapper<Object>(
Executors.callable(command, null)) : command);
}

@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T result) {
if (isTracing() && !(runnable instanceof TraceSessionWrapper)) {
return new TraceSessionWrapper<T>(Executors.callable(runnable, result));
}
return super.newTaskFor(runnable, result);
}

@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
if (isTracing() && !(callable instanceof TraceSessionWrapper)) {
return new TraceSessionWrapper<T>(callable);
}
return super.newTaskFor(callable);
}

@Override @Override
protected void afterExecute(Runnable r, Throwable t) { protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t); super.afterExecute(r, t);


maybeResetTraceSessionWrapper(r);
logExceptionsAfterExecute(r, t); logExceptionsAfterExecute(r, t);
} }


protected static void maybeResetTraceSessionWrapper(Runnable r) {
if (r instanceof TraceSessionWrapper) {
TraceSessionWrapper<?> tsw = (TraceSessionWrapper<?>) r;
// we have to reset trace state as its presence is what denotes the current thread is tracing
// and if left this thread might start tracing unrelated tasks
tsw.reset();
}
}

@Override
protected void beforeExecute(Thread t, Runnable r) {
if (r instanceof TraceSessionWrapper)
((TraceSessionWrapper<?>) r).setupContext();

super.beforeExecute(t, r);
}

/** /**
* Send @param t and any exception wrapped by @param r to the default uncaught exception handler, * Send @param t and any exception wrapped by @param r to the default uncaught exception handler,
* or log them if none such is set up * or log them if none such is set up
Expand Down Expand Up @@ -261,32 +206,4 @@ public static Throwable extractThrowable(Runnable runnable) {


return null; return null;
} }

/**
* Used to wrap a Runnable or Callable passed to submit or execute so we can clone the TraceSessionContext and move
* it into the worker thread.
*
* @param <T>
*/
private static class TraceSessionWrapper<T> extends FutureTask<T> {
private final TraceState state;

public TraceSessionWrapper(Callable<T> callable) {
super(callable);
state = Tracing.instance.get();
}

public TraceSessionWrapper(Runnable command, TraceState state) {
super(command, null);
this.state = state;
}

private void setupContext() {
Tracing.instance.set(state);
}

private void reset() {
Tracing.instance.set(null);
}
}
} }
Expand Up @@ -22,13 +22,7 @@


import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;


import org.lealone.cluster.tracing.TraceState; public interface LealoneExecutorService extends ExecutorService {

public interface TracingAwareExecutorService extends ExecutorService {
// we need a way to inject a TraceState directly into the Executor context without going through
// the global Tracing sessions; see lealone-5668
public void execute(Runnable command, TraceState state);

// permits executing in the context of the submitting thread // permits executing in the context of the submitting thread
public void maybeExecuteImmediately(Runnable command); public void maybeExecuteImmediately(Runnable command);
} }
Expand Up @@ -52,7 +52,7 @@ public synchronized void shutdown() {
} }
} }


public TracingAwareExecutorService newExecutor(int maxConcurrency, int maxQueuedTasks, String name, String jmxPath) { public LealoneExecutorService newExecutor(int maxConcurrency, int maxQueuedTasks, String name, String jmxPath) {
MetricsEnabledSEPExecutor executor = new MetricsEnabledSEPExecutor(maxConcurrency, maxQueuedTasks, name, MetricsEnabledSEPExecutor executor = new MetricsEnabledSEPExecutor(maxConcurrency, maxQueuedTasks, name,
jmxPath); jmxPath);
executors.add(executor); executors.add(executor);
Expand Down
Expand Up @@ -29,7 +29,7 @@


import static org.lealone.cluster.concurrent.SEPWorker.Work; import static org.lealone.cluster.concurrent.SEPWorker.Work;


public class SEPExecutor extends AbstractTracingAwareExecutorService { public class SEPExecutor extends AbstractLealoneExecutorService {
private final SharedExecutorPool pool; private final SharedExecutorPool pool;


private final int maxWorkers; private final int maxWorkers;
Expand Down
Expand Up @@ -24,7 +24,6 @@ public enum Stage {
REQUEST_RESPONSE, REQUEST_RESPONSE,
MIGRATION, MIGRATION,
MISC, MISC,
TRACING,
INTERNAL_RESPONSE, INTERNAL_RESPONSE,
READ_REPAIR; READ_REPAIR;


Expand All @@ -33,7 +32,6 @@ public String getJmxType() {
case GOSSIP: case GOSSIP:
case MIGRATION: case MIGRATION:
case MISC: case MISC:
case TRACING:
case INTERNAL_RESPONSE: case INTERNAL_RESPONSE:
return "internal"; return "internal";
case MUTATION: case MUTATION:
Expand Down
Expand Up @@ -21,18 +21,9 @@
import static org.lealone.cluster.config.DatabaseDescriptor.getConcurrentWriters; import static org.lealone.cluster.config.DatabaseDescriptor.getConcurrentWriters;


import java.util.EnumMap; import java.util.EnumMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;


import org.lealone.cluster.net.MessagingService;
import org.lealone.cluster.tracing.TraceState;
import org.lealone.cluster.utils.FBUtilities; import org.lealone.cluster.utils.FBUtilities;


/** /**
Expand All @@ -41,7 +32,7 @@
* even though stages (executors) are not created dynamically. * even though stages (executors) are not created dynamically.
*/ */
public class StageManager { public class StageManager {
private static final EnumMap<Stage, TracingAwareExecutorService> stages = new EnumMap<Stage, TracingAwareExecutorService>( private static final EnumMap<Stage, LealoneExecutorService> stages = new EnumMap<Stage, LealoneExecutorService>(
Stage.class); Stage.class);


public static final long KEEPALIVE = 60; // seconds to keep "extra" threads alive for when idle public static final long KEEPALIVE = 60; // seconds to keep "extra" threads alive for when idle
Expand All @@ -57,26 +48,14 @@ public class StageManager {
stages.put(Stage.GOSSIP, new MetricsEnabledThreadPoolExecutor(Stage.GOSSIP)); stages.put(Stage.GOSSIP, new MetricsEnabledThreadPoolExecutor(Stage.GOSSIP));
stages.put(Stage.MIGRATION, new MetricsEnabledThreadPoolExecutor(Stage.MIGRATION)); stages.put(Stage.MIGRATION, new MetricsEnabledThreadPoolExecutor(Stage.MIGRATION));
stages.put(Stage.MISC, new MetricsEnabledThreadPoolExecutor(Stage.MISC)); stages.put(Stage.MISC, new MetricsEnabledThreadPoolExecutor(Stage.MISC));
stages.put(Stage.TRACING, tracingExecutor());
}

private static ExecuteOnlyExecutor tracingExecutor() {
RejectedExecutionHandler reh = new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
MessagingService.instance().incrementDroppedMessages(MessagingService.Verb._TRACE);
}
};
return new ExecuteOnlyExecutor(1, 1, KEEPALIVE, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000),
new NamedThreadFactory(Stage.TRACING.getJmxName()), reh);
} }


private static MetricsEnabledThreadPoolExecutor multiThreadedStage(Stage stage, int numThreads) { private static MetricsEnabledThreadPoolExecutor multiThreadedStage(Stage stage, int numThreads) {
return new MetricsEnabledThreadPoolExecutor(numThreads, KEEPALIVE, TimeUnit.SECONDS, return new MetricsEnabledThreadPoolExecutor(numThreads, KEEPALIVE, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(stage.getJmxName()), stage.getJmxType()); new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(stage.getJmxName()), stage.getJmxType());
} }


private static TracingAwareExecutorService multiThreadedLowSignalStage(Stage stage, int numThreads) { private static LealoneExecutorService multiThreadedLowSignalStage(Stage stage, int numThreads) {
return MetricsEnabledSharedExecutorPool.SHARED.newExecutor(numThreads, Integer.MAX_VALUE, stage.getJmxName(), return MetricsEnabledSharedExecutorPool.SHARED.newExecutor(numThreads, Integer.MAX_VALUE, stage.getJmxName(),
stage.getJmxType()); stage.getJmxType());
} }
Expand All @@ -85,7 +64,7 @@ private static TracingAwareExecutorService multiThreadedLowSignalStage(Stage sta
* Retrieve a stage from the StageManager * Retrieve a stage from the StageManager
* @param stage name of the stage to be retrieved. * @param stage name of the stage to be retrieved.
*/ */
public static TracingAwareExecutorService getStage(Stage stage) { public static LealoneExecutorService getStage(Stage stage) {
return stages.get(stage); return stages.get(stage);
} }


Expand All @@ -97,41 +76,4 @@ public static void shutdownNow() {
StageManager.stages.get(stage).shutdownNow(); StageManager.stages.get(stage).shutdownNow();
} }
} }

/**
* A TPE that disallows submit so that we don't need to worry about unwrapping exceptions on the
* tracing stage. See lealone-1123 for background.
*/
private static class ExecuteOnlyExecutor extends ThreadPoolExecutor implements TracingAwareExecutorService {
public ExecuteOnlyExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}

@Override
public void execute(Runnable command, TraceState state) {
assert state == null;
super.execute(command);
}

@Override
public void maybeExecuteImmediately(Runnable command) {
execute(command);
}

@Override
public Future<?> submit(Runnable task) {
throw new UnsupportedOperationException();
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
throw new UnsupportedOperationException();
}

@Override
public <T> Future<T> submit(Callable<T> task) {
throw new UnsupportedOperationException();
}
}
} }

0 comments on commit 4f3e600

Please sign in to comment.