Skip to content

Commit

Permalink
Added the ability to do Versioned puts. Added another test case for t…
Browse files Browse the repository at this point in the history
…he same
  • Loading branch information
Chinmay Soman committed Apr 26, 2013
1 parent 06c4b3e commit 4f097fe
Show file tree
Hide file tree
Showing 8 changed files with 205 additions and 53 deletions.
Expand Up @@ -41,6 +41,7 @@
import org.codehaus.jackson.map.ObjectMapper;

import voldemort.VoldemortException;
import voldemort.coordinator.CoordinatorUtils;
import voldemort.coordinator.VectorClockWrapper;
import voldemort.store.AbstractStore;
import voldemort.utils.ByteArray;
Expand Down Expand Up @@ -70,10 +71,12 @@ public class R2Store extends AbstractStore<ByteArray, byte[], byte[]> {
private static final String POST = "POST";
private static final String DELETE = "DELETE";
private static final String ETAG = "ETag";
public static final String X_VOLD_VECTOR_CLOCK = "X-VOLD-Vector-Clock";
public static final String X_VOLD_REQUEST_TIMEOUT_MS = "X-VOLD-Request-Timeout-ms";
public static final String X_VOLD_INCONSISTENCY_RESOLVER = "X-VOLD-Inconsistency-Resolver";
public static final String CUSTOM_RESOLVING_STRATEGY = "custom";
public static final String DEFAULT_RESOLVING_STRATEGY = "timestamp";

private static final String LAST_MODIFIED = "Last-Modified";
private static final String MULTIPART_CONTENT_TYPE = "multipart/binary";
private final Logger logger = Logger.getLogger(R2Store.class);
Expand Down Expand Up @@ -202,20 +205,24 @@ public void put(ByteArray key, Versioned<byte[]> value, byte[] transform)
RestRequestBuilder rb = new RestRequestBuilder(new URI(this.baseURL + "/" + getName()
+ "/" + base64Key));

// Serialize the Vector clock
String serializedVC = CoordinatorUtils.getSerializedVectorClock((VectorClock) value.getVersion());

// Create a HTTP POST request
// TODO: Create a proper request based on client config
rb.setMethod(POST);
rb.setEntity(outputBytes.toByteArray());
rb.setHeader("Content-Type", "application/json");
rb.setHeader("Content-Length", "" + payload.length);
rb.setHeader(X_VOLD_REQUEST_TIMEOUT_MS, "1000");
rb.setHeader(X_VOLD_INCONSISTENCY_RESOLVER, "custom");
rb.setHeader(X_VOLD_VECTOR_CLOCK, serializedVC);

RestRequest request = rb.build();
Future<RestResponse> f = client.restRequest(request);

// This will block
RestResponse response = f.get();
String eTag = response.getHeader(ETAG);
final ByteString entity = response.getEntity();
if(entity == null) {
logger.error("Empty response !");
Expand Down
Expand Up @@ -19,6 +19,8 @@
import java.util.ArrayList;
import java.util.List;

import voldemort.versioning.Versioned;

public class SampleRESTClient {

public static void main(String[] args) {
Expand All @@ -31,8 +33,18 @@ public static void main(String[] args) {
clientStore.put("a", "Howdy!!!!");
clientStore.put("b", "Partner!!!!");

// Do a sample operation:
System.out.println("Received response : " + clientStore.get("a"));
// Do a sample get operation:
Versioned<String> versionedValue = clientStore.get("a");
System.out.println("Received response : " + versionedValue);

// Do a versioned put operation:
versionedValue.setObject("New Value !!!");
clientStore.put("a", versionedValue);

// Do a get again on the last versioned put operation:
versionedValue = clientStore.get("a");
System.out.println("Received response on the versioned put: " + versionedValue);

List<String> keyList = new ArrayList<String>();
keyList.add("a");
keyList.add("b");
Expand Down
46 changes: 46 additions & 0 deletions src/java/voldemort/coordinator/CoordinatorUtils.java
@@ -0,0 +1,46 @@
package voldemort.coordinator;

import org.codehaus.jackson.map.ObjectMapper;

import voldemort.versioning.VectorClock;

public class CoordinatorUtils {

/**
* Function to serialize the given Vector clock into a string. If something
* goes wrong, it returns an empty string.
*
* @param vc The Vector clock to serialize
* @return The string (JSON) version of the specified Vector clock
*/
public static String getSerializedVectorClock(VectorClock vc) {
VectorClockWrapper vcWrapper = new VectorClockWrapper(vc);
ObjectMapper mapper = new ObjectMapper();
String serializedVC = "";
try {
serializedVC = mapper.writeValueAsString(vcWrapper);
} catch(Exception e) {
e.printStackTrace();
}
return serializedVC;
}

public static VectorClock deserializeVectorClock(String serializedVC) {
VectorClock vc = null;

if(serializedVC == null) {
return null;
}

ObjectMapper mapper = new ObjectMapper();

try {
VectorClockWrapper vcWrapper = mapper.readValue(serializedVC, VectorClockWrapper.class);
vc = new VectorClock(vcWrapper.getVersions(), vcWrapper.getTimestamp());
} catch(Exception e) {
e.printStackTrace();
}

return vc;
}
}
18 changes: 1 addition & 17 deletions src/java/voldemort/coordinator/HttpGetRequestExecutor.java
Expand Up @@ -27,12 +27,7 @@
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.REQUEST_TIMEOUT;
import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;

import java.io.IOException;

import org.apache.log4j.Logger;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.MessageEvent;
Expand Down Expand Up @@ -110,18 +105,7 @@ public void writeResponse(Versioned<byte[]> responseVersioned) {
this.responseContent.writeBytes(value);

VectorClock vc = (VectorClock) responseVersioned.getVersion();
VectorClockWrapper vcWrapper = new VectorClockWrapper(vc);
ObjectMapper mapper = new ObjectMapper();
String eTag = "";
try {
eTag = mapper.writeValueAsString(vcWrapper);
} catch(JsonGenerationException e) {
e.printStackTrace();
} catch(JsonMappingException e) {
e.printStackTrace();
} catch(IOException e) {
e.printStackTrace();
}
String eTag = CoordinatorUtils.getSerializedVectorClock(vc);

if(logger.isDebugEnabled()) {
logger.debug("ETAG : " + eTag);
Expand Down
20 changes: 17 additions & 3 deletions src/java/voldemort/coordinator/HttpPutRequestExecutor.java
Expand Up @@ -17,6 +17,7 @@
package voldemort.coordinator;

import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.ETAG;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.CREATED;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
Expand All @@ -36,6 +37,7 @@
import voldemort.store.stats.Tracked;
import voldemort.utils.ByteArray;
import voldemort.versioning.ObsoleteVersionException;
import voldemort.versioning.VectorClock;

/**
* A Runnable class that uses the specified Fat client to perform a Voldemort
Expand Down Expand Up @@ -88,11 +90,18 @@ public HttpPutRequestExecutor(CompositeVoldemortRequest<ByteArray, byte[]> putRe
this.coordinatorPerfStats = coordinatorPerfStats;
}

public void writeResponse() {
public void writeResponse(VectorClock successfulPutVC) {
// 1. Create the Response object
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, CREATED);

String eTag = CoordinatorUtils.getSerializedVectorClock(successfulPutVC);

if(logger.isDebugEnabled()) {
logger.debug("ETAG : " + eTag);
}

// 2. Set the right headers
response.setHeader(ETAG, eTag);
response.setHeader(CONTENT_LENGTH, 0);

// TODO: return the Version back to the client
Expand All @@ -111,11 +120,16 @@ public void writeResponse() {
public void run() {

try {
this.storeClient.putWithCustomTimeout(putRequestObject);
VectorClock successfulPutVC = null;
if(putRequestObject.getValue() != null) {
successfulPutVC = (VectorClock) this.storeClient.putVersionedWithCustomTimeout(putRequestObject);
} else {
successfulPutVC = (VectorClock) this.storeClient.putWithCustomTimeout(putRequestObject);
}
if(logger.isDebugEnabled()) {
logger.debug("PUT successful !");
}
writeResponse();
writeResponse(successfulPutVC);

} catch(IllegalArgumentException illegalArgsException) {
String errorDescription = "PUT Failed !!! Illegal Arguments : "
Expand Down
3 changes: 2 additions & 1 deletion src/java/voldemort/coordinator/NoopHttpRequestHandler.java
Expand Up @@ -22,6 +22,7 @@
import org.jboss.netty.handler.codec.http.HttpRequest;

import voldemort.common.VoldemortOpCode;
import voldemort.versioning.VectorClock;
import voldemort.versioning.Versioned;

/**
Expand Down Expand Up @@ -50,7 +51,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex
break;
case VoldemortOpCode.PUT_OP_CODE:
HttpPutRequestExecutor putRequestExecutor = new HttpPutRequestExecutor(e);
putRequestExecutor.writeResponse();
putRequestExecutor.writeResponse(new VectorClock());
break;
default:
System.err.println("Illegal operation.");
Expand Down
25 changes: 20 additions & 5 deletions src/java/voldemort/coordinator/VoldemortHttpRequestHandler.java
Expand Up @@ -42,9 +42,11 @@
import voldemort.store.CompositeGetAllVoldemortRequest;
import voldemort.store.CompositeGetVoldemortRequest;
import voldemort.store.CompositePutVoldemortRequest;
import voldemort.store.CompositeVersionedPutVoldemortRequest;
import voldemort.store.CompositeVoldemortRequest;
import voldemort.utils.ByteArray;
import voldemort.versioning.VectorClock;
import voldemort.versioning.Versioned;

/**
* A class to handle the HTTP request and execute the same on behalf of the thin
Expand Down Expand Up @@ -153,15 +155,23 @@ private CompositeVoldemortRequest<ByteArray, byte[]> parseRequest(String request
return null;
}
byte[] putValue = readValue(content);
requestWrapper = new CompositePutVoldemortRequest<ByteArray, byte[]>(putKey,
putValue,
operationTimeoutInMs);
VectorClock putOpVectorClock = getVectorClock(this.request.getHeader(X_VOLD_VECTOR_CLOCK));
if(putOpVectorClock != null && putOpVectorClock.getEntries().size() > 0) {
requestWrapper = new CompositeVersionedPutVoldemortRequest<ByteArray, byte[]>(putKey,
new Versioned<byte[]>(putValue,
putOpVectorClock),
operationTimeoutInMs);
} else {
requestWrapper = new CompositePutVoldemortRequest<ByteArray, byte[]>(putKey,
putValue,
operationTimeoutInMs);
}

break;
case VoldemortOpCode.DELETE_OP_CODE:
VectorClock vc = getVectorClock(this.request.getHeader(X_VOLD_VECTOR_CLOCK));
VectorClock deleteOpVectorClock = getVectorClock(this.request.getHeader(X_VOLD_VECTOR_CLOCK));
requestWrapper = new CompositeDeleteVoldemortRequest<ByteArray, byte[]>(keyList.get(0),
vc,
deleteOpVectorClock,
operationTimeoutInMs);

break;
Expand Down Expand Up @@ -262,6 +272,11 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex
*/
private VectorClock getVectorClock(String vectorClockHeader) {
VectorClock vc = null;

if(vectorClockHeader == null) {
return null;
}

ObjectMapper mapper = new ObjectMapper();
if(logger.isDebugEnabled()) {
logger.debug("Received vector clock : " + vectorClockHeader);
Expand Down

0 comments on commit 4f097fe

Please sign in to comment.