Skip to content

Commit

Permalink
Reference replacement WIP.
Browse files Browse the repository at this point in the history
  • Loading branch information
johnou committed Apr 28, 2015
1 parent c43b35f commit c6de990
Show file tree
Hide file tree
Showing 9 changed files with 212 additions and 7 deletions.
Expand Up @@ -28,6 +28,8 @@

package com.ea.orbit.actors.providers.memcached;

import com.ea.orbit.actors.runtime.ActorReference;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand All @@ -53,7 +55,28 @@ public static Object getSerializedObject(byte[] byteArray) throws IOException, C
ByteArrayInputStream str = new ByteArrayInputStream(byteArray);
Object result = null;

ObjectInputStream ois = new ObjectInputStream(str);
ObjectInputStream ois = new ObjectInputStream(str) {

{
enableResolveObject(true);
}
@Override
protected Object resolveObject(final Object obj) throws IOException
{
if (obj instanceof ReferenceReplacement)
{
ReferenceReplacement replacement = (ReferenceReplacement) obj;
// TODO: help?
if (replacement.address != null)
{
//return execution.getRemoteObserverReference(replacement.address, (Class)replacement.interfaceClass, replacement.id);
}
//return execution.getReference((Class)replacement.interfaceClass, replacement.id);

}
return super.resolveObject(obj);
}
};
result = ois.readObject();
return result;
}
Expand All @@ -66,16 +89,41 @@ public static Object getSerializedObject(byte[] byteArray) throws IOException, C
*/
public static byte[] serializeObject(Object object)
{
ByteArrayOutputStream str = new ByteArrayOutputStream();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try
{
ObjectOutputStream oos = new ObjectOutputStream(str);
ObjectOutputStream oos = new ObjectOutputStream(baos)
{
{
enableReplaceObject(true);
}

@SuppressWarnings("rawtypes")
@Override
protected Object replaceObject(final Object obj) throws IOException
{
if (!(obj instanceof ActorReference))
{
return super.replaceObject(obj);
}
return getReferenceReplacement((ActorReference) obj);
}
};
oos.writeObject(object);
}
catch (IOException impossible)
{
throw new RuntimeException("Failed to serialize object " + object + " to byte array", impossible);
}
return str.toByteArray();
return baos.toByteArray();
}

private static ReferenceReplacement getReferenceReplacement(final ActorReference reference)
{
ReferenceReplacement replacement = new ReferenceReplacement();
replacement.address = ActorReference.getAddress(reference);
replacement.interfaceClass = ActorReference.getInterfaceClass(reference);
replacement.id = ActorReference.getId(reference);
return replacement;
}
}
Expand Up @@ -57,9 +57,9 @@ public Object get(String key)
return deserialize(key, getMemcachedValueAsBytes(key));
}

public void set(String key, Object value)
public boolean set(String key, Object value)
{
memCachedClient.set(key, serialize(value));
return memCachedClient.set(key, serialize(value));
}

