Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added pooling of MessageProcessor objects to avoid extra allocations per RawMessage. #1

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
113 changes: 91 additions & 22 deletions src/main/java/org/ice4j/stack/MessageProcessor.java
Expand Up @@ -17,19 +17,26 @@
*/
package org.ice4j.stack;

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.function.*;
import java.util.logging.*;

import org.ice4j.*;
import org.ice4j.message.*;

/**
* The class is used to parse and dispatch incoming messages by being
* executed by concurrent {@link java.util.concurrent.ExecutorService}
* executed by concurrent {@link java.util.concurrent.ForkJoinPool}.
* To reduce memory allocation this <tt>ForkJoinTask</tt> implementation
* designed to be suitable for usage with pooling, the instance of this type is
* mutable such that <tt>RawMessage</tt> can be updated and instance can be
* reused and scheduled with new <tt>RawMessage</tt>
*
* @author Emil Ivov
*/
class MessageProcessor
implements Runnable
extends RecursiveAction
{
/**
* Our class logger.
Expand All @@ -38,15 +45,16 @@ class MessageProcessor
= Logger.getLogger(MessageProcessor.class.getName());

/**
* The <tt>NetAccessManager</tt> which has created this instance and which
* is its owner.
* Indicates that <tt>MessageProcessor</tt> is cancelled and should not
* process <tt>RawMessage</tt> anymore.
*/
private final NetAccessManager netAccessManager;
private final AtomicBoolean cancelled = new AtomicBoolean(false);

/**
* Ram message which is being processed
* The <tt>NetAccessManager</tt> which has created this instance and which
* is its owner.
*/
private final RawMessage rawMessage;
private final NetAccessManager netAccessManager;

/**
* The listener that will be collecting error notifications.
Expand All @@ -58,33 +66,36 @@ class MessageProcessor
*/
private final MessageEventHandler messageEventHandler;

/**
* Raw message which is being processed
*/
private RawMessage rawMessage;

/**
* Callback which is invoked when this <tt>MessageProcessor</tt>
* processed it's {@link #rawMessage}
*/
private Consumer<MessageProcessor> rawMessageProcessedHandler;

/**
* Creates a Message processor.
*
* @param netAccessManager the <tt>NetAccessManager</tt> which is creating
* the new instance, is going to be its owner, specifies the
* <tt>MessageEventHandler</tt> and represents the <tt>ErrorHandler</tt> to
* handle exceptions in the new instance
* @param message the <tt>RawMessage</tt> to be asynchronously processed by
* this MessageProcessor
* @throws IllegalArgumentException if any of the mentioned properties of
* <tt>netAccessManager</tt> are <tt>null</tt>
*/
MessageProcessor(
NetAccessManager netAccessManager,
RawMessage message)
NetAccessManager netAccessManager)
throws IllegalArgumentException
{
if (netAccessManager == null)
{
throw new NullPointerException("netAccessManager");
}

if (message == null)
{
throw new IllegalArgumentException("The message may not be null");
}

MessageEventHandler messageEventHandler
= netAccessManager.getMessageEventHandler();

Expand All @@ -97,26 +108,71 @@ class MessageProcessor
this.netAccessManager = netAccessManager;
this.messageEventHandler = messageEventHandler;
this.errorHandler = netAccessManager;
}

/**
* Assigns the <tt>RawMessage</tt> that will be processed
* by this <tt>MessageProcessor</tt> on separate thread.
* @param message RawMessage to be processed
* @param onProcessed callback which will be invoked when processing
* of {@link #rawMessage} is completed
*/
void setMessage(RawMessage message, Consumer<MessageProcessor> onProcessed)
{
if (message == null)
{
throw new IllegalArgumentException("The message may not be null");
}
this.rawMessage = message;
this.rawMessageProcessedHandler = onProcessed;
}

/**
* Does the message parsing.
* Performs proper reset of internal state of pooled instance.
*/
public void run()
public void resetState()
{
this.reinitialize();
this.cancelled.set(false);
}

/**
* Attempts to cancel processing of {@link #rawMessage}
*/
public void cancel()
{
this.cancelled.set(true);
}

@Override
protected void compute()
{
final Consumer<MessageProcessor> onProcessed = rawMessageProcessedHandler;
final RawMessage message = rawMessage;
//add an extra try/catch block that handles uncatched errors
try
{
if (message == null)
{
return;
}
rawMessage = null;
rawMessageProcessedHandler = null;

if (this.cancelled.get())
{
return;
}

StunStack stunStack = netAccessManager.getStunStack();

Message stunMessage;
try
{
stunMessage
= Message.decode(rawMessage.getBytes(),
= Message.decode(message.getBytes(),
(char) 0,
(char) rawMessage.getMessageLength());
(char) message.getMessageLength());
}
catch (StunException ex)
{
Expand All @@ -129,13 +185,26 @@ public void run()
logger.finest("Dispatching a StunMessageEvent.");

StunMessageEvent stunMessageEvent
= new StunMessageEvent(stunStack, rawMessage, stunMessage);
= new StunMessageEvent(stunStack, message, stunMessage);

messageEventHandler.handleMessageEvent(stunMessageEvent);
}
catch(Throwable err)
{
errorHandler.handleFatalError(this, "Unexpected Error!", err);
errorHandler.handleFatalError(
Thread.currentThread(),
"Unexpected Error!", err);
}
finally
{
// On processed callback must be invoked in all cases, even when
// cancellation or early exist happen, otherwise
// NetAccessManager internal tracking of pooled and active
// message processors will missbehave.
if (onProcessed != null)
{
onProcessed.accept(this);
}
}
}
}
75 changes: 50 additions & 25 deletions src/main/java/org/ice4j/stack/NetAccessManager.java
Expand Up @@ -22,6 +22,7 @@
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.function.*;
import java.util.logging.*;

