Navigation Menu

Skip to content

Commit

Permalink
- Added GetAll and Delete implementations on the Coordinator (and the…
Browse files Browse the repository at this point in the history
… temporary rest client)

- Converted Coordinator into an AbstractService and added CoordinatorConfig
- Refactored Composite Voldemort request into different types
  • Loading branch information
Chinmay Soman committed Mar 26, 2013
1 parent d0f91f6 commit 4964e7d
Show file tree
Hide file tree
Showing 32 changed files with 1,341 additions and 323 deletions.
1 change: 0 additions & 1 deletion .gitignore
@@ -1,5 +1,4 @@
classes
config
dist
*~
*.iml
Expand Down
188 changes: 175 additions & 13 deletions contrib/restclient/src/java/voldemort/restclient/R2Store.java
Expand Up @@ -23,13 +23,21 @@
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import javax.mail.MessagingException;
import javax.mail.internet.MimeBodyPart;
import javax.mail.internet.MimeMultipart;
import javax.mail.util.ByteArrayDataSource;

import org.apache.commons.codec.binary.Base64;
import org.apache.log4j.Logger;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;

import voldemort.VoldemortException;
Expand Down Expand Up @@ -60,12 +68,14 @@ public class R2Store extends AbstractStore<ByteArray, byte[], byte[]> {

private static final String GET = "GET";
private static final String POST = "POST";
private static final String DELETE = "DELETE";
private static final String ETAG = "ETag";
public static final String X_VOLD_REQUEST_TIMEOUT_MS = "X-VOLD-Request-Timeout-ms";
public static final String X_VOLD_INCONSISTENCY_RESOLVER = "X-VOLD-Inconsistency-Resolver";
public static final String CUSTOM_RESOLVING_STRATEGY = "custom";
public static final String DEFAULT_RESOLVING_STRATEGY = "timestamp";
private static final String LAST_MODIFIED = "Last-Modified";
private static final String MULTIPART_CONTENT_TYPE = "multipart/binary";
private final Logger logger = Logger.getLogger(R2Store.class);

HttpURLConnection conn = null;
Expand All @@ -83,7 +93,6 @@ public R2Store(String baseURL, String storeName) {
} catch(Exception e) {
e.printStackTrace();
}

}

@Override
Expand All @@ -100,8 +109,36 @@ public void close() throws VoldemortException {
}

@Override
public boolean delete(ByteArray arg0, Version arg1) throws VoldemortException {
// TODO Auto-generated method stub
public boolean delete(ByteArray key, Version version) throws VoldemortException {
try {

// Create the REST request with this byte array
String base64Key = new String(Base64.encodeBase64(key.get()));
RestRequestBuilder rb = new RestRequestBuilder(new URI(this.baseURL + "/" + getName()
+ "/" + base64Key));

// Create a HTTP POST request
// TODO: Create a proper request based on client config
rb.setMethod(DELETE);
rb.setHeader("Content-Type", "application/json");
rb.setHeader("Content-Length", "0");
rb.setHeader(X_VOLD_REQUEST_TIMEOUT_MS, "1000");

RestRequest request = rb.build();
Future<RestResponse> f = client.restRequest(request);

// This will block
RestResponse response = f.get();
final ByteString entity = response.getEntity();
if(entity == null) {
logger.error("Empty response !");
}
} catch(VoldemortException ve) {
ve.printStackTrace();
throw ve;
} catch(Exception e) {
e.printStackTrace();
}
return false;
}

Expand All @@ -112,8 +149,8 @@ public List<Versioned<byte[]>> get(ByteArray key, byte[] transforms) throws Vold

try {
String base64Key = new String(Base64.encodeBase64(key.get()));
RestRequestBuilder rb = new RestRequestBuilder(new URI(this.baseURL + "/test/"
+ base64Key));
RestRequestBuilder rb = new RestRequestBuilder(new URI(this.baseURL + "/" + getName()
+ "/" + base64Key));

// TODO: Form a proper request based on client config
rb.setMethod(GET);
Expand Down Expand Up @@ -141,7 +178,9 @@ public List<Versioned<byte[]>> get(ByteArray key, byte[] transforms) throws Vold
ve.printStackTrace();
throw ve;
} catch(Exception e) {
e.printStackTrace();
if(!e.getMessage().contains("status=404")) {
logger.error("ERROR: " + e);
}
}

return resultList;
Expand All @@ -160,8 +199,8 @@ public void put(ByteArray key, Versioned<byte[]> value, byte[] transform)

// Create the REST request with this byte array
String base64Key = new String(Base64.encodeBase64(key.get()));
RestRequestBuilder rb = new RestRequestBuilder(new URI(this.baseURL + "/test/"
+ base64Key));
RestRequestBuilder rb = new RestRequestBuilder(new URI(this.baseURL + "/" + getName()
+ "/" + base64Key));

// Create a HTTP POST request
// TODO: Create a proper request based on client config
Expand All @@ -185,7 +224,7 @@ public void put(ByteArray key, Versioned<byte[]> value, byte[] transform)
ve.printStackTrace();
throw ve;
} catch(Exception e) {
e.printStackTrace();
logger.error("ERROR: " + e);
}
}

