Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

pull from voldemort master.

  • Loading branch information...
commit edb84f614b08ceebe4bce963859dadd299505961 2 parents f678066 + f15e2cd
@bbansal bbansal authored
View
1  .classpath
@@ -44,5 +44,6 @@
<classpathentry kind="lib" path="lib/je-3.3.87.jar"/>
<classpathentry kind="lib" path="lib/protobuf-java-2.2.0.jar"/>
<classpathentry kind="lib" path="contrib/ec2-testing/lib/typica.jar"/>
+ <classpathentry kind="lib" path="lib/h2-lzf.jar"/>
<classpathentry kind="output" path="classes"/>
</classpath>
View
2  NOTICE
@@ -25,6 +25,8 @@ This product includes jdom, a library developed by jdom.org.
This product includes jopt-simple, a library for parsing command line options (http://jopt-simple.sourceforge.net/).
+This product includes h2-lzf, which contains classes for LZF compression originally found in the H2 database (http://www.h2database.com/html/main.html). H2 is dual licensed and available under a modified version of the MPL 1.1 (Mozilla Public License) or under the (unmodified) EPL 1.0 (Eclipse Public License).
+
This product includes BDB, Java edition, A library developed by Oracle (http://www.oracle.com/database/berkeley-db/je/index.html), which includes
the following license information:
View
4 bin/voldemort-shell.sh
@@ -16,7 +16,7 @@
# limitations under the License.
#
-if [ $# != 2 ];
+if [ $# -lt 2 ];
then
echo 'USAGE: bin/voldemort-shell.sh store_name bootstrap_url [command_file]'
exit 1
@@ -24,4 +24,4 @@ fi
base_dir=$(dirname $0)/..
-$base_dir/bin/run-class.sh jline.ConsoleRunner voldemort.VoldemortClientShell $@
+$base_dir/bin/run-class.sh jline.ConsoleRunner voldemort.VoldemortClientShell $@
View
42 contrib/ruby-client/node.rb
@@ -0,0 +1,42 @@
+require 'time'
+require 'rexml/document'
+include REXML
+
+class VoldemortNode
+ """A Voldemort node with the appropriate host and port information for contacting that node"""
+
+ attr_reader :id, :host, :socket_port, :http_port, :partitions, :is_available, :last_contact
+
+ def initialize(id, host, socket_port, http_port, partitions, is_available = true, last_contact = nil)
+ @id = id
+ @host = host
+ @socket_port = socket_port
+ @http_port = http_port
+ @partitions = partitions
+ @is_available = is_available
+ if not last_contact
+ @last_contact = Time.new.to_f
+ end
+ end
+
+ def inspect()
+ return "node(id = #{@id}, host = #{@host}, socket_port = #{@socket_port}, http_port = #{@http_port}, partitions = #{@partitions.map {|i| i.to_s}.join(', ')})"
+ end
+
+ #@staticmethod
+ def self.parse_cluster(xml)
+ """Parse the cluster.xml file and return a dictionary of the nodes in the cluster indexed by node id """
+ doc = Document.new(xml)
+ nodes = {}
+ XPath.each(doc, '//server') do |curr|
+ id = curr.elements['id'].text.to_i
+ host = curr.elements['host'].text
+ http_port = curr.elements['http-port'].text.to_i
+ socket_port = curr.elements['socket-port'].text.to_i
+ partition_str = curr.elements['partitions'].text
+ partitions = partition_str.split(/, */).map {|p| p.to_i}
+ nodes[id] = VoldemortNode.new(id, host, socket_port, http_port, partitions)
+ end
+ return nodes
+ end
+end
View
353 contrib/ruby-client/store_client.rb
@@ -0,0 +1,353 @@
+require 'rubygems'
+require 'logger'
+require 'socket'
+require 'node'
+require 'voldemort_exception'
+require 'voldemort-client.pb.rb'
+include Voldemort
+
+$LOG = Logger.new(STDOUT)
+
+# A simple Voldemort client. It is single-threaded and supports only server-side routing.
+class StoreClient
+ def initialize(store_name, bootstrap_urls, reconnect_interval = 500, conflict_resolver = nil)
+ @store_name = store_name
+ @request_count = 0
+ @conflict_resolver = conflict_resolver
+ @nodes = _bootstrap_metadata(bootstrap_urls)
+ @node_id = rand(@nodes.size)
+ @node_id, @connection = _reconnect()
+ @reconnect_interval = reconnect_interval
+ @open = true
+ end
+
+ def _make_connection(host, port)
+ protocol = 'pb0'
+ $LOG.debug("Attempting to connect to #{host}:#{port}")
+ connection = TCPSocket.open(host, port)
+ $LOG.debug('Connection succeeded, negotiating protocol')
+ connection.send(protocol, 0)
+ resp = connection.recv(2)
+ if resp != 'ok'
+ raise VoldemortException.new("Server does not understand the protocol #{protocol}")
+ end
+ return connection
+ end
+
+ ## Connect to a the next available node in the cluster
+ ## returns a tuple of (node_id, connection)
+ def _reconnect()
+ num_nodes = @nodes.size
+ attempts = 0
+ new_node_id = @node_id
+ while attempts < num_nodes
+ new_node_id = (new_node_id + 1) % num_nodes
+ new_node = @nodes[new_node_id]
+ connection = nil
+ begin
+ connection = _make_connection(new_node.host, new_node.socket_port)
+ @request_count = 0
+ return new_node_id, connection
+ rescue SocketError => message
+ _close_socket(connection)
+ connection.close
+ $LOG.warn("Error connecting to node #{new_node_id}: #{message}")
+ attempts += 1
+ end
+ end
+
+ # If we get here all nodes have failed us, explode
+ raise VoldemortException.new('Connections to all nodes failed.')
+ end
+
+ ## Safely close the socket, catching and logging any exceptions
+ def _close_socket(socket)
+ begin
+ if socket
+ socket.close
+ end
+ rescue SocketError => message
+ $LOG.error("Error while closing socket: #{message}")
+ end
+ end
+
+ ## Check if the the number of requests made on this connection is greater than the reconnect interval.
+ ## If so reconnect to a random node in the cluster. No attempt is made at preventing the reconnecting
+ ## from going back to the same node
+ def _maybe_reconnect()
+ if @request_count >= @reconnect_interval
+ $LOG.debug("Completed #{@request_count} requests using this connection, reconnecting...")
+ _close_socket(@connection)
+ @node_id, @connection = _reconnect()
+ end
+ end
+
+ ## send a request to the server using the given connection
+ def _send_request(connection, req_bytes)
+ connection.print([req_bytes.size].pack('N') + req_bytes)
+ @request_count += 1
+ end
+
+ ## read a response from the connection
+ def _receive_response(connection)
+ size_bytes = connection.recv(4)
+ size = size_bytes.unpack('N')
+
+ if size[0] == 0
+ return ''
+ end
+
+ return connection.recv(size[0])
+ end
+
+ ## Bootstrap cluster metadata from a list of urls of nodes in the cluster.
+ ## The urls are tuples in the form (host, port).
+ ## A dictionary of node_id => node is returned.
+ def _bootstrap_metadata(bootstrap_urls)
+ bootstrap_urls.sort_by { rand }
+ for host, port in bootstrap_urls
+ $LOG.debug("Attempting to bootstrap metadata from #{host}:#{port}")
+ connection = nil
+ begin
+ connection = _make_connection(host, port)
+ cluster_xmls = _get_with_connection(connection, 'metadata', 'cluster.xml', false)
+ if cluster_xmls.size != 1
+ raise VoldemortException.new("Expected exactly one version of the metadata but found #{cluster_xmls}")
+ end
+ nodes = VoldemortNode.parse_cluster(cluster_xmls[0][0])
+ $LOG.debug("Bootstrap from #{host}:#{port} succeeded, found #{nodes.size} nodes.")
+ return nodes
+ rescue SocketError => message
+ $LOG.warn("Metadata bootstrap from #{host}:#{port} failed: #{message}")
+ ensure
+ _close_socket(connection)
+ end
+ end
+ raise VoldemortException.new('All bootstrap attempts failed')
+ end
+
+ ## check if the server response has an error, if so throw an exception
+ def _check_error(resp)
+ if resp.error
+ raise VoldemortException.new(resp.error.error_message, resp.error.error_code)
+ end
+ end
+
+ ## Increment the version for a vector clock
+ def _increment(clock)
+ # See if we already have a version for this guy, if so increment it
+ for entry in clock.entries
+ if entry.node_id == @node_id
+ entry.version += 1
+ return
+ end
+ end
+ # Otherwise add a version
+ entry = ClockEntry.new
+ entry.node_id = @node_id
+ entry.version = 1
+ clock.entries << entry
+ clock.timestamp = Time.new.to_i * 1000
+ end
+
+ ## Take a list of versions, and, if a conflict resolver has been given, resolve any conflicts that can be resolved
+ def _resolve_conflicts(versions)
+ if @conflict_resolver and versions
+ return conflict_resolver(versions)
+ else
+ return versions
+ end
+ end
+
+ ## Turn a protocol buffer list of versioned items into a ruby list of items
+ def _extract_versions(pb_versioneds)
+ versions = []
+ for versioned in pb_versioneds
+ versions << [versioned.value, versioned.version]
+ end
+ return _resolve_conflicts(versions)
+ end
+
+ ## A basic request wrapper, that handles reconnection logic and failures
+ def _execute_request(fun, args)
+ raise 'Store has been closed.' unless @open
+ _maybe_reconnect()
+
+ failures = 0
+ num_nodes = @nodes.size
+ while failures < num_nodes
+ begin
+ return send(fun, *args)
+ rescue SocketError => message
+ $LOG.warn("Error while performing #{fun} on node #{@node_id}: #{message}")
+ _reconnect()
+ failures += 1
+ end
+ end
+
+ raise VoldemortException.new("All nodes are down, #{fun} failed.")
+ end
+
+ ## An internal get function that take the connection and store name as parameters. This is
+ ## used by both the public get() method and also the metadata bootstrap process
+ def _get_with_connection(connection, store_name, key, should_route)
+ """Execute get request to the given store. Returns a (value, version) pair."""
+
+ req = VoldemortRequest.new
+ req.should_route = should_route
+ req.store = store_name
+ req.type = RequestType::GET
+ req.get = GetRequest.new
+ req.get.key = key
+
+ # send request
+ _send_request(connection, req.serialize_to_string())
+
+ # read and parse response
+ resp_str = _receive_response(connection)
+ resp = GetResponse.new
+ resp = resp.parse_from_string(resp_str)
+ _check_error(resp)
+
+ return _extract_versions(resp.versioned)
+ end
+
+ ## Inner helper function for get
+ def _get(key)
+ return _get_with_connection(@connection, @store_name, key, true)
+ end
+
+ def get(key)
+ #"""Execute a get request. Returns a list of (value, version) pairs."""
+ return _execute_request(:_get, [key])
+ end
+
+ ## Inner get_all method that takes the connection and store_name as parameters
+ def _get_all(keys)
+ req = VoldemortRequest.new
+ req.should_route = true
+ req.store = @store_name
+ req.type = RequestType::GET_ALL
+ req.getAll = GetAllRequest.new
+ for key in keys
+ req.getAll.keys << key
+ end
+
+ # send request
+ _send_request(@connection, req.serialize_to_string())
+
+ # read and parse response
+ resp_str = _receive_response(@connection)
+ resp = GetAllResponse.new
+ resp = resp.parse_from_string(resp_str)
+ _check_error(resp)
+ values = {}
+ for key_val in resp.values
+ values[key_val.key] = _extract_versions(key_val.versions)
+ end
+ return values
+ end
+
+ def get_all(keys)
+ #"""Execute get request for multiple keys given as a list or tuple.
+ # Returns a dictionary of key => [(value, version), ...] pairs."""
+ return _execute_request(:_get_all, [keys])
+ end
+
+ ## Get the current version of the given key by doing a get request to the store
+ def _fetch_version(key)
+ versioned = get(key)
+ if versioned.length > 0
+ version = versioned[0][1]
+ else
+ begin
+ version = VectorClock.new
+ version.timestamp = Time.new.to_i * 1000
+ end
+ end
+ return version
+ end
+
+ def _put(key, value, version)
+ req = VoldemortRequest.new
+ req.should_route = true
+ req.store = @store_name
+ req.type = RequestType::PUT
+ req.put = PutRequest.new
+ req.put.key = key
+ req.put.versioned = Versioned.new
+ req.put.versioned.value = value
+ req.put.versioned.version = VectorClock.new
+ req.put.versioned.version.merge_from(version)
+
+ # send request
+ _send_request(@connection, req.serialize_to_string())
+
+ # read and parse response
+ resp_str = _receive_response(@connection)
+ resp = PutResponse.new
+ resp = resp.parse_from_string(resp_str)
+ _check_error(resp)
+ _increment(version)
+ return version
+ end
+
+ def put(key, value, version = nil)
+ #"""Execute a put request using the given key and value. If no version is specified a get(key) request
+ # will be done to get the current version. The updated version is returned."""
+ # if we don't have a version, fetch one
+ if not version
+ version = _fetch_version(key)
+ end
+ return _execute_request(:_put, [key, value, version])
+ end
+
+ def maybe_put(key, value, version = nil)
+ #"""Execute a put request using the given key and value. If the version being put is obsolete,
+ # no modification will be made and this function will return None. Otherwise it will return the new version."""
+ begin
+ return put(key, value, version)
+ rescue
+ return nil
+ end
+ end
+
+ def _delete(key, version)
+ req = VoldemortRequest.new
+ req.should_route = true
+ req.store = @store_name
+ req.type = RequestType::DELETE
+ req.delete = DeleteRequest.new
+ req.delete.key = key
+ req.delete.version = VectorClock.new
+ req.delete.version.merge_from(version)
+
+ # send request
+ _send_request(@connection, req.serialize_to_string())
+
+ # read and parse response
+ resp_str = _receive_response(@connection)
+ resp = DeleteResponse.new
+ resp = resp.parse_from_string(resp_str)
+ _check_error(resp)
+
+ return resp.success
+ end
+
+ def delete(key, version = nil)
+ #"""Execute a delete request, deleting all keys up to and including the given version.
+ # If no version is given a get(key) request will be done to find the latest version."""
+ # if we don't have a version, fetch one
+ if version == nil
+ version = _fetch_version(key)
+ end
+ return _execute_request(:_delete, [key, version])
+ end
+
+ def close()
+ #"""Close the connection this store maintains."""
+ @open = false
+ @connection.close()
+ end
+
+end
View
11 contrib/ruby-client/test.rb
@@ -0,0 +1,11 @@
+require 'store_client'
+
+s = StoreClient.new("test", [["localhost", "6666"]])
+version = s.put("hello", "1")
+raise "Invalid result" unless s.get("hello")[0][0] == "1"
+s.put("hello", "2", version)
+raise "Invalid result" unless s.get("hello")[0][0] == "2"
+s.put("hello", "3")
+raise "Invalid result" unless s.get("hello")[0][0] == "3"
+s.delete("hello")
+raise "Invalid result" unless s.get("hello").size == 0
View
187 contrib/ruby-client/voldemort-client.pb.rb
@@ -0,0 +1,187 @@
+### Generated by rprotoc. DO NOT EDIT!
+### <proto file: voldemort-client.proto>
+# package voldemort;
+#
+# option java_package = "voldemort.client.protocol.pb";
+# option java_outer_classname = "VProto";
+# option optimize_for = SPEED;
+#
+# message ClockEntry {
+# required int32 node_id = 1;
+# required int64 version = 2;
+# }
+#
+# message VectorClock {
+# repeated ClockEntry entries = 1;
+# optional int64 timestamp = 2;
+# }
+#
+# message Versioned {
+# required bytes value = 1;
+# required VectorClock version = 2;
+# }
+#
+# message Error {
+# required int32 error_code = 1;
+# required string error_message = 2;
+# }
+#
+# message KeyedVersions {
+# required bytes key = 1;
+# repeated Versioned versions = 2;
+# }
+#
+# message GetRequest {
+# optional bytes key = 1;
+# }
+#
+# message GetResponse {
+# repeated Versioned versioned = 1;
+# optional Error error = 2;
+# }
+#
+# message GetVersionResponse {
+# repeated VectorClock versions = 1;
+# optional Error error = 2;
+# }
+#
+# message GetAllRequest {
+# repeated bytes keys = 1;
+# }
+#
+# message GetAllResponse {
+# repeated KeyedVersions values = 1;
+# optional Error error = 2;
+# }
+#
+# message PutRequest {
+# required bytes key = 1;
+# required Versioned versioned = 2;
+# }
+#
+# message PutResponse {
+# optional Error error = 1;
+# }
+#
+# message DeleteRequest {
+# required bytes key = 1;
+# required VectorClock version = 2;
+# }
+#
+# message DeleteResponse {
+# required bool success = 1;
+# optional Error error = 2;
+# }
+#
+# enum RequestType {
+# GET = 0;
+# GET_ALL = 1;
+# PUT = 2;
+# DELETE = 3;
+# GET_VERSION = 4;
+# }
+#
+# message VoldemortRequest {
+# required RequestType type = 1;
+# required bool should_route = 2 [default = false];
+# required string store = 3;
+# optional GetRequest get = 4;
+# optional GetAllRequest getAll = 5;
+# optional PutRequest put = 6;
+# optional DeleteRequest delete = 7;
+# }
+require 'protobuf/message/message'
+require 'protobuf/message/enum'
+require 'protobuf/message/service'
+require 'protobuf/message/extend'
+
+module Voldemort
+ ::Protobuf::OPTIONS[:"java_package"] = "voldemort.client.protocol.pb"
+ ::Protobuf::OPTIONS[:"java_outer_classname"] = "VProto"
+ ::Protobuf::OPTIONS[:"optimize_for"] = :SPEED
+ class ClockEntry < ::Protobuf::Message
+ defined_in __FILE__
+ required :int32, :node_id, 1
+ required :int64, :version, 2
+ end
+ class VectorClock < ::Protobuf::Message
+ defined_in __FILE__
+ repeated :ClockEntry, :entries, 1
+ optional :int64, :timestamp, 2
+ end
+ class Versioned < ::Protobuf::Message
+ defined_in __FILE__
+ required :bytes, :value, 1
+ required :VectorClock, :version, 2
+ end
+ class Error < ::Protobuf::Message
+ defined_in __FILE__
+ required :int32, :error_code, 1
+ required :string, :error_message, 2
+ end
+ class KeyedVersions < ::Protobuf::Message
+ defined_in __FILE__
+ required :bytes, :key, 1
+ repeated :Versioned, :versions, 2
+ end
+ class GetRequest < ::Protobuf::Message
+ defined_in __FILE__
+ optional :bytes, :key, 1
+ end
+ class GetResponse < ::Protobuf::Message
+ defined_in __FILE__
+ repeated :Versioned, :versioned, 1
+ optional :Error, :error, 2
+ end
+ class GetVersionResponse < ::Protobuf::Message
+ defined_in __FILE__
+ repeated :VectorClock, :versions, 1
+ optional :Error, :error, 2
+ end
+ class GetAllRequest < ::Protobuf::Message
+ defined_in __FILE__
+ repeated :bytes, :keys, 1
+ end
+ class GetAllResponse < ::Protobuf::Message
+ defined_in __FILE__
+ repeated :KeyedVersions, :values, 1
+ optional :Error, :error, 2
+ end
+ class PutRequest < ::Protobuf::Message
+ defined_in __FILE__
+ required :bytes, :key, 1
+ required :Versioned, :versioned, 2
+ end
+ class PutResponse < ::Protobuf::Message
+ defined_in __FILE__
+ optional :Error, :error, 1
+ end
+ class DeleteRequest < ::Protobuf::Message
+ defined_in __FILE__
+ required :bytes, :key, 1
+ required :VectorClock, :version, 2
+ end
+ class DeleteResponse < ::Protobuf::Message
+ defined_in __FILE__
+ required :bool, :success, 1
+ optional :Error, :error, 2
+ end
+ class RequestType < ::Protobuf::Enum
+ defined_in __FILE__
+ GET = 0
+ GET_ALL = 1
+ PUT = 2
+ DELETE = 3
+ GET_VERSION = 4
+ end
+ class VoldemortRequest < ::Protobuf::Message
+ defined_in __FILE__
+ required :RequestType, :type, 1
+ required :bool, :should_route, 2, :default => false
+ required :string, :store, 3
+ optional :GetRequest, :get, 4
+ optional :GetAllRequest, :getAll, 5
+ optional :PutRequest, :put, 6
+ optional :DeleteRequest, :delete, 7
+ end
+end
View
6 contrib/ruby-client/voldemort_exception.rb
@@ -0,0 +1,6 @@
+class VoldemortException < StandardError
+ def initialize(message, code = 1)
+ @code = code
+ @message = message
+ end
+end
View
BIN  lib/h2-lzf.jar
Binary file not shown
View
48 src/java/voldemort/VoldemortClientShell.java
@@ -49,24 +49,30 @@
private static final String PROMPT = "> ";
+ private static DefaultStoreClient<Object, Object> client;
+
public static void main(String[] args) throws Exception {
if(args.length < 2 || args.length > 3)
Utils.croak("USAGE: java VoldemortClientShell store_name bootstrap_url [command_file]");
String storeName = args[0];
String bootstrapUrl = args[1];
- BufferedReader reader = null;
+ String commandsFileName = "";
+ BufferedReader fileReader = null;
+ BufferedReader inputReader = null;
try {
- if(args.length == 3)
- reader = new BufferedReader(new FileReader(args[2]));
- else
- reader = new BufferedReader(new InputStreamReader(System.in));
+ if(args.length == 3) {
+ commandsFileName = args[2];
+ fileReader = new BufferedReader(new FileReader(commandsFileName));
+ }
+
+ inputReader = new BufferedReader(new InputStreamReader(System.in));
} catch(IOException e) {
Utils.croak("Failure to open input stream: " + e.getMessage());
}
StoreClientFactory factory = new SocketStoreClientFactory(new ClientConfig().setBootstrapUrls(bootstrapUrl));
- DefaultStoreClient<Object, Object> client = null;
+ client = null;
try {
client = (DefaultStoreClient<Object, Object>) factory.getStoreClient(storeName);
} catch(Exception e) {
@@ -75,9 +81,20 @@ public static void main(String[] args) throws Exception {
System.out.println("Established connection to " + storeName + " via " + bootstrapUrl);
System.out.print(PROMPT);
+ if(fileReader != null) {
+ processCommands(fileReader, true);
+ fileReader.close();
+ }
+ processCommands(inputReader, false);
+ }
+
+ private static void processCommands(BufferedReader reader, boolean printCommands)
+ throws IOException {
for(String line = reader.readLine(); line != null; line = reader.readLine()) {
if(line.trim().equals(""))
continue;
+ if(printCommands)
+ System.out.println(line);
try {
if(line.toLowerCase().startsWith("put")) {
JsonReader jsonReader = new JsonReader(new StringReader(line.substring("put".length())));
@@ -93,10 +110,14 @@ public static void main(String[] args) throws Exception {
// this is okay, just means we are done reading
}
Map<Object, Versioned<Object>> vals = client.getAll(keys);
- for(Map.Entry<Object, Versioned<Object>> entry: vals.entrySet()) {
- System.out.print(entry.getKey());
- System.out.print(" => ");
- printVersioned(entry.getValue());
+ if(vals.size() > 0) {
+ for(Map.Entry<Object, Versioned<Object>> entry: vals.entrySet()) {
+ System.out.print(entry.getKey());
+ System.out.print(" => ");
+ printVersioned(entry.getValue());
+ }
+ } else {
+ System.out.println("null");
}
} else if(line.toLowerCase().startsWith("get")) {
JsonReader jsonReader = new JsonReader(new StringReader(line.substring("get".length())));
@@ -104,16 +125,17 @@ public static void main(String[] args) throws Exception {
} else if(line.toLowerCase().startsWith("delete")) {
JsonReader jsonReader = new JsonReader(new StringReader(line.substring("delete".length())));
client.delete(tightenNumericTypes(jsonReader.read()));
- } else if(line.startsWith("locate")) {
- JsonReader jsonReader = new JsonReader(new StringReader(line.substring("locate".length())));
+ } else if(line.startsWith("preflist")) {
+ JsonReader jsonReader = new JsonReader(new StringReader(line.substring("preflist".length())));
Object key = tightenNumericTypes(jsonReader.read());
printNodeList(client.getResponsibleNodes(key));
} else if(line.startsWith("help")) {
System.out.println("Commands:");
System.out.println("put key value -- Associate the given value with the key.");
System.out.println("get key -- Retrieve the value associated with the key.");
+ System.out.println("getall key -- Retrieve the value(s) associated with the key.");
System.out.println("delete key -- Remove all values associated with the key.");
- System.out.println("locate key -- Determine which servers host the give key.");
+ System.out.println("preflist key -- Get node preference list for given key.");
System.out.println("help -- Print this message.");
System.out.println("exit -- Exit from this shell.");
System.out.println();
View
12 src/java/voldemort/server/socket/SocketServer.java
@@ -252,18 +252,16 @@ public int getRemainingThreads() {
* Blocks until the server has started successfully or an exception is
* thrown.
*
- * @return {@code null} if the server has started successfully or the
- * exception thrown if not.
+ * @throws VoldemortException if a problem occurs during start-up wrapping
+ * the original exception.
*/
- public Throwable awaitStartupCompletion() {
+ public void awaitStartupCompletion() {
try {
Object obj = startedStatusQueue.take();
if(obj instanceof Throwable)
- return (Throwable) obj;
- else
- return null;
+ throw new VoldemortException((Throwable) obj);
} catch(InterruptedException e) {
- return e;
+ // this is okay, if we are interrupted we can stop waiting
}
}
View
6 src/java/voldemort/server/socket/SocketService.java
@@ -16,7 +16,6 @@
package voldemort.server.socket;
-import voldemort.VoldemortException;
import voldemort.server.AbstractSocketService;
import voldemort.server.ServiceType;
import voldemort.server.StatusManager;
@@ -56,10 +55,7 @@ public StatusManager getStatusManager() {
@Override
protected void startInner() {
this.server.start();
- Throwable startupException = this.server.awaitStartupCompletion();
- if(startupException != null)
- throw new VoldemortException(startupException);
-
+ this.server.awaitStartupCompletion();
enableJmx(server);
}
View
134 src/java/voldemort/store/bdb/BdbStorageEngine.java
@@ -112,7 +112,7 @@ public String getName() {
}
Cursor cursor = bdbDatabase.openCursor(null, null);
- return new BdbStoreIterator(cursor);
+ return new BdbEntriesIterator(cursor);
} catch(DatabaseException e) {
logger.error(e);
throw new PersistenceFailureException(e);
@@ -353,58 +353,69 @@ public String getBdbStats() {
return stats;
}
- private static class BdbKeysIterator implements ClosableIterator<ByteArray> {
+ private static abstract class BdbIterator<T> implements ClosableIterator<T> {
+ private final boolean noValues;
+ final Cursor cursor;
+
+ private T current;
private volatile boolean isOpen;
- private final Cursor cursor;
- private byte[] current;
- public BdbKeysIterator(Cursor cursor) {
+ public BdbIterator(Cursor cursor, boolean noValues) {
this.cursor = cursor;
isOpen = true;
+ this.noValues = noValues;
DatabaseEntry keyEntry = new DatabaseEntry();
DatabaseEntry valueEntry = new DatabaseEntry();
- valueEntry.setPartial(true);
+ if(noValues)
+ valueEntry.setPartial(true);
try {
cursor.getFirst(keyEntry, valueEntry, LockMode.READ_UNCOMMITTED);
} catch(DatabaseException e) {
logger.error(e);
throw new PersistenceFailureException(e);
}
- current = keyEntry.getData();
+ if(keyEntry.getData() != null)
+ current = get(keyEntry, valueEntry);
}
- public boolean hasNext() {
+ protected abstract T get(DatabaseEntry key, DatabaseEntry value);
+
+ protected abstract void moveCursor(DatabaseEntry key, DatabaseEntry value)
+ throws DatabaseException;
+
+ public final boolean hasNext() {
return current != null;
}
- public ByteArray next() {
+ public final T next() {
if(!isOpen)
throw new PersistenceFailureException("Call to next() on a closed iterator.");
DatabaseEntry keyEntry = new DatabaseEntry();
DatabaseEntry valueEntry = new DatabaseEntry();
- valueEntry.setPartial(true);
+ if(noValues)
+ valueEntry.setPartial(true);
try {
- cursor.getNextNoDup(keyEntry, valueEntry, LockMode.READ_UNCOMMITTED);
+ moveCursor(keyEntry, valueEntry);
} catch(DatabaseException e) {
logger.error(e);
throw new PersistenceFailureException(e);
}
- byte[] previous = current;
+ T previous = current;
if(keyEntry.getData() == null)
current = null;
else
- current = keyEntry.getData();
+ current = get(keyEntry, valueEntry);
- return new ByteArray(previous);
+ return previous;
}
- public void remove() {
+ public final void remove() {
throw new UnsupportedOperationException("No removal y'all.");
}
- public void close() {
+ public final void close() {
try {
cursor.close();
isOpen = false;
@@ -414,96 +425,51 @@ public void close() {
}
@Override
- protected void finalize() {
+ protected final void finalize() {
if(isOpen) {
logger.error("Failure to close cursor, will be forcably closed.");
close();
}
}
-
}
- private static class BdbStoreIterator implements
- ClosableIterator<Pair<ByteArray, Versioned<byte[]>>> {
-
- private volatile boolean isOpen;
- private final Cursor cursor;
- private Pair<ByteArray, Versioned<byte[]>> current;
+ private static class BdbKeysIterator extends BdbIterator<ByteArray> {
- public BdbStoreIterator(Cursor cursor) {
- this.cursor = cursor;
- isOpen = true;
- DatabaseEntry keyEntry = new DatabaseEntry();
- DatabaseEntry valueEntry = new DatabaseEntry();
- try {
- cursor.getFirst(keyEntry, valueEntry, LockMode.READ_UNCOMMITTED);
- } catch(DatabaseException e) {
- logger.error(e);
- throw new PersistenceFailureException(e);
- }
- current = getPair(keyEntry, valueEntry);
+ public BdbKeysIterator(Cursor cursor) {
+ super(cursor, true);
}
- private Pair<ByteArray, Versioned<byte[]>> getPair(DatabaseEntry key, DatabaseEntry value) {
- if(key == null || key.getData() == null) {
- return null;
- } else {
- VectorClock clock = new VectorClock(value.getData());
- byte[] bytes = ByteUtils.copy(value.getData(),
- clock.sizeInBytes(),
- value.getData().length);
- return Pair.create(new ByteArray(key.getData()),
- new Versioned<byte[]>(bytes, clock));
- }
+ @Override
+ protected ByteArray get(DatabaseEntry key, DatabaseEntry value) {
+ return new ByteArray(key.getData());
}
- public boolean hasNext() {
- return current != null;
+ @Override
+ protected void moveCursor(DatabaseEntry key, DatabaseEntry value) throws DatabaseException {
+ cursor.getNextNoDup(key, value, LockMode.READ_UNCOMMITTED);
}
- public Pair<ByteArray, Versioned<byte[]>> next() {
- if(!isOpen)
- throw new PersistenceFailureException("Call to next() on a closed iterator.");
-
- DatabaseEntry keyEntry = new DatabaseEntry();
- DatabaseEntry valueEntry = new DatabaseEntry();
- try {
- cursor.getNext(keyEntry, valueEntry, LockMode.READ_UNCOMMITTED);
- } catch(DatabaseException e) {
- logger.error(e);
- throw new PersistenceFailureException(e);
- }
- Pair<ByteArray, Versioned<byte[]>> previous = current;
- if(keyEntry.getData() == null)
- current = null;
- else
- current = getPair(keyEntry, valueEntry);
+ }
- return previous;
- }
+ private static class BdbEntriesIterator extends BdbIterator<Pair<ByteArray, Versioned<byte[]>>> {
- public void remove() {
- throw new UnsupportedOperationException("No removal y'all.");
+ public BdbEntriesIterator(Cursor cursor) {
+ super(cursor, false);
}
- public void close() {
- try {
- cursor.close();
- isOpen = false;
- } catch(DatabaseException e) {
- logger.error(e);
- }
+ @Override
+ protected Pair<ByteArray, Versioned<byte[]>> get(DatabaseEntry key, DatabaseEntry value) {
+ VectorClock clock = new VectorClock(value.getData());
+ byte[] bytes = ByteUtils.copy(value.getData(),
+ clock.sizeInBytes(),
+ value.getData().length);
+ return Pair.create(new ByteArray(key.getData()), new Versioned<byte[]>(bytes, clock));
}
@Override
- protected void finalize() {
- if(isOpen) {
- logger.error("Failure to close cursor, will be forcably closed.");
- close();
- }
-
+ protected void moveCursor(DatabaseEntry key, DatabaseEntry value) throws DatabaseException {
+ cursor.getNext(key, value, LockMode.READ_UNCOMMITTED);
}
-
}
}
View
2  src/java/voldemort/store/compress/CompressionStrategyFactory.java
@@ -9,6 +9,8 @@ public CompressionStrategy get(Compression compression) {
return new NoopCompressionStrategy();
if(compression.getType().equals("gzip"))
return new GzipCompressionStrategy();
+ if(compression.getType().equals("lzf"))
+ return new LzfCompressionStrategy();
throw new IllegalArgumentException("Unsupported compression algorithm: "
+ compression.getType());
}
View
25 src/java/voldemort/store/compress/GzipCompressionStrategy.java
@@ -1,34 +1,27 @@
package voldemort.store.compress;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
-import org.apache.commons.io.IOUtils;
-
/**
* Implementation of CompressionStrategy for the gzip format.
*/
/*
* In the future we may want to support different compression levels.
*/
-public class GzipCompressionStrategy implements CompressionStrategy {
+public class GzipCompressionStrategy extends StreamCompressionStrategy {
- public byte[] deflate(byte[] data) throws IOException {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- GZIPOutputStream gos = new GZIPOutputStream(bos);
- gos.write(data);
- gos.close();
- return bos.toByteArray();
+ @Override
+ protected OutputStream wrapOutputStream(OutputStream underlying) throws IOException {
+ return new GZIPOutputStream(underlying);
}
- public byte[] inflate(byte[] data) throws IOException {
- GZIPInputStream is = new GZIPInputStream(new ByteArrayInputStream(data));
- byte[] inflated = IOUtils.toByteArray(is);
- is.close();
- return inflated;
+ @Override
+ protected InputStream wrapInputStream(InputStream underlying) throws IOException {
+ return new GZIPInputStream(underlying);
}
public String getType() {
View
29 src/java/voldemort/store/compress/LzfCompressionStrategy.java
@@ -0,0 +1,29 @@
+package voldemort.store.compress;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.h2.compress.LZFInputStream;
+import org.h2.compress.LZFOutputStream;
+
+/**
+ * Implementation of CompressionStrategy for the LZF format. LZF is optimized
+ * for speed.
+ */
+public class LzfCompressionStrategy extends StreamCompressionStrategy {
+
+ @Override
+ protected OutputStream wrapOutputStream(OutputStream underlying) throws IOException {
+ return new LZFOutputStream(underlying);
+ }
+
+ @Override
+ protected InputStream wrapInputStream(InputStream underlying) throws IOException {
+ return new LZFInputStream(underlying);
+ }
+
+ public String getType() {
+ return "lzf";
+ }
+}
View
34 src/java/voldemort/store/compress/StreamCompressionStrategy.java
@@ -0,0 +1,34 @@
+package voldemort.store.compress;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.commons.io.IOUtils;
+
+/**
+ * Useful base class for stream-based compression strategies.
+ */
+public abstract class StreamCompressionStrategy implements CompressionStrategy {
+
+ public byte[] deflate(byte[] data) throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ OutputStream gos = wrapOutputStream(bos);
+ gos.write(data);
+ gos.close();
+ return bos.toByteArray();
+ }
+
+ protected abstract OutputStream wrapOutputStream(OutputStream underlying) throws IOException;
+
+ protected abstract InputStream wrapInputStream(InputStream underlying) throws IOException;
+
+ public byte[] inflate(byte[] data) throws IOException {
+ InputStream is = wrapInputStream(new ByteArrayInputStream(data));
+ byte[] inflated = IOUtils.toByteArray(is);
+ is.close();
+ return inflated;
+ }
+}
View
13 test/unit/voldemort/store/compress/CompressingStoreTest.java
@@ -14,6 +14,7 @@
import voldemort.client.ClientConfig;
import voldemort.client.SocketStoreClientFactory;
import voldemort.client.StoreClient;
+import voldemort.serialization.Compression;
import voldemort.server.AbstractSocketService;
import voldemort.store.AbstractByteArrayStoreTest;
import voldemort.store.Store;
@@ -26,22 +27,26 @@
private CompressingStore store;
private final boolean useNio;
+ private final Compression compression;
+ private final CompressionStrategyFactory compressionFactory = new CompressionStrategyFactory();
- public CompressingStoreTest(boolean useNio) {
+ public CompressingStoreTest(boolean useNio, String compressionType) {
this.useNio = useNio;
+ this.compression = new Compression(compressionType, null);
}
@Parameters
public static Collection<Object[]> configs() {
- return Arrays.asList(new Object[][] { { true }, { false } });
+ return Arrays.asList(new Object[][] { { true, "gzip" }, { false, "gzip" }, { true, "lzf" },
+ { false, "lzf" } });
}
@Override
@Before
public void setUp() throws Exception {
this.store = new CompressingStore(new InMemoryStorageEngine<ByteArray, byte[]>("test"),
- new GzipCompressionStrategy(),
- new GzipCompressionStrategy());
+ compressionFactory.get(compression),
+ compressionFactory.get(compression));
}
@Override
Please sign in to comment.
Something went wrong with that request. Please try again.