Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Initial import of the protocol buffers based admin protocol.

  • Loading branch information...
commit 71acbce579768edda2d075ccfbb711a1d17244c4 1 parent c9bec07
Alex Feinberg authored
View
5 .gitignore
@@ -1,3 +1,8 @@
classes
dist
*~
+*.iml
+*.ipr
+*.iws
+*#
+.#*
View
11 build.xml
@@ -113,9 +113,12 @@
</target>
<target name="protobuff" description="Generate source files from .proto files">
- <path id="proto-files">
- <fileset dir="${protobuff.dir}" />
- </path>
+ <pathconvert property="proto.sources" pathsep=" ">
+ <path id="proto-files">
+ <fileset dir="${protobuff.dir}" />
+ </path>
+ </pathconvert>
+
<property name="proto.path" location="${protobuff.dir}"/>
<property name="javaout.path" location="${java.dir}"/>
<property name="pythonout.path" location="${python.dir}"/>
@@ -123,7 +126,7 @@
<arg value="--proto_path=${proto.path}"/>
<arg value="--java_out=${javaout.path}"/>
<arg value="--python_out=${pythonout.path}"/>
- <arg pathref="proto-files"/>
+ <arg line="${proto.sources}"/>
</exec>
</target>
View
3  src/java/voldemort/client/protocol/RequestFormatType.java
@@ -26,7 +26,8 @@
VOLDEMORT_V0("vp0", "voldemort-native-v0"),
VOLDEMORT_V1("vp1", "voldemort-native-v1"),
PROTOCOL_BUFFERS("pb0", "protocol-buffers-v0"),
- ADMIN("ad0", "admin-v0");
+ ADMIN("ad0", "admin-v0"),
+ ADMIN_PROTOCOL_BUFFERS("ad1", "admin-v1");
private final String code;
private final String displayName;
View
212 src/java/voldemort/client/protocol/pb/ProtoBuffAdminClientRequestFormat.java
@@ -0,0 +1,212 @@
+package voldemort.client.protocol.pb;
+
+import com.google.protobuf.ByteString;
+import org.apache.log4j.Logger;
+import voldemort.VoldemortException;
+import voldemort.client.protocol.RequestFormatType;
+import voldemort.client.protocol.VoldemortFilter;
+import voldemort.client.protocol.admin.AdminClientRequestFormat;
+import voldemort.cluster.Node;
+import voldemort.store.ErrorCodeMapper;
+import voldemort.store.StoreUtils;
+import voldemort.store.metadata.MetadataStore;
+import voldemort.store.socket.SocketAndStreams;
+import voldemort.store.socket.SocketDestination;
+import voldemort.store.socket.SocketPool;
+import voldemort.utils.ByteArray;
+import voldemort.utils.NetworkClassLoader;
+import voldemort.utils.Pair;
+import voldemort.versioning.Versioned;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Protocol buffers implementation for {@link voldemort.client.protocol.admin.AdminClientRequestFormat}
+ *
+ * @author afeinber
+ */
+public class ProtoBuffAdminClientRequestFormat extends AdminClientRequestFormat {
+ private final ErrorCodeMapper errorMapper;
+ private final static Logger logger = Logger.getLogger(ProtoBuffAdminClientRequestFormat.class);
+ private final SocketPool pool;
+ private final NetworkClassLoader networkClassLoader;
+
+ public ProtoBuffAdminClientRequestFormat(MetadataStore metadataStore, SocketPool pool) {
+ super(metadataStore);
+ this.errorMapper = new ErrorCodeMapper();
+ this.pool = pool;
+ this.networkClassLoader = new NetworkClassLoader(Thread.currentThread()
+ .getContextClassLoader());
+ }
+
+ /**
+ * Updates Metadata at (remote) Node
+ *
+ * @param remoteNodeId Node id to update
+ * @param key Key to update
+ * @param value The metadata
+ * @throws VoldemortException
+ */
+ @Override
+ public void doUpdateRemoteMetadata(int remoteNodeId, ByteArray key, Versioned<byte[]> value) {
+ Node node = this.getMetadata().getCluster().getNodeById(remoteNodeId);
+ SocketDestination destination = new SocketDestination(node.getHost(),
+ node.getSocketPort(),
+ RequestFormatType.ADMIN_PROTOCOL_BUFFERS);
+ SocketAndStreams sands = pool.checkout(destination);
+
+ try {
+ StoreUtils.assertValidKey(key);
+ DataOutputStream outputStream = sands.getOutputStream();
+ DataInputStream inputStream = sands.getInputStream();
+
+ ProtoUtils.writeMessage(outputStream,
+ VAdminProto.VoldemortAdminRequest.newBuilder()
+ .setType(VAdminProto.AdminRequestType.UPDATE_METADATA)
+ .setUpdateMetadata(VAdminProto.UpdateMetadataRequest.newBuilder()
+ .setKey(ByteString.copyFrom(key.get()))
+ .setVersioned(ProtoUtils.encodeVersioned(value)))
+ .build());
+
+ outputStream.flush();
+
+ VAdminProto.UpdateMetadataResponse.Builder response = ProtoUtils.readToBuilder(
+ inputStream, VAdminProto.UpdateMetadataResponse.newBuilder());
+ if (response.hasError())
+ throwException(response.getError());
+ } catch (IOException e) {
+ close(sands.getSocket());
+ throw new VoldemortException(e);
+ } finally {
+ pool.checkin(destination, sands);
+ }
+
+
+ }
+
+ /**
+ * Get Metadata from (remote) Node
+ *
+ *
+ *
+ * @param nodeId
+ * @param storesList
+ * @throws VoldemortException
+ */
+ @Override
+ public Versioned<byte[]> doGetRemoteMetadata(int remoteNodeId, ByteArray key) {
+ Node node = this.getMetadata().getCluster().getNodeById(remoteNodeId);
+
+ SocketDestination destination = new SocketDestination(node.getHost(),
+ node.getSocketPort(),
+ RequestFormatType.ADMIN_PROTOCOL_BUFFERS);
+ SocketAndStreams sands = pool.checkout(destination);
+ try {
+ DataOutputStream outputStream = sands.getOutputStream();
+ DataInputStream inputStream = sands.getInputStream();
+
+ ProtoUtils.writeMessage(outputStream,
+ VAdminProto.VoldemortAdminRequest.newBuilder()
+ .setType(VAdminProto.AdminRequestType.GET_METADATA)
+ .setGetMetadata(VAdminProto.GetMetadataRequest.newBuilder()
+ .setKey(ByteString.copyFrom(key.get())))
+ .build());
+
+ outputStream.flush();
+
+ VAdminProto.GetMetadataResponse.Builder response = ProtoUtils.readToBuilder(
+ inputStream, VAdminProto.GetMetadataResponse.newBuilder());
+ if (response.hasError())
+ throwException(response.getError());
+ return ProtoUtils.decodeVersioned(response.getVersion());
+
+ } catch (IOException e) {
+ close(sands.getSocket());
+ throw new VoldemortException(e);
+ } finally {
+ pool.checkin(destination, sands);
+ }
+ }
+
+ /**
+ * provides a mechanism to do forcedGet on (remote) store, Overrides all
+ * security checks and return the value. queries the raw storageEngine at
+ * server end to return the value
+ *
+ * @param proxyDestNodeId
+ * @param storeName
+ * @param key
+ * @return List<Versioned <byte[]>>
+ */
+ @Override
+ public List<Versioned<byte[]>> doRedirectGet(int proxyDestNodeId, String storeName, ByteArray key) {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ /**
+ * update Entries at (remote) node with all entries in iterator for passed
+ * storeName
+ *
+ * @param nodeId
+ * @param storeName
+ * @param entryIterator
+ * @param filterRequest: <imp>Do not Update entries filtered out (returned
+ * false) from the {@link VoldemortFilter} implementation</imp>
+ * @throws VoldemortException
+ * @throws IOException
+ */
+ @Override
+ public void doUpdatePartitionEntries(int nodeId, String storeName, Iterator<Pair<ByteArray, Versioned<byte[]>>> entryIterator, VoldemortFilter filter) {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ /**
+ * streaming API to get all entries belonging to any of the partition in the
+ * input List.
+ *
+ * @param nodeId
+ * @param storeName
+ * @param partitionList
+ * @param filterRequest: <imp>Do not fetch entries filtered out (returned
+ * false) from the {@link VoldemortFilter} implementation</imp>
+ * @return
+ * @throws VoldemortException
+ */
+ @Override
+ public Iterator<Pair<ByteArray, Versioned<byte[]>>> doFetchPartitionEntries(int nodeId, String storeName, List<Integer> partitionList, VoldemortFilter filter) {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ /**
+ * Delete all Entries at (remote) node for partitions in partitionList
+ *
+ * @param nodeId
+ * @param storeName
+ * @param partitionList
+ * @param filterRequest: <imp>Do not Delete entries filtered out (returned
+ * false) from the {@link VoldemortFilter} implementation</imp>
+ * @throws VoldemortException
+ * @throws IOException
+ */
+ @Override
+ public int doDeletePartitionEntries(int nodeId, String storeName, List<Integer> partitionList, VoldemortFilter filter) {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void throwException(VProto.Error error) {
+ throw errorMapper.getError((short) error.getErrorCode(), error.getErrorMessage());
+ }
+
+ private void close(Socket socket) {
+ try {
+ socket.close();
+ } catch(IOException e) {
+ logger.warn("Failed to close socket");
+ }
+ }
+}
View
6,575 src/java/voldemort/client/protocol/pb/VAdminProto.java
6,575 additions, 0 deletions not shown
View
10,670 src/java/voldemort/client/protocol/pb/VProto.java
5,224 additions, 5,446 deletions not shown
View
5 src/java/voldemort/server/protocol/RequestHandlerFactory.java
@@ -5,6 +5,7 @@
import voldemort.server.StoreRepository;
import voldemort.server.VoldemortConfig;
import voldemort.server.protocol.admin.NativeAdminServiceRequestHandler;
+import voldemort.server.protocol.pb.ProtoBuffAdminServiceRequestHandler;
import voldemort.server.protocol.pb.ProtoBuffRequestHandler;
import voldemort.server.protocol.vold.VoldemortNativeRequestHandler;
import voldemort.store.ErrorCodeMapper;
@@ -45,6 +46,10 @@ public RequestHandler getRequestHandler(RequestFormatType type) {
metadata,
voldemortConfig.getStreamMaxReadBytesPerSec(),
voldemortConfig.getStreamMaxWriteBytesPerSec());
+ case ADMIN_PROTOCOL_BUFFERS:
+ return new ProtoBuffAdminServiceRequestHandler(new ErrorCodeMapper(),
+ repository,
+ metadata);
default:
throw new VoldemortException("Unknown wire format " + type);
}
View
129 src/java/voldemort/server/protocol/pb/ProtoBuffAdminServiceRequestHandler.java
@@ -0,0 +1,129 @@
+package voldemort.server.protocol.pb;
+
+
+import com.google.protobuf.Message;
+import org.apache.log4j.Logger;
+import voldemort.VoldemortException;
+import voldemort.client.protocol.pb.ProtoUtils;
+import voldemort.client.protocol.pb.VAdminProto;
+import voldemort.client.protocol.pb.VAdminProto.VoldemortAdminRequest;
+import voldemort.client.protocol.pb.VProto;
+import voldemort.server.StoreRepository;
+import voldemort.server.protocol.AbstractRequestHandler;
+import voldemort.store.ErrorCodeMapper;
+import voldemort.store.metadata.MetadataStore;
+import voldemort.utils.ByteArray;
+import voldemort.utils.ByteUtils;
+import voldemort.versioning.Versioned;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+
+/**
+ * Created by IntelliJ IDEA.
+ * User: afeinber
+ * Date: Sep 29, 2009
+ * Time: 4:45:20 PM
+ * To change this template use File | Settings | File Templates.
+ */
+public class ProtoBuffAdminServiceRequestHandler extends AbstractRequestHandler {
+ private final Logger logger = Logger.getLogger(ProtoBuffAdminServiceRequestHandler.class);
+ private final MetadataStore metadataStore;
+
+
+ public ProtoBuffAdminServiceRequestHandler(ErrorCodeMapper errorCodeMapper,
+ StoreRepository storeRepository,
+ MetadataStore metadataStore) {
+ super(errorCodeMapper,storeRepository);
+ this.metadataStore = metadataStore;
+
+ }
+
+ //@Override
+ public void handleRequest(DataInputStream inputStream, DataOutputStream outputStream) throws IOException {
+ VoldemortAdminRequest.Builder request = ProtoUtils.readToBuilder(inputStream,
+ VoldemortAdminRequest.newBuilder());
+ Message response;
+
+ switch(request.getType()) {
+ case GET_METADATA:
+ response = handleGetMetadata(request.getGetMetadata());
+ break;
+ case UPDATE_METADATA:
+ response = handleUpdateMetadata(request.getUpdateMetadata());
+ break;
+ // case DELETE_PARTITION_ENTRIES: break;
+ // case FETCH_PARTITION_ENTRIES: break;
+ // case REDIRECT_GET: break;
+ // case UPDATE_PARTITION_ENTRIES: break;
+ default:
+ throw new VoldemortException("Unkown operation " + request.getType());
+ }
+ ProtoUtils.writeMessage(outputStream, response);
+ }
+
+ public VAdminProto.UpdateMetadataResponse handleUpdateMetadata(VAdminProto.UpdateMetadataRequest request) {
+ VAdminProto.UpdateMetadataResponse.Builder response = VAdminProto.UpdateMetadataResponse.newBuilder();
+
+ try {
+ ByteArray key = ProtoUtils.decodeBytes(request.getKey());
+ String keyString = ByteUtils.getString(key.get(), "UTF-8");
+
+ if (MetadataStore.METADATA_KEYS.contains(keyString)) {
+ Versioned<byte[]> versionedValue = ProtoUtils.decodeVersioned(request.getVersioned());
+ metadataStore.put(new ByteArray(ByteUtils.getBytes(keyString, "UTF-8")), versionedValue);
+ }
+
+
+ } catch (VoldemortException e) {
+ response.setError(ProtoUtils.encodeError(getErrorMapper(), e));
+ }
+
+ return response.build();
+ }
+
+ public VAdminProto.GetMetadataResponse handleGetMetadata(VAdminProto.GetMetadataRequest request) {
+ VAdminProto.GetMetadataResponse.Builder response = VAdminProto.GetMetadataResponse.newBuilder();
+
+ try {
+ ByteArray key = ProtoUtils.decodeBytes(request.getKey());
+ String keyString = ByteUtils.getString(key.get(), "UTF-8");
+
+ if (MetadataStore.METADATA_KEYS.contains(keyString)) {
+ List<Versioned<byte[]>> versionedList =
+ metadataStore.get(key);
+ int size = (versionedList.size() > 0) ? 1 : 0;
+ if (size > 0) {
+ Versioned<byte[]> versioned = versionedList.get(0);
+ response.setVersion(ProtoUtils.encodeVersioned(versioned));
+ }
+ } else {
+ throw new VoldemortException("Metadata Key passed " + keyString + " is not handled yet ...");
+
+ }
+ } catch (VoldemortException e) {
+ response.setError(ProtoUtils.encodeError(getErrorMapper(), e));
+ }
+ return response.build();
+
+ }
+ /**
+ * This method is used by non-blocking code to determine if the give buffer
+ * represents a complete request. Because the non-blocking code can by
+ * definition not just block waiting for more data, it's possible to get
+ * partial reads, and this identifies that case.
+ *
+ * @param buffer Buffer to check; the buffer is reset to position 0 before
+ * calling this method and the caller must reset it after the call
+ * returns
+ * @return True if the buffer holds a complete request, false otherwise
+ */
+ //@Override
+ public boolean isCompleteRequest(ByteBuffer buffer) {
+ throw new VoldemortException("Non-blocking server not supported for ProtoBuffAdminServiceRequestHandler");
+ }
+}
View
114 src/proto/voldemort-admin.proto
@@ -0,0 +1,114 @@
+package voldemort;
+
+option java_package = "voldemort.client.protocol.pb";
+option java_outer_classname = "VAdminProto";
+option optimize_for = SPEED;
+
+import "voldemort-client.proto";
+
+message GetMetadataRequest {
+ required bytes key = 1;
+}
+
+message GetMetadataResponse {
+ optional Versioned version = 1;
+ optional Error error = 3;
+}
+
+message UpdateMetadataRequest {
+ required bytes key = 1;
+ required Versioned versioned = 2;
+}
+
+message UpdateMetadataResponse {
+ optional Error error = 1;
+}
+
+message RedirectGetRequest {
+ required string store_name = 1;
+ required bytes key = 2;
+}
+
+message RedirectGetResponse {
+ repeated Versioned versioned = 1;
+ optional Error error = 2;
+}
+
+message PartitionEntry {
+ required bytes key = 1;
+ required Versioned versioned = 2;
+}
+
+message UpdatePartitionEntriesRequest {
+ required string store = 1;
+ repeated PartitionEntry partition_entries = 2;
+}
+
+message UpdatePartitionEntriesResponse {
+ optional Error error = 1;
+}
+
+message VoldemortFilter {
+ required string name = 1;
+ required bytes data = 2;
+}
+
+message StartFetchPartitionEntriesRequest {
+ repeated int32 partitions = 1;
+ optional VoldemortFilter filter = 2;
+}
+
+message ContinueFetchPartitionEntriesRequest {
+ repeated PartitionEntry partition_entries = 1;
+}
+
+enum FetchPartitionEntriesRequestType {
+ START = 0;
+ CONTINUE = 1;
+}
+
+message FetchPartitionEntriesRequest {
+ required FetchPartitionEntriesRequestType type = 1;
+ required string store = 2;
+ optional ContinueFetchPartitionEntriesRequest continue = 3;
+ optional StartFetchPartitionEntriesRequest start = 4;
+}
+
+message FetchPartitionEntriesResponse {
+ repeated PartitionEntry partition_entries = 1;
+ optional Error error = 2;
+}
+
+message DeletePartitionEntriesRequest {
+ required string store = 1;
+ optional VoldemortFilter filter = 2;
+}
+
+message DeletePartitionEntriesResponse {
+ required int32 count = 1;
+ optional Error error = 2;
+}
+
+enum AdminRequestType {
+ GET_METADATA = 0;
+ UPDATE_METADATA = 1;
+ REDIRECT_GET = 2;
+ UPDATE_PARTITION_ENTRIES = 3;
+ FETCH_PARTITION_ENTRIES = 4;
+ DELETE_PARTITION_ENTRIES = 5;
+}
+
+message VoldemortAdminRequest {
+ required AdminRequestType type = 1;
+ optional GetMetadataRequest get_metadata = 2;
+ optional UpdateMetadataRequest update_metadata = 3;
+ optional RedirectGetRequest redirect_get = 4;
+ optional UpdatePartitionEntriesRequest update_partition_entries = 5;
+ optional FetchPartitionEntriesRequest fetch_partition_entries = 6;
+ optional DeletePartitionEntriesRequest delete_partition_entries = 7;
+}
+
+
+
+
+
View
20 test/common/voldemort/ServerTestUtils.java
@@ -29,9 +29,12 @@
import org.mortbay.jetty.servlet.ServletHolder;
import voldemort.client.RoutingTier;
+import voldemort.client.protocol.RequestFormat;
import voldemort.client.protocol.RequestFormatFactory;
import voldemort.client.protocol.RequestFormatType;
+import voldemort.client.protocol.admin.AdminClientRequestFormat;
import voldemort.client.protocol.admin.NativeAdminClientRequestFormat;
+import voldemort.client.protocol.pb.ProtoBuffAdminClientRequestFormat;
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
import voldemort.routing.RoutingStrategyType;
@@ -304,10 +307,23 @@ public static VoldemortConfig createServerConfig(int nodeId,
}
public static NativeAdminClientRequestFormat getAdminClient(Node identityNode,
- MetadataStore MetadataStore) {
- return new NativeAdminClientRequestFormat(MetadataStore, new SocketPool(2,
+ MetadataStore metadataStore) {
+ return new NativeAdminClientRequestFormat(metadataStore, new SocketPool(2,
10000,
100000,
32 * 1024));
}
+
+ public static AdminClientRequestFormat getAdminClient(Node identityNode,
+ MetadataStore metadataStore,
+ boolean useProtocolBuffers) {
+ if (useProtocolBuffers)
+ return new ProtoBuffAdminClientRequestFormat(metadataStore, new SocketPool(2,
+ 10000,
+ 100000,
+ 32 * 1024));
+ else
+ return getAdminClient(identityNode, metadataStore);
+
+ }
}
View
80 test/unit/voldemort/client/ProtoBuffAdminServiceBasicTest.java
@@ -0,0 +1,80 @@
+package voldemort.client;
+
+import com.google.common.collect.ImmutableList;
+import junit.framework.TestCase;
+import voldemort.ServerTestUtils;
+import voldemort.TestUtils;
+import voldemort.client.protocol.admin.AdminClientRequestFormat;
+import voldemort.client.protocol.admin.NativeAdminClientRequestFormat;
+import voldemort.cluster.Cluster;
+import voldemort.cluster.Node;
+import voldemort.server.VoldemortConfig;
+import voldemort.server.VoldemortServer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Created by IntelliJ IDEA.
+ * User: afeinber
+ * Date: Sep 30, 2009
+ * Time: 12:56:06 PM
+ * To change this template use File | Settings | File Templates.
+ */
+public class ProtoBuffAdminServiceBasicTest extends TestCase {
+ private static String storeName = "test-replication-memory";
+ private static String storesXmlfile = "test/common/voldemort/config/stores.xml";
+
+ VoldemortConfig config;
+ VoldemortServer server;
+ Cluster cluster;
+
+ @Override
+ public void setUp() throws IOException {
+ // start 2 node cluster with free ports
+ int[] ports = ServerTestUtils.findFreePorts(2);
+ Node node0 = new Node(0, "localhost", ports[0], ports[1], Arrays.asList(new Integer[] { 0,
+ 1 }));
+
+ ports = ServerTestUtils.findFreePorts(2);
+ Node node1 = new Node(1, "localhost", ports[0], ports[1], Arrays.asList(new Integer[] { 2,
+ 3 }));
+
+ cluster = new Cluster("admin-service-test", Arrays.asList(new Node[] { node0, node1 }));
+ config = ServerTestUtils.createServerConfig(0,
+ TestUtils.createTempDir().getAbsolutePath(),
+ null,
+ storesXmlfile);
+ server = new VoldemortServer(config, cluster);
+ server.start();
+ }
+
+ @Override
+ public void tearDown() throws IOException, InterruptedException {
+ server.stop();
+ }
+
+ public AdminClientRequestFormat getAdminClient() {
+ return ServerTestUtils.getAdminClient(server.getIdentityNode(), server.getMetadataStore(), true);
+ }
+
+ public void testUpdateClusterMetadata() {
+ Cluster cluster = server.getMetadataStore().getCluster();
+ List<Node> nodes = new ArrayList<Node>(cluster.getNodes());
+ nodes.add(new Node(3, "localhost", 8883, 6668, ImmutableList.of(4,5)));
+ Cluster updatedCluster = new Cluster("new-cluster", nodes);
+
+ AdminClientRequestFormat client = getAdminClient();
+ client.updateClusterMetadata(server.getIdentityNode().getId(), updatedCluster);
+
+ assertEquals("Cluster should match", updatedCluster, server.getMetadataStore().getCluster());
+ assertEquals("AdminClient.getMetdata() should match",
+ client.getClusterMetadata(server.getIdentityNode().getId()).getValue(),
+ updatedCluster);
+
+ }
+
+
+}
Please sign in to comment.
Something went wrong with that request. Please try again.