Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
Add a '--copy-hosts' option to transforms, and use it to create a
Browse files Browse the repository at this point in the history
separate connection pool for the :copy phase of transforms.
  • Loading branch information
Stu Hood committed Apr 10, 2012
1 parent 3d9b5ff commit 8d6bf80
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 22 deletions.
24 changes: 20 additions & 4 deletions lib/gizzard/commands.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class << self
def run(command_name, global_options, argv, subcommand_options, log)
command_class = Gizzard.const_get("#{classify(command_name)}Command")

@manager ||= make_manager(global_options, log)
@manager ||= make_manager(global_options, global_options.hosts, log)
@job_injector ||= make_job_injector(global_options, log)

command = command_class.new(@manager, @job_injector, global_options, argv, subcommand_options)
Expand All @@ -38,9 +38,10 @@ def classify(string)
string.split(/\W+/).map{|s| s.capitalize }.join("")
end

def make_manager(global_options, log)
raise "No hosts specified" unless global_options.hosts
hosts = global_options.hosts.map {|h| [h, global_options.port].join(":") }
# create a 'Nameserver' instance for the given set of hosts
def make_manager(global_options, host_list, log)
raise "No hosts specified" unless host_list
hosts = host_list.map {|h| [h, global_options.port].join(":") }

Nameserver.new(hosts, :retries => global_options.retry,
:log => log,
Expand Down Expand Up @@ -127,6 +128,15 @@ def manifest_for_write(*table_ids)
manifest.validate_for_write_or_raise(ignore_busy, ignore_shard_types)
manifest
end

# TODO: since this is transform specific, it should move into BaseTransformCommand
# once Add/Remove partition are subclasses (DS-84)
def make_copy_manager()
scheduler_options = command_options.scheduler_options || {}
scheduler_options[:copy_hosts] ?
make_manager(global_options, scheduler_options[:copy_hosts], log) :
@manager
end
end

class RetryProxy
Expand Down Expand Up @@ -847,6 +857,7 @@ def run
end
@batch_finish = scheduler_options[:batch_finish] || false
@copy_wrapper = scheduler_options[:copy_wrapper]
@copy_manager = make_copy_manager()

scheduler_options[:force] = @force
scheduler_options[:quiet] = @be_quiet
Expand Down Expand Up @@ -876,6 +887,7 @@ def run
confirm!

Gizzard.schedule! manager,
@copy_manager,
base_name,
transformations,
scheduler_options
Expand Down Expand Up @@ -976,6 +988,7 @@ def run
batch_finish = scheduler_options[:batch_finish] || false
be_quiet = global_options.force && command_options.quiet
transformations = {}
copy_manager = make_copy_manager()

scheduler_options[:quiet] = be_quiet

Expand Down Expand Up @@ -1021,6 +1034,7 @@ def run
confirm!

Gizzard.schedule! manager,
copy_manager,
base_name,
transformations,
scheduler_options
Expand All @@ -1037,6 +1051,7 @@ def run
batch_finish = scheduler_options[:batch_finish] || false
be_quiet = global_options.force && command_options.quiet
transformations = {}
copy_manager = make_copy_manager()

scheduler_options[:quiet] = be_quiet

Expand Down Expand Up @@ -1078,6 +1093,7 @@ def run
confirm!

Gizzard.schedule! manager,
copy_manager,
base_name,
transformations,
scheduler_options
Expand Down
6 changes: 4 additions & 2 deletions lib/gizzard/transformation_scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ class Transformation::Scheduler
:poll_interval => 10,
}.freeze

def initialize(nameserver, base_name, transformations, options = {})
def initialize(nameserver, copy_nameserver, base_name, transformations, options = {})
options = DEFAULT_OPTIONS.merge(options)
@nameserver = nameserver
@copy_nameserver = copy_nameserver
@transformations = transformations
@max_copies = options[:max_copies]
@copies_per_host = options[:copies_per_host]
Expand Down Expand Up @@ -101,7 +102,8 @@ def apply!

def apply_job(job, phase)
if !(@skip_phases.include? phase)
job.apply!(@nameserver, phase, @rollback_log)
ns = phase == :copy ? @copy_nameserver : @nameserver
job.apply!(ns, phase, @rollback_log)
end
end

Expand Down
30 changes: 17 additions & 13 deletions lib/gizzmo.rb
Original file line number Diff line number Diff line change
Expand Up @@ -124,42 +124,46 @@ def load_config(options, filename)
end