Expand All @@ -206,11 +245,134 @@ private List<Versioned<byte[]>> readResults(ByteString entity, String eTag, Stri
}

@Override
public Map<ByteArray, List<Versioned<byte[]>>> getAll(Iterable<ByteArray> arg0,
Map<ByteArray, byte[]> arg1)
public Map<ByteArray, List<Versioned<byte[]>>> getAll(Iterable<ByteArray> keys,
Map<ByteArray, byte[]> tranforms)
throws VoldemortException {
// TODO Auto-generated method stub
return null;

Map<ByteArray, List<Versioned<byte[]>>> resultMap = new HashMap<ByteArray, List<Versioned<byte[]>>>();

try {
Iterator<ByteArray> it = keys.iterator();
String keyArgs = null;

while(it.hasNext()) {
ByteArray key = it.next();
String base64Key = new String(Base64.encodeBase64(key.get()));
if(keyArgs == null) {
keyArgs = base64Key;
} else {
keyArgs += "," + base64Key;
}
}

RestRequestBuilder rb = new RestRequestBuilder(new URI(this.baseURL + "/" + getName()
+ "/" + keyArgs));

// TODO: Form a proper request based on client config
rb.setMethod(GET);
rb.setHeader("Accept", MULTIPART_CONTENT_TYPE);
rb.setHeader(X_VOLD_REQUEST_TIMEOUT_MS, "1000");

RestRequest request = rb.build();
Future<RestResponse> f = client.restRequest(request);

// This will block
RestResponse response = f.get();

// Parse the response
final ByteString entity = response.getEntity();
String contentType = response.getHeader("Content-Type");
// String eTag = response.getHeader(ETAG);
// String lastModified = response.getHeader(LAST_MODIFIED);
if(entity != null) {
if(contentType.equalsIgnoreCase(MULTIPART_CONTENT_TYPE)) {
resultMap = readResultsGetAll(entity);
} else {
logger.error("Did not receive a multipart response");
}

} else {
logger.error("Did not get any response!");
}

} catch(VoldemortException ve) {
ve.printStackTrace();
throw ve;
} catch(Exception e) {
if(!e.getMessage().contains("status=404")) {
logger.error("ERROR: " + e);
}
}

return resultMap;
}

private Map<ByteArray, List<Versioned<byte[]>>> readResultsGetAll(ByteString entity) {
Map<ByteArray, List<Versioned<byte[]>>> results = new HashMap<ByteArray, List<Versioned<byte[]>>>();

try {
ObjectMapper mapper = new ObjectMapper();
// VectorClockWrapper vcWrapper = mapper.readValue(eTag,
// VectorClockWrapper.class);

// Build the multipart object
byte[] bytes = new byte[entity.length()];
entity.copyBytes(bytes, 0);

ByteArrayDataSource ds = new ByteArrayDataSource(bytes, "multipart/mixed");
// logger.info("received data = ");
// BufferedReader in = new BufferedReader(new
// InputStreamReader(ds.getInputStream()));
// String inputLine;
// while((inputLine = in.readLine()) != null)
// System.out.println(inputLine);
// in.close();

MimeMultipart mp = new MimeMultipart(ds);
for(int i = 0; i < mp.getCount(); i++) {
MimeBodyPart part = (MimeBodyPart) mp.getBodyPart(i);
String eTag = part.getHeader("ETag")[0];
String contentLocation = part.getHeader("Content-Location")[0];

logger.debug("Received etag : " + eTag);
logger.debug("Content-Location : " + contentLocation);

// Get the key
String base64Key = contentLocation.split("/")[2];

logger.debug("Base 64 key : " + base64Key);
ByteArray key = new ByteArray(Base64.decodeBase64(base64Key.getBytes()));

VectorClockWrapper vcWrapper = mapper.readValue(eTag, VectorClockWrapper.class);
List<Versioned<byte[]>> keyResultList = new ArrayList<Versioned<byte[]>>(2);

// get the value bytes
byte[] bodyPartBytes = ((String) part.getContent()).getBytes();
VectorClock clock = new VectorClock(vcWrapper.getVersions(),
vcWrapper.getTimestamp());
keyResultList.add(new Versioned<byte[]>(bodyPartBytes, clock));
results.put(key, keyResultList);

}

// VectorClock clock = new VectorClock(vcWrapper.getVersions(),
// vcWrapper.getTimestamp());
// results.add(new Versioned<byte[]>(bytes, clock));
} catch(MessagingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch(JsonParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch(JsonMappingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch(IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return results;

}

@Override
Expand Down

0 comments on commit 4964e7d

Please sign in to comment.