From 1ce3ede07e8555a2c32a77156cdb7cfa791a4ba1 Mon Sep 17 00:00:00 2001 From: Jerry D'Antonio Date: Sun, 3 Nov 2013 17:23:27 -0500 Subject: [PATCH] ScheduledTask in now Observable --- README.md | 1 + lib/concurrent/scheduled_task.rb | 14 ++++ md/scheduled_task.md | 34 ++++++++ spec/concurrent/scheduled_task_spec.rb | 108 +++++++++++++++++++++++++ 4 files changed, 157 insertions(+) create mode 100644 md/scheduled_task.md diff --git a/README.md b/README.md index c93909456..935af11b7 100644 --- a/README.md +++ b/README.md @@ -73,6 +73,7 @@ Several features from Erlang, Go, Clojure, Java, and JavaScript have been implem * Java inspired [Thread Pools](https://github.com/jdantonio/concurrent-ruby/blob/master/md/thread_pool.md) * Old school [events](http://msdn.microsoft.com/en-us/library/windows/desktop/ms682655.aspx) from back in my Visual C++ days * Repeated task execution with Java inspired [TimerTask](https://github.com/jdantonio/concurrent-ruby/blob/master/md/timer_task.md) service +* Scheduled task execution with Java inspired [ScheduledTask](https://github.com/jdantonio/concurrent-ruby/blob/master/md/scheduled_task.md) service * Erlang inspired [Supervisor](https://github.com/jdantonio/concurrent-ruby/blob/master/md/supervisor.md) for managing long-running threads ### Is it any good? diff --git a/lib/concurrent/scheduled_task.rb b/lib/concurrent/scheduled_task.rb index a447d6e31..dfdb435c3 100644 --- a/lib/concurrent/scheduled_task.rb +++ b/lib/concurrent/scheduled_task.rb @@ -1,3 +1,5 @@ +require 'observer' + require 'concurrent/obligation' require 'concurrent/runnable' @@ -5,6 +7,7 @@ module Concurrent class ScheduledTask include Obligation + include Observable include Runnable attr_reader :schedule_time @@ -53,6 +56,11 @@ def cancel end end + def add_observer(observer, func = :update) + return false unless @state == :pending || @state == :in_progress + super + end + protected def on_task @@ -69,10 +77,16 @@ def on_task rescue => ex @reason = ex @state = :rejected + ensure + changed end end end + if self.changed? + notify_observers(Time.now, self.value, @reason) + delete_observers + end event.set self.stop end diff --git a/md/scheduled_task.md b/md/scheduled_task.md new file mode 100644 index 000000000..7042c39f8 --- /dev/null +++ b/md/scheduled_task.md @@ -0,0 +1,34 @@ +# I'm late! For a very important date! + +TBD + +[ScheduledExecutorService](http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ScheduledExecutorService.html) + +## Copyright + +*Concurrent Ruby* is Copyright © 2013 [Jerry D'Antonio](https://twitter.com/jerrydantonio). +It is free software and may be redistributed under the terms specified in the LICENSE file. + +## License + +Released under the MIT license. + +http://www.opensource.org/licenses/mit-license.php + +> Permission is hereby granted, free of charge, to any person obtaining a copy +> of this software and associated documentation files (the "Software"), to deal +> in the Software without restriction, including without limitation the rights +> to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +> copies of the Software, and to permit persons to whom the Software is +> furnished to do so, subject to the following conditions: +> +> The above copyright notice and this permission notice shall be included in +> all copies or substantial portions of the Software. +> +> THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +> IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +> FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +> AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +> LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +> OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +> THE SOFTWARE. diff --git a/spec/concurrent/scheduled_task_spec.rb b/spec/concurrent/scheduled_task_spec.rb index ed8c2df23..f597a8153 100644 --- a/spec/concurrent/scheduled_task_spec.rb +++ b/spec/concurrent/scheduled_task_spec.rb @@ -147,5 +147,113 @@ module Concurrent task.should_not be_running end end + + context 'observation' do + + let(:clazz) do + Class.new do + attr_reader :value + attr_reader :reason + attr_reader :count + define_method(:update) do |time, value, reason| + @count = @count.to_i + 1 + @value = value + @reason = reason + end + end + end + + let(:observer) { clazz.new } + + it 'returns true for an observer added while :pending' do + task = ScheduledTask.new(1){ 42 } + task.run! + task.add_observer(observer).should be_true + end + + it 'returns true for an observer added while :in_progress' do + task = ScheduledTask.new(0.1){ sleep(1); 42 } + task.run! + sleep(0.2) + task.add_observer(observer).should be_true + end + + it 'returns true for an observer added while not running' do + task = ScheduledTask.new(1){ 42 } + task.add_observer(observer).should be_true + end + + it 'returns false for an observer added once :cancelled' do + task = ScheduledTask.new(1){ 42 } + task.run! + sleep(0.1) + task.cancel + sleep(0.1) + task.add_observer(observer).should be_false + end + + it 'returns false for an observer added once :fulfilled' do + task = ScheduledTask.new(0.1){ 42 } + task.run! + sleep(0.2) + task.add_observer(observer).should be_false + end + + it 'returns false for an observer added once :rejected' do + task = ScheduledTask.new(0.1){ raise StandardError } + task.run! + sleep(0.2) + task.add_observer(observer).should be_false + end + + it 'notifies all observers on fulfillment' do + task = ScheduledTask.new(0.1){ 42 } + task.add_observer(observer) + task.run! + sleep(0.2) + task.value.should == 42 + task.reason.should be_nil + observer.value.should == 42 + observer.reason.should be_nil + end + + it 'notifies all observers on rejection' do + task = ScheduledTask.new(0.1){ raise StandardError } + task.add_observer(observer) + task.run! + sleep(0.2) + task.value.should be_nil + task.reason.should be_a(StandardError) + observer.value.should be_nil + observer.reason.should be_a(StandardError) + end + + it 'does not notify an observer added after fulfillment' do + observer.should_not_receive(:update).with(any_args()) + task = ScheduledTask.new(0.1){ 42 } + sleep(0.2) + task.add_observer(observer) + sleep(0.1) + end + + it 'does not notify an observer added after rejection' do + observer.should_not_receive(:update).with(any_args()) + task = ScheduledTask.new(0.1){ raise StandardError } + sleep(0.2) + task.add_observer(observer) + sleep(0.1) + end + + it 'does not notify an observer added after cancellation' do + observer.should_not_receive(:update).with(any_args()) + task = ScheduledTask.new(0.5){ 42 } + task.run! + sleep(0.1) + task.cancel + sleep(0.1) + task.add_observer(observer) + sleep(0.5) + end + end end end