Permalink
Browse files

Eliminate "Stun4J Message Processor" by replacing it with thread pool. (

#155)

* Use shared thread pool to execute stun messages decoding and processing. Set minimum required Java version to 1.8.

* Added missing argument comment.

* Added pooling of MessageProcessor objects to avoid extra allocations per RawMessage.

* Made MessageProcessor to be ForkJoinTask<Void> to avoid hidden allocation inside ForkJoinPool.

* Fixed comments according to last changes.

* Added dedicated method to reset state of pooled processor. More comments.

* Fixed comment typo.

* ForkJoinPool by default uses all available processors.

* Rollback JDK 8 changes in pom file since it looks they were only necessary for local build due to misconfigured IDE.

* Avoid usage of Java 8 features (lambda expressions & method references)

* Revert "Rollback JDK 8 changes in pom file since it looks they were only necessary for local build due to misconfigured IDE."

This reverts commit 840f18a.

* Revert "Avoid usage of Java 8 features (lambda expressions & method references)"

This reverts commit b6ce70e.

* Renamed MessageProcessor into MessageProcessingTask to better reflect changed meaning of class.

* Use ForkJoinPool with prefixed threads to simplify debugging.

* Removed todo.

* Use well-known thread pool instead of ForkJoinPool. Could be easily changed back to ForkJoinPool.

* Fixed missing space.

* Fixes according to code review.

* Fixed active tasks tracking which was broken when pooling was added.

* Keep track of task processing throughput via QueueStatistics.

* Updated comment.

* Fixed comment.

* Updated comment.

* Updated comment.

* Updated comment.

* Renamed field to have better name.

* Fixed formatting.

* Restored old formatting corrupted by IDE.

* Fixed formatting.

* Fixed comment.

* Fixed comment.

* Updated comment.

* Updated log message.

* Code-style fixes to match rest of file.

* Fixed code style to match existing code.

* Fixed formatting.

* Restored accidentally removed comment.

* Added myself to list of authors.
  • Loading branch information...
mstyura authored and bbaldino committed Nov 29, 2018
1 parent 3239ed1 commit ee4e8c945221300baa6c0dc69dd56caf25cf243e
28 pom.xml
@@ -43,6 +43,14 @@
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
@@ -126,26 +134,6 @@
</build>
<profiles>
<profile>
<id>skip-pre-jdk8</id>
<activation>
<jdk>(,1.8)</jdk>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<excludes>
<exclude>**/jdk8/**</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>run-sample</id>
<build>
@@ -20,7 +20,7 @@
import java.io.*;
import java.net.*;
import java.nio.channels.*;
import java.util.concurrent.*;
import java.util.function.*;
import java.util.logging.*;
import org.ice4j.*;
@@ -29,8 +29,9 @@
/**
* The Network Access Point is the most outward part of the stack. It is
* constructed around a datagram socket and takes care of forwarding incoming
* messages to the MessageProcessor as well as sending datagrams to the STUN
* server specified by the original NetAccessPointDescriptor.
* messages to the provided {@link #messageConsumer} as well as sending
* datagrams to the STUN server specified by the
* original NetAccessPointDescriptor.
*
* @author Emil Ivov
*/
@@ -44,9 +45,9 @@
= Logger.getLogger(Connector.class.getName());
/**
* The message queue is where incoming messages are added.
* The consumer of incoming <tt>RawMessage</tt>s
*/
private final BlockingQueue<RawMessage> messageQueue;
private final Consumer<RawMessage> messageConsumer;
/**
* The socket object that used by this access point to access the network.
@@ -87,16 +88,16 @@
* communication.
* @param remoteAddress the remote address of the socket of this
* {@link Connector} if it is a TCP socket, or null if it is UDP.
* @param messageQueue the Queue where incoming messages should be queued
* @param messageConsumer the incoming messages consumer
* @param errorHandler the instance to notify when errors occur.
*/
protected Connector(IceSocketWrapper socket,
TransportAddress remoteAddress,
BlockingQueue<RawMessage> messageQueue,
Consumer<RawMessage> messageConsumer,
ErrorHandler errorHandler)
{
this.sock = socket;
this.messageQueue = messageQueue;
this.messageConsumer = messageConsumer;
this.errorHandler = errorHandler;
this.remoteAddress = remoteAddress;
@@ -230,7 +231,7 @@ else if(localSock.getUDPSocket() != null)
listenAddress.getTransport()),
listenAddress);
messageQueue.add(rawMessage);
messageConsumer.accept(rawMessage);
}
catch (SocketException ex)
{
@@ -0,0 +1,216 @@
/*
* ice4j, the OpenSource Java Solution for NAT and Firewall Traversal.
*
* Copyright @ 2015 Atlassian Pty Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.ice4j.stack;
import java.util.concurrent.atomic.*;
import java.util.function.*;
import java.util.logging.*;
import org.ice4j.*;
import org.ice4j.message.*;
/**
* The class is used to parse and dispatch incoming messages by being
* executed by concurrent {@link java.util.concurrent.ExecutorService}.
* To reduce memory allocation this class is designed to be suitable for
* usage with pooling, the instance of this type is mutable such that
* <tt>RawMessage</tt> can be updated and instance can be reused and
* scheduled with new <tt>RawMessage</tt>
*
* @author Emil Ivov
* @author Yura Yaroshevich
*/
class MessageProcessingTask
implements Runnable
{
/**
* Our class logger.
*/
private static final Logger logger
= Logger.getLogger(MessageProcessingTask.class.getName());
/**
* Indicates that <tt>MessageProcessingTask</tt> is cancelled and should not
* process <tt>RawMessage</tt> anymore.
*/
private final AtomicBoolean cancelled = new AtomicBoolean(false);
/**
* The <tt>NetAccessManager</tt> which has created this instance and which
* is its owner.
*/
private final NetAccessManager netAccessManager;
/**
* The listener that will be collecting error notifications.
*/
private final ErrorHandler errorHandler;
/**
* The listener that will be retrieving <tt>MessageEvent</tt>s
*/
private final MessageEventHandler messageEventHandler;
/**
* Raw message which is being processed
*/
private RawMessage rawMessage;
/**
* Callback which is invoked when this <tt>MessageProcessingTask</tt>
* processed it's {@link #rawMessage}
*/
private Consumer<MessageProcessingTask> rawMessageProcessedHandler;
/**
* Creates a Message processor.
*
* @param netAccessManager the <tt>NetAccessManager</tt> which is creating
* the new instance, is going to be its owner, specifies the
* <tt>MessageEventHandler</tt> and represents the <tt>ErrorHandler</tt> to
* handle exceptions in the new instance
* @throws IllegalArgumentException if any of the mentioned properties of
* <tt>netAccessManager</tt> are <tt>null</tt>
*/
MessageProcessingTask(NetAccessManager netAccessManager)
throws IllegalArgumentException
{
if (netAccessManager == null)
{
throw new NullPointerException("netAccessManager");
}
MessageEventHandler messageEventHandler
= netAccessManager.getMessageEventHandler();
if (messageEventHandler == null)
{
throw new IllegalArgumentException(
"The message event handler may not be null");
}
this.netAccessManager = netAccessManager;
this.messageEventHandler = messageEventHandler;
this.errorHandler = netAccessManager;
}
/**
* Assigns the <tt>RawMessage</tt> that will be processed
* by this <tt>MessageProcessingTask</tt> on executor's thread.
* @param message RawMessage to be processed
* @param onProcessed callback which will be invoked when processing
* of {@link #rawMessage} is completed
*/
void setMessage(
RawMessage message,
Consumer<MessageProcessingTask> onProcessed)
{
if (message == null)
{
throw new IllegalArgumentException("The message may not be null");
}
rawMessage = message;
rawMessageProcessedHandler = onProcessed;
}
/**
* Performs proper reset of internal state of pooled instance.
*/
void resetState()
{
cancelled.set(false);
rawMessage = null;
rawMessageProcessedHandler = null;
}
/**
* Attempts to cancel processing of {@link #rawMessage}
*/
public void cancel()
{
cancelled.set(true);
}
/**
* Does the message parsing.
*/
@Override
public void run()
{
final Consumer<MessageProcessingTask> onProcessed
= rawMessageProcessedHandler;
final RawMessage message = rawMessage;
//add an extra try/catch block that handles uncatched errors
try
{
if (message == null)
{
return;
}
rawMessage = null;
rawMessageProcessedHandler = null;
if (cancelled.get())
{
return;
}
StunStack stunStack = netAccessManager.getStunStack();
Message stunMessage;
try
{
stunMessage
= Message.decode(message.getBytes(),
(char) 0,
(char) message.getMessageLength());
}
catch (StunException ex)
{
errorHandler.handleError(
"Failed to decode a stun message!",
ex);
return;
}
logger.finest("Dispatching a StunMessageEvent.");
StunMessageEvent stunMessageEvent
= new StunMessageEvent(stunStack, message, stunMessage);
messageEventHandler.handleMessageEvent(stunMessageEvent);
}
catch (Throwable err)
{
errorHandler.handleFatalError(
Thread.currentThread(),
"Unexpected Error!", err);
}
finally
{
// On processed callback must be invoked in all cases, even when
// cancellation or early exist happen, otherwise
// NetAccessManager internal tracking of pooled and active
// message processors will missbehave.
if (onProcessed != null)
{
onProcessed.accept(this);
}
}
}
}
Oops, something went wrong.

0 comments on commit ee4e8c9

Please sign in to comment.