Skip to content

Commit

Permalink
Implement metadata bootstrapping for Java Rest Client
Browse files Browse the repository at this point in the history
  • Loading branch information
vinothchandar committed Jun 11, 2013
1 parent 8db2a31 commit 3f04f58
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 65 deletions.
61 changes: 39 additions & 22 deletions contrib/restclient/src/java/voldemort/restclient/R2Store.java
Expand Up @@ -78,6 +78,7 @@ public class R2Store extends AbstractStore<ByteArray, byte[], byte[]> {
public static final String X_VOLD_INCONSISTENCY_RESOLVER = "X-VOLD-Inconsistency-Resolver";
public static final String CUSTOM_RESOLVING_STRATEGY = "custom";
public static final String DEFAULT_RESOLVING_STRATEGY = "timestamp";
public static final String SCHEMATA_STORE_NAME = "schemata";

private static final String LAST_MODIFIED = "Last-Modified";
private static final String MULTIPART_CONTENT_TYPE = "multipart/binary";
Expand Down Expand Up @@ -153,27 +154,27 @@ public boolean delete(ByteArray key, Version version) throws VoldemortException
return true;
}

private RestResponse fetchGetResponse(RestRequestBuilder requestBuilder) throws Exception {
// TODO: Form a proper request based on client config
requestBuilder.setMethod(GET);
requestBuilder.setHeader("Accept", "binary");
requestBuilder.setHeader(X_VOLD_REQUEST_TIMEOUT_MS, "1000");

RestRequest request = requestBuilder.build();
Future<RestResponse> f = client.restRequest(request);

// This will block
return f.get();
}

@Override
public List<Versioned<byte[]>> get(ByteArray key, byte[] transforms) throws VoldemortException {

List<Versioned<byte[]>> resultList = new ArrayList<Versioned<byte[]>>();

String base64Key = new String(Base64.encodeBase64(key.get()));
RestRequestBuilder rb = null;
try {
String base64Key = new String(Base64.encodeBase64(key.get()));
RestRequestBuilder rb = new RestRequestBuilder(new URI(this.baseURL + "/" + getName()
+ "/" + base64Key));

// TODO: Form a proper request based on client config
rb.setMethod(GET);
rb.setHeader("Accept", "binary");
rb.setHeader(X_VOLD_REQUEST_TIMEOUT_MS, "1000");

RestRequest request = rb.build();
Future<RestResponse> f = client.restRequest(request);

// This will block
RestResponse response = f.get();

rb = new RestRequestBuilder(new URI(this.baseURL + "/" + getName() + "/" + base64Key));
RestResponse response = fetchGetResponse(rb);
// Parse the response
final ByteString entity = response.getEntity();
String eTag = response.getHeader(ETAG);
Expand All @@ -183,19 +184,35 @@ public List<Versioned<byte[]>> get(ByteArray key, byte[] transforms) throws Vold
} else {
logger.error("Did not get any response!");
}

} catch(VoldemortException ve) {
ve.printStackTrace();
logger.error("Error performing get", ve);
throw ve;
} catch(Exception e) {
if(!e.getMessage().contains("status=404")) {
logger.error("Specified key does not exist." + e);
// TODO this needs to be revisited.. Not sure if we rethrow
// exception when its not 404
String errorMsg = e.getMessage();
if(errorMsg != null && !errorMsg.contains("status=404")) {
logger.error("Specified key does not exist.", e);
}
}

return resultList;
}

public String getSerializerInfoXml() throws VoldemortException {
RestRequestBuilder rb = null;
try {
String base64Key = new String(Base64.encodeBase64(getName().getBytes("UTF-8")));
rb = new RestRequestBuilder(new URI(this.baseURL + "/" + SCHEMATA_STORE_NAME + "/"
+ base64Key));
RestResponse response = fetchGetResponse(rb);
// Parse the response
return response.getEntity().asString("UTF-8");
} catch(Exception e) {
logger.error("Error in get serializer info request", e);
throw new VoldemortException(e);
}
}