def add_scheduler_opts(subcommand_options, opts)
opts.on("--ignore-types=SHARD_TYPE_LIST", "Allow transformations to begin despite shards of the given types (a comma-separated list) existing in the topology.") do |t|
(subcommand_options.scheduler_options ||= {})[:ignore_types] = t.to_s.split(',')
subcommand_options.scheduler_options ||= {}
s_opts = subcommand_options.scheduler_options
opts.on("--copy-hosts=HOST[,HOST,...]", "Host(s) to use specifically for ':copy' tasks launched by a transform. It is recommended that this be a subset of the global --hosts list. If not specified, the --hosts list is used.") do |h|
s_opts[:copy_hosts] ||= []
s_opts[:copy_hosts] += h.to_s.split(',')
end
opts.on("--ignore-types=SHARD_TYPE[,SHARD_TYPE,...]", "Allow transformations to begin despite shards of the given type(s) existing in the topology.") do |t|
s_opts[:ignore_types] = t.to_s.split(',')
end
opts.on("--ignore-busy", "Allow transformations to begin despite busy shards existing in the topology.") do
(subcommand_options.scheduler_options ||= {})[:ignore_busy] = true
s_opts[:ignore_busy] = true
end
opts.on("--max-copies=COUNT", "Limit max simultaneous copies to COUNT.") do |c|
(subcommand_options.scheduler_options ||= {})[:max_copies] = c.to_i
s_opts[:max_copies] = c.to_i
end
opts.on("--copies-per-host=COUNT", "Limit max copies per individual destination host to COUNT") do |c|
(subcommand_options.scheduler_options ||= {})[:copies_per_host] = c.to_i
s_opts[:copies_per_host] = c.to_i
end
opts.on("--poll-interval=SECONDS", "Sleep SECONDS between polling for copy status") do |c|
(subcommand_options.scheduler_options ||= {})[:poll_interval] = c.to_i
s_opts[:poll_interval] = c.to_i
end
opts.on("--copy-wrapper=SHARD_TYPE", "Wrap copy destination shards with SHARD_TYPE. default BlockedShard") do |t|
(subcommand_options.scheduler_options ||= {})[:copy_wrapper] = t
s_opts[:copy_wrapper] = t
end
opts.on("--skip-phases=PHASE_LIST", "Executes transformations without the given phases. WARNING: This is VERY DANGEROUS if you don't know what you're doing!") do |phase_list|
s_opts = (subcommand_options.scheduler_options ||= {})
opts.on("--skip-phases=PHASE[,PHASE,...]", "Executes transformations without the given phase(s). WARNING: This is VERY DANGEROUS if you don't know what you're doing!") do |phase_list|
s_opts[:skip_phases] ||= []
s_opts[:skip_phases] += phase_list.split(",").map{|phase| phase.upcase}
end
opts.on("--skip-copies", "Deprecated: use 'skip-phases=copy' instead.") do
s_opts = (subcommand_options.scheduler_options ||= {})
s_opts[:skip_phases] ||= []
s_opts[:skip_phases] << "COPY"
end
opts.on("--no-progress", "Do not show progress bar at bottom.") do
(subcommand_options.scheduler_options ||= {})[:no_progress] = true
s_opts[:no_progress] = true
end
opts.on("--batch-finish", "After copies complete (while wrapped in --copy-wrapper), move shards to a WriteOnly settling state. When all transforms are settling, wait until the operator indicates that it is safe to remove the WriteOnly wrappers. Finally, wait for the operator to indicate that it is safe to execute cleanup.") do
(subcommand_options.scheduler_options ||= {})[:batch_finish] = true
s_opts[:batch_finish] = true
end
opts.on("--rollback-log=LOG_NAME", "A named log of applied operations which will be stored in the nameserver, and can later be rolled back with the `rollback` command. A log with the given name must not already exist.") do |rl|
(subcommand_options.scheduler_options ||= {})[:rollback_log] = rl.to_s
s_opts[:rollback_log] = rl.to_s
end
end

Expand Down
6 changes: 3 additions & 3 deletions test/scheduler_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
stub(@nameserver).dryrun? { false }

@transformations = {}
@scheduler = Gizzard::Transformation::Scheduler.new(@nameserver, 't', @transformations)
@scheduler = Gizzard::Transformation::Scheduler.new(@nameserver, @nameserver, 't', @transformations)
end

describe "busy_shards" do
Expand All @@ -33,7 +33,7 @@
it "returns a list of hosts over the threshold of copies per host" do
shards = []
stub(@nameserver).get_busy_shards { shards }
@scheduler = Gizzard::Transformation::Scheduler.new(@nameserver, 't', @transformations, :copies_per_host => 2)
@scheduler = Gizzard::Transformation::Scheduler.new(@nameserver, @nameserver, 't', @transformations, :copies_per_host => 2)

@scheduler.busy_hosts.should == Set.new

Expand All @@ -49,7 +49,7 @@
it "respects passed in extra hosts" do
shards = []
stub(@nameserver).get_busy_shards { shards }
@scheduler = Gizzard::Transformation::Scheduler.new(@nameserver, 't', @transformations, :copies_per_host => 2)
@scheduler = Gizzard::Transformation::Scheduler.new(@nameserver, @nameserver, 't', @transformations, :copies_per_host => 2)

@scheduler.busy_hosts.should == Set.new
@scheduler.busy_hosts(["127.0.0.1"]).should == Set.new
Expand Down

0 comments on commit 8d6bf80

Please sign in to comment.