Permalink
Browse files

prepararation for resharding

  • Loading branch information...
1 parent d6ecf16 commit 006aad6b9df9b90e68949f91756d361657806701 @funny-falcon funny-falcon committed Aug 3, 2012
View
@@ -37,16 +37,19 @@ def new(conf)
raise ArgumentError, "Shard strategy could be :round_robin or :master_first, got #{replica_strategy.inspect}"
end
+ previous_shards_count = conf[:previous_shards_count]
+ insert_to_previous_shard = conf[:insert_to_previous_shard]
+
case conf[:type] || :block
when :em, :em_fiber
require 'tarantool/fiber_db'
- FiberDB.new(shards, replica_strategy)
+ FiberDB.new(shards, replica_strategy, previous_shards_count, insert_to_previous_shard)
when :em_cb, :em_callback
require 'tarantool/callback_db'
- CallbackDB.new(shards, replica_strategy)
+ CallbackDB.new(shards, replica_strategy, previous_shards_count, insert_to_previous_shard)
when :block
require 'tarantool/block_db'
- BlockDB.new(shards, replica_strategy)
+ BlockDB.new(shards, replica_strategy, previous_shards_count, insert_to_previous_shard)
else
raise "Unknown Tarantool connection type #{conf[:type]}"
end
@@ -68,11 +71,13 @@ def _fix_connection(conn)
end
class DB
- attr_reader :closed, :connection
+ attr_reader :closed, :connections
alias closed? closed
- def initialize(shards, replica_strategy)
+ def initialize(shards, replica_strategy, previous_shards_count, insert_to_previous_shard)
@shards = shards
@replica_strategy = replica_strategy
+ @previous_shards_count = previous_shards_count
+ @insert_to_previous_shard = insert_to_previous_shard
@connections = {}
@closed = false
end
@@ -128,6 +133,12 @@ def shards_count
@shards.count
end
+ attr_reader :previous_shards_count
+
+ def insert_with_shards_count
+ @insert_to_previous_shard && @previous_shards_count || @shards.count
+ end
+
def _shard(number)
@connections[number] ||= begin
@shards[number].map do |host, port|
@@ -69,12 +69,25 @@ def _one_shard_write(replicas, request_type, body)
def _send_to_several_shards(shard_numbers, read_write, request_type, body, response)
results = []
- for shard in shard_numbers
- res = _send_to_one_shard(shard, read_write, request_type, body, response)
- if Array === res
- results.concat res
+ unless read_write == :replace
+ for shard in shard_numbers
+ res = _send_to_one_shard(shard, read_write, request_type, body, response)
+ Array === res ? results.concat(res) : results << res
+ end
+ else
+ for shard in shard_numbers
+ begin
+ res = _send_to_one_shard(shard, read_write, request_type, body, response)
+ Array === res ? results.concat(res) : results << res
+ rescue ::Tarantool::TupleDoesntExists => e
+ results << e
+ end
+ end
+
+ if results.all?{|r| ::Tarantool::TupleDoesntExists === r}
+ raise results.first
else
- results << res
+ results.delete_if{|r| ::Tarantool::TupleDoesntExists === r}
end
end
if Integer === results.first
@@ -148,8 +148,74 @@ def call(array)
end
end
+ class ConcatterReplace
+ def initialize(count, feed)
+ @result = []
+ @count = count
+ @feed = feed
+ end
+ def call(array)
+ if @count > 0
+ case array
+ when Array
+ @result.concat array
+ when Exception
+ @result = array
+ @count = 1
+ else
+ @result << array
+ end
+ if (@count -= 1) == 0
+ if Array === @result && Integer === @result.first
+ @feed.call @result.inject(0){|s, i| s + i}
+ else
+ @feed.call @result
+ end
+ end
+ end
+ end
+ end
+
+ class ConcatterReplace
+ def initialize(count, feed)
+ @result = []
+ @count = count
+ @feed = feed
+ end
+ def call(array)
+ if @count > 0
+ case array
+ when Array
+ @result.concat array
+ when ::Tarantool::TupleDoesntExists
+ @result << array
+ when Exception
+ @result = array
+ @count = 1
+ else
+ @result << array
+ end
+ if (@count -= 1) == 0
+ if Exception === @result
+ @feed.call @result
+ elsif @result.all?{|r| ::Tarantool::TupleDoesntExists === r}
+ @feed.call @result.first
+ else
+ @result.delete_if{|r| ::Tarantool::TupleDoesntExists === r}
+ if Integer === @result.first
+ @feed.call @result.inject(0){|s, i| s + i}
+ else
+ @feed.call @result
+ end
+ end
+ end
+ end
+ end
+ end
+
def _send_to_several_shards(shard_numbers, read_write, request_type, body, response, feed)
- concat = Concatter.new(shard_numbers.size, feed)
+ concat = read_write != :replace ? Concatter.new(shard_numbers.size, feed) :
+ ConcatterReplace.new(shard_numbers.size, feed)
for shard in shard_numbers
_send_to_one_shard(shard, read_write, request_type, body, response, concat)
end
@@ -16,9 +16,8 @@ def select_cb(space_no, index_no, keys, offset, limit, cb, opts={})
if Hash === returns
returns, *translators = _parse_hash_definition(returns)
end
- shard_nums = _get_shard_nums{ all_shards }
_select(space_no, index_no, offset, limit, keys, cb, returns,
- types, shard_nums, translators)
+ types, all_shards, translators)
end
def all_cb(space_no, index_no, keys, cb, opts={})
@@ -33,20 +32,17 @@ def first_cb(space_no, index_no, key, cb, opts={})
def insert_cb(space_no, tuple, cb, opts={})
types = opts[:types] || _detect_types(tuple)
- shard_nums = _get_shard_nums{ all_shards }
- _insert(space_no, BOX_ADD, tuple, types, cb, opts[:return_tuple], shard_nums)
+ _insert(space_no, BOX_ADD, tuple, types, cb, opts[:return_tuple], all_shards)
end
def replace_cb(space_no, tuple, cb, opts={})
types = opts[:types] || _detect_types(tuple)
- shard_nums = _get_shard_nums{ all_shards }
- _insert(space_no, BOX_REPLACE, tuple, types, cb, opts[:return_tuple], shard_nums)
+ _insert(space_no, BOX_REPLACE, tuple, types, cb, opts[:return_tuple], all_shards)
end
def store_cb(space_no, tuple, cb, opts={})
types = opts[:types] || _detect_types(tuple)
- shard_nums = _get_shard_nums{ all_shards }
- _insert(space_no, 0, tuple, types, cb, opts[:return_tuple], shard_nums)
+ _insert(space_no, 0, tuple, types, cb, opts[:return_tuple], all_shards)
end
def update_cb(space_no, pk, operations, cb, opts={})
@@ -56,9 +52,8 @@ def update_cb(space_no, pk, operations, cb, opts={})
if Hash === returns && opts[:return_tuple]
returns, *translators = _parse_hash_definition(returns)
end
- shard_nums = _get_shard_nums{ all_shards }
_update(space_no, pk, operations, returns, pk_types, cb,
- opts[:return_tuple], shard_nums, translators)
+ opts[:return_tuple], all_shards, translators)
end
def delete_cb(space_no, pk, cb, opts={})
@@ -68,9 +63,8 @@ def delete_cb(space_no, pk, cb, opts={})
if Hash === returns && opts[:return_tuple]
returns, *translators = _parse_hash_definition(returns)
end
- shard_nums = _get_shard_nums{ all_shards }
_delete(space_no, pk, returns, pk_types, cb,
- opts[:return_tuple], shard_nums, translators)
+ opts[:return_tuple], all_shards, translators)
end
def invoke_cb(func_name, values, cb, opts={})
@@ -3,6 +3,10 @@
require 'tarantool/serializers'
module Tarantool
+ module CommonSpace
+ attr_reader :tarantool, :space_no
+ end
+
module Request
include Util::Packer
include Util::TailGetter
@@ -92,7 +96,7 @@ def pack_tuple(body, key, types, index_no = 0)
body << INT32_1
pack_field(body, types[0], key)
end
- rescue IndexIndexError => e
+ rescue IndexIndexError
raise ArgumentError, "tuple #{key} has more entries than index #{index_no}"
end
@@ -181,16 +185,16 @@ def _modify_request(type, body, fields, ret_tuple, cb, shard_nums, read_write, t
_send_request(shard_nums, read_write, type, body, response)
end
- def _insert(space_no, flags, tuple, fields, cb, ret_tuple, shard_nums, translators = [])
+ def _insert(space_no, flags, tuple, fields, cb, ret_tuple, shard_nums, in_any_shard = nil, translators = [])
flags |= BOX_RETURN_TUPLE if ret_tuple
fields = [*fields]
tuple = [*tuple]
- tuple_size = tuple.size
body = [space_no, flags].pack(INSERT_HEADER)
pack_tuple(body, tuple, fields, :space)
- _modify_request(REQUEST_INSERT, body, fields, ret_tuple, cb, shard_nums, :write, translators)
+ _modify_request(REQUEST_INSERT, body, fields, ret_tuple, cb, shard_nums,
+ in_any_shard ? :replace : :write, translators)
end
def _update(space_no, pk, operations, fields, pk_fields, cb, ret_tuple, shard_nums, translators = [])
@@ -319,12 +323,19 @@ def _space_call_fix_values(values, space_no, opts)
opts[:types] = TYPES_STR_STR
end
end
+
# scheck for shards hints
- opts[:shards] ||= _get_shard_nums {
- opts[:shard_keys] ? _detect_shards(opts[:shard_keys]) :
- opts[:shard_key] ? detect_shard( opts[:shard_key]) :
- all_shards
- }
+ opts[:shards] ||= _get_shard_nums do
+ if opts[:shard_for_insert]
+ opts[:shard_keys] ? _detect_shards_for_insert(opts[:shard_keys]) :
+ opts[:shard_key] ? _detect_shard_for_insert( opts[:shard_key]) :
+ _all_shards
+ else
+ opts[:shard_keys] ? _detect_shards(opts[:shard_keys]) :
+ opts[:shard_key] ? _detect_shard( opts[:shard_key]) :
+ _all_shards
+ end
+ end
[values, opts]
end
@@ -341,8 +352,17 @@ def _call(func_name, values, cb, opts={})
body = [flags, func_name.size, func_name].pack(CALL_HEADER)
pack_tuple(body, values, value_types, :func_call)
- shard_nums = opts[:shards] || _get_shard_nums{ all_shards }
- read_write = opts[:readonly] ? :read : :write
+ shard_nums = opts[:shards] || all_shards
+ read_write = case opts[:readonly]
+ when nil, false, :write
+ :write
+ when true, :read
+ :read
+ when :replace
+ :replace
+ else
+ raise ArgumentError, "space#call :readonly options accepts nil, false, :write, true, :read and :replace, but #{opts[:readonly].inspect} were sent"
+ end
_modify_request(REQUEST_CALL, body, return_types, return_tuple, cb, shard_nums, read_write, opts[:translators] || [])
end
@@ -359,7 +379,7 @@ def parse_response(data)
end
end
def _ping(cb)
- _send_request(_get_shard_nums{ all_shards }, :write, REQUEST_PING, EMPTY, WrapPing.new(cb))
+ _send_request(all_shards, :write, REQUEST_PING, EMPTY, WrapPing.new(cb))
end
alias ping_cb _ping
Oops, something went wrong.

0 comments on commit 006aad6

Please sign in to comment.