Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Make client and server negotiate protocol at connection time. Elimina…

…te admin port, as this is now just

another supported protocol.
  • Loading branch information...
commit c4c27c28130c744762f28a869242d4f22f724c2d 1 parent 1a5d7ee
@jkreps jkreps authored
Showing with 362 additions and 301 deletions.
  1. +0 −1  src/java/voldemort/VoldemortClientShell.java
  2. +17 −6 src/java/voldemort/client/AdminClient.java
  3. +1 −1  src/java/voldemort/client/ClientConfig.java
  4. +2 −3 src/java/voldemort/client/SocketStoreClientFactory.java
  5. +16 −8 src/java/voldemort/client/protocol/RequestFormatFactory.java
  6. +18 −11 src/java/voldemort/client/protocol/RequestFormatType.java
  7. +1 −1  src/java/voldemort/client/protocol/vold/VoldemortNativeClientRequestFormat.java
  8. +2 −14 src/java/voldemort/cluster/Node.java
  9. +2 −2 src/java/voldemort/server/VoldemortConfig.java
  10. +3 −11 src/java/voldemort/server/VoldemortServer.java
  11. +5 −3 src/java/voldemort/server/protocol/RequestHandlerFactory.java
  12. +17 −2 src/java/voldemort/server/protocol/vold/VoldemortNativeRequestHandler.java
  13. +24 −11 src/java/voldemort/server/socket/SocketServer.java
  14. +55 −8 src/java/voldemort/server/socket/SocketServerSession.java
  15. +3 −3 src/java/voldemort/server/socket/SocketService.java
  16. +4 −3 src/java/voldemort/server/storage/StorageService.java
  17. +25 −12 src/java/voldemort/store/bdb/BdbStorageConfiguration.java
  18. +0 −19 src/java/voldemort/store/bdb/BdbStorageEngine.java
  19. +12 −3 src/java/voldemort/store/socket/SocketAndStreams.java
  20. +13 −5 src/java/voldemort/store/socket/SocketDestination.java
  21. +33 −4 src/java/voldemort/store/socket/SocketPoolableObjectFactory.java
  22. +12 −18 src/java/voldemort/store/socket/SocketStore.java
  23. +1 −7 src/java/voldemort/xml/ClusterMapper.java
  24. +16 −18 test/common/voldemort/ServerTestUtils.java
  25. +1 −1  test/common/voldemort/TestUtils.java
  26. +7 −11 test/integration/voldemort/performance/RemoteStoreComparisonTest.java
  27. +25 −51 test/unit/voldemort/client/AdminServiceTest.java
  28. +2 −2 test/unit/voldemort/client/HttpStoreClientFactoryTest.java
  29. +1 −3 test/unit/voldemort/client/SocketStoreClientFactoryTest.java
  30. +0 −4 test/unit/voldemort/cluster/TestCluster.java
  31. +1 −1  test/unit/voldemort/protocol/vold/VoldemortNativeRequestFormatTest.java
  32. +3 −3 test/unit/voldemort/routing/ConsistentRoutingStrategyTest.java
  33. +15 −6 test/unit/voldemort/server/socket/SocketPoolTest.java
  34. +3 −3 test/unit/voldemort/store/http/HttpStoreTest.java
  35. +4 −6 test/unit/voldemort/store/readonly/ReadOnlyStorageEngineTestInstance.java
  36. +16 −33 test/unit/voldemort/store/rebalancing/RebalancingStoreTest.java
  37. +1 −2  test/unit/voldemort/store/socket/AbstractSocketStoreTest.java
  38. +1 −1  test/unit/voldemort/store/socket/VoldemortNativeSocketStoreTest.java