import org.ice4j.*;
Expand Down Expand Up @@ -51,20 +52,28 @@ class NetAccessManager
/**
* Thread pool to execute {@link MessageProcessor}s across all
* {@link NetAccessManager}s.
* The work-stealing thread pool tries to utilizes all available processors
* in parallel, but does not have guarantee about the order of execution,
* but underlying transport (UDP) also does not have such guarantee,
* so it is ok when some of the messages will be processed out of
* arrival (enqueuing) order, but faster.
* The work-stealing thread pool {@link java.util.concurrent.ForkJoinPool}
* tries to utilizes all available processors in parallel, but does not
* have guarantee about the order of execution, but underlying transport
* (UDP) also does not have such guarantee, so it is ok when some of the
* messages will be processed out of arrival (enqueuing) order, but faster.
*/
private static ExecutorService messageProcessorExecutor
= Executors.newWorkStealingPool();
private static ForkJoinPool messageProcessorExecutor = new ForkJoinPool();

/**
* The set of <tt>Future</tt>'s of not yet processed <tt>RawMessage</tt>s
* Pool of <tt>MessageProcessor</tt> objects to avoid extra-allocations of
* processor object per <tt>RawMessage</tt> needed to process.
*/
private final ConcurrentHashMap.KeySetView<Future<?>, Boolean>
unprocessedMessageFutures = ConcurrentHashMap.newKeySet();
private final ArrayBlockingQueue<MessageProcessor> messageProcessorsPool
= new ArrayBlockingQueue<>(8);

/**
* The set of <tt>Future</tt>'s of not yet processed <tt>RawMessage</tt>s,
* this tracking is necessary to properly cancel pending tasks in case
* {@link #stop()} is called.
*/
private final ConcurrentHashMap.KeySetView<MessageProcessor, Boolean>
activeMessageProcessors = ConcurrentHashMap.newKeySet();

/**
* All <tt>Connectors</tt> currently in use with UDP. The table maps a local
Expand Down Expand Up @@ -125,6 +134,16 @@ class NetAccessManager
*/
private final AtomicBoolean isStopped = new AtomicBoolean(false);

/**
* Callback to be called when scheduled <tt>MessageProcessor</tt> completes
* processing it's <tt>RawMessage</tt>.
*/
private final Consumer<MessageProcessor>
onMessageProcessorProcessedRawMessage = messageProcessor -> {
activeMessageProcessors.remove(messageProcessor);
messageProcessorsPool.offer(messageProcessor);
};

/**
* Constructs a NetAccessManager.
*
Expand Down Expand Up @@ -179,7 +198,7 @@ MessageEventHandler getMessageEventHandler()
* Gets the <tt>PeerUdpMessageEventHandler</tt> of this
* <tt>NetAccessManager</tt> which is to be notified when incoming UDP
* messages have been processed and are ready for delivery.
*
*
* @return the <tt>PeerUdpMessageEventHandler</tt> of this
* <tt>NetAccessManager</tt> which is to be notified when incoming
* UDP messages have been processed and are ready for delivery
Expand All @@ -193,7 +212,7 @@ public PeerUdpMessageEventHandler getUdpMessageEventHandler()
* Gets the <tt>ChannelDataEventHandler</tt> of this
* <tt>NetAccessManager</tt> which is to be notified when incoming
* ChannelData messages have been processed and are ready for delivery.
*
*
* @return the <tt>ChannelDataEventHandler</tt> of this
* <tt>NetAccessManager</tt> which is to be notified when incoming
* ChannelData messages have been processed and are ready for
Expand Down Expand Up @@ -416,11 +435,11 @@ public void stop()
// no item can be added to {@link #unprocessedMessageFutures} when
// NetAccessManager is stopped, so it is safe to iterate without
// removing item in-place.
for (Future<?> future: unprocessedMessageFutures)
for (MessageProcessor messageProcessor: activeMessageProcessors)
{
future.cancel(true);
messageProcessor.cancel();
}
unprocessedMessageFutures.clear();
activeMessageProcessors.clear();

for (Object o : new Object[]{udpConnectors, tcpConnectors})
{
Expand Down Expand Up @@ -494,19 +513,25 @@ private void onIncomingRawMessage(final RawMessage message)
return;
}

final MessageProcessor messageProcessor
= new MessageProcessor(this, message);
final CompletableFuture<Void> completableFuture = CompletableFuture
.runAsync(messageProcessor, messageProcessorExecutor);
MessageProcessor messageProcessor = messageProcessorsPool.poll();
if (messageProcessor == null)
{
messageProcessor = new MessageProcessor(this);
}
else
{
messageProcessor.resetState();
}

messageProcessor.setMessage(
message,
onMessageProcessorProcessedRawMessage);

unprocessedMessageFutures.add(completableFuture);
completableFuture.handle((result, error) -> {
unprocessedMessageFutures.remove(completableFuture);
return null;
});
// Because MessageProcessor is ForkJoinTask<Void> it is not re-wrapped
// inside ForkJoinPool and no hidden allocation is done inside pool.
messageProcessorExecutor.submit(messageProcessor);
}


//--------------- SENDING MESSAGES -----------------------------------------
/**
* Sends the specified stun message through the specified access point.
Expand Down