@Override
public void put(ByteArray key, Versioned<byte[]> value, byte[] transform)
throws VoldemortException {
Expand Down
67 changes: 36 additions & 31 deletions contrib/restclient/src/java/voldemort/restclient/RESTClient.java
Expand Up @@ -20,18 +20,19 @@
import java.util.Map;
import java.util.Map.Entry;

import voldemort.client.RoutingTier;
import org.apache.log4j.Logger;

import voldemort.client.StoreClient;
import voldemort.client.UpdateAction;
import voldemort.cluster.Node;
import voldemort.routing.RoutingStrategyType;
import voldemort.coordinator.CoordinatorUtils;
import voldemort.serialization.DefaultSerializerFactory;
import voldemort.serialization.Serializer;
import voldemort.serialization.SerializerDefinition;
import voldemort.serialization.SerializerFactory;
import voldemort.store.Store;
import voldemort.store.StoreDefinition;
import voldemort.store.StoreDefinitionBuilder;
import voldemort.store.compress.CompressingStore;
import voldemort.store.compress.CompressionStrategyFactory;
import voldemort.store.serialized.SerializingStore;
import voldemort.store.versioned.InconsistencyResolvingStore;
import voldemort.utils.ByteArray;
Expand All @@ -51,57 +52,56 @@ public class RESTClient<K, V> implements StoreClient<K, V> {

private Store<K, V, Object> clientStore = null;
private SerializerFactory serializerFactory = new DefaultSerializerFactory();
private StoreDefinition storeDef;
private String storeName;

private static Logger logger = Logger.getLogger(RESTClient.class);

/**
* A REST ful equivalent of the DefaultStoreClient. This uses the R2Store to
* A RESTful equivalent of the DefaultStoreClient. This uses the R2Store to
* interact with the RESTful Coordinator
*
* @param bootstrapURL The bootstrap URL of the Voldemort cluster
* @param storeName Name of the store to interact with
*/
public RESTClient(String bootstrapURL, String storeName) {

this.storeName = storeName;
String baseURL = "http://" + bootstrapURL.split(":")[1].substring(2) + ":8080";
// The lowest layer : Transporting request to coordinator
Store<ByteArray, byte[], byte[]> store = new R2Store(baseURL, storeName);
R2Store r2store = new R2Store(baseURL, storeName);

// TODO
// Get the store definition so that we can learn the Serialization
// and
// compression properties
// bootstrap from the coordinator and obtain all the serialization
// information.
String serializerInfoXml = r2store.getSerializerInfoXml();
SerializerDefinition keySerializerDefinition = CoordinatorUtils.parseKeySerializerDefinition(serializerInfoXml);
SerializerDefinition valueSerializerDefinition = CoordinatorUtils.parseValueSerializerDefinition(serializerInfoXml);

logger.info("Bootstrapping for " + getName() + ": Key serializer "
+ keySerializerDefinition);
logger.info("Bootstrapping for " + getName() + ": Value serializer "
+ valueSerializerDefinition);

// Start building the stack..
// First, the transport layer
Store<ByteArray, byte[], byte[]> store = r2store;

// TODO
// Add compression layer
if(keySerializerDefinition.hasCompression() || valueSerializerDefinition.hasCompression()) {
store = new CompressingStore(store,
new CompressionStrategyFactory().get(keySerializerDefinition.getCompression()),
new CompressionStrategyFactory().get(valueSerializerDefinition.getCompression()));
}

// Add Serialization layer

// Set the following values although we don't need them
// TODO: Fix this, so that we only need to set the needed parameters
storeDef = new StoreDefinitionBuilder().setName(storeName)
.setType("bdb")
.setKeySerializer(new SerializerDefinition("string"))
.setValueSerializer(new SerializerDefinition("string"))
.setRoutingPolicy(RoutingTier.CLIENT)
.setRoutingStrategyType(RoutingStrategyType.CONSISTENT_STRATEGY)
.setReplicationFactor(1)
.setPreferredReads(1)
.setRequiredReads(1)
.setPreferredWrites(1)
.setRequiredWrites(1)
.build();
Serializer<K> keySerializer = (Serializer<K>) serializerFactory.getSerializer(storeDef.getKeySerializer());
Serializer<V> valueSerializer = (Serializer<V>) serializerFactory.getSerializer(storeDef.getValueSerializer());
Serializer<K> keySerializer = (Serializer<K>) serializerFactory.getSerializer(keySerializerDefinition);
Serializer<V> valueSerializer = (Serializer<V>) serializerFactory.getSerializer(valueSerializerDefinition);
clientStore = SerializingStore.wrap(store, keySerializer, valueSerializer, null);

// Add inconsistency Resolving layer
InconsistencyResolver<Versioned<V>> secondaryResolver = new TimeBasedInconsistencyResolver();
clientStore = new InconsistencyResolvingStore<K, V, Object>(clientStore,
new ChainedResolver<Versioned<V>>(new VectorClockInconsistencyResolver(),
secondaryResolver));

this.storeName = storeName;
}

@Override
Expand Down Expand Up @@ -236,6 +236,11 @@ public List<Node> getResponsibleNodes(K key) {
}

public void close() {
// TODO understand why the client hangs around even after close()
this.clientStore.close();
}

public String getName() {
return this.storeName;
}
}
74 changes: 74 additions & 0 deletions src/java/voldemort/coordinator/CoordinatorUtils.java
@@ -1,8 +1,21 @@
package voldemort.coordinator;

import java.io.IOException;
import java.io.StringReader;

import org.codehaus.jackson.map.ObjectMapper;
import org.jdom.Document;
import org.jdom.Element;
import org.jdom.JDOMException;
import org.jdom.input.SAXBuilder;
import org.jdom.output.Format;
import org.jdom.output.XMLOutputter;

import voldemort.serialization.SerializerDefinition;
import voldemort.store.StoreDefinition;
import voldemort.versioning.VectorClock;
import voldemort.xml.MappingException;
import voldemort.xml.StoreDefinitionsMapper;

public class CoordinatorUtils {

Expand Down Expand Up @@ -43,4 +56,65 @@ public static VectorClock deserializeVectorClock(String serializedVC) {

return vc;
}

/**
* Given a storedefinition, constructs the xml string to be sent out in
* response to a "schemata" fetch request
*
* @param storeDefinition
* @return
*/
public static String constructSerializerInfoXml(StoreDefinition storeDefinition) {
Element store = new Element(StoreDefinitionsMapper.STORE_ELMT);
store.addContent(new Element(StoreDefinitionsMapper.STORE_NAME_ELMT).setText(storeDefinition.getName()));
Element keySerializer = new Element(StoreDefinitionsMapper.STORE_KEY_SERIALIZER_ELMT);
StoreDefinitionsMapper.addSerializer(keySerializer, storeDefinition.getKeySerializer());
store.addContent(keySerializer);

Element valueSerializer = new Element(StoreDefinitionsMapper.STORE_VALUE_SERIALIZER_ELMT);
StoreDefinitionsMapper.addSerializer(valueSerializer, storeDefinition.getValueSerializer());
store.addContent(valueSerializer);

XMLOutputter serializer = new XMLOutputter(Format.getPrettyFormat());
return serializer.outputString(store);
}

/**
* Given an xml string containing the store's serialization information,
* obtains the key serializer definition
*
* @param serializerInfoXml
* @return
*/
public static SerializerDefinition parseKeySerializerDefinition(String serializerInfoXml) {
return parseSerializerDefinition(serializerInfoXml,
StoreDefinitionsMapper.STORE_KEY_SERIALIZER_ELMT);
}

/**
* Given an xml string containing the store's serialization information,
* obtains the value serializer definition
*
* @param serializerInfoXml
* @return
*/
public static SerializerDefinition parseValueSerializerDefinition(String serializerInfoXml) {
return parseSerializerDefinition(serializerInfoXml,
StoreDefinitionsMapper.STORE_VALUE_SERIALIZER_ELMT);
}

private static SerializerDefinition parseSerializerDefinition(String serializerInfoXml,
String elementName) {
SAXBuilder builder = new SAXBuilder();
try {
Document doc = builder.build(new StringReader(serializerInfoXml));
Element root = doc.getRootElement();
Element serializerElement = root.getChild(elementName);
return StoreDefinitionsMapper.readSerializer(serializerElement);
} catch(JDOMException e) {
throw new MappingException(e);
} catch(IOException e) {
throw new MappingException(e);
}
}
}
12 changes: 2 additions & 10 deletions src/java/voldemort/coordinator/GetSchemataRequestExecutor.java
Expand Up @@ -8,8 +8,6 @@

import java.io.UnsupportedEncodingException;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.log4j.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
Expand Down Expand Up @@ -46,15 +44,9 @@ public void run() {

StoreDefinition storeDef = StoreDefinitionUtils.getStoreDefinitionWithName(storeClientFactory.getStoreDefs(),
storeName);

String keySerializer = storeDef.getKeySerializer().toString();
String valueSerializer = storeDef.getValueSerializer().toString();
GenericData.Record record = new GenericData.Record(Schema.parse(SCHEMATAJSON));

record.put("key-serializer", keySerializer);
record.put("value-serializer", valueSerializer);
String serializerInfoXml = CoordinatorUtils.constructSerializerInfoXml(storeDef);
try {
writeResponse(record.toString().getBytes("UTF-8"));
writeResponse(serializerInfoXml.getBytes("UTF-8"));
} catch(UnsupportedEncodingException e) {
logger.error(e);
}
Expand Down
4 changes: 2 additions & 2 deletions src/java/voldemort/xml/StoreDefinitionsMapper.java
Expand Up @@ -367,7 +367,7 @@ private StoreDefinition readView(Element store, List<StoreDefinition> stores) {
.build();
}

private SerializerDefinition readSerializer(Element elmt) {
public static SerializerDefinition readSerializer(Element elmt) {
String name = elmt.getChild(STORE_SERIALIZATION_TYPE_ELMT).getText();
boolean hasVersion = true;
Map<Integer, String> schemaInfosByVersion = new HashMap<Integer, String>();
Expand Down Expand Up @@ -528,7 +528,7 @@ private Element viewToElement(StoreDefinition storeDefinition) {
return store;
}

private void addSerializer(Element parent, SerializerDefinition def) {
public static void addSerializer(Element parent, SerializerDefinition def) {
parent.addContent(new Element(STORE_SERIALIZATION_TYPE_ELMT).setText(def.getName()));
if(def.hasSchemaInfo()) {
for(Map.Entry<Integer, String> entry: def.getAllSchemaInfoVersions().entrySet()) {
Expand Down

0 comments on commit 3f04f58

Please sign in to comment.