public byte[] serialize(Object value)
Expand Down
@@ -0,0 +1,45 @@
/*
Copyright (C) 2015 Electronic Arts Inc. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of Electronic Arts, Inc. ("EA") nor the names of
its contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY ELECTRONIC ARTS AND ITS CONTRIBUTORS "AS IS" AND ANY
EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL ELECTRONIC ARTS OR ITS CONTRIBUTORS BE LIABLE FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package com.ea.orbit.actors.providers.memcached;

import com.ea.orbit.actors.cluster.INodeAddress;

import java.io.Serializable;

/**
* @author Johno Crawford (johno@sulake.com)
*/
public class ReferenceReplacement implements Serializable
{
private static final long serialVersionUID = 1L;

Class<?> interfaceClass;
Object id;
INodeAddress address;
}
Expand Up @@ -34,6 +34,12 @@
public class HelloActor extends OrbitActor<HelloState> implements IHelloActor
{

@Override
public Task<?> activateAsync()
{
return super.activateAsync().thenRun(state().observers::cleanup);
}

@Override
public Task<String> sayHello(String name)
{
Expand All @@ -42,6 +48,14 @@ public Task<String> sayHello(String name)
return Task.fromValue("Hello " + name);
}

@Override
public Task addObserver(final IHelloObserver observer)
{
state().observers.addObserver(observer);
writeState().join();
return Task.done();
}

@Override
public Task<Void> clear()
{
Expand Down
@@ -0,0 +1,36 @@
/*
Copyright (C) 2015 Electronic Arts Inc. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of Electronic Arts, Inc. ("EA") nor the names of
its contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY ELECTRONIC ARTS AND ITS CONTRIBUTORS "AS IS" AND ANY
EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL ELECTRONIC ARTS OR ITS CONTRIBUTORS BE LIABLE FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package com.ea.orbit.actors.memcached.test;

/**
* @author Johno Crawford (johno@sulake.com)
*/
public class HelloObserver implements IHelloObserver
{
}
@@ -1,14 +1,21 @@
package com.ea.orbit.actors.memcached.test;

import com.ea.orbit.actors.ObserverManager;
import com.ea.orbit.actors.test.IStorageTestState;

import java.io.Serializable;

public class HelloState implements IStorageTestState, Serializable
{
ObserverManager<IHelloObserver> observers = new ObserverManager<>();

public String lastName;

public ObserverManager<IHelloObserver> getObservers()
{
return observers;
}

@Override
public String lastName()
{
Expand Down
Expand Up @@ -30,8 +30,9 @@

import com.ea.orbit.actors.IActor;
import com.ea.orbit.actors.test.IStorageTestActor;
import com.ea.orbit.concurrent.Task;

public interface IHelloActor extends IActor, IStorageTestActor
{
//basic tasks from IStorageTestActor interface
Task addObserver(IHelloObserver observer);
}
@@ -0,0 +1,38 @@
/*
Copyright (C) 2015 Electronic Arts Inc. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of Electronic Arts, Inc. ("EA") nor the names of
its contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY ELECTRONIC ARTS AND ITS CONTRIBUTORS "AS IS" AND ANY
EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL ELECTRONIC ARTS OR ITS CONTRIBUTORS BE LIABLE FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package com.ea.orbit.actors.memcached.test;

import com.ea.orbit.actors.IActorObserver;

/**
* @author Johno Crawford (johno@sulake.com)
*/
public interface IHelloObserver extends IActorObserver
{
}
Expand Up @@ -28,6 +28,8 @@

package com.ea.orbit.actors.memcached.test;

import com.ea.orbit.actors.IActor;
import com.ea.orbit.actors.OrbitStage;
import com.ea.orbit.actors.providers.IOrbitProvider;
import com.ea.orbit.actors.providers.memcached.MemCachedClientFactory;
import com.ea.orbit.actors.providers.memcached.MemCachedStorageHelper;
Expand All @@ -36,16 +38,30 @@
import com.ea.orbit.actors.test.IStorageTestState;
import com.ea.orbit.actors.test.StorageBaseTest;

import org.junit.Test;

import com.whalin.MemCached.MemCachedClient;

import static com.ea.orbit.actors.providers.memcached.MemCachedStorageHelper.KEY_SEPARATOR;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

public class MemCachedPersistenceTest extends StorageBaseTest
{

private MemCachedClient memCachedClient;
private MemCachedStorageHelper memCachedStorageHelper;

@Test
public void testObserverSerialization() throws Exception
{
createStage();
IHelloActor helloActor = IActor.getReference(IHelloActor.class, "1");
helloActor.addObserver(new HelloObserver()).join();
HelloState helloState = (HelloState) readState("1");
helloState.observers.cleanup();
}

@Override
public Class<? extends IStorageTestActor> getActorInterfaceClass()
{
Expand Down

0 comments on commit c6de990

Please sign in to comment.