Skip to content
Browse files

Propagate the batch_finish parameter into Transformation creation

  • Loading branch information...
1 parent 074d035 commit e1b3c85fb48542d1e5460123cd3309ea0b0173a4 @stuhood stuhood committed Feb 4, 2012
View
2 bin/setup_shards
@@ -104,7 +104,7 @@ existing_map = manifest.template_map
puts "\nCalculating changes...\n"
-migrator = Gizzard::Migrator.new(existing_map, new_templates, total_shards, migrator_config)
+migrator = Gizzard::Migrator.new(existing_map, new_templates, total_shards, migrator_config, false)
puts "\nUNCHANGED:"
pp migrator.unchanged_templates
View
17 lib/gizzard/commands.rb
@@ -3,7 +3,7 @@
require "digest/md5"
module Gizzard
- def confirm!(message="Continue?")
+ def Gizzard::confirm!(message="Continue?")
begin
print "#{message} (y/n) "; $stdout.flush
resp = $stdin.gets.chomp.downcase
@@ -876,6 +876,7 @@ def get_transformations
scheduler_options = command_options.scheduler_options || {}
copy_wrapper = scheduler_options[:copy_wrapper]
skip_copies = scheduler_options[:skip_copies] || false
+ batch_finish = scheduler_options[:batch_finish] || false
transformations = {}
memoized_transforms = {}
@@ -887,7 +888,7 @@ def get_transformations
manifest = manifest_for_write(forwarding.table_id)
shard = manifest.trees[forwarding]
- transform_args = [shard.template, to_template, copy_wrapper, skip_copies]
+ transform_args = [shard.template, to_template, copy_wrapper, skip_copies, batch_finish]
transformation = memoized_transforms.fetch(transform_args) do |args|
memoized_transforms[args] = Transformation.new(*args)
end
@@ -911,11 +912,12 @@ def get_transformations
manifest = manifest_for_write(*global_options.tables)
copy_wrapper = scheduler_options[:copy_wrapper]
skip_copies = scheduler_options[:skip_copies] || false
+ batch_finish = scheduler_options[:batch_finish] || false
transformations = {}
@argv.each_slice(2) do |(from_template_s, to_template_s)|
from, to = [from_template_s, to_template_s].map {|s| ShardTemplate.parse(s) }
- transformation = Transformation.new(from, to, copy_wrapper, skip_copies)
+ transformation = Transformation.new(from, to, copy_wrapper, skip_copies, batch_finish)
forwardings = Set.new(manifest.templates[from] || [])
trees = manifest.trees.reject {|(f, s)| !forwardings.include?(f) }
@@ -935,6 +937,7 @@ def get_transformations
scheduler_options = command_options.scheduler_options || {}
manifest = manifest_for_write(*global_options.tables)
copy_wrapper = scheduler_options[:copy_wrapper]
+ batch_finish = scheduler_options[:batch_finish] || false
transformations = {}
dest_templates_and_weights = {}
@@ -948,7 +951,7 @@ def get_transformations
global_options.tables.inject({}) do |all, table|
trees = manifest.trees.reject {|(f, s)| f.table_id != table }
- rebalancer = Rebalancer.new(trees, dest_templates_and_weights, copy_wrapper)
+ rebalancer = Rebalancer.new(trees, dest_templates_and_weights, copy_wrapper, batch_finish)
all.update(rebalancer.transformations) {|t,a,b| a.merge b }
end
@@ -963,6 +966,7 @@ def run
scheduler_options = command_options.scheduler_options || {}
manifest = manifest_for_write(*global_options.tables)
copy_wrapper = scheduler_options[:copy_wrapper]
+ batch_finish = scheduler_options[:batch_finish] || false
be_quiet = global_options.force && command_options.quiet
transformations = {}
@@ -986,7 +990,7 @@ def run
transformations = global_options.tables.inject({}) do |all, table|
trees = manifest.trees.reject {|(f, s)| f.table_id != table }
- rebalancer = Rebalancer.new(trees, dest_templates_and_weights, copy_wrapper)
+ rebalancer = Rebalancer.new(trees, dest_templates_and_weights, copy_wrapper, batch_finish)
all.update(rebalancer.transformations) {|t,a,b| a.merge b }
end
@@ -1023,6 +1027,7 @@ def run
scheduler_options = command_options.scheduler_options || {}
manifest = manifest_for_write(*global_options.tables)
copy_wrapper = scheduler_options[:copy_wrapper]
+ batch_finish = scheduler_options[:batch_finish] || false
be_quiet = global_options.force && command_options.quiet
transformations = {}
@@ -1042,7 +1047,7 @@ def run
transformations = global_options.tables.inject({}) do |all, table|
trees = manifest.trees.reject {|(f, s)| f.table_id != table }
- rebalancer = Rebalancer.new(trees, dest_templates_and_weights, copy_wrapper)
+ rebalancer = Rebalancer.new(trees, dest_templates_and_weights, copy_wrapper, batch_finish)
all.update(rebalancer.transformations) {|t,a,b| a.merge b }
end
View
6 lib/gizzard/migrator.rb
@@ -20,13 +20,14 @@ class Migrator
# populated via derive_changes
attr_reader :new_templates, :unrecognized_templates, :similar_templates, :unchanged_templates
- def initialize(existing_map, config_templates, default_total_shards, config)
+ def initialize(existing_map, config_templates, default_total_shards, config, batch_finish)
@configured_templates = config_templates
@existing_map = existing_map
@existing_templates = existing_map.keys
@total_shards = @existing_map.values.map { |a| a.length }.inject { |a, b| a + b } || default_total_shards
@config = config
+ @batch_finish = batch_finish
derive_changes
end
@@ -65,6 +66,7 @@ def transformations
# be rebalanced later.
configured_map.values.first.concat forwardings.values
+ # TODO: ForwardingTransformation does not appear to be defined anywhere? dead codepath?
@transformations << ForwardingTransformation.new(@config.table_id, forwardings.inject({}) {|f, (b, e)| f.update b => @config.shard_name(e) })
end
@@ -140,7 +142,7 @@ def generate_transformations(existing, configured)
# transformation for each one.
(configured_shards.to_a - existing_shards.to_a).inject({}) do |transformations, (shard, to)|
from = existing_shards[shard]
- (transformations[[from, to]] ||= Transformation.new(from, to, [])).shards << shard
+ (transformations[[from, to]] ||= Transformation.new(from, to, :batch_finish => @batch_finish)).shards << shard
transformations
end.values
end
View
5 lib/gizzard/rebalancer.rb
@@ -20,8 +20,9 @@ def delete(e); set.delete(e) end
# 4. put shards in destinations based on reducing number of copies required.
# 5.
- def initialize(forwardings_to_trees, dest_templates_and_weights, wrapper)
+ def initialize(forwardings_to_trees, dest_templates_and_weights, wrapper, batch_finish)
@copy_dest_wrapper = wrapper
+ @batch_finish = batch_finish
@shards = forwardings_to_trees.map do |forwarding, tree|
TemplateAndTree.new(tree.template, forwarding, tree)
end.flatten
@@ -87,7 +88,7 @@ def transformations
@transformations = {}
@result.each do |bucket|
bucket.set.each do |shard|
- trans = Transformation.new(shard.template, bucket.template, @copy_dest_wrapper)
+ trans = Transformation.new(shard.template, bucket.template, @copy_dest_wrapper, @batch_finish)
forwardings_to_trees = (@transformations[trans] ||= {})
forwardings_to_trees.update(shard.forwarding => shard.tree)
View
22 lib/gizzard/transformation.rb
@@ -51,7 +51,7 @@ class Transformation
attr_reader :from, :to, :copy_dest_wrapper, :skip_copies
- def initialize(from_template, to_template, copy_dest_wrapper = nil, skip_copies = false)
+ def initialize(from_template, to_template, copy_dest_wrapper = nil, skip_copies = false, batch_finish = false)
copy_dest_wrapper ||= DEFAULT_DEST_WRAPPER
unless Shard::VIRTUAL_SHARD_TYPES.include? copy_dest_wrapper
@@ -62,6 +62,7 @@ def initialize(from_template, to_template, copy_dest_wrapper = nil, skip_copies
@to = to_template
@copy_dest_wrapper = copy_dest_wrapper
@skip_copies = skip_copies
+ @batch_finish = batch_finish
if copies_required? && copy_source.nil?
raise ArgumentError, "copy required without a valid copy source"
@@ -111,19 +112,19 @@ def inspect
# TODO: This seems kind of daft to copy around these long strings.
# Loop over it once just for display?
- def phase_line(phase)
+ phase_line = lambda do |phase|
op_inspect[phase].empty? ? "" : " #{OP_PHASES[phase]}\n#{op_inspect[phase]}\n"
end
# display phase lists in a particular order
op_inspect = [
- phase_line(:prepare),
- phase_line(:copy),
- phase_line(:repair),
- phase_line(:diff),
- phase_line(:settle_begin),
- phase_line(:settle_end),
- phase_line(:cleanup)
+ phase_line.call(:prepare),
+ phase_line.call(:copy),
+ phase_line.call(:repair),
+ phase_line.call(:diff),
+ phase_line.call(:settle_begin),
+ phase_line.call(:settle_end),
+ phase_line.call(:cleanup)
].join
"#{from.inspect} => #{to.inspect} :\n#{op_inspect}"
@@ -157,7 +158,7 @@ def collapse_jobs(jobs)
def expand_jobs(jobs)
expanded = jobs.inject(initialize_op_phases) do |ops, job|
- job_ops = job.expand(self.copy_source, involved_in_copy?(job.template))
+ job_ops = job.expand(self.copy_source, involved_in_copy?(job.template), @batch_finish)
ops.update(job_ops) {|k,a,b| a + b }
end
@@ -228,7 +229,6 @@ def visit_collect(parent, pass_down_method=Proc.new{}, pass_down_value=nil, &blo
end
end
-
class BoundTransformation
attr_reader :transformation, :base_name, :forwarding, :shard
View
58 lib/gizzard/transformation_op.rb
@@ -110,11 +110,18 @@ def eql?(other)
end
class AddLink < LinkOp
- def expand(copy_source, involved_in_copy)
- if involved_in_copy && @wrapper_type
- wrapper = ShardTemplate.new(@wrapper_type, to.host, to.weight, '', '', [to])
- { :prepare => [AddLink.new(from, wrapper)],
- :cleanup => [self, RemoveLink.new(from, wrapper)] }
+ def expand(copy_source, involved_in_copy, batch_finish)
+ # TODO: enforce that wrapper definitions match everywhere
+ if !batch_finish && involved_in_copy && @wrapper_type
+ copy_wrapper = ShardTemplate.new(@wrapper_type, to.host, to.weight, '', '', [to])
+ { :prepare => [AddLink.new(from, copy_wrapper)],
+ :cleanup => [self, RemoveLink.new(from, copy_wrapper)] }
+ elsif batch_finish && involved_in_copy && @wrapper_type
+ copy_wrapper = ShardTemplate.new(@wrapper_type, to.host, to.weight, '', '', [to])
+ settle_wrapper = ShardTemplate.new('WriteOnlyShard', to.host, to.weight, '', '', [to])
+ { :prepare => [AddLink.new(from, copy_wrapper)],
+ :settle_begin => [AddLink.new(from, settle_wrapper), RemoveLink.new(from, copy_wrapper)],
+ :settle_end => [self, RemoveLink.new(from, settle_wrapper)] }
else
{ :prepare => [self] }
end
@@ -129,7 +136,7 @@ def apply(nameserver, table_id, base_id, table_prefix, translations)
end
class RemoveLink < LinkOp
- def expand(copy_source, involved_in_copy)
+ def expand(copy_source, involved_in_copy, batch_finish)
{ (involved_in_copy ? :cleanup : :prepare) => [self] }
end
@@ -159,13 +166,23 @@ def eql?(other)
end
class CreateShard < ShardOp
- def expand(copy_source, involved_in_copy)
- if involved_in_copy && @wrapper_type
- wrapper = ShardTemplate.new(@wrapper_type, template.host, template.weight, '', '', [template])
- { :prepare => [self, CreateShard.new(wrapper), AddLink.new(wrapper, template)],
- :cleanup => [RemoveLink.new(wrapper, template), DeleteShard.new(wrapper)],
+ def expand(copy_source, involved_in_copy, batch_finish)
+ # TODO: enforce that wrapper definitions match everywhere
+ if !batch_finish && involved_in_copy && @wrapper_type
+ copy_wrapper = ShardTemplate.new(@wrapper_type, template.host, template.weight, '', '', [template])
+ { :prepare => [self, CreateShard.new(copy_wrapper), AddLink.new(copy_wrapper, template)],
+ :cleanup => [RemoveLink.new(copy_wrapper, template), DeleteShard.new(copy_wrapper)],
+ :copy => [CopyShard.new(copy_source, template)] }
+ elsif batch_finish && involved_in_copy && @wrapper_type
+ copy_wrapper = ShardTemplate.new(@wrapper_type, template.host, template.weight, '', '', [template])
+ settle_wrapper = ShardTemplate.new('WriteOnlyShard', template.host, template.weight, '', '', [template])
+ { :prepare => [self, CreateShard.new(copy_wrapper), AddLink.new(copy_wrapper, template)],
+ :settle_begin => [RemoveLink.new(copy_wrapper, template), DeleteShard.new(copy_wrapper),
+ CreateShard.new(settle_wrapper), AddLink.new(settle_wrapper, template)],
+ :settle_end => [RemoveLink.new(settle_wrapper, template), DeleteShard.new(settle_wrapper)],
:copy => [CopyShard.new(copy_source, template)] }
elsif involved_in_copy
+ # TODO: when would a wrapper type not be defined? should this still be supported?
{ :prepare => [self],
:copy => [CopyShard.new(copy_source, template)] }
else
@@ -179,7 +196,7 @@ def apply(nameserver, table_id, base_id, table_prefix, translations)
end
class DeleteShard < ShardOp
- def expand(copy_source, involved_in_copy)
+ def expand(copy_source, involved_in_copy, batch_finish)
{ (involved_in_copy ? :cleanup : :prepare) => [self] }
end
@@ -189,11 +206,18 @@ def apply(nameserver, table_id, base_id, table_prefix, translations)
end
class SetForwarding < ShardOp
- def expand(copy_source, involved_in_copy)
- if involved_in_copy && @wrapper_type
- wrapper = ShardTemplate.new(@wrapper_type, nil, 0, '', '', [to])
- { :prepare => [SetForwarding.new(template, wrapper)],
+ def expand(copy_source, involved_in_copy, batch_finish)
+ # TODO: enforce that wrapper definitions match everywhere
+ if !batch_finish && involved_in_copy && @wrapper_type
+ copy_wrapper = ShardTemplate.new(@wrapper_type, nil, 0, '', '', [to])
+ { :prepare => [SetForwarding.new(template, copy_wrapper)],
:cleanup => [self] }
+ elsif batch_finish && involved_in_copy && @wrapper_type
+ copy_wrapper = ShardTemplate.new(@wrapper_type, nil, 0, '', '', [to])
+ settle_wrapper = ShardTemplate.new('WriteOnlyShard', nil, 0, '', '', [to])
+ { :prepare => [SetForwarding.new(template, copy_wrapper)],
+ :settle_begin => [SetForwarding.new(template, settle_wrapper)],
+ :settle_end => [self] }
else
{ :prepare => [self] }
end
@@ -210,7 +234,7 @@ def apply(nameserver, table_id, base_id, table_prefix, translations)
# XXX: A no-op, but needed for setup/teardown symmetry
class RemoveForwarding < ShardOp
- def expand(copy_source, involved_in_copy)
+ def expand(copy_source, involved_in_copy, batch_finish)
{ (involved_in_copy ? :cleanup : :prepare) => [self] }
end
View
1 lib/gizzard/transformation_scheduler.rb
@@ -1,4 +1,5 @@
require "set"
+require "gizzard/commands"
module Gizzard
def self.schedule!(*args)
View
2 test/transformation_spec.rb
@@ -25,7 +25,7 @@ def remove_forwarding(t); Op::RemoveForwarding.new(mk_template(t)) end
@host_2_template = mk_template 'SqlShard(host2)'
@host_3_template = mk_template 'SqlShard(host3)'
- @trans = Gizzard::Transformation.new(@from_template, @to_template)
+ @trans = Gizzard::Transformation.new(@from_template, @to_template, :batch_finish => false)
end
describe "initialization" do

0 comments on commit e1b3c85

Please sign in to comment.
Something went wrong with that request. Please try again.