View
1  src/java/voldemort/VoldemortClientShell.java
@@ -32,7 +32,6 @@
import voldemort.client.DefaultStoreClient;
import voldemort.client.SocketStoreClientFactory;
import voldemort.client.StoreClientFactory;
-import voldemort.client.protocol.RequestFormatType;
import voldemort.cluster.Node;
import voldemort.serialization.SerializationException;
import voldemort.serialization.json.EndOfFileException;
View
23 src/java/voldemort/client/AdminClient.java
@@ -27,6 +27,7 @@
import org.apache.log4j.Logger;
import voldemort.VoldemortException;
+import voldemort.client.protocol.RequestFormatType;
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
import voldemort.serialization.VoldemortOpCode;
@@ -94,7 +95,9 @@ public VoldemortMetadata getMetadata() {
public void updateClusterMetadata(int nodeId, Cluster cluster, String cluster_key)
throws VoldemortException {
Node node = metadata.getCurrentCluster().getNodeById(nodeId);
- SocketDestination destination = new SocketDestination(node.getHost(), node.getAdminPort());
+ SocketDestination destination = new SocketDestination(node.getHost(),
+ node.getSocketPort(),
+ RequestFormatType.ADMIN);
SocketAndStreams sands = pool.checkout(destination);
try {
DataOutputStream outputStream = sands.getOutputStream();
@@ -125,7 +128,9 @@ public void updateStoresMetadata(int nodeId, List<StoreDefinition> storesList)
throws VoldemortException {
Node node = metadata.getCurrentCluster().getNodeById(nodeId);
- SocketDestination destination = new SocketDestination(node.getHost(), node.getAdminPort());
+ SocketDestination destination = new SocketDestination(node.getHost(),
+ node.getSocketPort(),
+ RequestFormatType.ADMIN);
SocketAndStreams sands = pool.checkout(destination);
try {
DataOutputStream outputStream = sands.getOutputStream();
@@ -161,7 +166,8 @@ public void updateStoresMetadata(int nodeId, List<StoreDefinition> storesList)
throws VoldemortException {
Node node = metadata.getCurrentCluster().getNodeById(nodeId);
final SocketDestination destination = new SocketDestination(node.getHost(),
- node.getAdminPort());
+ node.getSocketPort(),
+ RequestFormatType.ADMIN);
final SocketAndStreams sands = pool.checkout(destination);
try {
// get these partitions from the node for store
@@ -236,7 +242,9 @@ public void updatePartitionEntries(int nodeId,
throws VoldemortException, IOException {
Node node = metadata.getCurrentCluster().getNodeById(nodeId);
- SocketDestination destination = new SocketDestination(node.getHost(), node.getAdminPort());
+ SocketDestination destination = new SocketDestination(node.getHost(),
+ node.getSocketPort(),
+ RequestFormatType.ADMIN);
SocketAndStreams sands = pool.checkout(destination);
DataOutputStream outputStream = sands.getOutputStream();
DataInputStream inputStream = sands.getInputStream();
@@ -284,7 +292,9 @@ public void updatePartitionEntries(int nodeId,
public void changeServerState(int nodeId, VoldemortMetadata.ServerState state) {
Cluster currentCluster = metadata.getCurrentCluster();
Node node = currentCluster.getNodeById(nodeId);
- SocketDestination destination = new SocketDestination(node.getHost(), node.getAdminPort());
+ SocketDestination destination = new SocketDestination(node.getHost(),
+ node.getSocketPort(),
+ RequestFormatType.ADMIN);
SocketAndStreams sands = pool.checkout(destination);
try {
DataOutputStream outputStream = sands.getOutputStream();
@@ -315,7 +325,8 @@ public void changeServerState(int nodeId, VoldemortMetadata.ServerState state) {
public List<Versioned<byte[]>> redirectGet(int redirectedNodeId, String storeName, ByteArray key) {
Node redirectedNode = metadata.getCurrentCluster().getNodeById(redirectedNodeId);
SocketDestination destination = new SocketDestination(redirectedNode.getHost(),
- redirectedNode.getAdminPort());
+ redirectedNode.getSocketPort(),
+ RequestFormatType.ADMIN);
SocketAndStreams sands = pool.checkout(destination);
try {
DataOutputStream outputStream = sands.getOutputStream();
View
2  src/java/voldemort/client/ClientConfig.java
@@ -45,7 +45,7 @@
private volatile int socketBufferSize = 64 * 1024;
private volatile SerializerFactory serializerFactory = new DefaultSerializerFactory();
private volatile List<String> bootstrapUrls = null;
- private volatile RequestFormatType requestFormatType = RequestFormatType.VOLDEMORT;
+ private volatile RequestFormatType requestFormatType = RequestFormatType.VOLDEMORT_V1;
private volatile RoutingTier routingTier = RoutingTier.CLIENT;
private volatile boolean enableJmx = true;
View
5 src/java/voldemort/client/SocketStoreClientFactory.java
@@ -22,6 +22,7 @@
import voldemort.client.protocol.RequestFormatType;
import voldemort.cluster.Node;
import voldemort.store.Store;
+import voldemort.store.socket.SocketDestination;
import voldemort.store.socket.SocketPool;
import voldemort.store.socket.SocketStore;
import voldemort.utils.ByteArray;
@@ -61,10 +62,8 @@ public SocketStoreClientFactory(ClientConfig config) {
int port,
RequestFormatType type) {
return new SocketStore(Utils.notNull(storeName),
- Utils.notNull(host),
- port,
+ new SocketDestination(Utils.notNull(host), port, type),
socketPool,
- type,
RoutingTier.SERVER.equals(routingTier));
}
View
24 src/java/voldemort/client/protocol/RequestFormatFactory.java
@@ -16,6 +16,8 @@
package voldemort.client.protocol;
+import java.util.EnumMap;
+
import voldemort.VoldemortException;
import voldemort.client.protocol.pb.ProtoBuffClientRequestFormat;
import voldemort.client.protocol.vold.VoldemortNativeClientRequestFormat;
@@ -29,15 +31,21 @@
*/
public class RequestFormatFactory {
+ private EnumMap<RequestFormatType, RequestFormat> typeToInstance;
+
+ public RequestFormatFactory() {
+ this.typeToInstance = new EnumMap<RequestFormatType, RequestFormat>(RequestFormatType.class);
+ this.typeToInstance.put(RequestFormatType.VOLDEMORT_V1,
+ new VoldemortNativeClientRequestFormat());
+ this.typeToInstance.put(RequestFormatType.PROTOCOL_BUFFERS,
+ new ProtoBuffClientRequestFormat());
+ }
+
public RequestFormat getRequestFormat(RequestFormatType type) {
- switch(type) {
- case VOLDEMORT:
- return new VoldemortNativeClientRequestFormat();
- case PROTOCOL_BUFFERS:
- return new ProtoBuffClientRequestFormat();
- default:
- throw new VoldemortException("Unknown wire format " + type);
- }
+ RequestFormat format = this.typeToInstance.get(type);
+ if(type == null)
+ throw new VoldemortException("Unknown wire format " + type);
+ return format;
}
}
View
29 src/java/voldemort/client/protocol/RequestFormatType.java
@@ -23,25 +23,32 @@
*
*/
public enum RequestFormatType {
- VOLDEMORT("vold"),
- PROTOCOL_BUFFERS("pb"),
- ADMIN_HANDLER("admin");
+ VOLDEMORT_V0("vp0", "voldemort-native-v0"),
+ VOLDEMORT_V1("vp1", "voldemort-native-v1"),
+ PROTOCOL_BUFFERS("pb0", "protocol-buffers-v0"),
+ ADMIN("ad0", "admin-v0");
- private final String name;
+ private final String code;
+ private final String displayName;
- private RequestFormatType(String name) {
- this.name = name;
+ private RequestFormatType(String code, String display) {
+ this.code = code;
+ this.displayName = display;
}
- public String getName() {
- return name;
+ public String getCode() {
+ return code;
}
- public static RequestFormatType fromName(String name) {
+ public String getDisplayName() {
+ return this.displayName;
+ }
+
+ public static RequestFormatType fromCode(String code) {
for(RequestFormatType type: RequestFormatType.values())
- if(type.getName().equals(name))
+ if(type.getCode().equals(code))
return type;
- throw new IllegalArgumentException("No wire format '" + name + "' was found");
+ throw new IllegalArgumentException("No request format '" + code + "' was found");
}
}
View
2  src/java/voldemort/client/protocol/vold/VoldemortNativeClientRequestFormat.java
@@ -42,7 +42,7 @@
*/
public class VoldemortNativeClientRequestFormat implements RequestFormat {
- public final ErrorCodeMapper mapper;
+ private final ErrorCodeMapper mapper;
public VoldemortNativeClientRequestFormat() {
this.mapper = new ErrorCodeMapper();
View
16 src/java/voldemort/cluster/Node.java
@@ -41,30 +41,22 @@
private final int httpPort;
private final int socketPort;
private final List<Integer> partitions;
- private final int adminSocketPort;
private final NodeStatus status;
- public Node(int id,
- String host,
- int httpPort,
- int socketPort,
- int adminPort,
- List<Integer> partitions) {
- this(id, host, httpPort, socketPort, adminPort, partitions, new NodeStatus());
+ public Node(int id, String host, int httpPort, int socketPort, List<Integer> partitions) {
+ this(id, host, httpPort, socketPort, partitions, new NodeStatus());
}
public Node(int id,
String host,
int httpPort,
int socketPort,
- int adminSocketPort,
List<Integer> partitions,
NodeStatus status) {
this.id = id;
this.host = Utils.notNull(host);
this.httpPort = httpPort;
this.socketPort = socketPort;
- this.adminSocketPort = adminSocketPort;
this.status = status;
this.partitions = ImmutableList.copyOf(partitions);
}
@@ -81,10 +73,6 @@ public int getSocketPort() {
return socketPort;
}
- public int getAdminPort() {
- return adminSocketPort;
- }
-
public int getId() {
return id;
}
View
4 src/java/voldemort/server/VoldemortConfig.java
@@ -219,8 +219,8 @@ public VoldemortConfig(Props props) {
this.allProps = props;
String requestFormatName = props.getString("request.format",
- RequestFormatType.VOLDEMORT.getName());
- this.requestFormatType = RequestFormatType.fromName(requestFormatName);
+ RequestFormatType.VOLDEMORT_V1.getCode());
+ this.requestFormatType = RequestFormatType.fromCode(requestFormatName);
validateParams();
}
View
14 src/java/voldemort/server/VoldemortServer.java
@@ -105,24 +105,16 @@ public VoldemortServer(VoldemortConfig config, Cluster cluster) {
if(voldemortConfig.isHttpServerEnabled())
services.add(new HttpService(this,
storeRepository,
- RequestFormatType.VOLDEMORT,
+ RequestFormatType.VOLDEMORT_V1,
voldemortConfig.getMaxThreads(),
identityNode.getHttpPort()));
if(voldemortConfig.isSocketServerEnabled())
- services.add(new SocketService(requestHandlerFactory.getRequestHandler(voldemortConfig.getRequestFormatType()),
+ services.add(new SocketService(requestHandlerFactory,
identityNode.getSocketPort(),
voldemortConfig.getCoreThreads(),
voldemortConfig.getMaxThreads(),
voldemortConfig.getSocketBufferSize(),
- "client-request-server",
- voldemortConfig.isJmxEnabled()));
- if(voldemortConfig.isAdminServerEnabled())
- services.add(new SocketService(requestHandlerFactory.getRequestHandler(RequestFormatType.ADMIN_HANDLER),
- identityNode.getAdminPort(),
- voldemortConfig.getAdminCoreThreads(),
- voldemortConfig.getAdminMaxThreads(),
- voldemortConfig.getAdminSocketBufferSize(),
- "admin-server",
+ "socket-server",
voldemortConfig.isJmxEnabled()));
if(voldemortConfig.isJmxEnabled())
View
8 src/java/voldemort/server/protocol/RequestHandlerFactory.java
@@ -33,11 +33,13 @@ public RequestHandlerFactory(StoreRepository repository,
public RequestHandler getRequestHandler(RequestFormatType type) {
switch(type) {
- case VOLDEMORT:
- return new VoldemortNativeRequestHandler(new ErrorCodeMapper(), repository);
+ case VOLDEMORT_V0:
+ return new VoldemortNativeRequestHandler(new ErrorCodeMapper(), repository, 0);
+ case VOLDEMORT_V1:
+ return new VoldemortNativeRequestHandler(new ErrorCodeMapper(), repository, 1);
case PROTOCOL_BUFFERS:
return new ProtoBuffRequestHandler(new ErrorCodeMapper(), repository);
- case ADMIN_HANDLER:
+ case ADMIN:
return new AdminServiceRequestHandler(new ErrorCodeMapper(),
repository,
metadata,
View
19 src/java/voldemort/server/protocol/vold/VoldemortNativeRequestHandler.java
@@ -19,17 +19,32 @@
import voldemort.versioning.VectorClock;
import voldemort.versioning.Versioned;
+/**
+ * Server-side request handler for voldemort native client protocol
+ *
+ * @author jay
+ *
+ */
public class VoldemortNativeRequestHandler extends AbstractRequestHandler implements RequestHandler {
- public VoldemortNativeRequestHandler(ErrorCodeMapper errorMapper, StoreRepository repository) {
+ private final int protocolVersion;
+
+ public VoldemortNativeRequestHandler(ErrorCodeMapper errorMapper,
+ StoreRepository repository,
+ int protocolVersion) {
super(errorMapper, repository);
+ if(protocolVersion < 0 || protocolVersion > 1)
+ throw new IllegalArgumentException("Unknown protocol version: " + protocolVersion);
+ this.protocolVersion = protocolVersion;
}
public void handleRequest(DataInputStream inputStream, DataOutputStream outputStream)
throws IOException {
byte opCode = inputStream.readByte();
String storeName = inputStream.readUTF();
- boolean isRouted = inputStream.readBoolean();
+ boolean isRouted = false;
+ if(protocolVersion > 0)
+ isRouted = inputStream.readBoolean();
Store<ByteArray, byte[]> store = getStore(storeName, isRouted);
if(store == null) {
writeException(outputStream, new VoldemortException("No store named '" + storeName
View
35 src/java/voldemort/server/socket/SocketServer.java
@@ -35,7 +35,7 @@
import voldemort.VoldemortException;
import voldemort.annotations.jmx.JmxGetter;
import voldemort.annotations.jmx.JmxManaged;
-import voldemort.server.protocol.RequestHandler;
+import voldemort.server.protocol.RequestHandlerFactory;
/**
* A simple socket-based server for serving voldemort requests
@@ -54,7 +54,7 @@
private final ThreadGroup threadGroup;
private final CountDownLatch isStarted = new CountDownLatch(1);
private final int socketBufferSize;
- private final RequestHandler requestHandler;
+ private final RequestHandlerFactory handlerFactory;
private final int maxThreads;
private final String serverName;
private final StatusManager statusManager;
@@ -66,12 +66,12 @@ public SocketServer(String serverName,
int defaultThreads,
int maxThreads,
int socketBufferSize,
- RequestHandler requestHandler) {
+ RequestHandlerFactory handlerFactory) {
this.serverName = serverName;
this.port = port;
this.socketBufferSize = socketBufferSize;
this.threadGroup = new ThreadGroup("voldemort-socket-server");
- this.requestHandler = requestHandler;
+ this.handlerFactory = handlerFactory;
this.maxThreads = maxThreads;
this.threadPool = new ThreadPoolExecutor(defaultThreads,
maxThreads,
@@ -97,10 +97,16 @@ public Thread newThread(Runnable r) {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
SocketServerSession session = (SocketServerSession) r;
- logger.error("Too many open connections, " + executor.getActiveCount() + " of "
- + executor.getLargestPoolSize()
- + " threads in use, denying connection from "
- + session.getSocket().getRemoteSocketAddress());
+ if(interrupted()) {
+ logger.info("Denying connection from "
+ + session.getSocket().getRemoteSocketAddress()
+ + ", server is shutting down.");
+ } else {
+ logger.error("Too many open connections, " + executor.getActiveCount() + " of "
+ + executor.getLargestPoolSize()
+ + " threads in use, denying connection from "
+ + session.getSocket().getRemoteSocketAddress());
+ }
try {
session.getSocket().close();
} catch(IOException e) {
@@ -112,7 +118,7 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
@Override
public void run() {
logger.info("Starting voldemort socket server(" + serverName + ") on port " + port
- + " using request handler " + requestHandler.getClass());
+ + " using request handler " + handlerFactory.getClass());
try {
serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress(port));
@@ -121,7 +127,7 @@ public void run() {
while(!isInterrupted() && !serverSocket.isClosed()) {
final Socket socket = serverSocket.accept();
configureSocket(socket);
- this.threadPool.execute(new SocketServerSession(socket, requestHandler));
+ this.threadPool.execute(new SocketServerSession(socket, handlerFactory));
}
} catch(BindException e) {
logger.error("Could not bind to port " + port + ".");
@@ -158,11 +164,18 @@ private void configureSocket(Socket socket) throws SocketException {
public void shutdown() {
logger.info("Shutting down voldemort socket server(" + serverName + ") on port " + port
+ ".");
+ try {
+ serverSocket.close();
+ } catch(IOException e) {
+ logger.error("Error while closing socket server: " + e.getMessage());
+ }
threadGroup.interrupt();
interrupt();
threadPool.shutdownNow();
try {
- threadPool.awaitTermination(1, TimeUnit.SECONDS);
+ boolean completed = threadPool.awaitTermination(1, TimeUnit.SECONDS);
+ if(!completed)
+ logger.warn("Timed out waiting for sockets to close.");
} catch(InterruptedException e) {
logger.warn("Interrupted while waiting for tasks to complete: ", e);
}
View
63 src/java/voldemort/server/socket/SocketServerSession.java
@@ -6,18 +6,37 @@
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.net.Socket;
+import org.apache.log4j.Logger;
+
+import voldemort.client.protocol.RequestFormatType;
import voldemort.server.protocol.RequestHandler;
+import voldemort.server.protocol.RequestHandlerFactory;
+import voldemort.utils.ByteUtils;
+/**
+ * Represents a session of interaction between the server and the client. This
+ * begins with protocol negotiation and then a seriest of client requests
+ * followed by server responses. The negotiation is handled by the session
+ * object, which will choose an appropriate request handler to handle the actual
+ * request/response.
+ *
+ * @author jay
+ *
+ */
public class SocketServerSession implements Runnable {
+ private final Logger logger = Logger.getLogger(SocketServerSession.class);
+
private final Socket socket;
- private final RequestHandler requestHandler;
+ private final RequestHandlerFactory handlerFactory;
- public SocketServerSession(Socket socket, RequestHandler requestHandler) {
+ public SocketServerSession(Socket socket, RequestHandlerFactory handlerFactory) {
this.socket = socket;
- this.requestHandler = requestHandler;
+ this.handlerFactory = handlerFactory;
}
public Socket getSocket() {
@@ -30,25 +49,53 @@ private boolean isInterrupted() {
public void run() {
try {
- SocketServer.logger.info("Client " + socket.getRemoteSocketAddress() + " connected.");
DataInputStream inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream(),
64000));
DataOutputStream outputStream = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream(),
64000));
+
+ RequestFormatType protocol = negotiateProtocol(inputStream, outputStream);
+ RequestHandler handler = handlerFactory.getRequestHandler(protocol);
+ logger.info("Client " + socket.getRemoteSocketAddress()
+ + " connected successfully with protocol " + protocol);
+
while(!isInterrupted()) {
- requestHandler.handleRequest(inputStream, outputStream);
+ handler.handleRequest(inputStream, outputStream);
outputStream.flush();
}
} catch(EOFException e) {
- SocketServer.logger.info("Client " + socket.getRemoteSocketAddress() + " disconnected.");
+ logger.info("Client " + socket.getRemoteSocketAddress() + " disconnected.");
} catch(IOException e) {
- SocketServer.logger.error(e);
+ logger.error(e);
} finally {
try {
socket.close();
} catch(Exception e) {
- SocketServer.logger.error("Error while closing socket", e);
+ logger.error("Error while closing socket", e);
}
}
}
+
+ private RequestFormatType negotiateProtocol(InputStream input, OutputStream output)
+ throws IOException {
+ input.mark(3);
+ byte[] protoBytes = new byte[3];
+ ByteUtils.read(input, protoBytes);
+ RequestFormatType requestFormat;
+ try {
+ String proto = ByteUtils.getString(protoBytes, "UTF-8");
+ requestFormat = RequestFormatType.fromCode(proto);
+ output.write(ByteUtils.getBytes("ok", "UTF-8"));
+ output.flush();
+ } catch(IllegalArgumentException e) {
+ // okay we got some nonsense. For backwards compatibility,
+ // assume this is an old client who does not know how to negotiate
+ requestFormat = RequestFormatType.VOLDEMORT_V0;
+ // reset input stream so we don't interfere with request format
+ input.reset();
+ logger.info("No protocol proposal given, assuming "
+ + RequestFormatType.VOLDEMORT_V0.getDisplayName());
+ }
+ return requestFormat;
+ }
}
View
6 src/java/voldemort/server/socket/SocketService.java
@@ -21,7 +21,7 @@
import voldemort.server.AbstractService;
import voldemort.server.ServiceType;
import voldemort.server.VoldemortService;
-import voldemort.server.protocol.RequestHandler;
+import voldemort.server.protocol.RequestHandlerFactory;
import voldemort.utils.JmxUtils;
/**
@@ -37,7 +37,7 @@
private final SocketServer server;
private final boolean enableJmx;
- public SocketService(RequestHandler requestHandler,
+ public SocketService(RequestHandlerFactory requestHandlerFactory,
int port,
int coreConnections,
int maxConnections,
@@ -50,7 +50,7 @@ public SocketService(RequestHandler requestHandler,
coreConnections,
maxConnections,
socketBufferSize,
- requestHandler);
+ requestHandlerFactory);
this.serviceName = serviceName;
this.enableJmx = enableJmx;
}
View
7 src/java/voldemort/server/storage/StorageService.java
@@ -53,6 +53,7 @@
import voldemort.store.routed.RoutedStore;
import voldemort.store.serialized.SerializingStorageEngine;
import voldemort.store.slop.Slop;
+import voldemort.store.socket.SocketDestination;
import voldemort.store.socket.SocketPool;
import voldemort.store.socket.SocketStore;
import voldemort.store.stats.StatTrackingStore;
@@ -197,10 +198,10 @@ public void registerNodeStores(StoreDefinition def, Cluster cluster, int localNo
store = this.storeRepository.getLocalStore(def.getName());
} else {
store = new SocketStore(def.getName(),
- node.getHost(),
- node.getSocketPort(),
+ new SocketDestination(node.getHost(),
+ node.getSocketPort(),
+ voldemortConfig.getRequestFormatType()),
socketPool,
- voldemortConfig.getRequestFormatType(),
false);
}
this.storeRepository.addNodeStore(node.getId(), store);
View
37 src/java/voldemort/store/bdb/BdbStorageConfiguration.java
@@ -22,7 +22,7 @@
import org.apache.log4j.Logger;
import voldemort.VoldemortException;
-import voldemort.annotations.jmx.JmxGetter;
+import voldemort.annotations.jmx.JmxOperation;
import voldemort.server.VoldemortConfig;
import voldemort.store.StorageConfiguration;
import voldemort.store.StorageEngine;
@@ -120,13 +120,11 @@ private Environment getEnvironment(String storeName) throws DatabaseException {
// otherwise create a new environment
File bdbDir = new File(bdbMasterDir, storeName);
- if(!bdbDir.exists()) {
- logger.info("Creating BDB data directory '" + bdbDir.getAbsolutePath()
- + "' for store'" + storeName + "'.");
- bdbDir.mkdirs();
- }
+ createBdbDirIfNecessary(bdbDir);
Environment environment = new Environment(bdbDir, environmentConfig);
+ logger.info("Creating environment for " + storeName + ": ");
+ logEnvironmentConfig(environment.getConfig());
environments.put(storeName, environment);
return environment;
} else {
@@ -134,19 +132,34 @@ private Environment getEnvironment(String storeName) throws DatabaseException {
return environments.get(SHARED_ENV_KEY);
File bdbDir = new File(bdbMasterDir);
-
- if(!bdbDir.exists()) {
- logger.info("Creating BDB data directory '" + bdbDir.getAbsolutePath() + "'.");
- bdbDir.mkdirs();
- }
+ createBdbDirIfNecessary(bdbDir);
Environment environment = new Environment(bdbDir, environmentConfig);
+ logger.info("Creating shared BDB environment: ");
+ logEnvironmentConfig(environment.getConfig());
environments.put(SHARED_ENV_KEY, environment);
return environment;
}
}
}
+ private void createBdbDirIfNecessary(File bdbDir) {
+ if(!bdbDir.exists()) {
+ logger.info("Creating BDB data directory '" + bdbDir.getAbsolutePath() + ".");
+ bdbDir.mkdirs();
+ }
+ }
+
+ private void logEnvironmentConfig(EnvironmentConfig config) {
+ logger.info(" BDB cache size = " + config.getCacheSize());
+ logger.info(" BDB " + EnvironmentConfig.CLEANER_THREADS + " = "
+ + config.getConfigParam(EnvironmentConfig.CLEANER_THREADS));
+ logger.info(" BDB " + EnvironmentConfig.CLEANER_MIN_FILE_UTILIZATION + " = "
+ + config.getConfigParam(EnvironmentConfig.CLEANER_MIN_FILE_UTILIZATION));
+ logger.info(" BDB " + EnvironmentConfig.LOG_FILE_MAX + " = "
+ + config.getConfigParam(EnvironmentConfig.LOG_FILE_MAX));
+ }
+
public String getType() {
return TYPE_NAME;
}
@@ -162,7 +175,7 @@ public EnvironmentStats getStats(String storeName) {
}
}
- @JmxGetter(name = "stats", description = "A variety of stats about one BDB environment.")
+ @JmxOperation(description = "A variety of stats about one BDB environment.")
public String getEnvStatsAsString(String storeName) throws Exception {
return getStats(storeName).toString();
}
View
19 src/java/voldemort/store/bdb/BdbStorageEngine.java
@@ -51,7 +51,6 @@
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.DatabaseStats;
import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.StatsConfig;
@@ -80,24 +79,6 @@ public BdbStorageEngine(String name, Environment environment, Database database)
this.environment = Utils.notNull(environment);
this.serializer = new VersionedSerializer<byte[]>(new IdentitySerializer());
this.isOpen = new AtomicBoolean(true);
-
- try {
- logger.info(" BDB[" + name + "] : cache size = "
- + environment.getConfig().getCacheSize());
- logger.info(" BDB[" + name + "] : " + EnvironmentConfig.CLEANER_THREADS + " = "
- + environment.getConfig().getConfigParam(EnvironmentConfig.CLEANER_THREADS));
- logger.info(" BDB["
- + name
- + "] : "
- + EnvironmentConfig.CLEANER_MIN_FILE_UTILIZATION
- + " = "
- + environment.getConfig()
- .getConfigParam(EnvironmentConfig.CLEANER_MIN_FILE_UTILIZATION));
- logger.info(" BDB[" + name + "] : " + EnvironmentConfig.LOG_FILE_MAX + " = "
- + environment.getConfig().getConfigParam(EnvironmentConfig.LOG_FILE_MAX));
- } catch(DatabaseException e) {
- logger.error("Error getting config inforation for BDB at startup", e);
- }
}
public String getName() {
View
15 src/java/voldemort/store/socket/SocketAndStreams.java
@@ -23,6 +23,8 @@
import java.io.IOException;
import java.net.Socket;
+import voldemort.client.protocol.RequestFormatType;
+
/**
* A wrapper class that wraps a socket with its DataInputStream and
* DataOutputStream
@@ -35,19 +37,22 @@
private static final int DEFAULT_BUFFER_SIZE = 1000;
private final Socket socket;
+ private final RequestFormatType requestFormatType;
private final DataInputStream inputStream;
private final DataOutputStream outputStream;
- public SocketAndStreams(Socket socket) throws IOException {
- this(socket, DEFAULT_BUFFER_SIZE);
+ public SocketAndStreams(Socket socket, RequestFormatType requestFormatType) throws IOException {
+ this(socket, DEFAULT_BUFFER_SIZE, requestFormatType);
}
- public SocketAndStreams(Socket socket, int bufferSizeBytes) throws IOException {
+ public SocketAndStreams(Socket socket, int bufferSizeBytes, RequestFormatType type)
+ throws IOException {
this.socket = socket;
this.inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream(),
bufferSizeBytes));
this.outputStream = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream(),
bufferSizeBytes));
+ this.requestFormatType = type;
}
public Socket getSocket() {
@@ -62,4 +67,8 @@ public DataOutputStream getOutputStream() {
return outputStream;
}
+ public RequestFormatType getRequestFormatType() {
+ return this.requestFormatType;
+ }
+
}
View
18 src/java/voldemort/store/socket/SocketDestination.java
@@ -16,12 +16,13 @@
package voldemort.store.socket;
+import voldemort.client.protocol.RequestFormatType;
import voldemort.utils.Utils;
import com.google.common.base.Objects;
/**
- * A host + port
+ * A host + port + protocol
*
* @author jay
*
@@ -30,10 +31,12 @@
private final String host;
private final int port;
+ private final RequestFormatType requestFormatType;
- public SocketDestination(String host, int port) {
+ public SocketDestination(String host, int port, RequestFormatType requestFormatType) {
this.host = Utils.notNull(host);
this.port = Utils.notNull(port);
+ this.requestFormatType = Utils.notNull(requestFormatType);
}
public String getHost() {
@@ -44,6 +47,10 @@ public int getPort() {
return port;
}
+ public RequestFormatType getRequestFormatType() {
+ return requestFormatType;
+ }
+
@Override
public boolean equals(Object obj) {
if(obj == this)
@@ -53,17 +60,18 @@ public boolean equals(Object obj) {
return false;
SocketDestination d = (SocketDestination) obj;
- return getHost().equals(d.getHost()) && getPort() == d.getPort();
+ return getHost().equals(d.getHost()) && getPort() == d.getPort()
+ && getRequestFormatType().equals(d.getRequestFormatType());
}
@Override
public int hashCode() {
- return Objects.hashCode(host, port);
+ return Objects.hashCode(host, port, requestFormatType);
}
@Override
public String toString() {
- return host + ":" + port;
+ return host + ":" + port + "(" + requestFormatType.getCode() + ")";
}
}
View
37 src/java/voldemort/store/socket/SocketPoolableObjectFactory.java
@@ -16,6 +16,9 @@
package voldemort.store.socket;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
@@ -24,6 +27,10 @@
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.log4j.Logger;
+import voldemort.VoldemortException;
+import voldemort.client.protocol.RequestFormatType;
+import voldemort.utils.ByteUtils;
+
/**
* A Factory for creating sockets
*
@@ -36,8 +43,8 @@
private final int soTimeoutMs;
private final int socketBufferSize;
- public final AtomicInteger created;
- public final AtomicInteger destroyed;
+ private final AtomicInteger created;
+ private final AtomicInteger destroyed;
public SocketPoolableObjectFactory(int soTimeoutMs, int socketBufferSize) {
this.soTimeoutMs = soTimeoutMs;
@@ -81,14 +88,36 @@ public Object makeObject(Object key) throws Exception {
recordSocketCreation(dest, socket);
- return new SocketAndStreams(socket);
+ SocketAndStreams sands = new SocketAndStreams(socket, dest.getRequestFormatType());
+ negotiateProtocol(sands, dest.getRequestFormatType());
+
+ return sands;
+ }
+
+ private void negotiateProtocol(SocketAndStreams socket, RequestFormatType type)
+ throws IOException {
+ OutputStream outputStream = socket.getOutputStream();
+ byte[] proposal = ByteUtils.getBytes(type.getCode(), "UTF-8");
+ outputStream.write(proposal);
+ outputStream.flush();
+ DataInputStream inputStream = socket.getInputStream();
+ byte[] responseBytes = new byte[2];
+ inputStream.readFully(responseBytes);
+ String response = ByteUtils.getString(responseBytes, "UTF-8");
+ if(response.equals("ok"))
+ return;
+ else if(response.equals("no"))
+ throw new VoldemortException(type.getDisplayName()
+ + " is not an acceptable protcol for the server.");
+ else
+ throw new VoldemortException("Unknown server response: " + response);
}
/* Log relevant socket creation details */
private void recordSocketCreation(SocketDestination dest, Socket socket) throws SocketException {
int numCreated = created.incrementAndGet();
logger.debug("Created socket " + numCreated + " for " + dest.getHost() + ":"
- + dest.getPort());
+ + dest.getPort() + " using protocol " + dest.getRequestFormatType().getCode());
// check buffer sizes--you often don't get out what you put in!
int sendBufferSize = socket.getSendBufferSize();
View
30 src/java/voldemort/store/socket/SocketStore.java
@@ -26,7 +26,6 @@
import voldemort.VoldemortException;
import voldemort.client.protocol.RequestFormat;
import voldemort.client.protocol.RequestFormatFactory;
-import voldemort.client.protocol.RequestFormatType;
import voldemort.store.NoSuchCapabilityException;
import voldemort.store.Store;
import voldemort.store.StoreCapabilityType;
@@ -57,16 +56,11 @@
private final RequestFormat requestFormat;
private final boolean reroute;
- public SocketStore(String name,
- String host,
- int port,
- SocketPool socketPool,
- RequestFormatType requestFormatType,
- boolean reroute) {
+ public SocketStore(String name, SocketDestination dest, SocketPool socketPool, boolean reroute) {
this.name = Utils.notNull(name);
this.pool = Utils.notNull(socketPool);
- this.destination = new SocketDestination(Utils.notNull(host), port);
- this.requestFormat = requestFormatFactory.getRequestFormat(requestFormatType);
+ this.destination = dest;
+ this.requestFormat = requestFormatFactory.getRequestFormat(dest.getRequestFormatType());
this.reroute = reroute;
}
@@ -79,10 +73,10 @@ public boolean delete(ByteArray key, Version version) throws VoldemortException
SocketAndStreams sands = pool.checkout(destination);
try {
requestFormat.writeDeleteRequest(sands.getOutputStream(),
- name,
- key,
- (VectorClock) version,
- reroute);
+ name,
+ key,
+ (VectorClock) version,
+ reroute);
sands.getOutputStream().flush();
return requestFormat.readDeleteResponse(sands.getInputStream());
} catch(IOException e) {
@@ -131,11 +125,11 @@ public void put(ByteArray key, Versioned<byte[]> versioned) throws VoldemortExce
SocketAndStreams sands = pool.checkout(destination);
try {
requestFormat.writePutRequest(sands.getOutputStream(),
- name,
- key,
- versioned.getValue(),
- (VectorClock) versioned.getVersion(),
- reroute);
+ name,
+ key,
+ versioned.getValue(),
+ (VectorClock) versioned.getVersion(),
+ reroute);
sands.getOutputStream().flush();
requestFormat.readPutResponse(sands.getInputStream());
} catch(IOException e) {
View
8 src/java/voldemort/xml/ClusterMapper.java
@@ -43,7 +43,6 @@
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
-import voldemort.server.VoldemortConfig;
/**
* Parse a cluster xml file
@@ -63,7 +62,6 @@
private static final String HOST_ELMT = "host";
private static final String HTTP_PORT_ELMT = "http-port";
private static final String SOCKET_PORT_ELMT = "socket-port";
- private static final String ADMIN_PORT_ELMT = "admin-port";
private final Schema schema;
@@ -123,10 +121,7 @@ public Node readServer(Element server) {
for(String aPartition: COMMA_SEP.split(partitionsText))
partitions.add(Integer.parseInt(aPartition.trim()));
- int adminPort = (null != server.getChildText(ADMIN_PORT_ELMT)) ? Integer.parseInt(server.getChildText(ADMIN_PORT_ELMT))
- : VoldemortConfig.VOLDEMORT_DEFAULT_ADMIN_PORT;
-
- return new Node(id, host, httpPort, socketPort, adminPort, partitions);
+ return new Node(id, host, httpPort, socketPort, partitions);
}
public String writeCluster(Cluster cluster) {
@@ -144,7 +139,6 @@ private Element mapServer(Node node) {
server.addContent(new Element(HOST_ELMT).setText(node.getHost()));
server.addContent(new Element(HTTP_PORT_ELMT).setText(Integer.toString(node.getHttpPort())));
server.addContent(new Element(SOCKET_PORT_ELMT).setText(Integer.toString(node.getSocketPort())));
- server.addContent(new Element(ADMIN_PORT_ELMT).setText(Integer.toString(node.getAdminPort())));
String serverPartitionsText = StringUtils.join(node.getPartitionIds().toArray(), ", ");
server.addContent(new Element(SERVER_PARTITIONS_ELMT).setText(serverPartitionsText));
return server;
View
34 test/common/voldemort/ServerTestUtils.java
@@ -47,6 +47,7 @@
import voldemort.store.memory.InMemoryStorageConfiguration;
import voldemort.store.memory.InMemoryStorageEngine;
import voldemort.store.metadata.MetadataStore;
+import voldemort.store.socket.SocketDestination;
import voldemort.store.socket.SocketPool;
import voldemort.store.socket.SocketStore;
import voldemort.utils.ByteArray;
@@ -80,29 +81,26 @@ public static StoreRepository getStores(String storeName, String clusterXml, Str
public static SocketServer getSocketServer(String clusterXml,
String storesXml,
String storeName,
- int port,
- RequestFormatType type) {
+ int port) {
RequestHandlerFactory factory = new RequestHandlerFactory(getStores(storeName,
clusterXml,
storesXml), null, null);
- SocketServer socketServer = new SocketServer("Socket-Server",
- port,
- 5,
- 10,
- 10000,
- factory.getRequestHandler(type));
+ SocketServer socketServer = new SocketServer("Socket-Server", port, 5, 10, 10000, factory);
socketServer.start();
socketServer.awaitStartupCompletion();
return socketServer;
}
public static SocketStore getSocketStore(String storeName, int port) {
- return getSocketStore(storeName, port, RequestFormatType.VOLDEMORT);
+ return getSocketStore(storeName, port, RequestFormatType.VOLDEMORT_V1);
}
public static SocketStore getSocketStore(String storeName, int port, RequestFormatType type) {
SocketPool socketPool = new SocketPool(1, 2, 10000, 100000, 32 * 1024);
- return new SocketStore(storeName, "localhost", port, socketPool, type, false);
+ return new SocketStore(storeName,
+ new SocketDestination("localhost", port, type),
+ socketPool,
+ false);
}
public static Context getJettyServer(String clusterXml,
@@ -164,24 +162,24 @@ public static int findFreePort() {
}
public static Cluster getLocalCluster(int numberOfNodes) {
- return getLocalCluster(numberOfNodes, findFreePorts(3 * numberOfNodes));
+ return getLocalCluster(numberOfNodes, findFreePorts(2 * numberOfNodes));
}
public static Cluster getLocalCluster(int numberOfNodes, int[] ports) {
- if(3 * numberOfNodes != ports.length)
+ if(2 * numberOfNodes != ports.length)
throw new IllegalArgumentException(3 * numberOfNodes + " ports required but only "
+ ports.length + " given.");
List<Node> nodes = new ArrayList<Node>();
for(int i = 0; i < numberOfNodes; i++)
- nodes.add(new Node(i,
- "localhost",
- ports[3 * i],
- ports[3 * i + 1],
- ports[3 * i + 2],
- ImmutableList.of(i)));
+ nodes.add(new Node(i, "localhost", ports[2 * i], ports[2 * i + 1], ImmutableList.of(i)));
return new Cluster("test-cluster", nodes);
}
+ public static Node getLocalNode(int nodeId, List<Integer> partitions) {
+ int[] ports = findFreePorts(2);
+ return new Node(nodeId, "localhost", ports[0], ports[1], partitions);
+ }
+
public static List<StoreDefinition> getStoreDefs(int numStores) {
List<StoreDefinition> defs = new ArrayList<StoreDefinition>();
SerializerDefinition serDef = new SerializerDefinition("string");
View
2  test/common/voldemort/TestUtils.java
@@ -327,7 +327,7 @@ public static VoldemortMetadata createMetadata(int[][] partitionMap, StoreDefini
for(int p = 0; p < partitionMap[i].length; p++) {
partitionList.add(partitionMap[i][p]);
}
- nodes.add(new Node(i, "localhost", 8880 + i, 6666 + i, 7777 + i, partitionList));
+ nodes.add(new Node(i, "localhost", 8880 + i, 6666 + i, partitionList));
}
return nodes;
View
18 test/integration/voldemort/performance/RemoteStoreComparisonTest.java
@@ -36,6 +36,7 @@
import voldemort.store.Store;
import voldemort.store.http.HttpStore;
import voldemort.store.memory.InMemoryStorageEngine;
+import voldemort.store.socket.SocketDestination;
import voldemort.store.socket.SocketPool;
import voldemort.store.socket.SocketStore;
import voldemort.utils.ByteArray;
@@ -92,18 +93,13 @@ public void doOperation(int i) {
repository.addLocalStore(new InMemoryStorageEngine<ByteArray, byte[]>(storeName));
SocketPool socketPool = new SocketPool(10, 10, 1000, 1000, 32 * 1024);
final SocketStore socketStore = new SocketStore(storeName,
- "localhost",
- 6666,
+ new SocketDestination("localhost",
+ 6666,
+ RequestFormatType.VOLDEMORT_V1),
socketPool,
- RequestFormatType.VOLDEMORT,
false);
RequestHandlerFactory factory = new RequestHandlerFactory(repository, null, null);
- SocketServer socketServer = new SocketServer("Socket-Server",
- 6666,
- 50,
- 50,
- 1000,
- factory.getRequestHandler(RequestFormatType.VOLDEMORT));
+ SocketServer socketServer = new SocketServer("Socket-Server", 6666, 50, 50, 1000, factory);
socketServer.start();
socketServer.awaitStartupCompletion();
@@ -148,7 +144,7 @@ public void doOperation(int i) {
repository.addLocalStore(new InMemoryStorageEngine<ByteArray, byte[]>(storeName));
HttpService httpService = new HttpService(null,
repository,
- RequestFormatType.VOLDEMORT,
+ RequestFormatType.VOLDEMORT_V0,
numThreads,
8080);
httpService.start();
@@ -171,7 +167,7 @@ public void doOperation(int i) {
"localhost",
8080,
httpClient,
- new RequestFormatFactory().getRequestFormat(RequestFormatType.VOLDEMORT),
+ new RequestFormatFactory().getRequestFormat(RequestFormatType.VOLDEMORT_V0),
false);
Thread.sleep(400);
View
76 test/unit/voldemort/client/AdminServiceTest.java
@@ -44,6 +44,8 @@
import voldemort.versioning.VectorClock;
import voldemort.versioning.Versioned;
+import com.google.common.collect.ImmutableList;
+
/**
* @author bbansal
*
@@ -60,21 +62,13 @@
@Override
public void setUp() throws IOException {
// start 2 node cluster with free ports
- int[] ports = ServerTestUtils.findFreePorts(3);
- Node node0 = new Node(0,
- "localhost",
- ports[0],
- ports[1],
- ports[2],
- Arrays.asList(new Integer[] { 0, 1 }));
-
- ports = ServerTestUtils.findFreePorts(3);
- Node node1 = new Node(1,
- "localhost",
- ports[0],
- ports[1],
- ports[2],
- Arrays.asList(new Integer[] { 2, 3 }));
+ int[] ports = ServerTestUtils.findFreePorts(2);
+ Node node0 = new Node(0, "localhost", ports[0], ports[1], Arrays.asList(new Integer[] { 0,
+ 1 }));
+
+ ports = ServerTestUtils.findFreePorts(2);
+ Node node1 = new Node(1, "localhost", ports[0], ports[1], Arrays.asList(new Integer[] { 2,
+ 3 }));
cluster = new Cluster("admin-service-test", Arrays.asList(new Node[] { node0, node1 }));
config = ServerTestUtils.createServerConfig(0,
@@ -90,22 +84,21 @@ public void tearDown() throws IOException, InterruptedException {
server.stop();
}
+ private AdminClient getAdminClient() {
+ return new AdminClient(server.getIdentityNode(),
+ server.getVoldemortMetadata(),
+ new SocketPool(100, 100, 2000, 1000, 10000));
+ }
+
public void testUpdateCluster() {
Cluster cluster = server.getVoldemortMetadata().getCurrentCluster();
-
- // add node 3 and partition 4,5 to cluster.
- ArrayList<Integer> partitionList = new ArrayList<Integer>();
- partitionList.add(4);
- partitionList.add(5);
ArrayList<Node> nodes = new ArrayList<Node>(cluster.getNodes());
- nodes.add(new Node(3, "localhost", 8883, 6668, 7778, partitionList));
+ nodes.add(new Node(3, "localhost", 8883, 6668, ImmutableList.of(4, 5)));
Cluster updatedCluster = new Cluster("new-cluster", nodes);
// update VoldemortServer cluster.xml
- AdminClient client = new AdminClient(server.getIdentityNode(),
- server.getVoldemortMetadata(),
- new SocketPool(100, 100, 2000, 1000, 10000));
+ AdminClient client = getAdminClient();
client.updateClusterMetadata(server.getIdentityNode().getId(),
updatedCluster,
@@ -119,17 +112,12 @@ public void testUpdateOldCluster() {
Cluster cluster = server.getVoldemortMetadata().getCurrentCluster();
// add node 3 and partition 4,5 to cluster.
- ArrayList<Integer> partitionList = new ArrayList<Integer>();
- partitionList.add(4);
- partitionList.add(5);
ArrayList<Node> nodes = new ArrayList<Node>(cluster.getNodes());
- nodes.add(new Node(3, "localhost", 8883, 6668, 7778, partitionList));
+ nodes.add(new Node(3, "localhost", 8883, 6668, ImmutableList.of(4, 5)));
Cluster updatedCluster = new Cluster("new-cluster", nodes);
// update VoldemortServer cluster.xml
- AdminClient client = new AdminClient(server.getIdentityNode(),
- server.getVoldemortMetadata(),
- new SocketPool(100, 100, 2000, 1000, 10000));
+ AdminClient client = getAdminClient();
client.updateClusterMetadata(server.getIdentityNode().getId(),
updatedCluster,
@@ -163,9 +151,7 @@ public void testUpdateStores() {
}
// update server stores info
- AdminClient client = new AdminClient(server.getIdentityNode(),
- server.getVoldemortMetadata(),
- new SocketPool(100, 100, 2000, 1000, 10000));
+ AdminClient client = getAdminClient();
client.updateStoresMetadata(server.getIdentityNode().getId(), storesList);
@@ -195,9 +181,7 @@ public void testRedirectGet() {
.getValue()));
// update server stores info
- AdminClient client = new AdminClient(server.getIdentityNode(),
- server.getVoldemortMetadata(),
- new SocketPool(100, 100, 2000, 1000, 10000));
+ AdminClient client = getAdminClient();
assertEquals("ForcedGet should match put value",
new String(value),
@@ -208,9 +192,7 @@ public void testRedirectGet() {
public void testStateTransitions() {
// change to REBALANCING STATE
- AdminClient client = new AdminClient(server.getIdentityNode(),
- server.getVoldemortMetadata(),
- new SocketPool(100, 100, 2000, 1000, 10000));
+ AdminClient client = getAdminClient();
client.changeServerState(server.getIdentityNode().getId(),
VoldemortMetadata.ServerState.REBALANCING_STEALER_STATE);
@@ -254,9 +236,7 @@ public void testFetchAsStream() {
}
// Get a single partition here
- AdminClient client = new AdminClient(server.getIdentityNode(),
- server.getVoldemortMetadata(),
- new SocketPool(100, 100, 2000, 1000, 10000));
+ AdminClient client = getAdminClient();
Iterator<Pair<ByteArray, Versioned<byte[]>>> entryIterator = client.fetchPartitionEntries(0,
storeName,
Arrays.asList(new Integer[] { 0 }));
@@ -306,10 +286,7 @@ public void testUpdateAsStream() throws IOException {
}
// Write
- AdminClient client = new AdminClient(server.getIdentityNode(),
- server.getVoldemortMetadata(),
- new SocketPool(100, 100, 2000, 1000, 10000));
-
+ AdminClient client = getAdminClient();
client.updatePartitionEntries(0, storeName, entryList.iterator());
for(int i = 100; i <= 104; i++) {
@@ -362,10 +339,7 @@ public void testFetchAndUpdate() throws IOException {
}
// use pipeGetAndPutStream to add values to server2
- AdminClient client = new AdminClient(server2.getIdentityNode(),
- server2.getVoldemortMetadata(),
- new SocketPool(100, 100, 2000, 1000, 10000));
-
+ AdminClient client = getAdminClient();
List<Integer> stealList = new ArrayList<Integer>();
stealList.add(0);
stealList.add(1);
View
4 test/unit/voldemort/client/HttpStoreClientFactoryTest.java
@@ -41,11 +41,11 @@ public void setUp() throws Exception {
context = ServerTestUtils.getJettyServer(getClusterXml(),
getStoreDefXml(),
getValidStoreName(),
- RequestFormatType.VOLDEMORT,
+ RequestFormatType.VOLDEMORT_V1,
getLocalNode().getHttpPort());
server = context.getServer();
httpStore = ServerTestUtils.getHttpStore(getValidStoreName(),
- RequestFormatType.VOLDEMORT,
+ RequestFormatType.VOLDEMORT_V1,
getLocalNode().getHttpPort());
url = getLocalNode().getHttpUrl().toString();
}
View
4 test/unit/voldemort/client/SocketStoreClientFactoryTest.java
@@ -19,7 +19,6 @@
import java.net.URISyntaxException;
import voldemort.ServerTestUtils;
-import voldemort.client.protocol.RequestFormatType;
import voldemort.serialization.SerializerFactory;
import voldemort.server.socket.SocketServer;
@@ -41,8 +40,7 @@ public void setUp() throws Exception {
server = ServerTestUtils.getSocketServer(getClusterXml(),
getStoreDefXml(),
getValidStoreName(),
- getLocalNode().getSocketPort(),
- RequestFormatType.VOLDEMORT);
+ getLocalNode().getSocketPort());
}
@Override
View
4 test/unit/voldemort/cluster/TestCluster.java
@@ -37,28 +37,24 @@ public void setUp() {
"test1",
1,
1,
- 1,
ImmutableList.of(1, 2, 3),
new NodeStatus(time)),
new Node(2,
"test1",
2,
2,
- 2,
ImmutableList.of(3, 5, 6),
new NodeStatus(time)),
new Node(3,
"test1",
3,
3,
- 3,
ImmutableList.of(7, 8, 9),
new NodeStatus(time)),
new Node(4,
"test1",
4,
4,
- 4,
ImmutableList.of(10, 11, 12),
new NodeStatus(time)));
this.cluster = new Cluster(clusterName, nodes);
View
2  test/unit/voldemort/protocol/vold/VoldemortNativeRequestFormatTest.java
@@ -6,7 +6,7 @@
public class VoldemortNativeRequestFormatTest extends AbstractRequestFormatTest {
public VoldemortNativeRequestFormatTest() {
- super(RequestFormatType.VOLDEMORT);
+ super(RequestFormatType.VOLDEMORT_V1);
}
}
View
6 test/unit/voldemort/routing/ConsistentRoutingStrategyTest.java
@@ -108,8 +108,8 @@ public void testLoadBalancing(int numNodes,
tags.add(i);
for(int i = 0; i < numNodes; i++)
- nodes.add(new Node(i, "host", 8080, 6666, 7777, tags.subList(tagsPerNode * i,
- tagsPerNode * (i + 1))));
+ nodes.add(new Node(i, "host", 8080, 6666, tags.subList(tagsPerNode * i, tagsPerNode
+ * (i + 1))));
// use a seed so that this test is repeatable
Random random = new Random(2158745224L);
@@ -183,7 +183,7 @@ private Node node(int id, int... tags) {
List<Integer> list = new ArrayList<Integer>(tags.length);
for(int tag: tags)
list.add(tag);
- return new Node(id, "localhost", 8080, 6666, 7777, list, new NodeStatus(time));
+ return new Node(id, "localhost", 8080, 6666, list, new NodeStatus(time));
}
}
View
21 test/unit/voldemort/server/socket/SocketPoolTest.java
@@ -18,9 +18,9 @@
import junit.framework.TestCase;
import voldemort.ServerTestUtils;
+import voldemort.client.protocol.RequestFormatType;
import voldemort.server.StoreRepository;
-import voldemort.server.protocol.vold.VoldemortNativeRequestHandler;
-import voldemort.store.ErrorCodeMapper;
+import voldemort.server.protocol.RequestHandlerFactory;
import voldemort.store.socket.SocketAndStreams;
import voldemort.store.socket.SocketDestination;
import voldemort.store.socket.SocketPool;
@@ -48,15 +48,16 @@ public void setUp() {
1000,
1000,
32 * 1024);
- this.dest1 = new SocketDestination("localhost", port);
- VoldemortNativeRequestHandler requestHandler = new VoldemortNativeRequestHandler(new ErrorCodeMapper(),
- new StoreRepository());
+ this.dest1 = new SocketDestination("localhost", port, RequestFormatType.VOLDEMORT_V1);
+ RequestHandlerFactory handlerFactory = new RequestHandlerFactory(new StoreRepository(),
+ null,
+ null);
this.server = new SocketServer("test-socket",
port,
maxTotalConnections,
maxTotalConnections + 3,
10000,
- requestHandler);
+ handlerFactory);
this.server.start();
this.server.awaitStartupCompletion();
}
@@ -90,4 +91,12 @@ public void testClosingStreamDeactivates() throws Exception {
assertTrue(sas1 != sas2);
}
+ public void testVariousProtocols() throws Exception {
+ for(RequestFormatType type: RequestFormatType.values()) {
+ SocketDestination dest = new SocketDestination("localhost", port, type);
+ SocketAndStreams sas = pool.checkout(dest);
+ assertEquals(type, sas.getRequestFormatType());
+ }
+ }
+
}
View
6 test/unit/voldemort/store/http/HttpStoreTest.java
@@ -55,17 +55,17 @@ public void setUp() throws Exception {
context = ServerTestUtils.getJettyServer(new ClusterMapper().writeCluster(cluster),
VoldemortTestConstants.getSimpleStoreDefinitionsXml(),
"users",
- RequestFormatType.VOLDEMORT,
+ RequestFormatType.VOLDEMORT_V1,
node.getHttpPort());
server = context.getServer();
httpStore = ServerTestUtils.getHttpStore("users",
- RequestFormatType.VOLDEMORT,
+ RequestFormatType.VOLDEMORT_V1,
node.getHttpPort());
}
public <T extends Exception> void testBadUrlOrPort(String url, int port, Class<T> expected) {
ByteArray key = new ByteArray("test".getBytes());
- RequestFormat requestFormat = new RequestFormatFactory().getRequestFormat(RequestFormatType.VOLDEMORT);
+ RequestFormat requestFormat = new RequestFormatFactory().getRequestFormat(RequestFormatType.VOLDEMORT_V1);
HttpStore badUrlHttpStore = new HttpStore("test",
url,
port,
View
10 test/unit/voldemort/store/readonly/ReadOnlyStorageEngineTestInstance.java
@@ -85,12 +85,10 @@ public static ReadOnlyStorageEngineTestInstance create(File baseDir,
// set up definitions for cluster and store
List<Node> nodes = new ArrayList<Node>();
for(int i = 0; i < numNodes; i++) {
- nodes.add(new Node(i,
- "localhost",
- 8080 + i,
- 6666 + i,
- 7777 + i,
- Arrays.asList(4 * i, 4 * i + 1, 4 * i + 2, 4 * i + 3)));
+ nodes.add(new Node(i, "localhost", 8080 + i, 6666 + i, Arrays.asList(4 * i,
+ 4 * i + 1,
+ 4 * i + 2,
+ 4 * i + 3)));
}
Cluster cluster = new Cluster("test", nodes);
SerializerDefinition serDef = new SerializerDefinition("json", "'string'");
View
49 test/unit/voldemort/store/rebalancing/RebalancingStoreTest.java
@@ -51,21 +51,13 @@
@Override
public void setUp() throws IOException {
// start 2 node cluster with free ports
- int[] ports = ServerTestUtils.findFreePorts(3);
- Node node0 = new Node(0,
- "localhost",
- ports[0],
- ports[1],
- ports[2],
- Arrays.asList(new Integer[] { 0, 1 }));
-
- ports = ServerTestUtils.findFreePorts(3);
- Node node1 = new Node(1,
- "localhost",
- ports[0],
- ports[1],
- ports[2],
- Arrays.asList(new Integer[] { 2, 3 }));
+ int[] ports = ServerTestUtils.findFreePorts(2);
+ Node node0 = new Node(0, "localhost", ports[0], ports[1], Arrays.asList(new Integer[] { 0,
+ 1 }));
+
+ ports = ServerTestUtils.findFreePorts(2);
+ Node node1 = new Node(1, "localhost", ports[0], ports[1], Arrays.asList(new Integer[] { 2,
+ 3 }));
cluster = new Cluster("admin-service-test", Arrays.asList(new Node[] { node0, node1 }));
@@ -93,6 +85,13 @@ public void tearDown() throws IOException, InterruptedException {
server1.stop();
}
+ private RebalancingStore getRebalancingStore(VoldemortMetadata metadata) {
+ return new RebalancingStore(0,
+ server0.getStoreRepository().getLocalStore(storeName),
+ metadata,
+ new SocketPool(100, 100, 2000, 1000, 10000));
+ }
+
public void testProxyGet() {
// enter bunch of data into server1
for(int i = 100; i <= 1000; i++) {
@@ -111,15 +110,7 @@ public void testProxyGet() {
metadata.setDonorNode(server0.getVoldemortMetadata().getCurrentCluster().getNodeById(1));
metadata.setCurrentPartitionStealList(Arrays.asList(new Integer[] { 2, 3 }));
- RebalancingStore rebalancingStore = new RebalancingStore(0,
- server0.getStoreRepository()
- .getLocalStore(storeName),
- metadata,
- new SocketPool(100,
- 100,
- 2000,
- 1000,
- 10000));
+ RebalancingStore rebalancingStore = getRebalancingStore(metadata);
// for Normal server state no values are expected
for(int i = 100; i <= 1000; i++) {
@@ -179,15 +170,7 @@ public void testProxyPut() {
metadata.setDonorNode(server0.getVoldemortMetadata().getCurrentCluster().getNodeById(1));
metadata.setCurrentPartitionStealList(Arrays.asList(new Integer[] { 2, 3 }));
- RebalancingStore rebalancingStore = new RebalancingStore(0,
- server0.getStoreRepository()
- .getLocalStore(storeName),
- metadata,
- new SocketPool(100,
- 100,
- 2000,
- 1000,
- 10000));
+ RebalancingStore rebalancingStore = getRebalancingStore(metadata);
// we should see obsolete version exception if try to insert with same
// version
View
3  test/unit/voldemort/store/socket/AbstractSocketStoreTest.java
@@ -61,8 +61,7 @@ protected void setUp() throws Exception {
socketServer = ServerTestUtils.getSocketServer(VoldemortTestConstants.getOneNodeClusterXml(),
VoldemortTestConstants.getSimpleStoreDefinitionsXml(),
"test",
- socketPort,
- requestFormatType);
+ socketPort);
socketStore = ServerTestUtils.getSocketStore("test", socketPort, requestFormatType);
}
View
2  test/unit/voldemort/store/socket/VoldemortNativeSocketStoreTest.java
@@ -11,7 +11,7 @@
public class VoldemortNativeSocketStoreTest extends AbstractSocketStoreTest {
public VoldemortNativeSocketStoreTest() {
- super(RequestFormatType.VOLDEMORT);
+ super(RequestFormatType.VOLDEMORT_V1);
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.