Skip to content

Commit

Permalink
Memcached storage provider.
Browse files Browse the repository at this point in the history
  • Loading branch information
johnou committed Apr 27, 2015
1 parent 44c58ac commit 22826a3
Show file tree
Hide file tree
Showing 13 changed files with 161 additions and 219 deletions.
Expand Up @@ -28,12 +28,9 @@


package com.ea.orbit.actors.providers; package com.ea.orbit.actors.providers;



import com.ea.orbit.actors.runtime.ActorReference; import com.ea.orbit.actors.runtime.ActorReference;
import com.ea.orbit.concurrent.Task; import com.ea.orbit.concurrent.Task;


import java.util.concurrent.atomic.AtomicReference;

/** /**
* Storage providers are used by the orbit actors framework to load and store actor states. * Storage providers are used by the orbit actors framework to load and store actor states.
*/ */
Expand All @@ -50,10 +47,10 @@ public interface IStorageProvider extends IOrbitProvider
/** /**
* Asynchronously reads an actors state. * Asynchronously reads an actors state.
* @param reference an reference to the actor (contains the interface name and actor key) * @param reference an reference to the actor (contains the interface name and actor key)
* @param stateReference reference to state object, modified by the storage provider implementation * @param state the state object, modified by the storage provider implementation
* @return a completion promise * @return a boolean completion promise of whether or not the state was modified
*/ */
Task<Void> readState(ActorReference<?> reference, AtomicReference<Object> stateReference); Task<Boolean> readState(ActorReference<?> reference, Object state);


/** /**
* Asynchronously writes an actors state. * Asynchronously writes an actors state.
Expand Down
Expand Up @@ -41,7 +41,6 @@
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;


/** /**
* Base class to all actor implementations. * Base class to all actor implementations.
Expand Down Expand Up @@ -121,9 +120,9 @@ protected Task<Void> writeState()
* *
* @return a completion promise * @return a completion promise
*/ */
protected Task<Void> readState() protected Task<Boolean> readState()
{ {
return stateProvider.readState(reference, new AtomicReference<>(state)); return stateProvider.readState(reference, state);
} }


