Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

wip

  • Loading branch information...
commit 0a674637adefcd63b6d220241bae6b757a6c2a39 1 parent a16f09c
@freels freels authored
View
1  gizzmo.gemspec
@@ -82,6 +82,7 @@ Gem::Specification.new do |s|
"test/gizzmo_spec.rb",
"test/helper.rb",
"test/nameserver_spec.rb",
+ "test/scheduler_spec.rb",
"test/shard_template_spec.rb",
"test/spec_helper.rb",
"test/test_server/target/gen-rb/test_server.rb",
View
35 lib/gizzard/commands.rb
@@ -812,7 +812,7 @@ def run
end
unless global_options.force
- puts "Continue? (y/n)"
+ print "Continue? (y/n) "; $stdout.flush
exit unless $stdin.gets.chomp == "y"
puts ""
end
@@ -826,33 +826,40 @@ def run
class TransformCommand < Command
def run
- help!("wrong number of arguments") unless @argv.length == 2
+ help!("must have an even number of arguments") unless @argv.length % 2 == 0
+
+ manifest = manager.manifest(*global_options.tables)
+ transformations = {}
+
+ @argv.each_slice(2) do |(from_template_s, to_template_s)|
+ from, to = [from_template_s, to_template_s].map {|s| ShardTemplate.parse(s) }
+ transformation = Transformation.new(from, to)
+ forwardings = manifest.templates[from]
+ trees = manifest.trees.reject {|(f, s)| !forwardings.include?(f) }
- from_template_s, to_template_s = @argv
+ transformations[transformation] = trees
+ end
- from, to = [from_template_s, to_template_s].map {|s| ShardTemplate.parse(s) }
- manifest = manager.manifest(*global_options.tables)
- forwardings = manifest.templates[from]
- transformation = Transformation.new(from, to)
- trees = manifest.trees.reject {|(f, s)| !forwardings.include?(f) }
- base_name = trees.values.first.id.table_prefix.split('_').first
+ base_name = transformations.values.first.values.first.id.table_prefix.split('_').first
unless global_options.force && command_options.quiet
- puts transformation.inspect
- puts "Applied to:"
- forwardings.sort.each {|f| puts " #{f.inspect}" }
+ transformations.each do |transformation, trees|
+ puts transformation.inspect
+ puts "Applied to:"
+ trees.keys.sort.each {|f| puts " #{f.inspect}" }
+ end
puts ""
end
unless global_options.force
- puts "Continue? (y/n)"
+ print "Continue? (y/n) "; $stdout.flush
exit unless $stdin.gets.chomp == "y"
puts ""
end
Gizzard.schedule! manager,
base_name,
- { transformation => trees },
+ transformations,
command_options.scheduler_options
end
end
View
2  lib/gizzard/thrift.rb
@@ -86,7 +86,7 @@ def <=>(o)
end
def inspect
- "[#{table_id}] #{base_id.to_s(16)} -> #{shard_id.inspect}"
+ "[#{table_id}] #{base_id.to_s(16)} = #{shard_id.inspect}"
end
end
View
28 lib/gizzard/transformation.rb
@@ -201,27 +201,15 @@ def involved_hosts(phase = :copy)
involved_shards(phase).map {|s| s.hostname }.uniq
end
- def inspect(phase = nil)
- base = "#{@forwarding.inspect}: #{from.inspect} => #{to.inspect}"
- op_inspect = transformation.operations.inject({}) do |h, (p, ops)|
- h.update p => ops.map {|job| " #{job.inspect}" }.join("\n")
- end
-
- prepare_inspect = op_inspect[:prepare].empty? ? "" : " PREPARE\n#{op_inspect[:prepare]}\n"
- copy_inspect = op_inspect[:copy].empty? ? "" : " COPY\n#{op_inspect[:copy]}\n"
- cleanup_inspect = op_inspect[:cleanup].empty? ? "" : " CLEANUP\n#{op_inspect[:cleanup]}\n"
+ def inspect
+ "#{@forwarding.inspect}: #{from.inspect} => #{to.inspect}"
+ end
- case phase
- when :all
- [base, "\n", prepare_inspect, copy_inspect, cleanup_inspect].join
- when :prepare
- [base, "\n", prepare_inspect].join
- when :copy
- [base, "\n", copy_inspect].join
- when :cleanup
- [base, "\n", cleanup_inspect].join
- else
- base
+ def copy_descs
+ transformation.operations[:copy].map do |copy|
+ from_id = copy.from.to_shard_id(@table_prefix, @translations)
+ to_id = copy.to.to_shard_id(@table_prefix, @translations)
+ "#{from_id.inspect} -> #{to_id.inspect}"
end
end
View
48 lib/gizzard/transformation_scheduler.rb
@@ -9,9 +9,9 @@ class Transformation::Scheduler
attr_reader :max_copies, :copies_per_host
DEFAULT_OPTIONS = {
- :max_copies => 30,
+ :max_copies => 30,
:copies_per_host => 8,
- :poll_interval => 5
+ :poll_interval => 10
}.freeze
def initialize(nameserver, base_name, transformations, options = {})
@@ -44,8 +44,16 @@ def initialize(nameserver, base_name, transformations, options = {})
# 4. schedule a new job or reload app servers.
def apply!
+ @start_time = Time.now
+
loop do
- reload_busy_shards
+ begin
+ reload_busy_shards
+ rescue GizzardException
+ sleep 10
+ next
+ end
+
cleanup_jobs
schedule_jobs(max_copies - busy_shards.length)
@@ -61,7 +69,7 @@ def apply!
nameserver.reload_config
- log "All transformations applied. Have a nice day!"
+ log "All transformations applied. Total time elapsed: #{time_elapsed}"
end
def schedule_jobs(num_to_schedule)
@@ -78,23 +86,22 @@ def schedule_jobs(num_to_schedule)
end
job
- end.compact
+ end.compact.sort_by {|t| t.forwarding }
unless jobs.empty?
- log "Jobs starting:"
- jobs.each {|j| log " #{j.inspect(:prepare)}" }
+ log "STARTING:"
+ jobs.each {|j| log " #{j.inspect}" }
jobs.each {|j| j.prepare!(nameserver) }
- log "Reloading nameserver configuration."
nameserver.reload_config
copy_jobs = jobs.select {|j| j.copy_required? }
unless copy_jobs.empty?
- log "Scheduling copies:"
+ log "COPIES:"
copy_jobs.each do |j|
- log " #{j.inspect(:copy)}"
+ j.copy_descs.each {|d| log " #{d}" }
j.copy!(nameserver)
end
end
@@ -108,8 +115,8 @@ def cleanup_jobs
@jobs_in_progress -= jobs
unless jobs.empty?
- log "Jobs finishing:"
- jobs.each {|j| log " #{j.inspect(:cleanup)}" }
+ log "FINISHING:"
+ jobs.each {|j| log " #{j.inspect}" }
end
jobs.each {|j| j.cleanup!(nameserver) }
@@ -141,7 +148,7 @@ def busy_hosts(extra_hosts = [])
h.update(host => 1) {|_,a,b| a + b }
end
- copies_count_map.select {|_, count| count >= @max_copies }.map {|(host, _)| host }
+ copies_count_map.select {|_, count| count >= @copies_per_host }.map {|(host, _)| host }
end
def reset_progress_string
@@ -151,6 +158,17 @@ def reset_progress_string
end
end
+ def time_elapsed
+ s = (Time.now - @start_time).to_i
+
+ days = s / (60 * 60 * 24) if s >= 60 * 60 * 24
+ hours = (s % (60 * 60 * 24)) / (60 * 60) if s >= 60 * 60
+ minutes = (s % (60 * 60)) / 60 if s >= 60
+ seconds = s % 60
+
+ [days,hours,minutes,seconds].compact.map {|i| "%0.2i" % i }.join(":")
+ end
+
def log(*args)
reset_progress_string
puts *args
@@ -164,10 +182,10 @@ def put_copy_progress
unless @jobs_in_progress.empty? || @busy_shards.empty?
if @progress_string
print "\r"
- print " " * @progress_string.length + 10
+ print " " * (@progress_string.length + 10)
print "\r"
end
- @progress_string = "#{spinner} Copies in progress: #{@busy_shards.length}"
+ @progress_string = "#{spinner} Copies in progress: #{@busy_shards.length} Time elapsed: #{time_elapsed}"
print @progress_string; $stdout.flush
end
end
View
82 test/gizzmo_spec.rb
@@ -334,7 +334,7 @@ def nameserver
gizzmo "addforwarding 0 1 localhost/s_0_001_replicating"
gizzmo "-f reload"
- gizzmo('-f transform-tree "ReplicatingShard(1) -> (TestShard(localhost,1,Int,Int), TestShard(127.0.0.1))" localhost/s_0_001_replicating').should == <<-EOF
+ gizzmo('-f transform-tree --poll-interval=1 "ReplicatingShard(1) -> (TestShard(localhost,1,Int,Int), TestShard(127.0.0.1))" localhost/s_0_001_replicating').should == <<-EOF
ReplicatingShard(1) -> TestShard(localhost,1,Int,Int) => ReplicatingShard(1) -> (TestShard(localhost,1,Int,Int), TestShard(127.0.0.1,1)) :
PREPARE
create_shard(TestShard/127.0.0.1)
@@ -349,26 +349,13 @@ def nameserver
remove_link(ReplicatingShard -> WriteOnlyShard)
delete_shard(WriteOnlyShard)
-Jobs starting:
- [0] 1 -> localhost/s_0_001_replicating: ReplicatingShard(1) -> TestShard(localhost,1,Int,Int) => ReplicatingShard(1) -> (TestShard(localhost,1,Int,Int), TestShard(127.0.0.1,1))
- PREPARE
- create_shard(TestShard/127.0.0.1)
- create_shard(WriteOnlyShard)
- add_link(WriteOnlyShard -> TestShard/127.0.0.1)
- add_link(ReplicatingShard -> WriteOnlyShard)
-Reloading nameserver configuration.
-Scheduling copies:
- [0] 1 -> localhost/s_0_001_replicating: ReplicatingShard(1) -> TestShard(localhost,1,Int,Int) => ReplicatingShard(1) -> (TestShard(localhost,1,Int,Int), TestShard(127.0.0.1,1))
- COPY
- copy_shard(TestShard/127.0.0.1)
-Jobs finishing:
- [0] 1 -> localhost/s_0_001_replicating: ReplicatingShard(1) -> TestShard(localhost,1,Int,Int) => ReplicatingShard(1) -> (TestShard(localhost,1,Int,Int), TestShard(127.0.0.1,1))
- CLEANUP
- add_link(ReplicatingShard -> TestShard/127.0.0.1)
- remove_link(WriteOnlyShard -> TestShard/127.0.0.1)
- remove_link(ReplicatingShard -> WriteOnlyShard)
- delete_shard(WriteOnlyShard)
-All transformations applied. Have a nice day!
+STARTING:
+ [0] 1 = localhost/s_0_001_replicating: ReplicatingShard(1) -> TestShard(localhost,1,Int,Int) => ReplicatingShard(1) -> (TestShard(localhost,1,Int,Int), TestShard(127.0.0.1,1))
+COPIES:
+ localhost/s_0_001_a -> 127.0.0.1/s_0_0001
+FINISHING:
+ [0] 1 = localhost/s_0_001_replicating: ReplicatingShard(1) -> TestShard(localhost,1,Int,Int) => ReplicatingShard(1) -> (TestShard(localhost,1,Int,Int), TestShard(127.0.0.1,1))
+All transformations applied. Total time elapsed: 10
EOF
nameserver[:shards].should == [ info("127.0.0.1", "s_0_0001", "TestShard"),
@@ -392,7 +379,7 @@ def nameserver
end
gizzmo "-f reload"
- gizzmo('-f -T 0 transform "ReplicatingShard -> TestShard(localhost,1,Int,Int)" "ReplicatingShard -> (TestShard(localhost,1,Int,Int), TestShard(127.0.0.1))"').should == <<-EOF
+ gizzmo('-f -T0 transform --poll-interval=1 "ReplicatingShard -> TestShard(localhost,1,Int,Int)" "ReplicatingShard -> (TestShard(localhost,1,Int,Int), TestShard(127.0.0.1))"').should == <<-EOF
ReplicatingShard(1) -> TestShard(localhost,1,Int,Int) => ReplicatingShard(1) -> (TestShard(localhost,1,Int,Int), TestShard(127.0.0.1,1)) :
PREPARE
create_shard(TestShard/127.0.0.1)
@@ -407,44 +394,19 @@ def nameserver
remove_link(ReplicatingShard -> WriteOnlyShard)
delete_shard(WriteOnlyShard)
Applied to:
- [0] 1 -> localhost/s_0_001_replicating
- [0] 2 -> localhost/s_0_002_replicating
-
-Jobs starting:
- [0] 1 -> localhost/s_0_001_replicating: ReplicatingShard(1) -> TestShard(localhost,1,Int,Int) => ReplicatingShard(1) -> (TestShard(localhost,1,Int,Int), TestShard(127.0.0.1,1))
- PREPARE
- create_shard(TestShard/127.0.0.1)
- create_shard(WriteOnlyShard)
- add_link(WriteOnlyShard -> TestShard/127.0.0.1)
- add_link(ReplicatingShard -> WriteOnlyShard)
- [0] 2 -> localhost/s_0_002_replicating: ReplicatingShard(1) -> TestShard(localhost,1,Int,Int) => ReplicatingShard(1) -> (TestShard(localhost,1,Int,Int), TestShard(127.0.0.1,1))
- PREPARE
- create_shard(TestShard/127.0.0.1)
- create_shard(WriteOnlyShard)
- add_link(WriteOnlyShard -> TestShard/127.0.0.1)
- add_link(ReplicatingShard -> WriteOnlyShard)
-Reloading nameserver configuration.
-Scheduling copies:
- [0] 1 -> localhost/s_0_001_replicating: ReplicatingShard(1) -> TestShard(localhost,1,Int,Int) => ReplicatingShard(1) -> (TestShard(localhost,1,Int,Int), TestShard(127.0.0.1,1))
- COPY
- copy_shard(TestShard/127.0.0.1)
- [0] 2 -> localhost/s_0_002_replicating: ReplicatingShard(1) -> TestShard(localhost,1,Int,Int) => ReplicatingShard(1) -> (TestShard(localhost,1,Int,Int), TestShard(127.0.0.1,1))
- COPY
- copy_shard(TestShard/127.0.0.1)
-Jobs finishing:
- [0] 1 -> localhost/s_0_001_replicating: ReplicatingShard(1) -> TestShard(localhost,1,Int,Int) => ReplicatingShard(1) -> (TestShard(localhost,1,Int,Int), TestShard(127.0.0.1,1))
- CLEANUP
- add_link(ReplicatingShard -> TestShard/127.0.0.1)
- remove_link(WriteOnlyShard -> TestShard/127.0.0.1)
- remove_link(ReplicatingShard -> WriteOnlyShard)
- delete_shard(WriteOnlyShard)
- [0] 2 -> localhost/s_0_002_replicating: ReplicatingShard(1) -> TestShard(localhost,1,Int,Int) => ReplicatingShard(1) -> (TestShard(localhost,1,Int,Int), TestShard(127.0.0.1,1))
- CLEANUP
- add_link(ReplicatingShard -> TestShard/127.0.0.1)
- remove_link(WriteOnlyShard -> TestShard/127.0.0.1)
- remove_link(ReplicatingShard -> WriteOnlyShard)
- delete_shard(WriteOnlyShard)
-All transformations applied. Have a nice day!
+ [0] 1 = localhost/s_0_001_replicating
+ [0] 2 = localhost/s_0_002_replicating
+
+STARTING:
+ [0] 1 = localhost/s_0_001_replicating: ReplicatingShard(1) -> TestShard(localhost,1,Int,Int) => ReplicatingShard(1) -> (TestShard(localhost,1,Int,Int), TestShard(127.0.0.1,1))
+ [0] 2 = localhost/s_0_002_replicating: ReplicatingShard(1) -> TestShard(localhost,1,Int,Int) => ReplicatingShard(1) -> (TestShard(localhost,1,Int,Int), TestShard(127.0.0.1,1))
+COPIES:
+ localhost/s_0_001_a -> 127.0.0.1/s_0_0001
+ localhost/s_0_002_a -> 127.0.0.1/s_0_0002
+FINISHING:
+ [0] 1 = localhost/s_0_001_replicating: ReplicatingShard(1) -> TestShard(localhost,1,Int,Int) => ReplicatingShard(1) -> (TestShard(localhost,1,Int,Int), TestShard(127.0.0.1,1))
+ [0] 2 = localhost/s_0_002_replicating: ReplicatingShard(1) -> TestShard(localhost,1,Int,Int) => ReplicatingShard(1) -> (TestShard(localhost,1,Int,Int), TestShard(127.0.0.1,1))
+All transformations applied. Total time elapsed: 01
EOF
nameserver[:shards].should == [ info("127.0.0.1", "s_0_0001", "TestShard"),
View
59 test/scheduler_spec.rb
@@ -0,0 +1,59 @@
+require File.expand_path('../spec_helper', __FILE__)
+
+describe Gizzard::Transformation::Scheduler do
+
+ before do
+ @nameserver = stub!.subject
+ stub(@nameserver).dryrun? { false }
+
+ @transformations = {}
+ @scheduler = Gizzard::Transformation::Scheduler.new(@nameserver, 't', @transformations)
+ end
+
+ describe "busy_shards" do
+ it "memoizes" do
+ shards = [info('127.0.0.1', 't_0_0001', 'TestShard')]
+ mock(@nameserver).get_busy_shards { shards }
+
+ @scheduler.busy_shards.should == shards.map {|s| s.id }
+ @scheduler.busy_shards.should == shards.map {|s| s.id }
+ end
+
+ it "resets after calling reload_busy_shards" do
+ shards = [info('127.0.0.1', 't_0_0001', 'TestShard')]
+ mock(@nameserver).get_busy_shards { shards }.twice
+
+ @scheduler.busy_shards.should == shards.map {|s| s.id }
+ @scheduler.reload_busy_shards
+ @scheduler.busy_shards.should == shards.map {|s| s.id }
+ end
+ end
+
+ describe "busy_hosts" do
+ it "returns a list of hosts over the threshold of copies per host" do
+ shards = []
+ stub(@nameserver).get_busy_shards { shards }
+ @scheduler = Gizzard::Transformation::Scheduler.new(@nameserver, 't', @transformations, :copies_per_host => 2)
+
+ @scheduler.busy_hosts.should == []
+
+ shards = [info('127.0.0.1', 't_0_0001', 'TestShard')]
+ @scheduler.reload_busy_shards
+ @scheduler.busy_hosts.should == []
+
+ shards = [info('127.0.0.1', 't_0_0001', 'TestShard'), info('127.0.0.1', 't_0_0002', 'TestShard')]
+ @scheduler.reload_busy_shards
+ @scheduler.busy_hosts.should == ['127.0.0.1']
+ end
+
+ it "respects passed in extra hosts" do
+ shards = []
+ stub(@nameserver).get_busy_shards { shards }
+ @scheduler = Gizzard::Transformation::Scheduler.new(@nameserver, 't', @transformations, :copies_per_host => 2)
+
+ @scheduler.busy_hosts.should == []
+ @scheduler.busy_hosts(["127.0.0.1"]).should == []
+ @scheduler.busy_hosts(["127.0.0.1", "127.0.0.1"]).should == ["127.0.0.1"]
+ end
+ end
+end
Please sign in to comment.
Something went wrong with that request. Please try again.