Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
287 changes: 214 additions & 73 deletions src/main/java/org/mybatis/caches/memcached/MemcachedClientWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
import java.io.Serializable;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import net.spy.memcached.CASResponse;
import net.spy.memcached.CASValue;
import net.spy.memcached.MemcachedClient;
import net.spy.memcached.internal.OperationFuture;

import org.apache.ibatis.cache.CacheException;
import org.apache.ibatis.logging.Log;
Expand All @@ -41,6 +45,39 @@ final class MemcachedClientWrapper {

private final MemcachedClient client;

/**
* Used to represent an object retrieved from Memcached along with its CAS information
*
* @author Weisz, Gustavo E.
*/
private class ObjectWithCas {

Object object;
long cas;

ObjectWithCas(Object object, long cas) {
this.setObject(object);
this.setCas(cas);
}

public Object getObject() {
return object;
}

public void setObject(Object object) {
this.object = object;
}

public long getCas() {
return cas;
}

public void setCas(long cas) {
this.cas = cas;
}

}

public MemcachedClientWrapper() {
configuration = MemcachedConfigurationBuilder.getInstance().parseConfiguration();
try {
Expand Down Expand Up @@ -95,46 +132,38 @@ public Object getObject(Object key) {
return ret;
}

/**
* Return the stored group in Memcached identified by the specified key.
*
* @param groupKey the group key.
* @return the group if was previously stored, null otherwise.
*/
@SuppressWarnings("unchecked")
private Set<String> getGroup(String groupKey) {
if (LOG.isDebugEnabled()) {
LOG.debug("Retrieving group with id '"
+ groupKey
+ "'");
}

Object groups = null;
try {
groups = retrieve(groupKey);
} catch (Exception e) {
LOG.error("Impossible to retrieve group '"
+ groupKey
+ "' see nested exceptions", e);
}

if (groups == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Group '"
+ groupKey
+ "' not previously stored");
}
return new HashSet<String>();
}

if (LOG.isDebugEnabled()) {
LOG.debug("retrieved group '"
+ groupKey
+ "' with values "
+ groups);
}
return (Set<String>) groups;
}
/**
* Return the stored group in Memcached identified by the specified key.
*
* @param groupKey
* the group key.
* @return the group if was previously stored, null otherwise.
*/
private ObjectWithCas getGroup(String groupKey) {
if (LOG.isDebugEnabled()) {
LOG.debug("Retrieving group with id '" + groupKey + "'");
}

ObjectWithCas groups = null;
try {
groups = retrieveWithCas(groupKey);
} catch (Exception e) {
LOG.error("Impossible to retrieve group '" + groupKey + "' see nested exceptions", e);
}

if (groups == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Group '" + groupKey + "' not previously stored");
}
return null;
}

if (LOG.isDebugEnabled()) {
LOG.debug("retrieved group '" + groupKey + "' with values " + groups);
}

return groups;
}

/**
*
Expand Down Expand Up @@ -171,7 +200,47 @@ private Object retrieve(final String keyString) {
return retrieved;
}

public void putObject(Object key, Object value, String id) {
/**
* Retrieves an object along with its cas using the given key
*
* @param keyString
* @return
* @throws Exception
*/
private ObjectWithCas retrieveWithCas(final String keyString) {
CASValue<Object> retrieved = null;

if (configuration.isUsingAsyncGet()) {
Future<CASValue<Object>> future;
if (configuration.isCompressionEnabled()) {
future = client.asyncGets(keyString, new CompressorTranscoder());
} else {
future = client.asyncGets(keyString);
}

try {
retrieved = future.get(configuration.getTimeout(), configuration.getTimeUnit());
} catch (Exception e) {
future.cancel(false);
throw new CacheException(e);
}
} else {
if (configuration.isCompressionEnabled()) {
retrieved = client.gets(keyString, new CompressorTranscoder());
} else {
retrieved = client.gets(keyString);
}
}

if (retrieved == null) {
return null;
}

return new ObjectWithCas(retrieved.getValue(), retrieved.getCas());
}