/** /**
Expand Down
Expand Up @@ -42,8 +42,6 @@
import javax.persistence.Persistence; import javax.persistence.Persistence;
import javax.persistence.Query; import javax.persistence.Query;


import java.util.concurrent.atomic.AtomicReference;

public class JpaStorageProvider implements IStorageProvider public class JpaStorageProvider implements IStorageProvider
{ {


Expand Down Expand Up @@ -80,33 +78,34 @@ public synchronized Task<Void> clearState(final ActorReference<?> reference, fin
} }


@Override @Override
public synchronized Task<Void> readState(final ActorReference<?> reference, final AtomicReference<Object> stateReference) public synchronized Task<Boolean> readState(final ActorReference<?> reference, final Object state)
{ {
try try
{ {
Object objectState = stateReference.get();
String stateId = getIdentity(reference); String stateId = getIdentity(reference);
EntityManager em = emf.createEntityManager(); EntityManager em = emf.createEntityManager();
Query query = em.createQuery("select s from " + objectState.getClass().getSimpleName() + " s where s.stateId=:stateId"); Query query = em.createQuery("select s from " + state.getClass().getSimpleName() + " s where s.stateId=:stateId");
query.setParameter("stateId", stateId); query.setParameter("stateId", stateId);
Object newState; Object newState;
try try
{ {
newState = query.getSingleResult(); newState = query.getSingleResult();
mapper.readerForUpdating(state).readValue(mapper.writeValueAsString(newState));
return Task.fromValue(true);
} }
catch (NoResultException e) catch (NoResultException ignore)
{ {
newState = objectState.getClass().newInstance();
} }
mapper.readerForUpdating(objectState).readValue(mapper.writeValueAsString(newState)); finally
em.close(); {
return Task.done(); em.close();
}
return Task.fromValue(false);
} }
catch (Exception e) catch (Exception e)
{ {
throw new UncheckedException(e); throw new UncheckedException(e);
} }

} }


@Override @Override
Expand Down
4 changes: 0 additions & 4 deletions actors/providers/memcached/README.md

This file was deleted.

6 changes: 3 additions & 3 deletions actors/providers/memcached/pom.xml
Expand Up @@ -60,9 +60,9 @@ THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.ea.orbit</groupId> <groupId>net.sf.dozer</groupId>
<artifactId>orbit-actors-json</artifactId> <artifactId>dozer</artifactId>
<version>${project.version}</version> <version>5.5.1</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.whalin</groupId> <groupId>com.whalin</groupId>
Expand Down
Expand Up @@ -40,7 +40,7 @@ public abstract class MemCachedClientFactory
{ {
public static MemCachedClient getClient() public static MemCachedClient getClient()
{ {
MemCachedClient memCachedClient = new MemCachedClient(); MemCachedClient memCachedClient = new MemCachedClient(true);


SchoonerSockIOPool pool = SchoonerSockIOPool.getInstance(); SchoonerSockIOPool pool = SchoonerSockIOPool.getInstance();
pool.setServers(new String[]{ "localhost:11211" }); pool.setServers(new String[]{ "localhost:11211" });
Expand Down
Expand Up @@ -29,113 +29,87 @@
package com.ea.orbit.actors.providers.memcached; package com.ea.orbit.actors.providers.memcached;


import com.ea.orbit.actors.providers.IStorageProvider; import com.ea.orbit.actors.providers.IStorageProvider;
import com.ea.orbit.actors.providers.json.ActorReferenceModule;
import com.ea.orbit.actors.runtime.ActorReference; import com.ea.orbit.actors.runtime.ActorReference;
import com.ea.orbit.actors.runtime.ReferenceFactory;
import com.ea.orbit.concurrent.Task; import com.ea.orbit.concurrent.Task;
import com.ea.orbit.exception.UncheckedException;


import com.fasterxml.jackson.annotation.JsonAutoDetect; import org.dozer.DozerBeanMapper;
import com.fasterxml.jackson.core.JsonProcessingException; import org.dozer.Mapper;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.whalin.MemCached.MemCachedClient;


import java.util.concurrent.atomic.AtomicReference; import com.whalin.MemCached.MemCachedClient;


/** /**
* {@link MemCachedStorageProvider} provides Memcached support for storing actor states. * {@link MemCachedStorageProvider} provides Memcached support for storing actor states.
* *
* @author Johno Crawford (johno@sulake.com) * @author Johno Crawford (johno@sulake.com)
*/ */
public class MemCachedStorageProvider implements IStorageProvider { public class MemCachedStorageProvider implements IStorageProvider

{
private MemCachedClient memCachedClient;
private ObjectMapper mapper; public static final String KEY_SEPARATOR = "|";


private boolean useShortKeys = false; private Mapper mapper;
private boolean serializeStateAsJson = false; private MemCachedClient memCachedClient;


@Override private boolean useShortKeys = false;
public Task<Void> start() {
mapper = new ObjectMapper(); @Override
mapper.registerModule(new ActorReferenceModule(new ReferenceFactory())); public Task<Void> start()
mapper.setVisibilityChecker(mapper.getSerializationConfig().getDefaultVisibilityChecker() {
.withFieldVisibility(JsonAutoDetect.Visibility.ANY) mapper = new DozerBeanMapper();
.withGetterVisibility(JsonAutoDetect.Visibility.NONE) if (memCachedClient == null)
.withSetterVisibility(JsonAutoDetect.Visibility.NONE) {
.withCreatorVisibility(JsonAutoDetect.Visibility.NONE)); memCachedClient = MemCachedClientFactory.getClient();
if (memCachedClient == null) { }
memCachedClient = MemCachedClientFactory.getClient(); return Task.done();
} }
return Task.done();
} private String asKey(final ActorReference reference)

{
private String asKey(final ActorReference reference) { String clazzName = useShortKeys ? ActorReference.getInterfaceClass(reference).getSimpleName() : ActorReference.getInterfaceClass(reference).getName();
String clazzName = useShortKeys ? ActorReference.getInterfaceClass(reference).getSimpleName() : ActorReference.getInterfaceClass(reference).getName(); return clazzName + KEY_SEPARATOR + String.valueOf(ActorReference.getId(reference));
return clazzName + "|" + String.valueOf(ActorReference.getId(reference)); }
}

@Override
@Override public Task<Void> clearState(final ActorReference reference, final Object state)
public Task<Void> clearState(final ActorReference reference, final Object state) {
memCachedClient.delete(asKey(reference));
return Task.done();
}

@Override
public Task<Void> stop()
{
return Task.done();
}

@Override
public Task<Boolean> readState(final ActorReference<?> reference, final Object state)
{
Object data = memCachedClient.get(asKey(reference));
if (data != null)
{
mapper.map(data, state);
return Task.fromValue(true);
}
return Task.fromValue(false);
}

@Override
@SuppressWarnings("unchecked")
public Task<Void> writeState(final ActorReference reference, final Object state)
{
memCachedClient.set(asKey(reference), state);
return Task.done();
}

public void setUseShortKeys(final boolean useShortKeys)
{ {
memCachedClient.delete(asKey(reference)); this.useShortKeys = useShortKeys;
return Task.done(); }
}


@Override public void setMemCachedClient(final MemCachedClient memCachedClient)
public Task<Void> stop()
{ {
return Task.done(); this.memCachedClient = memCachedClient;
} }

@Override
public Task<Void> readState(final ActorReference<?> reference, final AtomicReference<Object> stateReference)
{
Object data = memCachedClient.get(asKey(reference));
if (data != null) {
if (serializeStateAsJson) {
try {
mapper.readerForUpdating(stateReference.get()).readValue(data.toString());
} catch (Exception e) {
throw new UncheckedException("Error parsing memcached response: " + data, e);
}
} else {
stateReference.set(data);
}
}
return Task.done();
}

@Override
@SuppressWarnings("unchecked")
public Task<Void> writeState(final ActorReference reference, final Object state) {
if (serializeStateAsJson) {
String data;
try {
data = mapper.writeValueAsString(state);
} catch (JsonProcessingException e) {
throw new UncheckedException(e);
}
memCachedClient.set(asKey(reference), data);
} else {
memCachedClient.set(asKey(reference), state);
}
return Task.done();
}

public void setUseShortKeys(final boolean useShortKeys)
{
this.useShortKeys = useShortKeys;
}

public void setSerializeStateAsJson(final boolean serializeStateAsJson)
{
this.serializeStateAsJson = serializeStateAsJson;
}

public void setMemCachedClient(final MemCachedClient memCachedClient)
{
this.memCachedClient = memCachedClient;
}


} }
Expand Up @@ -2,7 +2,9 @@


import com.ea.orbit.actors.test.IStorageTestState; import com.ea.orbit.actors.test.IStorageTestState;


public class HelloState implements IStorageTestState import java.io.Serializable;

public class HelloState implements IStorageTestState, Serializable
{ {


public String lastName; public String lastName;
Expand Down

0 comments on commit 22826a3

Please sign in to comment.