diff --git a/clients/ruby/CHANGELOG b/clients/ruby/CHANGELOG new file mode 100644 index 0000000000..e69de29bb2 diff --git a/clients/ruby/LICENSE b/clients/ruby/LICENSE new file mode 100644 index 0000000000..cc724b608b --- /dev/null +++ b/clients/ruby/LICENSE @@ -0,0 +1,20 @@ +Copyright (c) 2010 Alejandro Crosa + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/clients/ruby/README.md b/clients/ruby/README.md new file mode 100644 index 0000000000..aebc7f5e14 --- /dev/null +++ b/clients/ruby/README.md @@ -0,0 +1,64 @@ +voldemort-rb +================ + +# Installing the Gem from rubygems + +> sudo gem install voldemort-rb + + +# Requirements + +Since the communication between the client and the server is done using protocol buffers you'll need the ruby_protobuf gem found at http://code.google.com/p/ruby-protobuf/. + + sudo gem install ruby_protobuf + +XML Parsing is done using Nokogiri + + sudo gem install nokogiri + +# Building and Installing the Gem from source + +> gem build voldemort-rb.gemspec + +> sudo gem install voldemort-rb-0.1.X.gem (replace 'X' with the correct version) + +Examples +======= + +# Basic Usage +## Connecting and bootstrapping + + client = VoldemortClient.new("test", "localhost:6666") + +## Storing a value + + client.put("some key", "some value") + +## Reading a value + + client.get("some key") + + you'll get + + => some value + +## deleting a value from a key + + client.delete("some key") + +# Conflict resolution +## Default + +Voldemort replies with versions of a value, it's up to the client to resolve the conflicts. By default the library will return the version that's most recent. + +## Custom + +You can override the default behavior and perform a custom resolution of the conflict, here's how to do so: + +client = VoldemortClient.new("test", "localhost:6666") do |versions| + +versions.first # just return the first version for example + +end + +Copyright (c) 2010 Alejandro Crosa, released under the MIT license diff --git a/clients/ruby/Rakefile b/clients/ruby/Rakefile new file mode 100644 index 0000000000..201ee6edfb --- /dev/null +++ b/clients/ruby/Rakefile @@ -0,0 +1,60 @@ +require 'rubygems' +require 'rake/gempackagetask' +require 'rubygems/specification' +require 'date' +require 'spec/rake/spectask' + +GEM = 'Voldemort Client' +GEM_NAME = 'voldemort_client' +GEM_VERSION = '0.1.5' +AUTHORS = ['Alejandro Crosa'] +EMAIL = "alejandrocrosa@gmail.com" +HOMEPAGE = "http://github.com/acrosa/Voldemort-Ruby-Client" +SUMMARY = "A Ruby client for the Voldemort distributed key value store" + +spec = Gem::Specification.new do |s| + s.name = GEM + s.version = GEM_VERSION + s.platform = Gem::Platform::RUBY + s.has_rdoc = true + s.extra_rdoc_files = ["LICENSE"] + s.summary = SUMMARY + s.description = s.summary + s.authors = AUTHORS + s.email = EMAIL + s.homepage = HOMEPAGE + s.add_development_dependency "rspec" + s.require_path = 'lib' + s.autorequire = GEM + s.files = %w(LICENSE README.md Rakefile) + Dir.glob("{lib,tasks,spec}/**/*") +end + +task :default => :spec + +desc "Run specs" +Spec::Rake::SpecTask.new do |t| + t.spec_files = FileList['spec/**/*_spec.rb'] + t.spec_opts = %w(-fs --color) +end + +Rake::GemPackageTask.new(spec) do |pkg| + pkg.gem_spec = spec +end + +desc "install the gem locally" +task :install => [:package] do + sh %{sudo gem install pkg/#{GEM}-#{GEM_VERSION}} +end + +desc "create a gemspec file" +task :make_spec do + File.open("#{GEM}.gemspec", "w") do |file| + file.puts spec.to_ruby + end +end + +desc "Run all examples with RCov" +Spec::Rake::SpecTask.new(:rcov) do |t| + t.spec_files = FileList['spec/**/*_spec.rb'] + t.rcov = true +end diff --git a/clients/ruby/install.rb b/clients/ruby/install.rb new file mode 100644 index 0000000000..f7732d3796 --- /dev/null +++ b/clients/ruby/install.rb @@ -0,0 +1 @@ +# Install hook code here diff --git a/clients/ruby/lib/connection/connection.rb b/clients/ruby/lib/connection/connection.rb new file mode 100644 index 0000000000..0cc07eb5ae --- /dev/null +++ b/clients/ruby/lib/connection/connection.rb @@ -0,0 +1,147 @@ +require 'nokogiri' + +class Connection + + attr_accessor :hosts # The hosts from where we bootstrapped. + attr_accessor :nodes # The array of VoldemortNodes available. + attr_accessor :db_name # The DB store name. + attr_accessor :connected_node # The VoldemortNode we are connected to. + attr_accessor :request_count # Used to track the number of request a node receives. + attr_accessor :request_limit_per_node # Limit the number of request per node. + attr_accessor :key_serializer_schemas + attr_accessor :value_serializer_schemas + attr_accessor :key_serializer_type + attr_accessor :value_serializer_type + + STATUS_OK = "ok" + PROTOCOL = "pb0" + DEFAULT_REQUEST_LIMIT_PER_NODE = 500 + + def initialize(db_name, hosts, request_limit_per_node = DEFAULT_REQUEST_LIMIT_PER_NODE) + self.db_name = db_name + self.hosts = hosts + self.nodes = hosts.collect{ |h| + n = h.split(":") + node = VoldemortNode.new + node.host = n[0] + node.port = n[1] + node + } + self.request_count = 0 + self.request_limit_per_node = request_limit_per_node + end + + def bootstrap + cluster_response = self.get_from("metadata", "cluster.xml", false) + cluster_xml_doc = Nokogiri::XML(cluster_response[1][0][1]) + self.nodes = self.parse_nodes_from(cluster_xml_doc) + + stores_response = self.get_from("metadata", "stores.xml", false) + + stores_xml = stores_response[1][0][1] + + doc = Nokogiri::XML(stores_xml) + + self.key_serializer_type = self.parse_schema_type(doc, 'key-serializer') + self.value_serializer_type = self.parse_schema_type(doc, 'value-serializer') + self.key_serializer_schemas = self.parse_schema_from(doc, 'key-serializer') + self.value_serializer_schemas = self.parse_schema_from(doc, 'value-serializer') + + self.connect_to_random_node + + rescue StandardError => e + raise("There was an error trying to bootstrap from the specified servers: #{e}") + end + + def connect_to_random_node + nodes = self.nodes.sort_by { rand } + for node in nodes do + if self.connect_to(node.host, node.port) + self.connected_node = node + self.request_count = 0 + return node + end + end + end + + def parse_schema_type(doc, serializer = 'value-serializer') + type_doc = doc.xpath("//stores/store[name = \"#{self.db_name}\"]/#{serializer}/type") + if(type_doc != nil) + return type_doc.text + else + return nil + end + end + + def parse_schema_from(doc, serializer = 'value-serializer') + parsed_schemas = {} + doc.xpath("//stores/store[name = \"#{self.db_name}\"]/#{serializer}/schema-info").each do |value_serializer| + parsed_schemas[value_serializer.attributes['version'].text] = value_serializer.text + end + return parsed_schemas + end + + def parse_nodes_from(doc) + nodes = [] + doc.xpath("/cluster/server").each do |n| + node = VoldemortNode.new + node.id = n.xpath("//id").text + node.host = n.xpath("//host").text + node.port = n.xpath("//socket-port").text + node.http_port = n.xpath("//http_port").text + node.admin_port = n.xpath("//admin-port").text + node.partitions = n.xpath("//partitions").text + nodes << node + end + nodes + end + + def protocol_version + PROTOCOL + end + + def connect + self.connect! + end + + def reconnect + self.reconnect! + end + + def disconnect + self.disconnect! + end + + def reconnect_when_errors_in(response = nil) + return unless response + self.reconnect! if response.error + end + + def rebalance_connection? + self.request_count >= self.request_limit_per_node + end + + def rebalance_connection_if_needed + self.reconnect if self.rebalance_connection? + self.request_count += 1 + end + + def get(key) + self.rebalance_connection_if_needed + self.get_from(self.db_name, key, true) + end + + def get_all(keys) + self.rebalance_connection_if_needed + self.get_all_from(self.db_name, keys, true) + end + + def put(key, value, version = nil, route = true) + self.rebalance_connection_if_needed + self.put_from(self.db_name, key, value, version, route) + end + + def delete(key) + self.delete_from(self.db_name, key) + end +end diff --git a/clients/ruby/lib/connection/tcp_connection.rb b/clients/ruby/lib/connection/tcp_connection.rb new file mode 100644 index 0000000000..4e369e2982 --- /dev/null +++ b/clients/ruby/lib/connection/tcp_connection.rb @@ -0,0 +1,177 @@ +require 'socket' +require 'timeout' + +require 'protos/voldemort-client.pb' + +class TCPConnection < Connection + include Voldemort + + attr_accessor :socket + + SOCKET_TIMEOUT = 3 + + def connect_to(host, port) + begin + timeout(SOCKET_TIMEOUT) do + self.socket = TCPSocket.open(host, port) + self.send_protocol_version + if(protocol_handshake_ok?) + return self.socket + else + raise "There was an error connecting to the node" + end + end + rescue Timeout::Error + raise "Timeout when connecting to node" + rescue + false + end + end + + def get_from(db_name, key, route = true) + request = VoldemortRequest.new + request.should_route = route + request.store = db_name + request.type = RequestType::GET + request.get = GetRequest.new + request.get.key = key + + self.send(request) # send the request + raw_response = self.receive # read the response + response = GetResponse.new.parse_from_string(raw_response) # compose the get object based on the raw response + reconnect_when_errors_in(response) + response + end + + def get_all_from(db_name, keys, route = true) + request = VoldemortRequest.new + request.should_route = route + request.store = db_name + request.type = RequestType::GET_ALL + request.getAll = GetAllRequest.new + request.getAll.keys = keys + + self.send(request) # send the request + raw_response = self.receive # read the response + response = GetAllResponse.new.parse_from_string(raw_response) # compose the get object based on the raw response + reconnect_when_errors_in(response) + response + end + + def put_from(db_name, key, value, version = nil, route = true) + version = get_version(key) unless version + request = VoldemortRequest.new + request.should_route = route + request.store = db_name + request.type = RequestType::PUT + request.put = PutRequest.new + request.put.key = key + request.put.versioned = Versioned.new + request.put.versioned.value = value + request.put.versioned.version = VectorClock.new + request.put.versioned.version.merge_from(version) + + self.send(request) # send the request + raw_response = self.receive # read the response + response = PutResponse.new.parse_from_string(raw_response) + reconnect_when_errors_in(response) + + add_to_versions(version) # add version or increment when needed + version + end + + def delete_from(db_name, key, version = nil, route = true) + version = get_version(key) unless version + request = VoldemortRequest.new + request.should_route = route + request.store = db_name + request.type = RequestType::DELETE + request.delete = DeleteRequest.new + request.delete.key = key + request.delete.version = VectorClock.new + request.delete.version.merge_from(version) + + self.send(request) # send the request + raw_response = self.receive # read the response + response = DeleteResponse.new.parse_from_string(raw_response) + reconnect_when_errors_in(response) + response.success + end + + def add_to_versions(version) + entry = version.entries.detect { |e| e.node_id == self.connected_node.id.to_i } + if(entry) + entry.version += 1 + else + entry = ClockEntry.new + entry.node_id = self.connected_node.id.to_i + entry.version = 1 + version.entries << entry + version.timestamp = Time.new.to_i * 1000 + end + version + end + + def get_version(key) + other_version = get(key)[1][0] + if(other_version) + return other_version.version + else + version = VectorClock.new + version.timestamp = Time.new.to_i * 1000 + return version + end + end + + # unpack argument is N | Long, network (big-endian) byte order. + # from http://ruby-doc.org/doxygen/1.8.4/pack_8c-source.html + def receive + raw_size = self.socket.recv(4) + size = raw_size.unpack('N') + + # Read until we get to size + read = 0 + buffer = "" + + while read < size[0] and size[0] > 0 + data = self.socket.recv(size[0] - read) + buffer << data + read += data.length + end + return buffer + rescue + self.reconnect! + end + + # pack argument is N | Long, network (big-endian) byte order. + # from http://ruby-doc.org/doxygen/1.8.4/pack_8c-source.html + def send(request) + self.reconnect unless self.socket + bytes = request.serialize_to_string # helper method thanks to ruby-protobuf + self.socket.write([bytes.size].pack("N") + bytes) + rescue + self.disconnect! + end + + def send_protocol_version + self.socket.write(self.protocol_version) + end + + def protocol_handshake_ok? + self.socket.recv(2) == STATUS_OK + end + + def connect! + self.connect_to_random_node + end + + def reconnect! + self.disconnect! if self.socket + self.connect! + end + + def disconnect! + self.socket.close if self.socket + self.socket = nil + end +end diff --git a/clients/ruby/lib/connection/voldemort_node.rb b/clients/ruby/lib/connection/voldemort_node.rb new file mode 100644 index 0000000000..0aae66d287 --- /dev/null +++ b/clients/ruby/lib/connection/voldemort_node.rb @@ -0,0 +1,3 @@ +class VoldemortNode + attr_accessor :id, :host, :port, :http_port, :admin_port, :partitions +end \ No newline at end of file diff --git a/clients/ruby/lib/protos/voldemort-client.pb.rb b/clients/ruby/lib/protos/voldemort-client.pb.rb new file mode 100644 index 0000000000..96f2af7f1c --- /dev/null +++ b/clients/ruby/lib/protos/voldemort-client.pb.rb @@ -0,0 +1,190 @@ +### Generated by rprotoc. DO NOT EDIT! +### +# package voldemort; +# +# option java_package = "voldemort.client.protocol.pb"; +# option java_outer_classname = "VProto"; +# option optimize_for = SPEED; +# +# message ClockEntry { +# required int32 node_id = 1; +# required int64 version = 2; +# } +# +# message VectorClock { +# repeated ClockEntry entries = 1; +# optional int64 timestamp = 2; +# } +# +# message Versioned { +# required bytes value = 1; +# required VectorClock version = 2; +# } +# +# message Error { +# required int32 error_code = 1; +# required string error_message = 2; +# } +# +# message KeyedVersions { +# required bytes key = 1; +# repeated Versioned versions = 2; +# } +# +# message GetRequest { +# optional bytes key = 1; +# } +# +# message GetResponse { +# repeated Versioned versioned = 1; +# optional Error error = 2; +# } +# +# message GetVersionResponse { +# repeated VectorClock versions = 1; +# optional Error error = 2; +# } +# +# message GetAllRequest { +# repeated bytes keys = 1; +# } +# +# message GetAllResponse { +# repeated KeyedVersions values = 1; +# optional Error error = 2; +# } +# +# message PutRequest { +# required bytes key = 1; +# required Versioned versioned = 2; +# } +# +# message PutResponse { +# optional Error error = 1; +# } +# +# message DeleteRequest { +# required bytes key = 1; +# required VectorClock version = 2; +# } +# +# message DeleteResponse { +# required bool success = 1; +# optional Error error = 2; +# } +# +# enum RequestType { +# GET = 0; +# GET_ALL = 1; +# PUT = 2; +# DELETE = 3; +# GET_VERSION = 4; +# } +# +# +# message VoldemortRequest { +# required RequestType type = 1; +# required bool should_route = 2 [default = false]; +# required string store = 3; +# optional GetRequest get = 4; +# optional GetAllRequest getAll = 5; +# optional PutRequest put = 6; +# optional DeleteRequest delete = 7; +# optional int32 requestRouteType = 8; +# } +require 'protobuf/message/message' +require 'protobuf/message/enum' +require 'protobuf/message/service' +require 'protobuf/message/extend' + +module Voldemort + ::Protobuf::OPTIONS[:"java_package"] = "voldemort.client.protocol.pb" + ::Protobuf::OPTIONS[:"java_outer_classname"] = "VProto" + ::Protobuf::OPTIONS[:"optimize_for"] = :SPEED + class ClockEntry < ::Protobuf::Message + defined_in __FILE__ + required :int32, :node_id, 1 + required :int64, :version, 2 + end + class VectorClock < ::Protobuf::Message + defined_in __FILE__ + repeated :ClockEntry, :entries, 1 + optional :int64, :timestamp, 2 + end + class Versioned < ::Protobuf::Message + defined_in __FILE__ + required :bytes, :value, 1 + required :VectorClock, :version, 2 + end + class Error < ::Protobuf::Message + defined_in __FILE__ + required :int32, :error_code, 1 + required :string, :error_message, 2 + end + class KeyedVersions < ::Protobuf::Message + defined_in __FILE__ + required :bytes, :key, 1 + repeated :Versioned, :versions, 2 + end + class GetRequest < ::Protobuf::Message + defined_in __FILE__ + optional :bytes, :key, 1 + end + class GetResponse < ::Protobuf::Message + defined_in __FILE__ + repeated :Versioned, :versioned, 1 + optional :Error, :error, 2 + end + class GetVersionResponse < ::Protobuf::Message + defined_in __FILE__ + repeated :VectorClock, :versions, 1 + optional :Error, :error, 2 + end + class GetAllRequest < ::Protobuf::Message + defined_in __FILE__ + repeated :bytes, :keys, 1 + end + class GetAllResponse < ::Protobuf::Message + defined_in __FILE__ + repeated :KeyedVersions, :values, 1 + optional :Error, :error, 2 + end + class PutRequest < ::Protobuf::Message + defined_in __FILE__ + required :bytes, :key, 1 + required :Versioned, :versioned, 2 + end + class PutResponse < ::Protobuf::Message + defined_in __FILE__ + optional :Error, :error, 1 + end + class DeleteRequest < ::Protobuf::Message + defined_in __FILE__ + required :bytes, :key, 1 + required :VectorClock, :version, 2 + end + class DeleteResponse < ::Protobuf::Message + defined_in __FILE__ + required :bool, :success, 1 + optional :Error, :error, 2 + end + class RequestType < ::Protobuf::Enum + defined_in __FILE__ + GET = 0 + GET_ALL = 1 + PUT = 2 + DELETE = 3 + GET_VERSION = 4 + end + class VoldemortRequest < ::Protobuf::Message + defined_in __FILE__ + required :RequestType, :type, 1 + required :bool, :should_route, 2, :default => false + required :string, :store, 3 + optional :GetRequest, :get, 4 + optional :GetAllRequest, :getAll, 5 + optional :PutRequest, :put, 6 + optional :DeleteRequest, :delete, 7 + optional :int32, :requestRouteType, 8 + end +end \ No newline at end of file diff --git a/clients/ruby/lib/protos/voldemort-client.proto b/clients/ruby/lib/protos/voldemort-client.proto new file mode 100644 index 0000000000..5b5b2a0487 --- /dev/null +++ b/clients/ruby/lib/protos/voldemort-client.proto @@ -0,0 +1,92 @@ +package voldemort; + +option java_package = "voldemort.client.protocol.pb"; +option java_outer_classname = "VProto"; +option optimize_for = SPEED; + +message ClockEntry { + required int32 node_id = 1; + required int64 version = 2; +} + +message VectorClock { + repeated ClockEntry entries = 1; + optional int64 timestamp = 2; +} + +message Versioned { + required bytes value = 1; + required VectorClock version = 2; +} + +message Error { + required int32 error_code = 1; + required string error_message = 2; +} + +message KeyedVersions { + required bytes key = 1; + repeated Versioned versions = 2; +} + +message GetRequest { + optional bytes key = 1; +} + +message GetResponse { + repeated Versioned versioned = 1; + optional Error error = 2; +} + +message GetVersionResponse { + repeated VectorClock versions = 1; + optional Error error = 2; +} + +message GetAllRequest { + repeated bytes keys = 1; +} + +message GetAllResponse { + repeated KeyedVersions values = 1; + optional Error error = 2; +} + +message PutRequest { + required bytes key = 1; + required Versioned versioned = 2; +} + +message PutResponse { + optional Error error = 1; +} + +message DeleteRequest { + required bytes key = 1; + required VectorClock version = 2; +} + +message DeleteResponse { + required bool success = 1; + optional Error error = 2; +} + +enum RequestType { + GET = 0; + GET_ALL = 1; + PUT = 2; + DELETE = 3; + GET_VERSION = 4; +} + + +message VoldemortRequest { + required RequestType type = 1; + required bool should_route = 2 [default = false]; + required string store = 3; + optional GetRequest get = 4; + optional GetAllRequest getAll = 5; + optional PutRequest put = 6; + optional DeleteRequest delete = 7; + optional int32 requestRouteType = 8; +} \ No newline at end of file diff --git a/clients/ruby/lib/voldemort-rb.rb b/clients/ruby/lib/voldemort-rb.rb new file mode 100644 index 0000000000..307eff1a21 --- /dev/null +++ b/clients/ruby/lib/voldemort-rb.rb @@ -0,0 +1,73 @@ +libdir = File.dirname(__FILE__) +$LOAD_PATH.unshift(libdir) unless $LOAD_PATH.include?(libdir) + +require 'connection/voldemort_node' +require 'connection/connection' +require 'connection/tcp_connection' +require 'voldemort-serializer' + +class VoldemortClient + attr_accessor :connection + attr_accessor :conflict_resolver + attr_accessor :key_serializer + attr_accessor :value_serializer + + def initialize(db_name, *hosts, &block) + self.conflict_resolver = block unless !block + self.connection = TCPConnection.new(db_name, hosts) # implement and modifiy if you don't want to use TCP protobuf. + self.connection.bootstrap + + case(self.connection.key_serializer_type) + when 'json' + self.key_serializer = VoldemortJsonBinarySerializer.new(self.connection.key_serializer_schemas) + else + self.key_serializer = VoldemortPassThroughSerializer.new({}) + end + + case(self.connection.value_serializer_type) + when 'json' + self.value_serializer = VoldemortJsonBinarySerializer.new(self.connection.value_serializer_schemas) + else + self.value_serializer = VoldemortPassThroughSerializer.new({}) + end + end + + def get(key) + versions = self.connection.get(key_serializer.to_bytes(key)) + version = self.resolve_conflicts(versions.versioned) + if version + value_serializer.to_object(version.value) + else + nil + end + end + + def get_all(keys) + serialized_keys = [] + + keys.each do |key| + serialized_keys << key_serializer.to_bytes(key) + end + + all_version = self.connection.get_all(keys) + values = {} + all_version.values.collect do |v| + values[v.key] = value_serializer.to_object(self.resolve_conflicts(v.versions).value) + end + values + end + + def put(key, value, version = nil) + self.connection.put(key_serializer.to_bytes(key), value_serializer.to_bytes(value)) + end + + def delete(key) + self.connection.delete(key_serializer.to_bytes(key)) + end + + def resolve_conflicts(versions) + return self.conflict_resolver.call(versions) if self.conflict_resolver + # by default just return the version that has the most recent timestamp. + versions.max { |a, b| a.version.timestamp <=> b.version.timestamp } + end +end diff --git a/clients/ruby/lib/voldemort-serializer.rb b/clients/ruby/lib/voldemort-serializer.rb new file mode 100644 index 0000000000..37606bd48e --- /dev/null +++ b/clients/ruby/lib/voldemort-serializer.rb @@ -0,0 +1,485 @@ +require 'json' +require 'voldemort-rb' + +class VoldemortJsonBinarySerializer + attr_accessor :has_version + attr_accessor :type_def_versions + + BYTE_MIN_VAL = -128 + SHORT_MIN_VAL = -32768 + SHORT_MAX_VAL = 2 ** 15 - 1 + INT_MIN_VAL = -2147483648 + LONG_MIN_VAL = -9223372036854775808 + FLOAT_MIN_VAL = 2 ** -149 + DOUBLE_MIN_VAL = 2 ** -1074 + + def initialize(type_def_versions) + @has_version = true + @type_def_versions = {} + + # convert versioned json strings to ruby objects + type_def_versions.each_pair do |version, json_type_def_version| + @type_def_versions[version.to_i] = get_type_def(json_type_def_version) + end + end + + def to_signed(unsigned, bits) + max_unsigned = 2 ** bits + max_signed = 2 ** (bits - 1) + to_signed = proc { |n| (n >= max_signed) ? n - max_unsigned : n } + return to_signed[unsigned] + end + + def get_type_def(json_type_def_version) + # replace all single quotes with " since the JSON parser wants it this way + json_type_def_version = json_type_def_version.gsub(/\'/, '"') + + if((json_type_def_version =~ /[\{\[]/) == 0) + # check if the json is a list or string, since these are + # the only ones that JSON.parse() will work with + return JSON.parse(json_type_def_version) + else + # otherwise it's a primitive, so just strip the quotes + return json_type_def_version.gsub(/\"/, '') + end + end + + def read_slice(length, bytes) + substr = bytes[0, length] + bytes.slice!(0..length - 1) + return substr + end + + # handle serialization + + def to_bytes(object) + bytes = '' + newest_version = 0 # TODO get highest number from map + type_def = @type_def_versions[newest_version] + + if(@has_version) + bytes << newest_version.chr + end + + bytes << write(object, type_def) + + return bytes + end + + def write(object, type) + bytes = '' + + if(type.kind_of? Hash) + if(object != nil && !object.kind_of?(Hash)) + # TODO throw exception + else + bytes << write_map(object, type) + end + elsif(type.kind_of? Array) + if(object != nil && !object.kind_of?(Array)) + # TODO throw exception + else + bytes << write_list(object, type) + end + else + case(type) + when 'string': bytes << write_string(object) + when 'int8': bytes << write_int8(object) + when 'int16': bytes << write_int16(object) + when 'int32': bytes << write_int32(object) + when 'int64': bytes << write_int64(object) + when 'float32': bytes << write_float32(object) + when 'float64': bytes << write_float64(object) + when 'date': bytes << write_date(object) + when 'bytes': bytes << write_bytes(object) + when 'boolean': bytes << write_boolean(object) + else + # TODO throw unsupported type exception + end + end + + if(bytes == '') + return nil + end + + return bytes + end + + def write_boolean(object) + bytes = '' + + if(object == nil) + bytes << [BYTE_MIN_VAL].pack('c') + elsif(object) + bytes << [0x1].pack('c') + else + bytes << [0x0].pack('c') + end + + return bytes + end + + def write_string(object) + return write_bytes(object) + end + + def write_int8(object) + bytes = '' + + if(object == BYTE_MIN_VAL) + # TODO throw underflow exception + else + if(object == nil) + object = BYTE_MIN_VAL + end + + bytes << [object].pack('c') + end + + return bytes + end + + def write_int16(object) + bytes = '' + + if(object == SHORT_MIN_VAL) + # TODO throw underflow exception + else + if(object == nil) + object = SHORT_MIN_VAL + end + + bytes << [object].pack('n') + end + + return bytes + end + + def write_int32(object) + bytes = '' + + if(object == INT_MIN_VAL) + # TODO throw underflow exception + else + if(object == nil) + object = INT_MIN_VAL + end + + # reverse here to switch little endian to big endian + # this is because pack('N') is choking on 'bigint', wtf? + bytes << [object].pack('i').reverse + end + + return bytes + end + + def write_int64(object) + bytes = '' + + if(object == LONG_MIN_VAL) + # TODO throw underflow exception + else + if(object == nil) + object = LONG_MIN_VAL + end + + # reverse here to switch little endian to big endian + # this is because pack('N') is choking on 'bigint', wtf? + bytes << [object].pack('q').reverse + end + + return bytes + end + + def write_float32(object) + bytes = '' + + if(object == FLOAT_MIN_VAL) + # TODO throw underflow exception + else + if(object == nil) + object = FLOAT_MIN_VAL + end + + bytes << [object].pack('g') + end + + return bytes + end + + def write_float64(object) + bytes = '' + + if(object == DOUBLE_MIN_VAL) + # TODO throw underflow exception + else + if(object == nil) + object = DOUBLE_MIN_VAL + end + + bytes << [object].pack('G') + end + + return bytes + end + + def write_date(object) + bytes = '' + + if(object == LONG_MIN_VAL) + # TODO throw underflow exception + else + if(object == nil) + bytes << write_int64(nil) + else + bytes << write_int64((object.to_f * 1000).to_i) + end + end + + return bytes + end + + def write_bytes(object) + bytes = '' + + if(object == nil) + bytes << write_int16(-1) + elsif(object.length < SHORT_MAX_VAL) + bytes << write_int16(object.length) + bytes << object + else + # TODO throw "length too long to serialize" exception + end + + return bytes + end + + def write_map(object, type) + bytes = '' + + if(object == nil) + bytes << [-1].pack('c') + else + bytes << [1].pack('c') + + if(object.length != type.length) + # TODO throw exception here.. invalid map serialization, expected: but got + else + type.sort.each do |type_pair| + key = type_pair.first + subtype = type_pair.last + + if(!object.has_key? key) + # TODO throw "missing property exception" + else + bytes << write(object[key], subtype) + end + end + end + end + + return bytes + end + + def write_list(object, type) + bytes = '' + + if(type.length != 1) + # TODO throw new exception (expected single type in list) + else + entry_type = type.first + + if(object == nil) + bytes << write_int16(-1) + elsif(object.length < SHORT_MAX_VAL) + bytes << write_int16(object.length) + object.each do |o| + bytes << write(o, entry_type) + end + else + # TODO throw serialization exception + end + end + + return bytes + end + + # handle deserialization + + def to_object(bytes) + version = 0 + + if(@has_version) + version = read_slice(1, bytes).to_i + end + + type = @type_def_versions[version] + + if(type == nil) + # TODO throw exception here + end + + return read(bytes, type) + end + + def read(bytes, type) + if(type.kind_of? Hash) + return read_map(bytes, type) + elsif(type.kind_of? Array) + return read_list(bytes, type) + else + case(type) + when 'string': return read_bytes(bytes) + when 'int8': return read_int8(bytes) + when 'int16': return read_int16(bytes) + when 'int32': return read_int32(bytes) + when 'int64': return read_int64(bytes) + when 'float32': return read_float32(bytes) + when 'float64': return read_float64(bytes) + when 'date': return read_date(bytes) + when 'bytes': return read_bytes(bytes) + when 'boolean': return read_boolean(bytes) + # TODO default throw unknown type exception + end + end + end + + def read_map(bytes, type) + # convert to char to string, and string to int + if(read_slice(1, bytes).unpack('c').to_s.to_i == -1) + return nil + else + object = {} + + type.sort.each do |type_pair| + name = type_pair.first + sub_type = type_pair.last + object[name] = read(bytes, sub_type) + end + + return object + end + end + + def read_list(bytes, type) + size = read_int16(bytes) + if(size < 0) + return nil + else + object = [] + + size.times { object << read(bytes, type.first) } + + return object + end + end + + def read_boolean(bytes) + b = read_slice(1, bytes).unpack('c').first + + if(b < 0) + return nil + elsif(b == 0) + return false + else + return true + end + end + + def read_int8(bytes) + b = read_slice(1, bytes).unpack("c").first.to_i + + if(b == BYTE_MIN_VAL) + return nil + end + + return b + end + + def read_int16(bytes) + s = to_signed(read_slice(2, bytes).unpack("n").first, 16) + + if(s == SHORT_MIN_VAL) + return nil + end + + return s + end + + def read_int32(bytes) + # reverse here to switch little endian to big endian + # this is because pack('N') is choking on 'bigint', wtf? + i = read_slice(4, bytes).reverse.unpack("i").first.to_i + + if(i == INT_MIN_VAL) + return nil + end + + return i + end + + def read_int64(bytes) + # reverse here to switch little endian to big endian + # this is because pack('N') is choking on 'bigint', wtf? + l = read_slice(8, bytes).reverse.unpack("q").first.to_i + + if(l == LONG_MIN_VAL) + return nil + end + + return l + end + + def read_float32(bytes) + f = read_slice(4, bytes).unpack("g").first.to_f + + if(f == FLOAT_MIN_VAL) + return nil + end + + return f + end + + def read_float64(bytes) + d = read_slice(8, bytes).unpack("G").first.to_f + + if(d == DOUBLE_MIN_VAL) + return nil + end + + return d + end + + def read_date(bytes) + d = read_int64(bytes) + + if(d != nil) + d = Time.at((d / 1000).to_i, d % 1000) + end + + return d + end + + def read_bytes(bytes) + size = read_int16(bytes) + + if(size < 0) + return nil + else + return read_slice(size, bytes) + end + end +end + +class VoldemortPassThroughSerializer + def initialize(map) + end + + def to_bytes(bytes) + bytes + end + + def to_object(object) + object + end +end \ No newline at end of file diff --git a/clients/ruby/spec/connection_spec.rb b/clients/ruby/spec/connection_spec.rb new file mode 100644 index 0000000000..4e061113c8 --- /dev/null +++ b/clients/ruby/spec/connection_spec.rb @@ -0,0 +1,98 @@ +require File.dirname(__FILE__) + '/spec_helper' + +describe Connection do + + before(:each) do + @connection = Connection.new("test", "localhost:6666") + end + + describe "default methods" do + + it "should support connect" do + @connection.should respond_to(:connect) + end + + it "should support reconnect" do + @connection.should respond_to(:reconnect) + end + + it "should support disconnect" do + @connection.should respond_to(:disconnect) + end + + it "should parse nodes from xml" do + @connection.should respond_to(:parse_nodes_from) + xml = "\r\n mycluster\r\n \r\n 0\r\n localhost\r\n 8081\r\n 6666\r\n 6667\r\n 0, 1\r\n \r\n" + doc = Nokogiri::XML(xml) + nodes = @connection.parse_nodes_from(doc) + nodes.first.host.should eql("localhost") + nodes.first.port.should eql("6666") + nodes.length.should eql(1) + end + + it "should tell to wich node is connected to" do + @connection.should respond_to(:connected_node) + node = mock(VoldemortNode) + node.stub!(:host).and_return("localhost") + node.stub!(:port).and_return(6666) + @connection.nodes.stub!(:sort_by).and_return([node]) + @connection.stub!(:connect_to).and_return(true) + @connection.connect_to_random_node + @connection.connected_node.should eql(node) + end + + it "should use protobuf by default" do + @connection.protocol_version.should eql("pb0") + end + + it "should use the hosts specified" do + connection = Connection.new("test", "localhost:6666") + connection.hosts.should eql("localhost:6666") + connection.nodes.length.should eql(1) + connection2 = Connection.new("test", ["localhost:6666", "localhost:7777"]) + connection2.hosts.should eql(["localhost:6666", "localhost:7777"]) + connection2.nodes.length.should eql(2) + end + end + + describe "rebalance nodes by evaluating number of requests" do + + it "should have a request_count and request_limit_per_node per node connection" do + @connection.should respond_to(:request_count) + @connection.should respond_to(:request_limit_per_node) + end + + it "should tell if the request limit per node was reached" do + @connection.request_count = 0 + @connection.request_limit_per_node = 10 + @connection.rebalance_connection?.should eql(false) + @connection.request_count = 11 + @connection.request_limit_per_node = 10 + @connection.rebalance_connection?.should eql(true) + end + + it "should reconnect every N number of requests" do + @connection.should_receive(:rebalance_connection?).and_return(true) + @connection.should_receive(:reconnect).and_return(true) + @connection.rebalance_connection_if_needed + end + + it "should not reconnect if it haven't reached the limit of requests" do + @connection.should_receive(:rebalance_connection?).and_return(false) + @connection.should_not_receive(:reconnect).and_return(false) + @connection.rebalance_connection_if_needed + end + + it "should rebalance if needed when calling get, get_all or put" do + @connection.should_receive(:rebalance_connection_if_needed).exactly(3).times.and_return(true) + @connection.stub!(:get_from).and_return(true) + @connection.stub!(:get_all_from).and_return(true) + @connection.stub!(:put_from).and_return(true) + @connection.stub!(:delete_from).and_return(true) + @connection.get("value") + @connection.put("value", "value") + @connection.get_all(["key1", "key2"]) + @connection.delete("key") + end + end +end \ No newline at end of file diff --git a/clients/ruby/spec/spec_helper.rb b/clients/ruby/spec/spec_helper.rb new file mode 100644 index 0000000000..6049254fa4 --- /dev/null +++ b/clients/ruby/spec/spec_helper.rb @@ -0,0 +1,2 @@ +require 'rubygems' +require 'voldemort-rb' \ No newline at end of file diff --git a/clients/ruby/spec/tcp_connection_spec.rb b/clients/ruby/spec/tcp_connection_spec.rb new file mode 100644 index 0000000000..7e3c3d24fe --- /dev/null +++ b/clients/ruby/spec/tcp_connection_spec.rb @@ -0,0 +1,40 @@ +require File.dirname(__FILE__) + '/spec_helper' + +describe TCPConnection do + + before(:each) do + @connection = TCPConnection.new("test", "localhost:6666") + end + + describe "connection mechanism" do + + it "should connect to a specified host" do + @connection.should respond_to(:connect_to) + mock_socket = mock(TCPSocket) + TCPSocket.should_receive(:open).and_return(mock_socket) + @connection.should_receive(:send_protocol_version).and_return(true) + @connection.should_receive(:protocol_handshake_ok?).and_return(true) + @connection.connect_to("localhost", 6666).should eql(mock_socket) + end + + it "should send the protocol" do + @connection.should respond_to(:send_protocol_version) + mock_socket = mock(TCPSocket) + @connection.stub!(:socket).and_return(mock_socket) + mock_socket.should_receive(:write).with(Connection::PROTOCOL).and_return(true) + @connection.send_protocol_version.should eql(true) + end + + it "should receive the protocol handshake response" do + @connection.should respond_to(:protocol_handshake_ok?) + mock_socket = mock(TCPSocket) + @connection.stub!(:socket).and_return(mock_socket) + mock_socket.should_receive(:recv).with(2).and_return(Connection::STATUS_OK) + @connection.protocol_handshake_ok?.should eql(true) + end + + it "should have a socket" do + @connection.should respond_to(:socket) + end + end +end diff --git a/clients/ruby/spec/voldemort_client_spec.rb b/clients/ruby/spec/voldemort_client_spec.rb new file mode 100644 index 0000000000..58348f534a --- /dev/null +++ b/clients/ruby/spec/voldemort_client_spec.rb @@ -0,0 +1,98 @@ +require File.dirname(__FILE__) + '/spec_helper' + +include Voldemort + +describe VoldemortClient do + + before(:each) do + connection = mock(TCPConnection) + connection.stub!(:key_serializer_type).and_return("string") + connection.stub!(:value_serializer_type).and_return("string") + node = mock(VoldemortNode) + connection.stub!(:bootstrap).and_return(node) + TCPConnection.stub!(:new).and_return(connection) + @client = VoldemortClient.new("test", "localhost:6666") + @client.stub!(:connection).and_return(connection) + end + + describe "connection abstraction" do + it "should have a connection" do + @client.should respond_to(:connection) + end + + it "should initialize the connection" do + @client.connection.should_not be(nil) + end + end + + describe "default methods" do + + it "should support get" do + @client.should respond_to(:get) + version = mock(Versioned) + v = mock(VectorClock) + v.stub!(:value).and_return("some value") + version.stub!(:versioned).and_return([v]) + @client.connection.should_receive(:get).with("key").and_return(version) + @client.get("key").should eql("some value") + end + + it "should support get all" do + @client.should respond_to(:get_all) + version = mock(Versioned) + v = mock(VectorClock) + v.stub!(:value).and_return("some value") + v.stub!(:key).and_return("key") + v.stub!(:versions).and_return([v]) + version.stub!(:values).and_return([v]) + @client.connection.should_receive(:get_all).with(["key", "key2"]).and_return(version) + @client.get_all(["key", "key2"]).should eql({ "key" => "some value" }) # we pretend key2 doesn't exist + end + + it "should support put" do + @client.should respond_to(:put) + @client.connection.should_receive(:put).with("key", "value").and_return("version") + @client.put("key", "value").should eql("version") + end + + it "should support delete" do + @client.should respond_to(:delete) + @client.connection.should_receive(:delete).with("key").and_return(true) + @client.delete("key").should eql(true) + end + end + + describe "default resolver" do + + before(:each) do + @old_versioned = Versioned.new + @old_versioned.value = "old value" + @old_versioned.version = VectorClock.new + @old_versioned.version.timestamp = (Time.now-86400).to_i * 1000 + + @new_versioned = Versioned.new + @new_versioned.value = "new value" + @new_versioned.version = VectorClock.new + @new_versioned.version.timestamp = (Time.now).to_i * 1000 + + @versions = [] + @versions << @old_versioned + @versions << @new_versioned + end + + it "should have a default resolver" do + @client.should respond_to(:conflict_resolver) + end + + it "should pick a default version form a list of versions, and should be the most recent value" do + @client.resolve_conflicts(@versions).should eql(@new_versioned) + end + + it "should allow a custom conflict resolver" do + @client = VoldemortClient.new("test", "localhost:6666") do |versions| + versions.first # just return the first version + end + @client.resolve_conflicts(@versions).should eql(@old_versioned) + end + end +end diff --git a/clients/ruby/spec/voldemort_node_spec.rb b/clients/ruby/spec/voldemort_node_spec.rb new file mode 100644 index 0000000000..141cb250e2 --- /dev/null +++ b/clients/ruby/spec/voldemort_node_spec.rb @@ -0,0 +1,17 @@ +require File.dirname(__FILE__) + '/spec_helper' + +describe VoldemortNode do + + before(:each) do + @voldemort_node = VoldemortNode.new + end + + describe "default methods" do + + it "should have id, host, port, http_port, admin_port and partitions" do + [:id, :host, :port, :http_port, :admin_port, :partitions].each do |m| + @voldemort_node.should respond_to(m) + end + end + end +end \ No newline at end of file diff --git a/clients/ruby/tasks/spec.rake b/clients/ruby/tasks/spec.rake new file mode 100644 index 0000000000..caf0dd9c9d --- /dev/null +++ b/clients/ruby/tasks/spec.rake @@ -0,0 +1,9 @@ +require 'spec/rake/spectask' + +spec_files = Rake::FileList["spec/**/*_spec.rb"] + +desc "Run specs" +Spec::Rake::SpecTask.new do |t| + t.spec_files = spec_files + t.spec_opts = ["-c"] +end diff --git a/clients/ruby/tasks/voldemort_client_tasks.rake b/clients/ruby/tasks/voldemort_client_tasks.rake new file mode 100644 index 0000000000..197d707f72 --- /dev/null +++ b/clients/ruby/tasks/voldemort_client_tasks.rake @@ -0,0 +1,4 @@ +# desc "Explaining what the task does" +# task :voldemort_client do +# # Task goes here +# end diff --git a/clients/ruby/uninstall.rb b/clients/ruby/uninstall.rb new file mode 100644 index 0000000000..9738333463 --- /dev/null +++ b/clients/ruby/uninstall.rb @@ -0,0 +1 @@ +# Uninstall hook code here diff --git a/clients/ruby/voldemort-rb.gemspec b/clients/ruby/voldemort-rb.gemspec new file mode 100644 index 0000000000..aaf1961c56 --- /dev/null +++ b/clients/ruby/voldemort-rb.gemspec @@ -0,0 +1,30 @@ +Gem::Specification.new do |s| + s.name = 'voldemort-rb' + s.version = '0.1.5' + s.summary = %{A Ruby client for the Voldemort distributed key value store} + s.description = %Q{voldemort-rb allows you to connect to the Voldemort descentralized key value store.} + s.authors = ["Alejandro Crosa"] + s.email = ["alejandrocrosa@gmail.com"] + s.homepage = "http://github.com/acrosa/voldemort-rb" + s.files = [ + "CHANGELOG", + "LICENSE", + "README.md", + "Rakefile", + "lib/voldemort-rb.rb", + "lib/voldemort-serializer.rb", + "lib/connection/connection.rb", + "lib/connection/tcp_connection.rb", + "lib/connection/voldemort_node.rb", + "lib/protos/voldemort-client.pb.rb", + "lib/protos/voldemort-client.proto", + "spec/connection_spec.rb", + "spec/tcp_connection_spec.rb", + "spec/voldemort_node_spec.rb", + "spec/voldemort_client_spec.rb", + "spec/spec_helper.rb" + ] + s.require_paths = ["lib"] + s.add_dependency('ruby_protobuf', '>= 0.3.3') + s.add_dependency('nokogiri', '>= 1.4.3.1') +end