Skip to content

Commit

Permalink
JSON serialization for memcached provider.
Browse files Browse the repository at this point in the history
  • Loading branch information
johnou committed Apr 29, 2015
1 parent 213e3d5 commit 9274a87
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 420 deletions.
6 changes: 3 additions & 3 deletions actors/providers/memcached/pom.xml
Expand Up @@ -60,9 +60,9 @@ THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>net.sf.dozer</groupId> <groupId>com.ea.orbit</groupId>
<artifactId>dozer</artifactId> <artifactId>orbit-actors-json</artifactId>
<version>5.5.1</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.whalin</groupId> <groupId>com.whalin</groupId>
Expand Down

This file was deleted.

This file was deleted.

Expand Up @@ -29,15 +29,20 @@
package com.ea.orbit.actors.providers.memcached; package com.ea.orbit.actors.providers.memcached;


import com.ea.orbit.actors.providers.IStorageProvider; import com.ea.orbit.actors.providers.IStorageProvider;
import com.ea.orbit.actors.providers.json.ActorReferenceModule;
import com.ea.orbit.actors.runtime.ActorReference; import com.ea.orbit.actors.runtime.ActorReference;
import com.ea.orbit.actors.runtime.ReferenceFactory;
import com.ea.orbit.concurrent.Task; import com.ea.orbit.concurrent.Task;
import com.ea.orbit.exception.UncheckedException;


import org.dozer.DozerBeanMapper; import org.slf4j.Logger;
import org.dozer.Mapper; import org.slf4j.LoggerFactory;


import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.whalin.MemCached.MemCachedClient; import com.whalin.MemCached.MemCachedClient;


import static com.ea.orbit.actors.providers.memcached.MemCachedStorageHelper.*; import java.io.IOException;


/** /**
* {@link MemCachedStorageProvider} provides Memcached support for storing actor states. * {@link MemCachedStorageProvider} provides Memcached support for storing actor states.
Expand All @@ -47,21 +52,31 @@
public class MemCachedStorageProvider implements IStorageProvider public class MemCachedStorageProvider implements IStorageProvider
{ {


private Mapper mapper; private static final Logger logger = LoggerFactory.getLogger(MemCachedStorageProvider.class);

static final String KEY_SEPARATOR = "|";

private ObjectMapper mapper;

private MemCachedClient memCachedClient; private MemCachedClient memCachedClient;
private MemCachedStorageHelper memCachedStorageHelper;


private boolean useShortKeys = false; private boolean useShortKeys = false;


@Override @Override
public Task<Void> start() public Task<Void> start()
{ {
mapper = new DozerBeanMapper(); mapper = new ObjectMapper();
mapper.registerModule(new ActorReferenceModule(new ReferenceFactory()));
mapper.setVisibilityChecker(mapper.getSerializationConfig().getDefaultVisibilityChecker()
.withFieldVisibility(JsonAutoDetect.Visibility.ANY)
.withGetterVisibility(JsonAutoDetect.Visibility.NONE)
.withSetterVisibility(JsonAutoDetect.Visibility.NONE)
.withCreatorVisibility(JsonAutoDetect.Visibility.NONE));

if (memCachedClient == null) if (memCachedClient == null)
{ {
memCachedClient = MemCachedClientFactory.getClient(); memCachedClient = MemCachedClientFactory.getClient();
} }
memCachedStorageHelper = new MemCachedStorageHelper(memCachedClient);
return Task.done(); return Task.done();
} }


Expand All @@ -81,11 +96,21 @@ public Task<Void> stop()
@Override @Override
public Task<Boolean> readState(final ActorReference<?> reference, final Object state) public Task<Boolean> readState(final ActorReference<?> reference, final Object state)
{ {
Object value = memCachedStorageHelper.get(asKey(reference)); try
if (value != null) {
Object newState = memCachedClient.get(asKey(reference));
if (newState != null)
{
mapper.readerForUpdating(state).readValue(String.valueOf(newState));
}
return Task.fromValue(newState != null);
}
catch (RuntimeException | IOException e)
{ {
mapper.map(value, state); logger.warn("Exception during cache value deserialization for key: " + asKey(reference) + " - removing entry");
return Task.fromValue(true);
// Remove the entry upon error deserializing its value..
memCachedClient.delete(asKey(reference));
} }
return Task.fromValue(false); return Task.fromValue(false);
} }
Expand All @@ -94,8 +119,16 @@ public Task<Boolean> readState(final ActorReference<?> reference, final Object s
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public Task<Void> writeState(final ActorReference reference, final Object state) public Task<Void> writeState(final ActorReference reference, final Object state)
{ {
memCachedStorageHelper.set(asKey(reference), state); try
return Task.done(); {
String serializedState = mapper.writeValueAsString(state);
memCachedClient.set(asKey(reference), serializedState);
return Task.done();
}
catch (RuntimeException | IOException e)
{
throw new UncheckedException(e);
}
} }


private String asKey(final ActorReference reference) private String asKey(final ActorReference reference)
Expand Down

This file was deleted.

0 comments on commit 9274a87

Please sign in to comment.