Skip to content

Commit

Permalink
Refactored the base networking imlpementation's "MessageProtocol" to …
Browse files Browse the repository at this point in the history
…be an

interface and strictly implement the to/from ByteBuffer protocol.  In the future
this will allow the message protocol to be swappable.  Also moved out the buffering
aspect since often buffers need to be created 'on the fly' and are separate from
the protocol.  This will allow me to play with fixing the issue related to messages
being deserialized before the serialization registry message has been processed by
swapping out the GreedyMessageBuffer implementation for a LazyMessageBuffer implementation.
  • Loading branch information
pspeed42 committed Sep 9, 2019
1 parent a9afcec commit 1c37d5a
Show file tree
Hide file tree
Showing 8 changed files with 372 additions and 172 deletions.
Expand Up @@ -65,6 +65,7 @@ public class ConnectorAdapter extends Thread
private MessageListener<Object> dispatcher;
private ErrorListener<Object> errorHandler;
private AtomicBoolean go = new AtomicBoolean(true);
private MessageProtocol protocol;

private BlockingQueue<ByteBuffer> outbound;

Expand All @@ -75,11 +76,13 @@ public class ConnectorAdapter extends Thread
// through this connector.
private boolean reliable;

public ConnectorAdapter( Connector connector, MessageListener<Object> dispatcher,
public ConnectorAdapter( Connector connector, MessageProtocol protocol,
MessageListener<Object> dispatcher,
ErrorListener<Object> errorHandler, boolean reliable )
{
super( String.valueOf(connector) );
this.connector = connector;
this.connector = connector;
this.protocol = protocol;
this.dispatcher = dispatcher;
this.errorHandler = errorHandler;
this.reliable = reliable;
Expand Down Expand Up @@ -151,7 +154,7 @@ protected void handleError( Exception e )

public void run()
{
MessageProtocol protocol = new MessageProtocol();
MessageBuffer messageBuffer = protocol.createBuffer();

try {
while( go.get() ) {
Expand All @@ -166,10 +169,10 @@ public void run()
}
}

protocol.addBuffer( buffer );
messageBuffer.addBytes(buffer);

Message m = null;
while( (m = protocol.getMessage()) != null ) {
while( (m = messageBuffer.pollMessage()) != null ) {
m.setReliable( reliable );
dispatch( m );
}
Expand Down
Expand Up @@ -33,6 +33,7 @@

import com.jme3.network.*;
import com.jme3.network.ClientStateListener.DisconnectInfo;
import com.jme3.network.base.protocol.SerializerMessageProtocol;
import com.jme3.network.kernel.Connector;
import com.jme3.network.message.ChannelInfoMessage;
import com.jme3.network.message.ClientRegistrationMessage;
Expand Down Expand Up @@ -83,6 +84,7 @@ public class DefaultClient implements Client
private ConnectorFactory connectorFactory;

private ClientServiceManager services;
private MessageProtocol protocol = new SerializerMessageProtocol();

public DefaultClient( String gameName, int version )
{
Expand Down Expand Up @@ -114,9 +116,9 @@ protected void setPrimaryConnectors( Connector reliable, Connector fast, Connect
throw new IllegalStateException( "Channels already exist." );

this.connectorFactory = connectorFactory;
channels.add(new ConnectorAdapter(reliable, dispatcher, dispatcher, true));
channels.add(new ConnectorAdapter(reliable, protocol, dispatcher, dispatcher, true));
if( fast != null ) {
channels.add(new ConnectorAdapter(fast, dispatcher, dispatcher, false));
channels.add(new ConnectorAdapter(fast, protocol, dispatcher, dispatcher, false));
} else {
// Add the null adapter to keep the indexes right
channels.add(null);
Expand Down Expand Up @@ -279,7 +281,7 @@ protected void send( int channel, Message message, boolean waitForConnected )
buffer.clear();

// Convert the message to bytes
buffer = MessageProtocol.messageToBuffer(message, buffer);
buffer = protocol.toByteBuffer(message, buffer);

// Since we share the buffer between invocations, we will need to
// copy this message's part out of it. This is because we actually
Expand Down Expand Up @@ -431,7 +433,7 @@ protected void configureChannels( long tempId, int[] ports ) {
try {
for( int i = 0; i < ports.length; i++ ) {
Connector c = connectorFactory.createConnector( i, ports[i] );
ConnectorAdapter ca = new ConnectorAdapter(c, dispatcher, dispatcher, true);
ConnectorAdapter ca = new ConnectorAdapter(c, protocol, dispatcher, dispatcher, true);
int ch = channels.size();
channels.add( ca );

Expand Down
Expand Up @@ -32,6 +32,7 @@
package com.jme3.network.base;

import com.jme3.network.*;
import com.jme3.network.base.protocol.SerializerMessageProtocol;
import com.jme3.network.kernel.Endpoint;
import com.jme3.network.kernel.Kernel;
import com.jme3.network.message.ChannelInfoMessage;
Expand Down Expand Up @@ -88,6 +89,7 @@ public class DefaultServer implements Server
private final List<ConnectionListener> connectionListeners = new CopyOnWriteArrayList<ConnectionListener>();

private HostedServiceManager services;
private MessageProtocol protocol = new SerializerMessageProtocol();

public DefaultServer( String gameName, int version, Kernel reliable, Kernel fast )
{
Expand All @@ -99,10 +101,10 @@ public DefaultServer( String gameName, int version, Kernel reliable, Kernel fast
this.services = new HostedServiceManager(this);
addStandardServices();

reliableAdapter = new KernelAdapter( this, reliable, dispatcher, true );
reliableAdapter = new KernelAdapter(this, reliable, protocol, dispatcher, true);
channels.add( reliableAdapter );
if( fast != null ) {
fastAdapter = new KernelAdapter( this, fast, dispatcher, false );
fastAdapter = new KernelAdapter(this, fast, protocol, dispatcher, false);
channels.add( fastAdapter );
}
}
Expand Down Expand Up @@ -153,7 +155,7 @@ public int addChannel( int port )
alternatePorts.add(port);

Kernel kernel = kernelFactory.createKernel(result, port);
channels.add( new KernelAdapter(this, kernel, dispatcher, true) );
channels.add( new KernelAdapter(this, kernel, protocol, dispatcher, true) );

return result;
} catch( IOException e ) {
Expand Down Expand Up @@ -238,7 +240,7 @@ public void broadcast( Filter<? super HostedConnection> filter, Message message
if( connections.isEmpty() )
return;

ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
ByteBuffer buffer = protocol.toByteBuffer(message, null);

FilterAdapter adapter = filter == null ? null : new FilterAdapter(filter);

Expand All @@ -263,7 +265,7 @@ public void broadcast( int channel, Filter<? super HostedConnection> filter, Mes

checkChannel(channel);

ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
ByteBuffer buffer = protocol.toByteBuffer(message, null);

FilterAdapter adapter = filter == null ? null : new FilterAdapter(filter);

Expand Down Expand Up @@ -579,7 +581,7 @@ public void send( Message message )
if( log.isLoggable(Level.FINER) ) {
log.log(Level.FINER, "send({0})", message);
}
ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
ByteBuffer buffer = protocol.toByteBuffer(message, null);
if( message.isReliable() || channels[CH_UNRELIABLE] == null ) {
channels[CH_RELIABLE].send( buffer );
} else {
Expand All @@ -594,7 +596,7 @@ public void send( int channel, Message message )
log.log(Level.FINER, "send({0}, {1})", new Object[]{channel, message});
}
checkChannel(channel);
ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
ByteBuffer buffer = protocol.toByteBuffer(message, null);
channels[channel+CH_FIRST].send(buffer);
}

Expand Down
Expand Up @@ -70,21 +70,24 @@ public class KernelAdapter extends Thread
private Kernel kernel;
private MessageListener<HostedConnection> messageDispatcher;
private AtomicBoolean go = new AtomicBoolean(true);


private MessageProtocol protocol;

// Keeps track of the in-progress messages that are received
// on reliable connections
private Map<Endpoint, MessageProtocol> messageBuffers = new ConcurrentHashMap<Endpoint,MessageProtocol>();
private Map<Endpoint, MessageBuffer> messageBuffers = new ConcurrentHashMap<>();

// Marks the messages as reliable or not if they came
// through this connector.
private boolean reliable;

public KernelAdapter( DefaultServer server, Kernel kernel, MessageListener<HostedConnection> messageDispatcher,
public KernelAdapter( DefaultServer server, Kernel kernel, MessageProtocol protocol, MessageListener<HostedConnection> messageDispatcher,
boolean reliable )
{
super( String.valueOf(kernel) );
this.server = server;
this.kernel = kernel;
this.protocol = protocol;
this.messageDispatcher = messageDispatcher;
this.reliable = reliable;
setDaemon(true);
Expand Down Expand Up @@ -190,20 +193,20 @@ protected void dispatch( Endpoint p, Message m )
}
}

protected MessageProtocol getMessageBuffer( Endpoint p )
protected MessageBuffer getMessageBuffer( Endpoint p )
{
if( !reliable ) {
// Since UDP comes in packets and they aren't split
// up, there is no reason to buffer. In fact, there would
// be a down side because there is no way for us to reliably
// clean these up later since we'd create another one for
// any random UDP packet that comes to the port.
return new MessageProtocol();
return protocol.createBuffer();
} else {
// See if we already have one
MessageProtocol result = messageBuffers.get(p);
MessageBuffer result = messageBuffers.get(p);
if( result == null ) {
result = new MessageProtocol();
result = protocol.createBuffer();
messageBuffers.put(p, result);
}
return result;
Expand All @@ -212,13 +215,12 @@ protected MessageProtocol getMessageBuffer( Endpoint p )

protected void createAndDispatch( Envelope env )
{
MessageProtocol protocol = getMessageBuffer(env.getSource());
MessageBuffer protocol = getMessageBuffer(env.getSource());

byte[] data = env.getData();
ByteBuffer buffer = ByteBuffer.wrap(data);

int count = protocol.addBuffer( buffer );
if( count == 0 ) {
if( !protocol.addBytes(buffer) ) {
// This can happen if there was only a partial message
// received. However, this should never happen for unreliable
// connections.
Expand All @@ -236,9 +238,9 @@ protected void createAndDispatch( Envelope env )

// Should be complete... and maybe we should check but we don't
Message m = null;
while( (m = protocol.getMessage()) != null ) {
while( (m = protocol.pollMessage()) != null ) {
m.setReliable(reliable);
dispatch( env.getSource(), m );
dispatch(env.getSource(), m);
}
}

Expand Down
@@ -0,0 +1,65 @@
/*
* Copyright (c) 2009-2019 jMonkeyEngine
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* * Neither the name of 'jMonkeyEngine' nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package com.jme3.network.base;

import java.nio.ByteBuffer;

import com.jme3.network.Message;

/**
* Accumulates network data into Message objects. This allows
* random chunks of bytes to be assembled into messages even if
* the buffer boundaries don't line up.
*
* @author Paul Speed
*/
public interface MessageBuffer {

/**
* Returns the next message in the buffer or null if there are no more
* messages in the buffer.
*/
public Message pollMessage();

/**
* Returns true if there is a message waiting in the buffer.
*/
public boolean hasMessages();

/**
* Adds byte data to the message buffer. Returns true if there is
* a message waiting after this call.
*/
public boolean addBytes( ByteBuffer buffer );
}

0 comments on commit 1c37d5a

Please sign in to comment.