Skip to content

Commit

Permalink
minor cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
gregw committed Mar 17, 2017
1 parent c1a159b commit d2d6bc3
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 41 deletions.
Expand Up @@ -58,17 +58,9 @@ public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoin
this.parser = parser;
this.session = session;
this.bufferSize = bufferSize;
this.strategy = new EatWhatYouKill(producer, executor,InvocationType.BLOCKING, InvocationType.BLOCKING, 0);
this.strategy = new EatWhatYouKill(producer, executor, 0);

// TODO clean this up.
try
{
((LifeCycle)this.strategy).start();
}
catch (Exception e)
{
throw new RuntimeException(e);
}
LifeCycle.start(strategy);
}

@Override
Expand Down Expand Up @@ -113,16 +105,8 @@ public void onClose()
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Close {} ", this);
super.onClose();

// TODO clean this up.
try
{
((LifeCycle)this.strategy).stop();
}
catch (Exception e)
{
throw new RuntimeException(e);
}

LifeCycle.stop(strategy);
}

@Override
Expand Down
Expand Up @@ -122,4 +122,47 @@ public interface Listener extends EventListener
public void lifeCycleStopping(LifeCycle event);
public void lifeCycleStopped(LifeCycle event);
}


/**
* Utility to start an object if it is a LifeCycle and to convert
* any exception thrown to a {@link RuntimeException}
* @param object The instance to start.
* @throws RuntimeException if the call to start throws an exception.
*/
public static void start(Object object)
{
if (object instanceof LifeCycle)
{
try
{
((LifeCycle)object).start();
}
catch(Exception e)
{
throw new RuntimeException(e);
}
}
}

/**
* Utility to stop an object if it is a LifeCycle and to convert
* any exception thrown to a {@link RuntimeException}
* @param object The instance to stop.
* @throws RuntimeException if the call to stop throws an exception.
*/
public static void stop(Object object)
{
if (object instanceof LifeCycle)
{
try
{
((LifeCycle)object).stop();
}
catch(Exception e)
{
throw new RuntimeException(e);
}
}
}
}
Expand Up @@ -195,38 +195,38 @@ public static class InvocableExecutor implements Executor
private static final Logger LOG = Log.getLogger(InvocableExecutor.class);

private final Executor _executor;
private final InvocationType _preferredExecutionType;
private final InvocationType _preferredInvocationType;
private final InvocationType _preferredInvocationForExecute;
private final InvocationType _preferredInvocationForInvoke;

public InvocableExecutor(Executor executor,InvocationType preferred)
{
this(executor,preferred,preferred);
}

public InvocableExecutor(Executor executor,InvocationType preferredForExecution,InvocationType preferredForIvocation)
public InvocableExecutor(Executor executor,InvocationType preferredInvocationForExecute,InvocationType preferredInvocationForIvoke)
{
_executor=executor;
_preferredExecutionType=preferredForExecution;
_preferredInvocationType=preferredForIvocation;
_preferredInvocationForExecute=preferredInvocationForExecute;
_preferredInvocationForInvoke=preferredInvocationForIvoke;
}

public Invocable.InvocationType getPreferredInvocationType()
{
return _preferredInvocationType;
return _preferredInvocationForInvoke;
}

public void invoke(Runnable task)
{
if (LOG.isDebugEnabled())
LOG.debug("{} invoke {}", this, task);
Invocable.invokePreferred(task,_preferredInvocationType);
Invocable.invokePreferred(task,_preferredInvocationForInvoke);
if (LOG.isDebugEnabled())
LOG.debug("{} invoked {}", this, task);
}

public void execute(Runnable task)
{
tryExecute(task,_preferredExecutionType);
tryExecute(task,_preferredInvocationForExecute);
}

public void execute(Runnable task, InvocationType preferred)
Expand All @@ -236,7 +236,7 @@ public void execute(Runnable task, InvocationType preferred)

public boolean tryExecute(Runnable task)
{
return tryExecute(task,_preferredExecutionType);
return tryExecute(task,_preferredInvocationForExecute);
}

public boolean tryExecute(Runnable task, InvocationType preferred)
Expand Down
Expand Up @@ -66,16 +66,21 @@ public EatWhatYouKill(Producer producer, Executor executor)
this(producer,executor,InvocationType.NON_BLOCKING,InvocationType.BLOCKING);
}

public EatWhatYouKill(Producer producer, Executor executor, InvocationType preferredExecution, InvocationType preferredInvocation)
public EatWhatYouKill(Producer producer, Executor executor, int maxProducersPending )
{
this(producer,executor,preferredExecution,preferredInvocation,1);
this(producer,executor,InvocationType.NON_BLOCKING,InvocationType.BLOCKING);
}

public EatWhatYouKill(Producer producer, Executor executor, InvocationType preferredExecution, InvocationType preferredInvocation, int maxProducersPending )
public EatWhatYouKill(Producer producer, Executor executor, InvocationType preferredInvocationPEC, InvocationType preferredInvocationEPC)
{
this(producer,executor,preferredInvocationPEC,preferredInvocationEPC,1);
}

public EatWhatYouKill(Producer producer, Executor executor, InvocationType preferredInvocationPEC, InvocationType preferredInvocationEPC, int maxProducersPending )
{
_producer = producer;
_pendingProducersMax = maxProducersPending;
_executor = new InvocableExecutor(executor,preferredExecution,preferredInvocation);
_executor = new InvocableExecutor(executor,preferredInvocationPEC,preferredInvocationEPC);
}

@Override
Expand Down Expand Up @@ -167,12 +172,15 @@ private boolean pendingProducerWait()
// spurious wakeup!
_pendingProducers--;
}
else if (_state == State.IDLE)
else
{
_pendingProducersSignalled--;
_state = State.PRODUCING;
return true;
}
if (_state == State.IDLE)
{
_state = State.PRODUCING;
return true;
}
}
}
catch (InterruptedException e)
{
Expand Down
Expand Up @@ -67,8 +67,7 @@ public ExecutionStrategyTest(Class<? extends ExecutionStrategy> strategy)
void newExecutionStrategy(Producer producer, Executor executor) throws Exception
{
_strategy = _strategyClass.getConstructor(Producer.class,Executor.class).newInstance(producer,executor);
if (_strategy instanceof LifeCycle)
((LifeCycle)_strategy).start();
LifeCycle.start(_strategy);
}

@Before
Expand All @@ -80,8 +79,7 @@ public void before() throws Exception
@After
public void after() throws Exception
{
if (_strategy instanceof LifeCycle)
((LifeCycle)_strategy).stop();
LifeCycle.stop(_strategy);
threads.stop();
}

Expand Down

0 comments on commit d2d6bc3

Please sign in to comment.