diff --git a/lib/perfectsched/application/decider.rb b/lib/perfectsched/application/decider.rb index 57fc226..7ab8b5e 100644 --- a/lib/perfectsched/application/decider.rb +++ b/lib/perfectsched/application/decider.rb @@ -40,7 +40,7 @@ def decide!(type, opts={}) begin m = method(type) rescue NameError - raise UndefinedDecisionError, "Undefined decision #{type} options=#{opt.inspect}" + raise UndefinedDecisionError, "Undefined decision #{type} options=#{opts.inspect}" end m.call(opts) end diff --git a/lib/perfectsched/backend/rdb_compat.rb b/lib/perfectsched/backend/rdb_compat.rb index f7167d3..9cdc757 100644 --- a/lib/perfectsched/backend/rdb_compat.rb +++ b/lib/perfectsched/backend/rdb_compat.rb @@ -20,6 +20,7 @@ module PerfectSched module Backend class RDBCompatBackend include BackendHelper + MAX_RETRY = 10 class Token < Struct.new(:row_id, :scheduled_time, :cron, :delay, :timezone) end @@ -76,13 +77,13 @@ def initialize(client, config) def init_database(options) sql = %[ CREATE TABLE IF NOT EXISTS `#{@table}` ( - id VARCHAR(256) NOT NULL, + id VARCHAR(255) NOT NULL, timeout INT NOT NULL, next_time INT NOT NULL, cron VARCHAR(128) NOT NULL, delay INT NOT NULL, - data BLOB NOT NULL, - timezone VARCHAR(256) NULL, + data LONGBLOB NOT NULL, + timezone VARCHAR(255) NULL, PRIMARY KEY (id) );] connect { @@ -97,7 +98,7 @@ def get_schedule_metadata(key, options={}) raise NotFoundError, "schedule key=#{key} does not exist" end attributes = create_attributes(row) - return ScheduleMetadata.new(@client, key, attributes) + return ScheduleWithMetadata.new(@client, key, attributes) } end diff --git a/lib/perfectsched/schedule_metadata.rb b/lib/perfectsched/schedule_metadata.rb index 157b629..29d7b42 100644 --- a/lib/perfectsched/schedule_metadata.rb +++ b/lib/perfectsched/schedule_metadata.rb @@ -57,24 +57,4 @@ def message @attributes[:message] end end - - class ScheduleMetadata - include Model - - def initialize(client, key, attributes) - super(client) - @key = key - end - - def schedule - Schedule.new(@client, @key) - end - - def inspect - "#<#{self.class} @key=#{@key.inspect} @attributes=#{@attributes.inspect}>" - end - - include ScheduleMetadataAccessors - end - end diff --git a/lib/perfectsched/worker.rb b/lib/perfectsched/worker.rb index f69ca2e..0a1db3c 100644 --- a/lib/perfectsched/worker.rb +++ b/lib/perfectsched/worker.rb @@ -88,10 +88,7 @@ def replace(command=[$0]+ARGV) begin return if @replaced_pid stop - @replaced_pid = Process.fork do - exec(*command) - exit!(127) - end + @replaced_pid = Process.spawn(*command) rescue @log.error "failed to replace: #{$!}" $!.backtrace.each {|bt| @log.warn "\t#{bt}" } diff --git a/perfectsched.gemspec b/perfectsched.gemspec index 303bab6..86f89ac 100644 --- a/perfectsched.gemspec +++ b/perfectsched.gemspec @@ -21,7 +21,8 @@ Gem::Specification.new do |gem| gem.add_dependency "tzinfo", "~> 1.1" gem.add_dependency "perfectqueue", "~> 0.8.41" gem.add_development_dependency "rake", "~> 0.9.2" - gem.add_development_dependency "rspec", "~> 2.10.0" + gem.add_development_dependency "rspec", "~> 3.4.0" gem.add_development_dependency "simplecov", "~> 0.10.0" gem.add_development_dependency "sqlite3", "~> 1.3.3" + gem.add_development_dependency "mysql2", "~> 0.3.20" end diff --git a/spec/application/base_spec.rb b/spec/application/base_spec.rb new file mode 100644 index 0000000..827b8a0 --- /dev/null +++ b/spec/application/base_spec.rb @@ -0,0 +1,81 @@ +require 'spec_helper' + +describe PerfectSched::Application::Base do + describe '.decider=' do + it 'defines .decider which returns the decider' do + decider_klass = double('decider_klass') + klass = PerfectSched::Application::Base + allow(klass).to receive(:decider).and_call_original + allow(klass).to receive(:decider=).with(decider_klass).and_call_original + expect(klass.decider = decider_klass).to eq(decider_klass) + expect(klass.decider).to eq(decider_klass) + end + end + + describe '.decider' do + it 'returns DefaultDecider' do + expect(PerfectSched::Application::Base.decider).to eq(PerfectSched::Application::DefaultDecider) + end + end + + describe '#new' do + let (:task){ double('task') } + let (:base) { PerfectSched::Application::Base.new(task) } + it 'calls super and set decider'do + expect(base).to be_an_instance_of(PerfectSched::Application::Base) + expect(base.instance_variable_get(:@task)).to eq(task) + expect(base.instance_variable_get(:@decider)).to be_an_instance_of(Application::DefaultDecider) + end + end + + describe '#run' do + let (:base) { PerfectSched::Application::Base.new(double('task')) } + it 'returns nil if before_perform returns false' do + allow(base).to receive(:before_perform).and_return(false) + expect(base.run).to be_nil + end + it 'returns nil' do + expect(base).to receive(:before_perform).exactly(:once).and_call_original + expect(base).to receive(:perform).exactly(:once).and_return(nil) + expect(base).to receive(:after_perform).exactly(:once).and_call_original + expect(base.run).to be_nil + end + it 'calls unexpected_error_raised on error' do + allow(base).to receive(:before_perform).exactly(:once).and_call_original + allow(base).to receive(:perform).exactly(:once) { raise } + allow(base).to receive(:decide!).with(:unexpected_error_raised, error: kind_of(Exception)).exactly(:once) + expect(base.run).to be_nil + end + end + + describe '#before_perform' do + let (:base) { PerfectSched::Application::Base.new(double('task')) } + it 'returns true' do + expect(base.before_perform).to be true + end + end + + describe '#after_perform' do + let (:base) { PerfectSched::Application::Base.new(double('task')) } + it 'returns nil' do + expect(base.after_perform).to be_nil + end + end + + describe '#decide!' do + let (:base) do + decider = double('decider') + expect(decider).to receive(:decide!).with(:type, :option).exactly(:once) + decider_klass = double('decider_klass') + allow(decider_klass).to receive(:new).with(kind_of(PerfectSched::Application::Base)).and_return(decider) + klass = PerfectSched::Application::Base + allow(klass).to receive(:decider).and_call_original + allow(klass).to receive(:decider=).with(decider_klass).and_call_original + klass.decider = decider_klass + klass.new(double('task')) + end + it 'calls decider.decide' do + expect(base.decide!(:type, :option)).to be_nil + end + end +end diff --git a/spec/application/decider_spec.rb b/spec/application/decider_spec.rb new file mode 100644 index 0000000..eec027a --- /dev/null +++ b/spec/application/decider_spec.rb @@ -0,0 +1,54 @@ +require 'spec_helper' + +describe PerfectSched::Application::UndefinedDecisionError do + it { is_expected.to be_an_instance_of(PerfectSched::Application::UndefinedDecisionError) } + it { is_expected.to be_a(Exception) } +end + +describe PerfectSched::Application::Decider do + let (:task){ double('task') } + let (:schedules){ double('schedules') } + let (:base){ double('base', schedules: schedules, task: task) } + let (:decider) { PerfectSched::Application::Decider.new(base) } + describe '#new' do + it 'returns a decider' do + expect(decider).to be_an_instance_of(PerfectSched::Application::Decider) + expect(decider.instance_variable_get(:@base)).to eq(base) + end + end + + describe '#schedules' do + it 'returns @base.schedules' do + expect(decider.schedules).to eq(schedules) + end + end + + describe '#task' do + let (:decider) do + base = double('base') + allow(base).to receive(:task).exactly(:once).and_return(task) + PerfectSched::Application::Decider.new(base) + end + it 'calls @base.task' do + expect(decider.task).to eq(task) + end + end + + describe '#decide!' do + it 'calls the specified method' do + opts = double('opts') + ret = double('ret') + allow(decider).to receive(:foo).exactly(:once).with(opts).and_return(ret) + expect(decider.decide!(:foo, opts)).to eq(ret) + end + it 'raises UndefinedDecisionError on unknown method' do + expect{ decider.decide!(:foo, double) }.to raise_error(PerfectSched::Application::UndefinedDecisionError) + end + end +end + +describe PerfectSched::Application::DefaultDecider do + subject { PerfectSched::Application::DefaultDecider.new(nil) } + it { is_expected.to be_a(PerfectSched::Application::Decider) } + it { is_expected.to be_an_instance_of(PerfectSched::Application::DefaultDecider) } +end diff --git a/spec/client_spec.rb b/spec/client_spec.rb new file mode 100644 index 0000000..b27fc7b --- /dev/null +++ b/spec/client_spec.rb @@ -0,0 +1,122 @@ +require 'spec_helper' + +describe Client do + let (:config){ {} } + let (:client){ Client.new(config) } + let (:ret){ double('ret') } + let (:backend){ double('backend') } + let (:options){ double('options') } + let (:task_token){ double('task_token') } + let (:key){ double('key') } + before do + allow(Backend).to receive(:new_backend) \ + .with(kind_of(Client), config).and_return(backend) + end + describe '.new' do + subject { client } + it { + is_expected.to be_an_instance_of(Client) } + end + describe '#backend' do + subject { client.backend } + it { is_expected.to eq backend } + end + describe '#config' do + subject { client.config } + it { is_expected.to eq config } + end + describe '#init_database' do + subject { client.init_database(options) } + before { expect(backend).to receive(:init_database).with(options).and_return(ret) } + it { is_expected.to eq ret } + end + describe '#get_schedule_metadata' do + subject { client.get_schedule_metadata(key, options) } + before { expect(backend).to receive(:get_schedule_metadata).with(key, options).and_return(ret) } + it { is_expected.to eq ret } + end + describe '#delete' do + subject { client.delete(key, options) } + before { expect(backend).to receive(:delete).with(key, options).and_return(ret) } + it { is_expected.to eq ret } + end + describe '#modify' do + subject { client.modify(key, options) } + before { expect(backend).to receive(:modify).with(key, options).and_return(ret) } + it { is_expected.to eq ret } + end + describe '#list' do + let (:pr){ proc{} } + subject { client.list(key, &pr) } + before { expect(backend).to receive(:list).with(key){|*_, &b| expect(b).to eq pr; ret} } + it { is_expected.to eq ret } + end + describe '#acquire' do + let (:alive_time){ double('alive_time') } + let (:max_acquire){ double('max_acquire') } + subject { client.acquire(options) } + before { expect(backend).to receive(:acquire).with(alive_time, max_acquire, options).and_return(ret) } + context 'options are given' do + let (:options){ {alive_time: alive_time, max_acquire: max_acquire} } + it { is_expected.to eq ret } + end + context 'alive_time options is not given' do + let (:max_acquire){ 1 } + let (:options){ {} } + let (:config){ {alive_time: alive_time} } + it { is_expected.to eq ret } + end + end + describe '#release' do + let (:alive_time){ double('alive_time') } + subject { client.release(task_token, options) } + before { expect(backend).to receive(:release).with(task_token, alive_time, options).and_return(ret) } + context 'alive_time options is given' do + let (:options){ {alive_time: alive_time} } + it { is_expected.to eq ret } + end + context 'alive_time options is not given' do + let (:options){ {} } + let (:config){ {alive_time: alive_time} } + it { is_expected.to eq ret } + end + end + describe '#heartbeat' do + let (:alive_time){ double('alive_time') } + subject { client.heartbeat(task_token, options) } + before { expect(backend).to receive(:heartbeat).with(task_token, alive_time, options).and_return(ret) } + context 'alive_time options is given' do + let (:options){ {alive_time: alive_time} } + it { is_expected.to eq ret } + end + context 'alive_time options is not given' do + let (:options){ {} } + let (:config){ {alive_time: alive_time} } + it { is_expected.to eq ret } + end + end + describe '#retry' do + let (:retry_wait){ double('retry_wait') } + subject { client.retry(task_token, options) } + before { expect(backend).to receive(:heartbeat).with(task_token, retry_wait, options).and_return(ret) } + context 'retry_wait options is given' do + let (:options){ {retry_wait: retry_wait} } + it { is_expected.to eq ret } + end + context 'retry_wait options is not given' do + let (:options){ {} } + let (:config){ {retry_wait: retry_wait} } + it { is_expected.to eq ret } + end + end + describe '#finish' do + subject { client.finish(task_token, options) } + before { expect(backend).to receive(:finish).with(task_token, options).and_return(ret) } + it { is_expected.to eq ret } + end + describe '#close' do + subject { client.close } + before { expect(backend).to receive(:close).with(no_args).and_return(ret) } + it { is_expected.to eq ret } + end +end diff --git a/spec/engine_spec.rb b/spec/engine_spec.rb new file mode 100644 index 0000000..30750e6 --- /dev/null +++ b/spec/engine_spec.rb @@ -0,0 +1,82 @@ +require 'spec_helper' + +describe PerfectSched::Engine do + let (:logger){ double('logger').as_null_object } + let (:runner){ double('runner') } + let (:scheds){ double('scheds') } + let (:config){ {logger: logger} } + let (:engine) do + Engine.new(runner, config) + end + before do + expect(PerfectSched).to receive(:open).with(config).and_return(scheds) + end + + describe '.new' do + it 'returns an Engine' do + engine = Engine.new(runner, config) + expect(engine).to be_an_instance_of(Engine) + expect(engine.instance_variable_get(:@runner)).to eq(runner) + expect(engine.instance_variable_get(:@poll_interval)).to eq(1.0) + expect(engine.instance_variable_get(:@log)).to eq(logger) + expect(engine.instance_variable_get(:@running_flag)).to be_a(BlockingFlag) + expect(engine.instance_variable_get(:@finish_flag)).to be_a(BlockingFlag) + expect(engine.instance_variable_get(:@scheds)).to be_a(PerfectSched) + end + end + + describe '#run' do + it 'runs until stopped' do + rflag = engine.instance_variable_get(:@running_flag) + fflag = engine.instance_variable_get(:@finish_flag) + task1 = double('task1') + task2 = double('task2') + allow(scheds).to receive(:poll).and_return(task1, nil, task2) + expect(runner).to receive(:new).exactly(:twice) do |task| + expect(rflag.set?).to be true + r = double('r') + case task + when task1 + expect(r).to receive(:run) + when task2 + expect(r).to receive(:run){ fflag.set! } + else + raise ArgumentError + end + r + end + expect(engine.run).to eq(engine) + expect(rflag.set?).to be false + end + end + + describe '#stop' do + it 'sets finish_flag' do + expect(engine.stop).to eq(engine) + expect(engine.instance_variable_get(:@finish_flag).set?).to eq true + end + end + + describe '#join' do + it 'waits running flag is set' do + expect(engine.join).to eq(engine) + expect(engine.instance_variable_get(:@running_flag).set?).to eq false + end + end + + describe '#close' do + it 'closes scheds' do + expect(scheds).to receive(:close) + expect(engine.close).to eq(engine) + end + end + + describe '#shutdown' do + it 'calls stop, join, and close' do + expect(engine).to receive(:stop) + expect(engine).to receive(:join) + expect(engine).to receive(:close).and_return(engine) + expect(engine.shutdown).to eq(engine) + end + end +end diff --git a/spec/model_spec.rb b/spec/model_spec.rb new file mode 100644 index 0000000..5f66554 --- /dev/null +++ b/spec/model_spec.rb @@ -0,0 +1,24 @@ +require 'spec_helper' + +describe PerfectSched::Model do + let (:config){ double('config') } + let (:client){ double('client', config: config) } + let (:klass){ Class.new{include PerfectSched::Model} } + let (:model){ klass.new(client) } + describe '.new' do + it 'creates an instance' do + expect(model).to be_a(PerfectSched::Model) + expect(model.instance_variable_get(:@client)).to eq(client) + end + end + describe '#client' do + it 'returns its client' do + expect(model.client).to eq(client) + end + end + describe '#config' do + it 'returns its client.config' do + expect(model.config).to eq(client.config) + end + end +end diff --git a/spec/perfect_sched_spec.rb b/spec/perfect_sched_spec.rb new file mode 100644 index 0000000..3a6811e --- /dev/null +++ b/spec/perfect_sched_spec.rb @@ -0,0 +1,66 @@ +require 'spec_helper' + +describe PerfectSched do + context '.open' do + let (:config){ double('config') } + let (:client){ double('client') } + let (:schedule_collection){ double('schedule_collection') } + before do + expect(Client).to receive(:new).with(config).and_return(client) + expect(ScheduleCollection).to receive(:new).with(client).and_return(schedule_collection) + end + it 'returns an instance without block' do + expect(client).not_to receive(:close) + expect(PerfectSched.open(config)).to eq(schedule_collection) + end + it 'yields block if given' do + ret = double('ret') + expect(client).to receive(:close) + r = PerfectSched.open(config) do |sc| + expect(sc).to eq(schedule_collection) + ret + end + expect(r).to eq(ret) + end + end + + context 'cron_time' do + it do + ts = PerfectSched.cron_time('0 * * * *', 0, nil) + expect(ts).not_to be_nil + end + it do + expect{PerfectSched.cron_time('0 * * * *', 0, 'JST-9')}.to raise_error(ArgumentError) + end + end + + context '.next_time' do + it do + ts = PerfectSched.next_time('0 * * * *', 0, nil) + expect(ts).not_to be_nil + end + xit 'calculates 4 years quickly' do + t = Time.utc(2012,2,29) + ts = PerfectSched.next_time('0 0 29 2 *', t.to_i, nil) + expect(ts).to eq(Time.utc(2016,2,29).to_i) + end + it do + expect{PerfectSched.next_time('0 * * * *', 0, 'JST-9')}.to raise_error(ArgumentError) + end + + context 'DST 2015' do + it 'can go through America/Los_Angeles transition' do + t0 = Time.new(2015, 3, 8, 1, 59, 59, -8*3600) + t1 = Time.new(2015, 3, 9, 2, 0, 0, -7*3600) # 2015-03-08T02:00:00 doesn't exist + ts = PerfectSched.next_time('* 2 * * *', t0.to_i, 'America/Los_Angeles') + expect(ts).to eq(t1.to_i) + end + it 'can go through America/Los_Angeles transition' do + t0 = Time.new(2015, 11, 1, 1, 0, 0, -7*3600) + t1 = Time.new(2015, 11, 1, 1, 0, 0, -8*3600) # 2015-11-01T01:00:00 exists twice + ts = PerfectSched.next_time('0 1 * * *', t0.to_i, 'America/Los_Angeles') + expect(ts).to eq(t1.to_i) + end + end + end +end diff --git a/spec/rdb_compat_backend_spec.rb b/spec/rdb_compat_backend_spec.rb index f384530..5fa19b1 100644 --- a/spec/rdb_compat_backend_spec.rb +++ b/spec/rdb_compat_backend_spec.rb @@ -2,40 +2,300 @@ require 'perfectsched/backend/rdb_compat' describe Backend::RDBCompatBackend do - let :sc do - FileUtils.rm_f 'spec/test.db' - sc = PerfectSched.open({:type=>'rdb_compat', :url=>'sqlite://spec/test.db', :table=>'test_scheds'}) - sc.client.init_database - sc + let (:now){ Time.now.to_i } + let (:client){ double('client') } + let (:config){ {url: 'sqlite://spec/test.db', table: 'test_scheds'} } + let (:db) do + d = Backend::RDBCompatBackend.new(client, config) + s = d.db + s.tables.each{|t| s.drop_table(t) } + d.init_database(nil) + d end - let :client do - sc.client + context 'compatibility' do + let :sc do + FileUtils.rm_f 'spec/test.db' + sc = PerfectSched.open({:type=>'rdb_compat', :url=>'sqlite://spec/test.db', :table=>'test_scheds'}) + sc.client.init_database + sc + end + + let :client do + sc.client + end + + let :backend do + client.backend + end + + it 'backward compatibility 1' do + backend.db["INSERT INTO test_scheds (id, timeout, next_time, cron, delay, data, timezone) VALUES (?, ?, ?, ?, ?, ?, ?)", "maint_sched.1.do_hourly", 1339812000, 1339812000, "0 * * * *", 0, {"account_id"=>1}.to_json, "UTC"].insert + ts = backend.acquire(60, 1, {:now=>1339812003}) + expect(ts).not_to eq(nil) + t = ts[0] + expect(t.data).to eq({'account_id'=>1}) + expect(t.type).to eq('maint_sched') + expect(t.key).to eq('maint_sched.1.do_hourly') + expect(t.next_time).to eq(1339812000) + end + + it 'backward compatibility 2' do + backend.db["INSERT INTO test_scheds (id, timeout, next_time, cron, delay, data, timezone) VALUES (?, ?, ?, ?, ?, ?, ?)", "merge", 1339812060, 1339812000, "@hourly", 60, '', "Asia/Tokyo"].insert + ts = backend.acquire(60, 1, {:now=>1339812060}) + t = ts[0] + expect(t.data).to eq({}) + expect(t.type).to eq('merge') + expect(t.key).to eq('merge') + expect(t.next_time).to eq(1339812000) + end end - let :backend do - client.backend + context '.new' do + let (:client){ double('client') } + let (:table){ double('table') } + it 'raises error unless url' do + expect{Backend::RDBCompatBackend.new(client, {})}.to raise_error(ConfigError) + end + it 'raises error unless table' do + expect{Backend::RDBCompatBackend.new(client, {url: ''})}.to raise_error(ConfigError) + end + it 'supports sqlite' do + config = {url: 'sqlite://localhost', table: table} + expect(Backend::RDBCompatBackend.new(client, config)).to be_an_instance_of(Backend::RDBCompatBackend) + end + it 'supports mysql' do + config = {url: 'mysql://root:@localhost/perfectsched_test', table: table} + expect(Backend::RDBCompatBackend.new(client, config)).to be_an_instance_of(Backend::RDBCompatBackend) + end + it 'doesn\'t support postgres' do + config = {url: 'postgres://localhost', table: table} + expect{Backend::RDBCompatBackend.new(client, config)}.to raise_error(ConfigError) + end end - it 'backward compatibility 1' do - backend.db["INSERT INTO test_scheds (id, timeout, next_time, cron, delay, data, timezone) VALUES (?, ?, ?, ?, ?, ?, ?)", "maint_sched.1.do_hourly", 1339812000, 1339812000, "0 * * * *", 0, {"account_id"=>1}.to_json, "UTC"].insert - ts = backend.acquire(60, 1, {:now=>1339812003}) - ts.should_not == nil - t = ts[0] - t.data.should == {'account_id'=>1} - t.type.should == 'maint_sched' - t.key.should == 'maint_sched.1.do_hourly' - t.next_time.should == 1339812000 + context '#init_database' do + it 'creates the table' do + db.add('key', 'test', '* * * * *', 0, 'Asia/Tokyo', {}, now, now, {}) + end end - it 'backward compatibility 2' do - backend.db["INSERT INTO test_scheds (id, timeout, next_time, cron, delay, data, timezone) VALUES (?, ?, ?, ?, ?, ?, ?)", "merge", 1339812060, 1339812000, "@hourly", 60, '', "Asia/Tokyo"].insert - ts = backend.acquire(60, 1, {:now=>1339812060}) - t = ts[0] - t.data.should == {} - t.type.should == 'merge' - t.key.should == 'merge' - t.next_time.should == 1339812000 + context '#get_schedule_metadata' do + before do + db.add('key', 'test', '* * * * *', 0, 'Asia/Tokyo', {}, now, now, {}) + end + it 'fetches a metadata' do + expect(db.get_schedule_metadata('key')).to be_an_instance_of(ScheduleWithMetadata) + end + it 'raises error if non exist key' do + expect{db.get_schedule_metadata('nonexistent')}.to raise_error(NotFoundError) + end + end + + context '#list' do + before do + db.add('key', 'test', '* * * * *', 0, 'Asia/Tokyo', {}, now, now, {}) + end + it 'lists a metadata' do + db.list(nil) do |x| + expect(x).to be_an_instance_of(PerfectSched::ScheduleWithMetadata) + expect(x.key).to eq('key') + end + end + end + + context '#add' do + it 'adds schedules' do + db.add('key', 'test', '* * * * *', 0, 'Asia/Tokyo', {}, now, now, {}) + expect{db.add('key', 'test', '* * * * *', 0, 'Asia/Tokyo', {}, now, now, {})}.to raise_error(IdempotentAlreadyExistsError) + end end -end + context '#delete' do + before do + db.add('key', 'test', '* * * * *', 0, 'Asia/Tokyo', {}, now, now, {}) + end + it 'deletes schedules' do + db.delete('key', nil) + expect{db.delete('key', nil)}.to raise_error(IdempotentNotFoundError) + end + end + + context '#modify' do + before do + db.add('key', 'test', '* * * * *', 0, 'Asia/Tokyo', {}, now, now, {}) + end + it 'returns nil if no keys' do + expect(db.modify('key', {})).to be_nil + end + it 'modifies schedules' do + db.modify('key', {delay: 1}) + end + it 'raises if nonexistent' do + expect{db.modify('nonexistent', {delay: 0})}.to raise_error(NotFoundError) + end + end + + context '#acquire' do + context 'no tasks' do + it 'returns nil' do + expect(db.acquire(0, nil, {})).to be_nil + end + end + context 'some tasks' do + before do + db.add('key1', 'test', '* * * * *', 0, 'Asia/Tokyo', {}, now, now, {}) + end + it 'returns a task' do + ary = db.acquire(0, nil, {}) + expect(ary).to be_an_instance_of(Array) + expect(ary[0]).to be_an_instance_of(Task) + end + end + context 'some tasks but conflict with another process' do + before do + db.add('key1', 'test', '* * * * *', 0, 'Asia/Tokyo', {}, now, now, {}) + db.add('key2', 'test', '* * * * *', 0, 'Asia/Tokyo', {}, now, now, {}) + db.add('key3', 'test', '* * * * *', 0, 'Asia/Tokyo', {}, now, now, {}) + end + it 'returns nil' do + data_set = double('data_set', update: 0) + allow(db.db).to receive(:[]).and_return(data_set) + expect(db.acquire(0, nil, {})).to be_nil + end + end + end + + context '#heartbeat' do + let (:next_time){ now } + let (:task_token){ Backend::RDBCompatBackend::Token.new('key', next_time, '* * * * *', 0, 'Asia/Tokyo') } + context 'have a scheduled task' do + before do + db.add('key', 'test', '* * * * *', 0, 'Asia/Tokyo', {}, next_time, next_time, {}) + end + it 'returns nil if next_run_time is not updated' do + expect(db.heartbeat(task_token, 0, {now: next_time})).to be_nil + end + it 'returns nil even if next_run_time is updated' do + expect(db.heartbeat(task_token, 1, {})).to be_nil + end + end + context 'no tasks' do + it 'raises PreemptedError' do + expect{db.heartbeat(task_token, 0, {})}.to raise_error(PreemptedError) + end + end + end + + context '#finish' do + let (:next_time){ now } + let (:task_token){ Backend::RDBCompatBackend::Token.new('key', next_time, '* * * * *', 0, 'Asia/Tokyo') } + context 'have the task' do + before do + db.add('key', 'test', '* * * * *', 0, 'Asia/Tokyo', {}, next_time, next_time, {}) + end + it 'returns nil' do + expect(db.finish(task_token, nil)).to be_nil + end + end + context 'already finished' do + it 'raises IdempotentAlreadyFinishedError' do + expect{db.finish(task_token, nil)}.to raise_error(IdempotentAlreadyFinishedError) + end + end + end + + context '#connect' do + context 'normal' do + let (:ret){ double('ret') } + it 'returns block result' do + expect(db.__send__(:connect){ ret }).to eq(ret) + end + end + context 'error' do + it 'returns block result' do + expect(RuntimeError).to receive(:new).exactly(Backend::RDBCompatBackend::MAX_RETRY).and_call_original + allow(STDERR).to receive(:puts) + allow(db).to receive(:sleep) + expect do + db.__send__(:connect) do + raise RuntimeError.new('try restarting transaction') + end + end.to raise_error(RuntimeError) + end + end + end + + context '#create_attributes' do + let (:data){ Hash.new } + let (:row) do + r = double('row') + allow(r).to receive(:[]){|k| data[k] } + r + end + it 'returns a hash consisting the data of the row' do + data[:timezone] = timezone = double('timezone') + data[:delay] = delay = double('delay') + data[:cron] = cron = double('cron') + data[:next_time] = next_time = double('next_time') + data[:timeout] = timeout = double('timeout') + data[:data] = '{"type":"foo.bar","a":"b"}' + data[:id] = 'hoge' + expect(db.__send__(:create_attributes, row)).to eq( + timezone: timezone, + delay: delay, + cron: cron, + data: {"a"=>"b"}, + next_time: next_time, + next_run_time: timeout, + type: 'foo.bar', + message: nil, + node: nil, + ) + end + it 'returns {} if data\'s JSON is broken' do + data[:data] = '}{' + data[:id] = 'foo.bar.baz' + expect(db.__send__(:create_attributes, row)).to eq( + timezone: 'UTC', + delay: 0, + cron: nil, + data: {}, + next_time: nil, + next_run_time: nil, + type: 'foo', + message: nil, + node: nil, + ) + end + it 'uses id[/\A[^.]*/] if type is empty string' do + data[:data] = '{"type":""}' + data[:id] = 'foo.bar.baz' + expect(db.__send__(:create_attributes, row)).to eq( + timezone: 'UTC', + delay: 0, + cron: nil, + data: {}, + next_time: nil, + next_run_time: nil, + type: 'foo', + message: nil, + node: nil, + ) + end + it 'uses id[/\A[^.]*/] if type is nil' do + data[:id] = 'foo.bar.baz' + expect(db.__send__(:create_attributes, row)).to eq( + timezone: 'UTC', + delay: 0, + cron: nil, + data: {}, + next_time: nil, + next_run_time: nil, + type: 'foo', + message: nil, + node: nil, + ) + end + end +end diff --git a/spec/runner_spec.rb b/spec/runner_spec.rb new file mode 100644 index 0000000..aa0217a --- /dev/null +++ b/spec/runner_spec.rb @@ -0,0 +1,23 @@ +require 'spec_helper' + +describe Runner do + let (:client){ double('client') } + let (:task){ double('task', client: client) } + let (:runner){ Runner.new(task) } + describe '.new' do + subject { runner } + it { is_expected.to be_an_instance_of(Runner) } + end + describe '#task' do + subject { runner.task } + it { is_expected.to eq task } + end + describe '#schedules' do + subject { runner.schedules } + it do + schedule_collection = double('schedule_collection') + expect(ScheduleCollection).to receive(:new).with(client).and_return(schedule_collection) + is_expected.to eq schedule_collection + end + end +end diff --git a/spec/schedule_collection_spec.rb b/spec/schedule_collection_spec.rb index 648c108..76dbcb0 100644 --- a/spec/schedule_collection_spec.rb +++ b/spec/schedule_collection_spec.rb @@ -9,7 +9,7 @@ end it 'is a ScheduleCollection' do - sc.class.should == PerfectSched::ScheduleCollection + expect(sc.class).to eq(PerfectSched::ScheduleCollection) end it 'succeess add' do @@ -19,9 +19,9 @@ it 'fail duplicated add' do sc.add('sched01', 't01', {:cron=>'* * * * *', :timezone=>'UTC'}) - lambda { + expect { sc.add('sched01', 't01', {:cron=>'* * * * *', :timezone=>'UTC'}) - }.should raise_error AlreadyExistsError + }.to raise_error AlreadyExistsError sc['sched01'].delete! @@ -34,25 +34,25 @@ s01 = sc.add('sched01', 't01', :cron=>"* * * * *", :data=>{'k'=>1}, :next_time=>time) t01 = sc.poll(:alive_time=>120, :now=>time) - t01.key.should == 'sched01' - t01.type.should == 't01' - t01.cron.should == "* * * * *" - t01.delay.should == 0 - t01.data.should == {'k'=>1} - t01.scheduled_time.should == time + expect(t01.key).to eq('sched01') + expect(t01.type).to eq('t01') + expect(t01.cron).to eq("* * * * *") + expect(t01.delay).to eq(0) + expect(t01.data).to eq({'k'=>1}) + expect(t01.scheduled_time).to eq(time) t02 = sc.poll(:alive_time=>120, :now=>time+60) - t02.should == nil + expect(t02).to eq(nil) t01.finish! t04 = sc.poll(:alive_time=>120, :now=>time+60) - t04.key.should == 'sched01' - t01.type.should == 't01' - t04.cron.should == "* * * * *" - t04.delay.should == 0 - t04.data.should == {'k'=>1} - t04.scheduled_time.should == time+60 + expect(t04.key).to eq('sched01') + expect(t01.type).to eq('t01') + expect(t04.cron).to eq("* * * * *") + expect(t04.delay).to eq(0) + expect(t04.data).to eq({'k'=>1}) + expect(t04.scheduled_time).to eq(time+60) end it 'timezone' do @@ -67,16 +67,16 @@ #s02.key.should == 'sched02' t01 = sc.poll(:alive_time=>86400, :now=>time) - t01.class.should == Task - t01.key.should == 'sched01' - t01.type.should == 't01' - t01.scheduled_time.should == time + expect(t01.class).to eq(Task) + expect(t01.key).to eq('sched01') + expect(t01.type).to eq('t01') + expect(t01.scheduled_time).to eq(time) t02 = sc.poll(:alive_time=>86400, :now=>time+54000) - t02.class.should == Task - t02.key.should == 'sched02' - t02.type.should == 't01' - t02.scheduled_time.should == time+54000 + expect(t02.class).to eq(Task) + expect(t02.key).to eq('sched02') + expect(t02.type).to eq('t01') + expect(t02.scheduled_time).to eq(time+54000) end it 'delay' do @@ -85,43 +85,43 @@ s01 = sc.add('sched01', 't01', :cron=>"0 * * * *", :delay=>30, :next_time=>time, :timezone=>'UTC') t01 = sc.poll(:alive_time=>86400, :now=>time) - t01.should == nil + expect(t01).to eq(nil) t02 = sc.poll(:alive_time=>86400, :now=>time+30) - t02.class.should == Task - t02.key.should == 'sched01' - t02.type.should == 't01' - t02.scheduled_time.should == time - t02.delay.should == 30 + expect(t02.class).to eq(Task) + expect(t02.key).to eq('sched01') + expect(t02.type).to eq('t01') + expect(t02.scheduled_time).to eq(time) + expect(t02.delay).to eq(30) t02.finish! t03 = sc.poll(:alive_time=>86400, :now=>time+3600) - t03.should == nil + expect(t03).to eq(nil) t04 = sc.poll(:alive_time=>86400, :now=>time+3630) - t04.class.should == Task - t04.key.should == 'sched01' - t04.type.should == 't01' - t04.scheduled_time.should == time+3600 - t04.delay.should == 30 + expect(t04.class).to eq(Task) + expect(t04.key).to eq('sched01') + expect(t04.type).to eq('t01') + expect(t04.scheduled_time).to eq(time+3600) + expect(t04.delay).to eq(30) end it 'invalid cron format' do - lambda { + expect { sc.add('sched01', 't01', :cron=>'???') - }.should raise_error ArgumentError + }.to raise_error ArgumentError - lambda { + expect { sc.add('sched01', 't01', :cron=>'* * * * * *') - }.should raise_error ArgumentError + }.to raise_error ArgumentError end it 'fail duplicated add' do sc.add('sched01', 't01', :cron=>"0 * * * *") - lambda { + expect { sc.add('sched01', 't01', :cron=>"0 * * * *") - }.should raise_error AlreadyExistsError + }.to raise_error AlreadyExistsError sc['sched01'].delete! @@ -142,31 +142,31 @@ a.sort_by! {|s| s.key } s01 = a.shift - s01.class.should == ScheduleWithMetadata - s01.key.should == 'sched01' - s01.type.should == 't01' - s01.cron.should == '0 * * * *' - s01.delay.should == 1 - s01.next_time.should == time - s01.next_run_time.should == time+1 + expect(s01.class).to eq(ScheduleWithMetadata) + expect(s01.key).to eq('sched01') + expect(s01.type).to eq('t01') + expect(s01.cron).to eq('0 * * * *') + expect(s01.delay).to eq(1) + expect(s01.next_time).to eq(time) + expect(s01.next_run_time).to eq(time+1) s02 = a.shift - s02.class.should == ScheduleWithMetadata - s02.key.should == 'sched02' - s02.type.should == 't02' - s02.cron.should == '0 * * * *' - s02.delay.should == 2 - s02.next_time.should == time - s02.next_run_time.should == time+2 + expect(s02.class).to eq(ScheduleWithMetadata) + expect(s02.key).to eq('sched02') + expect(s02.type).to eq('t02') + expect(s02.cron).to eq('0 * * * *') + expect(s02.delay).to eq(2) + expect(s02.next_time).to eq(time) + expect(s02.next_run_time).to eq(time+2) s03 = a.shift - s03.class.should == ScheduleWithMetadata - s03.key.should == 'sched03' - s03.type.should == 't03' - s03.cron.should == '0 * * * *' - s03.delay.should == 3 - s03.next_time.should == time - s03.next_run_time.should == time+3600 + expect(s03.class).to eq(ScheduleWithMetadata) + expect(s03.key).to eq('sched03') + expect(s03.type).to eq('t03') + expect(s03.cron).to eq('0 * * * *') + expect(s03.delay).to eq(3) + expect(s03.next_time).to eq(time) + expect(s03.next_run_time).to eq(time+3600) end end diff --git a/spec/schedule_spec.rb b/spec/schedule_spec.rb new file mode 100644 index 0000000..8c3e9b1 --- /dev/null +++ b/spec/schedule_spec.rb @@ -0,0 +1,66 @@ +require 'spec_helper' + +describe Schedule do + let (:schedule){ Schedule.new(client, key) } + let (:client){ double('client') } + let (:key){ double('key') } + let (:ret){ double('ret') } + let (:options){ double('options') } + describe '.new' do + subject { schedule } + it { is_expected.to be_an_instance_of(Schedule) } + end + describe '#key' do + subject { schedule.key } + it { is_expected.to eq key } + end + describe '#delete!' do + subject { schedule.delete!(options) } + it do + expect(client).to receive(:delete).with(key, options).and_return(ret) + is_expected.to eq ret + end + end + describe '#metadata' do + subject { schedule.metadata(options) } + it do + expect(client).to receive(:get_schedule_metadata).with(key, options).and_return(ret) + is_expected.to eq ret + end + end + describe '#exists?' do + subject { schedule.exists?(options) } + context 'metadata() works' do + it do + expect(schedule).to receive(:metadata).with(options) + is_expected.to eq true + end + end + context 'metadata() raises NotFoundError' do + it do + expect(schedule).to receive(:metadata).with(options).and_raise(NotFoundError) + is_expected.to be false + end + end + end + describe '#inspect' do + subject { schedule.inspect } + it { is_expected.to eq "#" } + end +end + +describe ScheduleWithMetadata do + let (:sm){ ScheduleWithMetadata.new(client, key, attributes) } + let (:client){ double('client') } + let (:key){ double('key') } + let (:ret){ double('ret') } + let (:attributes){ double('attributes') } + describe '.new' do + subject { sm } + it { is_expected.to be_an_instance_of(ScheduleWithMetadata) } + end + describe '#inspect' do + subject { sm.inspect } + it { is_expected.to eq "#" } + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 02e42b9..14dd46e 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -1,7 +1,5 @@ $LOAD_PATH.unshift(File.expand_path('../lib', File.dirname(__FILE__))) -require 'perfectsched' - if ENV['SIMPLE_COV'] require 'simplecov' SimpleCov.start do @@ -11,6 +9,8 @@ end end +require 'perfectsched' + if ENV['CI'] require 'coveralls' Coveralls.wear! diff --git a/spec/task_spec.rb b/spec/task_spec.rb new file mode 100644 index 0000000..bd2f25a --- /dev/null +++ b/spec/task_spec.rb @@ -0,0 +1,51 @@ +require 'spec_helper' + +describe PerfectSched::Task do + let (:client){ double('client') } + let (:key){ double('key') } + let (:attributes){ double('attributes') } + let (:scheduled_time){ double('scheduled_time') } + let (:task_token){ double('task_token') } + let (:task){ Task.new(client, key, attributes, scheduled_time, task_token) } + + describe '.new' do + it 'returns a Task' do + expect(task).to be_an_instance_of(Task) + expect(task.instance_variable_get(:@scheduled_time)).to eq(scheduled_time) + expect(task.instance_variable_get(:@task_token)).to eq(task_token) + end + end + + describe '#scheduled_time' do + it 'returns the scheduled_time' do + expect(task.scheduled_time).to eq(scheduled_time) + end + end + + describe '#release!' do + it 'calls client.release' do + options = double('options') + ret = double('ret') + expect(client).to receive(:release).with(task_token, options).and_return(ret) + expect(task.release!(options)).to eq(ret) + end + end + + describe '#retry!' do + it 'calls client.retry' do + options = double('options') + ret = double('ret') + expect(client).to receive(:retry).with(task_token, options).and_return(ret) + expect(task.retry!(options)).to eq(ret) + end + end + + describe '#finish!' do + it 'calls client.finish' do + options = double('options') + ret = double('ret') + expect(client).to receive(:finish).with(task_token, options).and_return(ret) + expect(task.finish!(options)).to eq(ret) + end + end +end diff --git a/spec/worker_spec.rb b/spec/worker_spec.rb index 838b2b5..f3a71b9 100644 --- a/spec/worker_spec.rb +++ b/spec/worker_spec.rb @@ -1,58 +1,201 @@ require 'spec_helper' -class TestHandler < PerfectSched::Application::Base - def run - puts "TestHandler: #{task}" - if task.data['raise_error'] - raise "expected error test" +describe Worker do + let (:logger){ double('logger').as_null_object } + let (:runner){ double('runner') } + let (:config){ {} } + let (:worker){ Worker.new(runner, config) } + before do + allow(DaemonsLogger).to receive(:new).and_return(logger) + end + + describe '.run' do + it 'calls Worker.new.run without block' do + expect(worker).to receive(:run).with(no_args) + expect(Worker).to receive(:new).with(runner, config) do |*args, &block| + expect(block).to be_nil + worker + end + Worker.run(runner, config) end - if num = task.data['sleep'] - sleep num + it 'calls Worker.new.run with block' do + expect(worker).to receive(:run).with(no_args) + expect(Worker).to receive(:new).with(runner, nil) do |*args, &block| + expect(block).to be_a(Proc) + worker + end + Worker.run(runner){ } end - puts "Task finished" end - def kill(reason) - puts "kill: #{reason.class}: #{reason}" + describe '.new' do + it 'creates a Worker without block' do + expect(worker).to be_an_instance_of(Worker) + expect(worker.instance_variable_get(:@runner)).to eq(runner) + expect(worker.instance_variable_get(:@config_load_proc)).to be_an_instance_of(Proc) + expect(worker.instance_variable_get(:@finished)).to be false + end + it 'creates a Worker with block' do + worker = Worker.new(runner){ } + expect(worker).to be_an_instance_of(Worker) + expect(worker.instance_variable_get(:@runner)).to eq(runner) + expect(worker.instance_variable_get(:@config_load_proc)).to be_an_instance_of(Proc) + expect(worker.instance_variable_get(:@finished)).to be false + end end -end - -class TestApp < PerfectSched::Application::Dispatch - route 'test' => TestHandler -end -describe Worker do - before do - create_test_sched.close - @worker = Worker.new(TestApp, test_sched_config) - @thread = Thread.new { - @worker.run - } + describe '#run' do + let (:sig) do + sig = double('sig') + expect(sig).to receive(:stop) + sig + end + let (:engine) do + engine = double('engine') + expect(engine).to receive(:run){ worker.instance_variable_set(:@finished, true) } + expect(engine).to receive(:shutdown) + engine + end + it 'creates a Worker without block' do + expect(worker).to receive(:install_signal_handlers).and_return(sig) + expect(Engine).to receive(:new).with(runner, kind_of(Hash)).and_return(engine) + expect(worker.instance_variable_get(:@runner)).to eq(runner) + expect(worker.instance_variable_get(:@config_load_proc)).to be_an_instance_of(Proc) + expect(worker.instance_variable_get(:@finished)).to be false + expect(worker.run).to be_nil + end + it 'rescues error' do + allow(worker).to receive(:install_signal_handlers).and_raise(RuntimeError) + expect(worker.run).to be_nil + end end - after do - @worker.stop - @thread.join + describe '#stop' do + context 'engine is not set' do + it 'returns true' do + expect(worker.stop).to be true + expect(worker.instance_variable_get(:@finished)).to be true + end + end + context 'engine is set' do + let (:engine){ double('engine') } + before do + worker.instance_variable_set(:@engine, engine) + end + context 'succeed to stop engine' do + before do + expect(engine).to receive(:stop) + end + it 'returns true' do + expect(worker.stop).to be true + expect(worker.instance_variable_get(:@finished)).to be true + end + end + context 'fail to stop engine' do + before do + expect(engine).to receive(:stop).and_raise(RuntimeError) + end + it 'returns false' do + expect(worker.stop).to be false + expect(worker.instance_variable_get(:@finished)).to be true + end + end + end end - def add(*args) - sc = get_test_sched - sc.add(*args) - sc.close + describe '#restart' do + let (:engine){ double('engine') } + let (:old_engine){ double('old_engine') } + before do + worker.instance_variable_set(:@engine, old_engine) + expect(Engine).to receive(:new).with(runner, kind_of(Hash)).and_return(engine) + end + context 'succeed to shutdown old engine' do + before do + expect(old_engine).to receive(:shutdown) + end + it 'returns true' do + expect(worker.restart).to be true + end + end + context 'fail to shutdown old engine' do + before do + expect(old_engine).to receive(:shutdown).and_raise(RuntimeError) + end + it 'returns false' do + expect(worker.restart).to be false + end + end end - it 'run' do - TestHandler.any_instance.should_receive(:run).once - add('key', 'test', {:cron=>'* * * * *', :next_time=>Time.now.to_i-60}) - sleep 2 + describe '#replace' do + context 'called at the first time' do + let (:pid){ double('pid') } + it 'returns self' do + expect(Process).to receive(:spawn).with(*([$0]+ARGV)).and_return(pid) + expect(worker).to receive(:stop) + expect(worker.replace).to eq(worker) + end + it 'returns self' do + command = double('command') + expect(Process).to receive(:spawn).with(command).and_return(pid) + expect(worker).to receive(:stop) + expect(worker.replace(command)).to eq(worker) + end + end + context 'already called' do + it 'returns true' do + expect(worker).to receive(:stop).and_raise(RuntimeError) + expect(worker.replace(double)).to be false + end + end end - it 'term signal' do - sleep 1 - Process.kill(:TERM, Process.pid) - puts "finish expected..." - @thread.join + describe '#logrotated' do + it 'returns true' do + expect(logger).to receive(:reopen!) + expect(worker.logrotated).to be true + end + it 'rescues error and returns false' do + expect(logger).to receive(:reopen!).and_raise(RuntimeError) + expect(worker.logrotated).to be false + end end + describe 'install_signal_handlers' do + let (:engine){ double('engine', shutdown: nil) } + before do + expect(Engine).to receive(:new).with(runner, kind_of(Hash)).and_return(engine) + end + it 'traps TERM and stop the worker' do + allow(engine).to receive(:run){ Process.kill(:TERM, $$) } + expect(worker).to receive(:stop).and_call_original + worker.run + end + it 'traps INT and stop the worker' do + allow(engine).to receive(:run){ Process.kill(:INT, $$) } + expect(worker).to receive(:stop).and_call_original + worker.run + end + it 'traps QUIT and stop the worker' do + allow(engine).to receive(:run){ Process.kill(:QUIT, $$) } + expect(worker).to receive(:stop).and_call_original + worker.run + end + it 'traps USR1 and restart the worker' do + allow(engine).to receive(:run){ Process.kill(:USR1, $$) } + expect(worker).to receive(:restart){ worker.stop } + worker.run + end + it 'traps HUP and restart the worker' do + allow(engine).to receive(:run){ Process.kill(:HUP, $$) } + expect(worker).to receive(:restart){ worker.stop } + worker.run + end + it 'traps USR2 and logrotated' do + allow(engine).to receive(:run){ Process.kill(:USR2, $$) } + expect(worker).to receive(:logrotated){ worker.stop } + worker.run + end + end end -