Skip to content

Commit

Permalink
Making RedisStorageProvider thread safe.
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielSperry committed Apr 19, 2015
1 parent 8dc8fef commit bdee4b1
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 25 deletions.
Expand Up @@ -34,21 +34,24 @@
import com.ea.orbit.actors.runtime.ReferenceFactory;
import com.ea.orbit.concurrent.Task;
import com.ea.orbit.exception.UncheckedException;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class RedisStorageProvider implements IStorageProvider {

private Jedis redis;
private JedisPool pool;
private ObjectMapper mapper;

private String host = "localhost";
private int port = 6379;
private String databaseName;
private int connectionTimeout = 10000;
private int soTimeout = 10000;
private int timeout = 10000;

@Override
public Task<Void> start() {
Expand All @@ -59,7 +62,7 @@ public Task<Void> start() {
.withGetterVisibility(JsonAutoDetect.Visibility.NONE)
.withSetterVisibility(JsonAutoDetect.Visibility.NONE)
.withCreatorVisibility(JsonAutoDetect.Visibility.NONE));
redis = new Jedis(host, port, connectionTimeout, soTimeout);
pool = new JedisPool(new JedisPoolConfig(), host, port, timeout);
return Task.done();
}

Expand All @@ -70,26 +73,35 @@ private String asKey(final ActorReference reference) {
}

@Override
public Task<Void> clearState(final ActorReference reference, final Object state) {
redis.del(asKey(reference));
public Task<Void> clearState(final ActorReference reference, final Object state)
{
try (Jedis redis = pool.getResource())
{
redis.del(asKey(reference));
}
return Task.done();
}

@Override
public Task<Void> stop() {
redis.close();
public Task<Void> stop()
{
pool.close();
return Task.done();
}

@Override
@SuppressWarnings("unchecked")
public Task<Boolean> readState(final ActorReference reference, final Object state) {
String data = redis.get(asKey(reference));
String data;
try (Jedis redis = pool.getResource())
{
data = redis.get(asKey(reference));
}
if (data != null) {
try {
mapper.readerForUpdating(state).readValue(data);
} catch (Exception e) {
throw new UncheckedException(e);
throw new UncheckedException("Error parsing redis response: " + data, e);
}
}
return Task.fromValue(true);
Expand All @@ -104,7 +116,10 @@ public Task<Void> writeState(final ActorReference reference, final Object state)
} catch (JsonProcessingException e) {
throw new UncheckedException(e);
}
redis.set(asKey(reference), data);
try (Jedis redis = pool.getResource())
{
redis.set(asKey(reference), data);
}
return Task.done();
}

Expand Down Expand Up @@ -132,23 +147,14 @@ public void setDatabaseName(String databaseName) {
this.databaseName = databaseName;
}

public int getConnectionTimeout()
public int getTimeout()
{
return connectionTimeout;
return timeout;
}

public void setConnectionTimeout(final int connectionTimeout)
public void setTimeout(final int timeout)
{
this.connectionTimeout = connectionTimeout;
this.timeout = timeout;
}

public int getSoTimeout()
{
return soTimeout;
}

public void setSoTimeout(final int soTimeout)
{
this.soTimeout = soTimeout;
}
}
Expand Up @@ -59,7 +59,7 @@ public Class<? extends IStorageTestActor> getActorInterfaceClass()
public IOrbitProvider getStorageProvider()
{
final RedisStorageProvider storageProvider = new RedisStorageProvider();
storageProvider.setConnectionTimeout(60000);
storageProvider.setTimeout(60000);
storageProvider.setDatabaseName(databaseName);
return storageProvider;
}
Expand Down

0 comments on commit bdee4b1

Please sign in to comment.