diff --git a/src/main/java/org/ice4j/stack/MessageProcessor.java b/src/main/java/org/ice4j/stack/MessageProcessor.java index fb073452..1b071765 100644 --- a/src/main/java/org/ice4j/stack/MessageProcessor.java +++ b/src/main/java/org/ice4j/stack/MessageProcessor.java @@ -17,6 +17,9 @@ */ package org.ice4j.stack; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import java.util.function.*; import java.util.logging.*; import org.ice4j.*; @@ -24,12 +27,16 @@ /** * The class is used to parse and dispatch incoming messages by being - * executed by concurrent {@link java.util.concurrent.ExecutorService} + * executed by concurrent {@link java.util.concurrent.ForkJoinPool}. + * To reduce memory allocation this ForkJoinTask implementation + * designed to be suitable for usage with pooling, the instance of this type is + * mutable such that RawMessage can be updated and instance can be + * reused and scheduled with new RawMessage * * @author Emil Ivov */ class MessageProcessor - implements Runnable + extends RecursiveAction { /** * Our class logger. @@ -38,15 +45,16 @@ class MessageProcessor = Logger.getLogger(MessageProcessor.class.getName()); /** - * The NetAccessManager which has created this instance and which - * is its owner. + * Indicates that MessageProcessor is cancelled and should not + * process RawMessage anymore. */ - private final NetAccessManager netAccessManager; + private final AtomicBoolean cancelled = new AtomicBoolean(false); /** - * Ram message which is being processed + * The NetAccessManager which has created this instance and which + * is its owner. */ - private final RawMessage rawMessage; + private final NetAccessManager netAccessManager; /** * The listener that will be collecting error notifications. @@ -58,6 +66,17 @@ class MessageProcessor */ private final MessageEventHandler messageEventHandler; + /** + * Raw message which is being processed + */ + private RawMessage rawMessage; + + /** + * Callback which is invoked when this MessageProcessor + * processed it's {@link #rawMessage} + */ + private Consumer rawMessageProcessedHandler; + /** * Creates a Message processor. * @@ -65,14 +84,11 @@ class MessageProcessor * the new instance, is going to be its owner, specifies the * MessageEventHandler and represents the ErrorHandler to * handle exceptions in the new instance - * @param message the RawMessage to be asynchronously processed by - * this MessageProcessor * @throws IllegalArgumentException if any of the mentioned properties of * netAccessManager are null */ MessageProcessor( - NetAccessManager netAccessManager, - RawMessage message) + NetAccessManager netAccessManager) throws IllegalArgumentException { if (netAccessManager == null) @@ -80,11 +96,6 @@ class MessageProcessor throw new NullPointerException("netAccessManager"); } - if (message == null) - { - throw new IllegalArgumentException("The message may not be null"); - } - MessageEventHandler messageEventHandler = netAccessManager.getMessageEventHandler(); @@ -97,26 +108,71 @@ class MessageProcessor this.netAccessManager = netAccessManager; this.messageEventHandler = messageEventHandler; this.errorHandler = netAccessManager; + } + + /** + * Assigns the RawMessage that will be processed + * by this MessageProcessor on separate thread. + * @param message RawMessage to be processed + * @param onProcessed callback which will be invoked when processing + * of {@link #rawMessage} is completed + */ + void setMessage(RawMessage message, Consumer onProcessed) + { + if (message == null) + { + throw new IllegalArgumentException("The message may not be null"); + } this.rawMessage = message; + this.rawMessageProcessedHandler = onProcessed; } /** - * Does the message parsing. + * Performs proper reset of internal state of pooled instance. */ - public void run() + public void resetState() + { + this.reinitialize(); + this.cancelled.set(false); + } + + /** + * Attempts to cancel processing of {@link #rawMessage} + */ + public void cancel() + { + this.cancelled.set(true); + } + + @Override + protected void compute() { + final Consumer onProcessed = rawMessageProcessedHandler; + final RawMessage message = rawMessage; //add an extra try/catch block that handles uncatched errors try { + if (message == null) + { + return; + } + rawMessage = null; + rawMessageProcessedHandler = null; + + if (this.cancelled.get()) + { + return; + } + StunStack stunStack = netAccessManager.getStunStack(); Message stunMessage; try { stunMessage - = Message.decode(rawMessage.getBytes(), + = Message.decode(message.getBytes(), (char) 0, - (char) rawMessage.getMessageLength()); + (char) message.getMessageLength()); } catch (StunException ex) { @@ -129,13 +185,26 @@ public void run() logger.finest("Dispatching a StunMessageEvent."); StunMessageEvent stunMessageEvent - = new StunMessageEvent(stunStack, rawMessage, stunMessage); + = new StunMessageEvent(stunStack, message, stunMessage); messageEventHandler.handleMessageEvent(stunMessageEvent); } catch(Throwable err) { - errorHandler.handleFatalError(this, "Unexpected Error!", err); + errorHandler.handleFatalError( + Thread.currentThread(), + "Unexpected Error!", err); + } + finally + { + // On processed callback must be invoked in all cases, even when + // cancellation or early exist happen, otherwise + // NetAccessManager internal tracking of pooled and active + // message processors will missbehave. + if (onProcessed != null) + { + onProcessed.accept(this); + } } } } diff --git a/src/main/java/org/ice4j/stack/NetAccessManager.java b/src/main/java/org/ice4j/stack/NetAccessManager.java index 9a4dd932..fad34b79 100644 --- a/src/main/java/org/ice4j/stack/NetAccessManager.java +++ b/src/main/java/org/ice4j/stack/NetAccessManager.java @@ -22,6 +22,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import java.util.function.*; import java.util.logging.*; import org.ice4j.*; @@ -51,20 +52,28 @@ class NetAccessManager /** * Thread pool to execute {@link MessageProcessor}s across all * {@link NetAccessManager}s. - * The work-stealing thread pool tries to utilizes all available processors - * in parallel, but does not have guarantee about the order of execution, - * but underlying transport (UDP) also does not have such guarantee, - * so it is ok when some of the messages will be processed out of - * arrival (enqueuing) order, but faster. + * The work-stealing thread pool {@link java.util.concurrent.ForkJoinPool} + * tries to utilizes all available processors in parallel, but does not + * have guarantee about the order of execution, but underlying transport + * (UDP) also does not have such guarantee, so it is ok when some of the + * messages will be processed out of arrival (enqueuing) order, but faster. */ - private static ExecutorService messageProcessorExecutor - = Executors.newWorkStealingPool(); + private static ForkJoinPool messageProcessorExecutor = new ForkJoinPool(); /** - * The set of Future's of not yet processed RawMessages + * Pool of MessageProcessor objects to avoid extra-allocations of + * processor object per RawMessage needed to process. */ - private final ConcurrentHashMap.KeySetView, Boolean> - unprocessedMessageFutures = ConcurrentHashMap.newKeySet(); + private final ArrayBlockingQueue messageProcessorsPool + = new ArrayBlockingQueue<>(8); + + /** + * The set of Future's of not yet processed RawMessages, + * this tracking is necessary to properly cancel pending tasks in case + * {@link #stop()} is called. + */ + private final ConcurrentHashMap.KeySetView + activeMessageProcessors = ConcurrentHashMap.newKeySet(); /** * All Connectors currently in use with UDP. The table maps a local @@ -125,6 +134,16 @@ class NetAccessManager */ private final AtomicBoolean isStopped = new AtomicBoolean(false); + /** + * Callback to be called when scheduled MessageProcessor completes + * processing it's RawMessage. + */ + private final Consumer + onMessageProcessorProcessedRawMessage = messageProcessor -> { + activeMessageProcessors.remove(messageProcessor); + messageProcessorsPool.offer(messageProcessor); + }; + /** * Constructs a NetAccessManager. * @@ -179,7 +198,7 @@ MessageEventHandler getMessageEventHandler() * Gets the PeerUdpMessageEventHandler of this * NetAccessManager which is to be notified when incoming UDP * messages have been processed and are ready for delivery. - * + * * @return the PeerUdpMessageEventHandler of this * NetAccessManager which is to be notified when incoming * UDP messages have been processed and are ready for delivery @@ -193,7 +212,7 @@ public PeerUdpMessageEventHandler getUdpMessageEventHandler() * Gets the ChannelDataEventHandler of this * NetAccessManager which is to be notified when incoming * ChannelData messages have been processed and are ready for delivery. - * + * * @return the ChannelDataEventHandler of this * NetAccessManager which is to be notified when incoming * ChannelData messages have been processed and are ready for @@ -416,11 +435,11 @@ public void stop() // no item can be added to {@link #unprocessedMessageFutures} when // NetAccessManager is stopped, so it is safe to iterate without // removing item in-place. - for (Future future: unprocessedMessageFutures) + for (MessageProcessor messageProcessor: activeMessageProcessors) { - future.cancel(true); + messageProcessor.cancel(); } - unprocessedMessageFutures.clear(); + activeMessageProcessors.clear(); for (Object o : new Object[]{udpConnectors, tcpConnectors}) { @@ -494,19 +513,25 @@ private void onIncomingRawMessage(final RawMessage message) return; } - final MessageProcessor messageProcessor - = new MessageProcessor(this, message); - final CompletableFuture completableFuture = CompletableFuture - .runAsync(messageProcessor, messageProcessorExecutor); + MessageProcessor messageProcessor = messageProcessorsPool.poll(); + if (messageProcessor == null) + { + messageProcessor = new MessageProcessor(this); + } + else + { + messageProcessor.resetState(); + } + + messageProcessor.setMessage( + message, + onMessageProcessorProcessedRawMessage); - unprocessedMessageFutures.add(completableFuture); - completableFuture.handle((result, error) -> { - unprocessedMessageFutures.remove(completableFuture); - return null; - }); + // Because MessageProcessor is ForkJoinTask it is not re-wrapped + // inside ForkJoinPool and no hidden allocation is done inside pool. + messageProcessorExecutor.submit(messageProcessor); } - //--------------- SENDING MESSAGES ----------------------------------------- /** * Sends the specified stun message through the specified access point.