@SuppressWarnings("unchecked")
public void putObject(Object key, Object value, String id) {
String keyString = toKeyString(key);
String groupKey = toKeyString(id);

Expand All @@ -186,18 +255,29 @@ public void putObject(Object key, Object value, String id) {
storeInMemcached(keyString, value);

// add namespace key into memcached
Set<String> group = getGroup(groupKey);
group.add(keyString);
// Optimistic lock approach...
boolean jobDone = false;

if (LOG.isDebugEnabled()) {
LOG.debug("Insert/Updating object ("
+ groupKey
+ ", "
+ group
+ ")");
}
while (!jobDone) {
ObjectWithCas group = getGroup(groupKey);
Set<String> groupValues;

if (group == null || group.getObject() == null) {
groupValues = new HashSet<String>();
groupValues.add(keyString);

storeInMemcached(groupKey, group);
if (LOG.isDebugEnabled()) {
LOG.debug("Insert/Updating object (" + groupKey + ", " + groupValues + ")");
}

jobDone = tryToAdd(groupKey, groupValues);
} else {
groupValues = (Set<String>) group.getObject();
groupValues.add(keyString);

jobDone = storeInMemcached(groupKey, group);
}
}
}

/**
Expand All @@ -221,6 +301,65 @@ private void storeInMemcached(String keyString, Object value) {
}
}

/**
* Tries to update an object value in memcached considering the cas validation
*
* Returns true if the object passed the cas validation and was modified.
*
* @param keyString
* @param value
* @return
*/
private boolean storeInMemcached(String keyString, ObjectWithCas value) {
if (value != null && value.getObject() != null && !Serializable.class.isAssignableFrom(value.getObject().getClass())) {
throw new CacheException("Object of type '" + value.getObject().getClass().getName() + "' that's non-serializable is not supported by Memcached");
}

CASResponse response;

if (configuration.isCompressionEnabled()) {
response = client.cas(keyString, value.getCas(), value.getObject(), new CompressorTranscoder());
} else {
response = client.cas(keyString, value.getCas(), value.getObject());
}

return (response.equals(CASResponse.OBSERVE_MODIFIED) || response.equals(CASResponse.OK));
}

/**
* Tries to store an object identified by a key in Memcached.
*
* Will fail if the object already exists.
*
* @param keyString
* @param value
* @return
*/
private boolean tryToAdd(String keyString, Object value) {
if (value != null && !Serializable.class.isAssignableFrom(value.getClass())) {
throw new CacheException("Object of type '" + value.getClass().getName() + "' that's non-serializable is not supported by Memcached");
}

boolean done;
OperationFuture<Boolean> result;

if (configuration.isCompressionEnabled()) {
result = client.add(keyString, configuration.getExpiration(), value, new CompressorTranscoder());
} else {
result = client.add(keyString, configuration.getExpiration(), value);
}

try {
done = result.get();
} catch (InterruptedException e) {
done = false;
} catch (ExecutionException e) {
done = false;
}

return done;
}

public Object removeObject(Object key) {
String keyString = toKeyString(key);

Expand All @@ -237,34 +376,36 @@ public Object removeObject(Object key) {
return result;
}

public void removeGroup(String id) {
String groupKey = toKeyString(id);
@SuppressWarnings("unchecked")
public void removeGroup(String id) {
String groupKey = toKeyString(id);

Set<String> group = getGroup(groupKey);
ObjectWithCas group = getGroup(groupKey);
Set<String> groupValues;

if (group == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("No need to flush cached entries for group '"
+ id
+ "' because is empty");
}
return;
}
if (group == null || group.getObject() == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("No need to flush cached entries for group '" + id + "' because is empty");
}
return;
}

if (LOG.isDebugEnabled()) {
LOG.debug("Flushing keys: " + group);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Flushing keys: " + group);
}

for (String key : group) {
client.delete(key);
}
groupValues = (Set<String>) group.getObject();

if (LOG.isDebugEnabled()) {
LOG.debug("Flushing group: " + groupKey);
}
for (String key : groupValues) {
client.delete(key);
}

client.delete(groupKey);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Flushing group: " + groupKey);
}

client.delete(groupKey);
}

@Override
protected void finalize() throws Throwable {
Expand Down
Loading