Skip to content

Commit

Permalink
ScheduledTask in now Observable
Browse files Browse the repository at this point in the history
  • Loading branch information
jdantonio committed Nov 3, 2013
1 parent 0adb002 commit 1ce3ede
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Expand Up @@ -73,6 +73,7 @@ Several features from Erlang, Go, Clojure, Java, and JavaScript have been implem
* Java inspired [Thread Pools](https://github.com/jdantonio/concurrent-ruby/blob/master/md/thread_pool.md)
* Old school [events](http://msdn.microsoft.com/en-us/library/windows/desktop/ms682655.aspx) from back in my Visual C++ days
* Repeated task execution with Java inspired [TimerTask](https://github.com/jdantonio/concurrent-ruby/blob/master/md/timer_task.md) service
* Scheduled task execution with Java inspired [ScheduledTask](https://github.com/jdantonio/concurrent-ruby/blob/master/md/scheduled_task.md) service
* Erlang inspired [Supervisor](https://github.com/jdantonio/concurrent-ruby/blob/master/md/supervisor.md) for managing long-running threads

### Is it any good?
Expand Down
14 changes: 14 additions & 0 deletions lib/concurrent/scheduled_task.rb
@@ -1,10 +1,13 @@
require 'observer'

require 'concurrent/obligation'
require 'concurrent/runnable'

module Concurrent

class ScheduledTask
include Obligation
include Observable
include Runnable

attr_reader :schedule_time
Expand Down Expand Up @@ -53,6 +56,11 @@ def cancel
end
end

def add_observer(observer, func = :update)
return false unless @state == :pending || @state == :in_progress
super
end

protected

def on_task
Expand All @@ -69,10 +77,16 @@ def on_task
rescue => ex
@reason = ex
@state = :rejected
ensure
changed
end
end
end

if self.changed?
notify_observers(Time.now, self.value, @reason)
delete_observers
end
event.set
self.stop
end
Expand Down
34 changes: 34 additions & 0 deletions md/scheduled_task.md
@@ -0,0 +1,34 @@
# I'm late! For a very important date!

TBD

[ScheduledExecutorService](http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ScheduledExecutorService.html)

## Copyright

*Concurrent Ruby* is Copyright © 2013 [Jerry D'Antonio](https://twitter.com/jerrydantonio).
It is free software and may be redistributed under the terms specified in the LICENSE file.

## License

Released under the MIT license.

http://www.opensource.org/licenses/mit-license.php

> Permission is hereby granted, free of charge, to any person obtaining a copy
> of this software and associated documentation files (the "Software"), to deal
> in the Software without restriction, including without limitation the rights
> to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
> copies of the Software, and to permit persons to whom the Software is
> furnished to do so, subject to the following conditions:
>
> The above copyright notice and this permission notice shall be included in
> all copies or substantial portions of the Software.
>
> THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
> IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
> FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
> AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
> LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
> OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
> THE SOFTWARE.
108 changes: 108 additions & 0 deletions spec/concurrent/scheduled_task_spec.rb
Expand Up @@ -147,5 +147,113 @@ module Concurrent
task.should_not be_running
end
end

context 'observation' do

let(:clazz) do
Class.new do
attr_reader :value
attr_reader :reason
attr_reader :count
define_method(:update) do |time, value, reason|
@count = @count.to_i + 1
@value = value
@reason = reason
end
end
end

let(:observer) { clazz.new }

it 'returns true for an observer added while :pending' do
task = ScheduledTask.new(1){ 42 }
task.run!
task.add_observer(observer).should be_true
end

it 'returns true for an observer added while :in_progress' do
task = ScheduledTask.new(0.1){ sleep(1); 42 }
task.run!
sleep(0.2)
task.add_observer(observer).should be_true
end

it 'returns true for an observer added while not running' do
task = ScheduledTask.new(1){ 42 }
task.add_observer(observer).should be_true
end

it 'returns false for an observer added once :cancelled' do
task = ScheduledTask.new(1){ 42 }
task.run!
sleep(0.1)
task.cancel
sleep(0.1)
task.add_observer(observer).should be_false
end

it 'returns false for an observer added once :fulfilled' do
task = ScheduledTask.new(0.1){ 42 }
task.run!
sleep(0.2)
task.add_observer(observer).should be_false
end

it 'returns false for an observer added once :rejected' do
task = ScheduledTask.new(0.1){ raise StandardError }
task.run!
sleep(0.2)
task.add_observer(observer).should be_false
end

it 'notifies all observers on fulfillment' do
task = ScheduledTask.new(0.1){ 42 }
task.add_observer(observer)
task.run!
sleep(0.2)
task.value.should == 42
task.reason.should be_nil
observer.value.should == 42
observer.reason.should be_nil
end

it 'notifies all observers on rejection' do
task = ScheduledTask.new(0.1){ raise StandardError }
task.add_observer(observer)
task.run!
sleep(0.2)
task.value.should be_nil
task.reason.should be_a(StandardError)
observer.value.should be_nil
observer.reason.should be_a(StandardError)
end

it 'does not notify an observer added after fulfillment' do
observer.should_not_receive(:update).with(any_args())
task = ScheduledTask.new(0.1){ 42 }
sleep(0.2)
task.add_observer(observer)
sleep(0.1)
end

it 'does not notify an observer added after rejection' do
observer.should_not_receive(:update).with(any_args())
task = ScheduledTask.new(0.1){ raise StandardError }
sleep(0.2)
task.add_observer(observer)
sleep(0.1)
end

it 'does not notify an observer added after cancellation' do
observer.should_not_receive(:update).with(any_args())
task = ScheduledTask.new(0.5){ 42 }
task.run!
sleep(0.1)
task.cancel
sleep(0.1)
task.add_observer(observer)
sleep(0.5)
end
end
end
end

0 comments on commit 1ce3ede

Please sign in to comment.