Skip to content

Commit

Permalink
Reduce object allocation with MessageSerializer. (#293)
Browse files Browse the repository at this point in the history
* Return byte array from MessageSerializer.

* Pool Kryo Input and Output objects.

* Fix caching.

* Use base class.

* Simplify deserialize.

* Add test.
  • Loading branch information
johnou authored and JoeHegarty committed Apr 3, 2018
1 parent e3a2a37 commit 55a4a5d
Show file tree
Hide file tree
Showing 12 changed files with 287 additions and 88 deletions.
Expand Up @@ -31,15 +31,12 @@
import cloud.orbit.actors.runtime.BasicRuntime;
import cloud.orbit.actors.runtime.Message;

import java.io.InputStream;
import java.io.OutputStream;

/**
* Extension interface to define how actor messages are serialized.
*/
public interface MessageSerializer extends ActorExtension
{
Message deserializeMessage(final BasicRuntime runtime, final InputStream inputStream) throws Exception;
Message deserializeMessage(final BasicRuntime runtime, final byte[] payload) throws Exception;

void serializeMessage(final BasicRuntime runtime, OutputStream out, Message message) throws Exception;
byte[] serializeMessage(final BasicRuntime runtime, Message message) throws Exception;
}
Expand Up @@ -101,12 +101,12 @@ public JsonMessageSerializer()


@Override
public Message deserializeMessage(final BasicRuntime runtime, final InputStream inputStream) throws Exception
public Message deserializeMessage(final BasicRuntime runtime, final byte[] payload) throws Exception
{
ensureInit(runtime);
try
{
final Message message = mapper.readValue(inputStream, Message.class);
final Message message = mapper.readValue(payload, Message.class);

// decode payload parameters according to the interface/method
if (message.getPayload() != null &&
Expand Down Expand Up @@ -144,7 +144,7 @@ private void ensureInit(final BasicRuntime newRuntime)
}

@Override
public void serializeMessage(final BasicRuntime runtime, final OutputStream out, final Message message) throws Exception
public byte[] serializeMessage(final BasicRuntime runtime, final Message message) throws Exception
{
ensureInit(runtime);
if (message.getPayload() instanceof Throwable && message.getMessageType() == MessageDefinitions.RESPONSE_ERROR)
Expand All @@ -161,7 +161,7 @@ public void serializeMessage(final BasicRuntime runtime, final OutputStream out,
{
message.setHeaders(null);
}
mapper.writeValue(out, message);
return mapper.writeValueAsBytes(message);
}

private Object[] castArgs(final Type[] genericParameterTypes, final Object payload)
Expand Down
Expand Up @@ -152,23 +152,15 @@ public void testMessage() throws Exception
{
Message message = new Message();
JsonMessageSerializer jsonMessageSerializer = new JsonMessageSerializer();
ByteArrayOutputStream out = new ByteArrayOutputStream();

out.reset();
message.setPayload(Arrays.asList(1, 2, 3));
jsonMessageSerializer.serializeMessage(null, out, message);
System.out.println(new String(out.toByteArray()));
System.out.println(new String(jsonMessageSerializer.serializeMessage(null, message)));

out.reset();
message.setPayload(new Object[]{ 1, 2, 3 });
jsonMessageSerializer.serializeMessage(null, out, message);
System.out.println(new String(out.toByteArray()));
System.out.println(new String(jsonMessageSerializer.serializeMessage(null, message)));

out.reset();
message.setPayload(new ArrayList(Arrays.asList(new Object[]{ 1, 2, 3 })));
jsonMessageSerializer.serializeMessage(null, out, message);
System.out.println(new String(out.toByteArray()));

System.out.println(new String(jsonMessageSerializer.serializeMessage(null, message)));
}
}

Expand Down
38 changes: 30 additions & 8 deletions actors/runtime/src/main/java/cloud/orbit/actors/Stage.java
Expand Up @@ -207,6 +207,8 @@ protected <T> LocalObjectEntry createLocalObjectEntry(final RemoteReference<T> r

private boolean enableShutdownHook = true;

private boolean enableMessageLoopback = true;

private volatile NodeCapabilities.NodeState state;

private ClusterPeer clusterPeer;
Expand Down Expand Up @@ -277,6 +279,7 @@ public static class Builder
private Long deactivationTimeoutMillis;
private Integer concurrentDeactivations;
private Boolean enableShutdownHook = null;
private Boolean enableMessageLoopback;

private Timer timer;

Expand All @@ -286,6 +289,12 @@ public Builder clock(Clock clock)
return this;
}

public Builder enableMessageLoopback(Boolean enableMessageLoopback)
{
this.enableMessageLoopback = enableMessageLoopback;
return this;
}

public Builder executionPool(ExecutorService executionPool)
{
this.executionPool = executionPool;
Expand Down Expand Up @@ -479,6 +488,7 @@ public Stage build()
if(concurrentDeactivations != null) stage.setConcurrentDeactivations(concurrentDeactivations);
if(broadcastActorDeactivations != null) stage.setBroadcastActorDeactivations(broadcastActorDeactivations);
if(enableShutdownHook != null) stage.setEnableShutdownHook(enableShutdownHook);
if(enableMessageLoopback != null) stage.setEnableMessageLoopback(enableMessageLoopback);
return stage;
}

Expand Down Expand Up @@ -660,10 +670,16 @@ public void setBroadcastActorDeactivations(boolean broadcastActorDeactivation)
this.broadcastActorDeactivations = broadcastActorDeactivation;
}

public void setEnableShutdownHook(boolean enableShutdownHook) {
public void setEnableShutdownHook(boolean enableShutdownHook)
{
this.enableShutdownHook = enableShutdownHook;
}

public void setEnableMessageLoopback(final boolean enableMessageLoopback)
{
this.enableMessageLoopback = enableMessageLoopback;
}

@Override
public Task<?> start()
{
Expand Down Expand Up @@ -811,10 +827,13 @@ public Task<?> start()
// handles invocation messages and request-response matching
pipeline.addLast(DefaultHandlers.MESSAGING, messaging);

final MessageLoopback messageLoopback = new MessageLoopback();
messageLoopback.setCloner(messageLoopbackObjectCloner != null ? messageLoopbackObjectCloner : new KryoSerializer());
messageLoopback.setRuntime(this);
pipeline.addLast(messageLoopback.getName(), messageLoopback);
if (enableMessageLoopback)
{
final MessageLoopback messageLoopback = new MessageLoopback();
messageLoopback.setCloner(messageLoopbackObjectCloner != null ? messageLoopbackObjectCloner : new KryoSerializer());
messageLoopback.setRuntime(this);
pipeline.addLast(messageLoopback.getName(), messageLoopback);
}

// message serializer handler
pipeline.addLast(DefaultHandlers.SERIALIZATION, new SerializationHandler(this, messageSerializer));
Expand Down Expand Up @@ -911,9 +930,12 @@ public void run()

logger.info("Stage started [{}]", runtimeIdentity());

if(enableShutdownHook) {
if(shutdownHook == null) {
shutdownHook = new Thread(() -> {
if (enableShutdownHook)
{
if (shutdownHook == null)
{
shutdownHook = new Thread(() ->
{
synchronized (shutdownLock)
{
if (state == NodeCapabilities.NodeState.RUNNING)
Expand Down
Expand Up @@ -48,11 +48,8 @@
import cloud.orbit.tuples.Pair;
import cloud.orbit.util.AnnotationCache;

import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Method;
import java.math.BigInteger;
import java.security.DigestOutputStream;
import java.security.MessageDigest;
import java.time.Clock;
import java.util.concurrent.Executor;
Expand All @@ -69,14 +66,6 @@ public class DefaultResponseCachingExtension
private final AnnotationCache<CacheResponse> cacheResponseCache = new AnnotationCache<>(CacheResponse.class);
private final MessageDigestFactory messageDigest = new MessageDigestFactory("SHA-256");

private static class NullOutputStream extends OutputStream
{
@Override
public void write(int b) throws IOException
{
}
}

private ExecutionObjectCloner objectCloner;

/**
Expand Down Expand Up @@ -216,10 +205,8 @@ private String generateParameterHash(Object[] params)
try
{
final MessageDigest md = messageDigest.newDigest();
final DigestOutputStream d = new DigestOutputStream(new NullOutputStream(), md);
messageSerializer.serializeMessage(runtime, d, new Message().withPayload(params));
d.close();
return String.format("%032X", new BigInteger(1, md.digest()));
final byte[] hashValue = md.digest(messageSerializer.serializeMessage(runtime, new Message().withPayload(params)));
return String.format("%032X", new BigInteger(1, hashValue));
}
catch (Exception e)
{
Expand Down
Expand Up @@ -32,6 +32,8 @@
import cloud.orbit.actors.cluster.NodeAddressImpl;
import cloud.orbit.actors.extensions.MessageSerializer;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInput;
Expand All @@ -46,9 +48,9 @@
public class JavaMessageSerializer implements MessageSerializer
{
@Override
public Message deserializeMessage(final BasicRuntime runtime, final InputStream inputStream) throws Exception
public Message deserializeMessage(final BasicRuntime runtime, final byte[] payload) throws Exception
{
final ObjectInput in = createObjectInput(runtime, inputStream);
final ObjectInput in = createObjectInput(runtime, new ByteArrayInputStream(payload));
final Message message = new Message();
message.setMessageType(in.readByte());
message.setMessageId(in.readInt());
Expand All @@ -65,9 +67,10 @@ public Message deserializeMessage(final BasicRuntime runtime, final InputStream
}

@Override
public void serializeMessage(final BasicRuntime runtime, OutputStream outputStream, Message message) throws Exception
public byte[] serializeMessage(final BasicRuntime runtime, Message message) throws Exception
{
final ObjectOutput out = createObjectOutput(runtime, outputStream);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final ObjectOutput out = createObjectOutput(runtime, baos);
out.writeByte(message.getMessageType());
out.writeInt(message.getMessageId());
if (message.getReferenceAddress() != null)
Expand All @@ -87,6 +90,7 @@ public void serializeMessage(final BasicRuntime runtime, OutputStream outputStre
out.writeObject(message.getHeaders());
out.writeObject(message.getFromNode());
out.writeObject(message.getPayload());
return baos.toByteArray();
}

protected ObjectOutput createObjectOutput(final BasicRuntime runtime, final OutputStream outputStream) throws IOException
Expand Down
@@ -0,0 +1,73 @@
/*
Copyright (C) 2018 Electronic Arts Inc. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of Electronic Arts, Inc. ("EA") nor the names of
its contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY ELECTRONIC ARTS AND ITS CONTRIBUTORS "AS IS" AND ANY
EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL ELECTRONIC ARTS OR ITS CONTRIBUTORS BE LIABLE FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package cloud.orbit.actors.runtime;

import java.lang.ref.SoftReference;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Function;

abstract class KryoIOPool<T>
{

private final Queue<SoftReference<T>> queue = new ConcurrentLinkedQueue<>();

private T borrow(final int bufferSize)
{
T element;
SoftReference<T> reference;
while ((reference = queue.poll()) != null)
{
if ((element = reference.get()) != null)
{
return element;
}
}
return create(bufferSize);
}

protected abstract T create(final int bufferSize);

protected abstract boolean recycle(final T element);

<R> R run(final Function<T, R> function, final int bufferSize)
{
final T element = borrow(bufferSize);
try
{
return function.apply(element);
}
finally
{
if (recycle(element))
{
queue.offer(new SoftReference<>(element));
}
}
}
}
@@ -0,0 +1,54 @@
/*
Copyright (C) 2018 Electronic Arts Inc. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of Electronic Arts, Inc. ("EA") nor the names of
its contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY ELECTRONIC ARTS AND ITS CONTRIBUTORS "AS IS" AND ANY
EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL ELECTRONIC ARTS OR ITS CONTRIBUTORS BE LIABLE FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package cloud.orbit.actors.runtime;

import com.esotericsoftware.kryo.io.Output;

class KryoOutputPool extends KryoIOPool<Output>
{

private static final int MAX_BUFFER_SIZE = 768 * 1024;
static final int MAX_POOLED_BUFFER_SIZE = 512 * 1024;

@Override
protected Output create(int bufferSize)
{
return new Output(bufferSize, MAX_BUFFER_SIZE);
}

@Override
protected boolean recycle(Output output)
{
if (output.getBuffer().length < MAX_POOLED_BUFFER_SIZE)
{
output.clear();
return true;
}
return false; // discard
}
}

0 comments on commit 55a4a5d

Please sign in to comment.