Permalink
Browse files

embedded test server, basic host management commands

  • Loading branch information...
1 parent ee6fa0c commit ef51a62de2fa6aed7dce218497ec06378321238e @freels freels committed Nov 24, 2010
Showing with 427 additions and 75 deletions.
  1. +11 −26 Rakefile
  2. +49 −14 lib/gizzard/commands.rb
  3. +76 −32 lib/gizzard/thrift.rb
  4. +14 −2 lib/gizzmo.rb
  5. +164 −0 test/gizzmo_spec.rb
  6. +112 −0 test/spec_helper.rb
  7. +1 −1 test/test_server/project/build.properties
View
@@ -1,3 +1,4 @@
+ROOT_DIR = File.expand_path(File.dirname(__FILE__))
require 'rubygems'
require 'rake'
@@ -13,39 +14,23 @@ begin
end
Jeweler::GemcutterTasks.new
rescue LoadError
- puts "Jeweler (or a dependency) not available. Install it with: gem install jeweler"
-end
-
-require 'rake/testtask'
-Rake::TestTask.new(:test) do |test|
- test.libs << 'lib' << 'test'
- test.pattern = 'test/**/test_*.rb'
- test.verbose = true
+ $stderr.puts "Jeweler (or a dependency) not available. Install it with: gem install jeweler"
end
begin
- require 'rcov/rcovtask'
- Rcov::RcovTask.new do |test|
- test.libs << 'test'
- test.pattern = 'test/**/test_*.rb'
- test.verbose = true
+ require 'spec/rake/spectask'
+ Spec::Rake::SpecTask.new(:spec) do |t|
+ spec_opts = File.expand_path('spec/spec.opts', ROOT_DIR)
+ if File.exist? spec_opts
+ t.spec_opts = ['--options', "\"#{spec_opts}\""]
+ end
+ t.spec_files = FileList['test/**/*_spec.rb']
end
rescue LoadError
- task :rcov do
- abort "RCov is not available. In order to run rcov, you must: sudo gem install spicycode-rcov"
- end
-end
-
-task :test do
- puts
- puts "=" * 79
- puts "You might want to read the README before running tests."
- puts "=" * 79
- sleep 2
- exec File.join(File.dirname(__FILE__), "test", "test.sh")
+ $stderr.puts "RSpec required to run tests."
end
-task :default => :test
+task :default => :spec
require 'rake/rdoctask'
Rake::RDocTask.new do |rdoc|
View
@@ -41,13 +41,13 @@ def output(string)
end
end
end
-
+
class RetryProxy
def initialize(retries, object)
@inner = object
@retries_left = retries
end
-
+
def method_missing(*args)
@inner.send(*args)
rescue
@@ -63,15 +63,15 @@ def method_missing(*args)
class ShardCommand < Command
def self.make_service(global_options, log)
- RetryProxy.new global_options.retry.to_i,
- Gizzard::Thrift::ShardManager.new(global_options.host, global_options.port, log, global_options.dry)
+ RetryProxy.new global_options.retry.to_i,
+ Gizzard::Thrift::Manager.new(global_options.host, global_options.port, log, global_options.dry)
end
end
class JobCommand < Command
def self.make_service(global_options, log)
- RetryProxy.new global_options.retry.to_i ,
- Gizzard::Thrift::JobManager.new(global_options.host, global_options.port + 2, log, global_options.dry)
+ RetryProxy.new global_options.retry.to_i,
+ Gizzard::Thrift::JobInjector.new(global_options.host, global_options.port + 2, log, global_options.dry)
end
end
@@ -103,7 +103,7 @@ def run
class ForwardingsCommand < ShardCommand
def run
- service.get_forwardings().sort_by do |f|
+ service.get_forwardings.sort_by do |f|
[ ((f.table_id.abs << 1) + (f.table_id < 0 ? 1 : 0)), f.base_id ]
end.reject do |forwarding|
@command_options.table_ids && !@command_options.table_ids.include?(forwarding.table_id)
@@ -147,7 +147,7 @@ def down(id, depth = 0)
class ReloadCommand < ShardCommand
def run
if global_options.force || ask
- service.reload_forwardings
+ service.reload_config
else
STDERR.puts "aborted"
end
@@ -202,7 +202,7 @@ def run
downward_links = service.list_downward_links(shard_id)
if upward_links.length == 0 or downward_links.length == 0
- STDERR.puts "Shard #{shard_id_string} must not be a root or leaf"
+ STDERR.puts "Shard #{shard_id_string} must not be a root or leaf"
next
end
@@ -645,22 +645,25 @@ def run
class InjectCommand < JobCommand
def run
+ count = 0
+ page_size = 20
priority, *jobs = @argv
help!("Requires priority") unless priority and jobs.size > 0
- count = 0
- jobs.each do |job|
- service.inject_job(priority.to_i, job)
+
+ jobs.each_slice(page_size) do |js|
+ service.inject_jobs(js.map {|j| Gizzard::Thrift::Job.new(priority.to_i, j) })
+
count += 1
# FIXME add -q --quiet option
STDERR.print "."
- STDERR.print "#{count}" if count % 100 == 0
+ STDERR.print "#{count * page_size}" if count % 10 == 0
STDERR.flush
end
STDERR.print "\n"
end
end
- class FlushCommand < JobCommand
+ class FlushCommand < ShardCommand
def run
args = @argv[0]
help!("Requires --all, or a job priority id.") unless args || command_options.flush_all
@@ -671,4 +674,36 @@ def run
end
end
end
+
+
+ class AddHostCommand < ShardCommand
+ def run
+ hosts = @argv.map do |arg|
+ cluster, hostname, port = *arg.split(":")
+ help!("malformed host argument") unless [cluster, hostname, port].compact.length == 3
+
+ Gizzard::Thrift::Host.new(hostname, port.to_i, cluster, Gizzard::Thrift::HostStatus::Normal)
+ end
+
+ hosts.each {|h| service.add_remote_host(h) }
+ end
+ end
+
+ class RemoveHostCommand < ShardCommand
+ def run
+ host = @argv[0].split(":")
+ host.unshift nil if host.length == 2
+ cluster, hostname, port = *host
+
+ service.remove_remote_host(hostname, port.to_i)
+ end
+ end
+
+ class ListHostsCommand < ShardCommand
+ def run
+ service.list_remote_hosts.each do |host|
+ puts "#{[host.cluster, host.hostname, host.port].join(":")} #{host.status}"
+ end
+ end
+ end
end
View
@@ -9,7 +9,7 @@ def self.struct(*args)
T::StructType.new(*args)
end
- ShardException = T.make_exception(:ShardException,
+ GizzardException = T.make_exception(:GizzardException,
T::Field.new(:description, T::STRING, 1)
)
@@ -72,11 +72,6 @@ def to_unix
end
end
- ShardMigration = T.make_struct(:ShardMigration,
- T::Field.new(:source_id, struct(ShardId), 1),
- T::Field.new(:destination_id, struct(ShardId), 2)
- )
-
Forwarding = T.make_struct(:Forwarding,
T::Field.new(:table_id, T::I32, 1),
T::Field.new(:base_id, T::I64, 2),
@@ -90,6 +85,25 @@ def inspect
end
end
+ module HostStatus
+ Normal = 0
+ Offline = 1
+ Blocked = 2
+ end
+
+ Host = T.make_struct(:Host,
+ T::Field.new(:hostname, T::STRING, 1),
+ T::Field.new(:port, T::I32, 2),
+ T::Field.new(:cluster, T::STRING, 3),
+ T::Field.new(:status, T::I32, 4)
+ )
+
+ class Host
+ def inspect
+ "(#{hostname}:#{port} - #{cluster} (#{status})"
+ end
+ end
+
class GizzmoService < T::ThriftService
def initialize(host, port, log_path, dry_run = false)
super(host, port)
@@ -128,50 +142,80 @@ def printable(method_name, args, timestamp = false)
end
end
- class ShardManager < GizzmoService
- thrift_method :create_shard, void, field(:shard, struct(ShardInfo), 1), :throws => exception(ShardException)
- thrift_method :delete_shard, void, field(:id, struct(ShardId), 1), :throws => exception(ShardException)
- thrift_method :get_shard, struct(ShardInfo), field(:id, struct(ShardId), 1), :throws => exception(ShardException)
+ class Manager < GizzmoService
+ thrift_method :reload_config, void, :throws => exception(GizzardException)
+ thrift_method :rebuild_schema, void, :throws => exception(GizzardException)
- thrift_method :add_link, void, field(:up_id, struct(ShardId), 1), field(:down_id, struct(ShardId), 2), field(:weight, i32, 3), :throws => exception(ShardException)
- thrift_method :remove_link, void, field(:up_id, struct(ShardId), 1), field(:down_id, struct(ShardId), 2), :throws => exception(ShardException)
+ thrift_method :find_current_forwarding, struct(ShardInfo), field(:table_id, i32, 1), field(:id, i64, 2), :throws => exception(GizzardException)
- thrift_method :list_upward_links, list(struct(LinkInfo)), field(:id, struct(ShardId), 1), :throws => exception(ShardException)
- thrift_method :list_downward_links, list(struct(LinkInfo)), field(:id, struct(ShardId), 1), :throws => exception(ShardException)
- thrift_method :get_child_shards_of_class, list(struct(ShardInfo)), field(:parent_id, struct(ShardId), 1), field(:class_name, string, 2), :throws => exception(ShardException)
+ # Shard Tree Management
- thrift_method :mark_shard_busy, void, field(:id, struct(ShardId), 1), field(:busy, i32, 2), :throws => exception(ShardException)
- thrift_method :copy_shard, void, field(:source_id, struct(ShardId), 1), field(:destination_id, struct(ShardId), 2), :throws => exception(ShardException)
+ thrift_method :create_shard, void, field(:shard, struct(ShardInfo), 1), :throws => exception(GizzardException)
+ thrift_method :delete_shard, void, field(:id, struct(ShardId), 1), :throws => exception(GizzardException)
- thrift_method :set_forwarding, void, field(:forwarding, struct(Forwarding), 1), :throws => exception(ShardException)
- thrift_method :replace_forwarding, void, field(:old_id, struct(ShardId), 1), field(:new_id, struct(ShardId), 2), :throws => exception(ShardException)
- thrift_method :remove_forwarding, void, field(:forwarding, struct(Forwarding), 1), :throws => exception(ShardException)
+ thrift_method :add_link, void, field(:up_id, struct(ShardId), 1), field(:down_id, struct(ShardId), 2), field(:weight, i32, 3), :throws => exception(GizzardException)
+ thrift_method :remove_link, void, field(:up_id, struct(ShardId), 1), field(:down_id, struct(ShardId), 2), :throws => exception(GizzardException)
- thrift_method :get_forwarding, struct(Forwarding), field(:table_id, i32, 1), field(:base_id, i64, 2), :throws => exception(ShardException)
- thrift_method :get_forwarding_for_shard, struct(Forwarding), field(:shard_id, struct(ShardId), 1), :throws => exception(ShardException)
+ thrift_method :set_forwarding, void, field(:forwarding, struct(Forwarding), 1), :throws => exception(GizzardException)
+ thrift_method :replace_forwarding, void, field(:old_id, struct(ShardId), 1), field(:new_id, struct(ShardId), 2), :throws => exception(GizzardException)
+ thrift_method :remove_forwarding, void, field(:forwarding, struct(Forwarding), 1), :throws => exception(GizzardException)
- thrift_method :get_forwardings, list(struct(Forwarding)), :throws => exception(ShardException)
- thrift_method :reload_forwardings, void, :throws => exception(ShardException)
+ thrift_method :get_shard, struct(ShardInfo), field(:id, struct(ShardId), 1), :throws => exception(GizzardException)
+ thrift_method :shards_for_hostname, list(struct(ShardInfo)), field(:hostname, string, 1), :throws => exception(GizzardException)
+ thrift_method :get_busy_shards, list(struct(ShardInfo)), :throws => exception(GizzardException)
- thrift_method :find_current_forwarding, struct(ShardInfo), field(:table_id, i32, 1), field(:id, i64, 2), :throws => exception(ShardException)
+ thrift_method :list_upward_links, list(struct(LinkInfo)), field(:id, struct(ShardId), 1), :throws => exception(GizzardException)
+ thrift_method :list_downward_links, list(struct(LinkInfo)), field(:id, struct(ShardId), 1), :throws => exception(GizzardException)
+ thrift_method :get_forwarding, struct(Forwarding), field(:table_id, i32, 1), field(:base_id, i64, 2), :throws => exception(GizzardException)
+ thrift_method :get_forwarding_for_shard, struct(Forwarding), field(:shard_id, struct(ShardId), 1), :throws => exception(GizzardException)
+ thrift_method :get_forwardings, list(struct(Forwarding)), :throws => exception(GizzardException)
- thrift_method :shards_for_hostname, list(struct(ShardInfo)), field(:hostname, string, 1), :throws => exception(ShardException)
- thrift_method :get_busy_shards, list(struct(ShardInfo)), :throws => exception(ShardException)
- thrift_method :list_hostnames, list(string), :throws => exception(ShardException)
+ thrift_method :list_hostnames, list(string), :throws => exception(GizzardException)
- thrift_method :rebuild_schema, void, :throws => exception(ShardException)
- end
+ thrift_method :mark_shard_busy, void, field(:id, struct(ShardId), 1), field(:busy, i32, 2), :throws => exception(GizzardException)
+ thrift_method :copy_shard, void, field(:source_id, struct(ShardId), 1), field(:destination_id, struct(ShardId), 2), :throws => exception(GizzardException)
+
+
+ # Job Scheduler Management
- class JobManager < GizzmoService
thrift_method :retry_errors, void
thrift_method :stop_writes, void
thrift_method :resume_writes, void
+
thrift_method :retry_errors_for, void, field(:priority, i32, 1)
thrift_method :stop_writes_for, void, field(:priority, i32, 1)
thrift_method :resume_writes_for, void, field(:priority, i32, 1)
+
thrift_method :is_writing, bool, field(:priority, i32, 1)
- thrift_method :inject_job, void, field(:priority, i32, 1), field(:job, string, 2)
+
+
+ # Remote Host Cluster Management
+
+ thrift_method :add_remote_host, void, field(:host, struct(Host), 1)#, :throws => exception(GizzardException)
+ thrift_method :remove_remote_host, void, field(:hostname, string, 1), field(:port, i32, 2), :throws => exception(GizzardException)
+ thrift_method :set_remote_host_status, void, field(:hostname, string, 1), field(:port, i32, 2), field(:status, i32, 3), :throws => exception(GizzardException)
+ thrift_method :set_remote_cluster_status, void, field(:cluster, string, 1), field(:status, i32, 2), :throws => exception(GizzardException)
+
+ thrift_method :get_remote_host, struct(Host), field(:hostname, string, 1), field(:port, i32, 2), :throws => exception(GizzardException)
+ thrift_method :list_remote_clusters, list(string), :throws => exception(GizzardException)
+ thrift_method :list_remote_hosts, list(struct(Host)), :throws => exception(GizzardException)
+ thrift_method :list_remote_hosts_in_cluster, list(struct(Host)), field(:cluster, string, 1), :throws => exception(GizzardException)
+ end
+
+
+
+ Job = T.make_struct(:Job,
+ T::Field.new(:priority, T::I32, 1),
+ T::Field.new(:contents, T::STRING, 2)
+ )
+
+ JobException = T.make_exception(:JobException,
+ T::Field.new(:description, T::STRING, 1)
+ )
+
+ class JobInjector < GizzmoService
+ thrift_method :inject_jobs, void, field(:priority, list(struct(Job)), 1), :throws => exception(JobException)
end
end
end
View
@@ -228,6 +228,18 @@ def separators(opts, string)
opts.on("--all", "Flush all error queues.") do
subcommand_options.flush_all = true
end
+ end,
+ 'add-host' => OptionParser.new do |opts|
+ opts.banner = "Usage: #{zero} add-host HOSTS"
+ separators(opts, DOC_STRINGS["add-host"])
+ end,
+ 'remove-host' => OptionParser.new do |opts|
+ opts.banner = "Usage: #{zero} remove-host HOST"
+ separators(opts, DOC_STRINGS["remove-host"])
+ end,
+ 'list-hosts' => OptionParser.new do |opts|
+ opts.banner = "Usage: #{zero} list-hosts"
+ separators(opts, DOC_STRINGS["list-hosts"])
end
}
@@ -370,7 +382,7 @@ def custom_timeout(seconds)
end
end
-begin
+begin
custom_timeout(global_options.timeout) do
Gizzard::Command.run(subcommand_name, global_options, argv, subcommand_options, log)
end
@@ -382,7 +394,7 @@ def custom_timeout(seconds)
end
STDERR.puts subcommands[subcommand_name]
exit 1
-rescue ThriftClient::Simple::ThriftException, Gizzard::Thrift::ShardException, Errno::ECONNREFUSED => e
+rescue ThriftClient::Simple::ThriftException, Gizzard::Thrift::GizzardException, Errno::ECONNREFUSED => e
STDERR.puts e.message
exit 1
rescue Errno::EPIPE
Oops, something went wrong.

0 comments on commit ef51a62

Please sign in to comment.