From 134f6209a230cda503a30071e25ac4feb9abf2bf Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Tue, 27 May 2014 21:22:11 +0200 Subject: [PATCH 01/13] Allow to silent experimental warning --- lib/concurrent/actress.rb | 13 +++++++++++-- spec/concurrent/actress_spec.rb | 1 + 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/lib/concurrent/actress.rb b/lib/concurrent/actress.rb index 2013def07..f2342f717 100644 --- a/lib/concurrent/actress.rb +++ b/lib/concurrent/actress.rb @@ -42,7 +42,9 @@ def on_message(message) # @param block for actress_class instantiation # @param args see {.spawn_optionify} def self.spawn(*args, &block) - warn '[EXPERIMENTAL] A full release of `Actress`, renamed `Actor`, is expected in the 0.7.0 release.' + experimental_acknowledged? or + warn '[EXPERIMENTAL] A full release of `Actress`, renamed `Actor`, is expected in the 0.7.0 release.' + if Actress.current Core.new(spawn_optionify(*args).merge(parent: Actress.current), &block).reference else @@ -52,7 +54,6 @@ def self.spawn(*args, &block) # as {.spawn} but it'll raise when Actor not initialized properly def self.spawn!(*args, &block) - warn '[EXPERIMENTAL] A full release of `Actress`, renamed `Actor`, is expected in the 0.7.0 release.' spawn(spawn_optionify(*args).merge(initialized: ivar = IVar.new), &block).tap { ivar.no_error! } end @@ -71,5 +72,13 @@ def self.spawn_optionify(*args) args: args[2..-1] } end end + + def self.i_know_it_is_experimental! + @experimental_acknowledged = true + end + + def self.experimental_acknowledged? + !!@experimental_acknowledged + end end end diff --git a/spec/concurrent/actress_spec.rb b/spec/concurrent/actress_spec.rb index a1dde4913..bd885d684 100644 --- a/spec/concurrent/actress_spec.rb +++ b/spec/concurrent/actress_spec.rb @@ -3,6 +3,7 @@ module Concurrent module Actress + i_know_it_is_experimental! describe 'Concurrent::Actress' do class Ping From aad40bd52031299a30b0988797a300470761adf0 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Tue, 27 May 2014 21:23:25 +0200 Subject: [PATCH 02/13] Remove with_full_reset method not to collide with global test reset --- spec/concurrent/configuration_spec.rb | 2 -- spec/concurrent/scheduled_task_spec.rb | 6 ++---- spec/concurrent/timer_task_spec.rb | 2 -- spec/support/example_group_extensions.rb | 12 ------------ 4 files changed, 2 insertions(+), 20 deletions(-) diff --git a/spec/concurrent/configuration_spec.rb b/spec/concurrent/configuration_spec.rb index 0e8b2ca0f..b2090f0aa 100644 --- a/spec/concurrent/configuration_spec.rb +++ b/spec/concurrent/configuration_spec.rb @@ -3,8 +3,6 @@ module Concurrent describe Configuration do - with_full_reset - it 'creates a global timer pool' do Concurrent.configuration.global_timer_set.should_not be_nil Concurrent.configuration.global_timer_set.should respond_to(:post) diff --git a/spec/concurrent/scheduled_task_spec.rb b/spec/concurrent/scheduled_task_spec.rb index 4922e5805..2757e7d2c 100644 --- a/spec/concurrent/scheduled_task_spec.rb +++ b/spec/concurrent/scheduled_task_spec.rb @@ -7,8 +7,6 @@ module Concurrent describe ScheduledTask do - with_full_reset - context 'behavior' do # obligation @@ -56,9 +54,9 @@ def execute_dereferenceable(subject) it_should_behave_like :dereferenceable # observable - + subject{ ScheduledTask.new(0.1){ nil } } - + def trigger_observable(observable) observable.execute sleep(0.2) diff --git a/spec/concurrent/timer_task_spec.rb b/spec/concurrent/timer_task_spec.rb index 6b47512d1..daf9bd5e4 100644 --- a/spec/concurrent/timer_task_spec.rb +++ b/spec/concurrent/timer_task_spec.rb @@ -5,8 +5,6 @@ module Concurrent describe TimerTask do - with_full_reset - before(:each) do # suppress deprecation warnings. Concurrent::TimerTask.any_instance.stub(:warn) diff --git a/spec/support/example_group_extensions.rb b/spec/support/example_group_extensions.rb index 2c02fa23f..05375f254 100644 --- a/spec/support/example_group_extensions.rb +++ b/spec/support/example_group_extensions.rb @@ -38,18 +38,6 @@ def kill_rogue_threads(warning = true) end class RSpec::Core::ExampleGroup - def self.with_full_reset - before(:each) do - reset_gem_configuration - end - - after(:each) do - Thread.list.each do |thread| - thread.kill unless thread == Thread.current - end - end - end - include Concurrent::TestHelpers extend Concurrent::TestHelpers end From cd657a92ed183e6d2886e5ad12a94fc96128e0fe Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Tue, 27 May 2014 21:24:32 +0200 Subject: [PATCH 03/13] Disable Actress tests temporarily. --- lib/concurrent/actress/core.rb | 2 + spec/concurrent/actress_spec.rb | 149 +++++++++++++---------- spec/support/example_group_extensions.rb | 2 + 3 files changed, 88 insertions(+), 65 deletions(-) diff --git a/lib/concurrent/actress/core.rb b/lib/concurrent/actress/core.rb index f51b8855c..7e13632c5 100644 --- a/lib/concurrent/actress/core.rb +++ b/lib/concurrent/actress/core.rb @@ -113,6 +113,8 @@ def terminated? # Terminates all its children, does not wait until they are terminated. def terminate! guard! + return nil if terminated? + @children.each do |ch| ch.send(:core).tap { |core| core.send(:schedule_execution) { core.terminate! } } end diff --git a/spec/concurrent/actress_spec.rb b/spec/concurrent/actress_spec.rb index bd885d684..74bc849a1 100644 --- a/spec/concurrent/actress_spec.rb +++ b/spec/concurrent/actress_spec.rb @@ -4,7 +4,26 @@ module Concurrent module Actress i_know_it_is_experimental! + + class Reference + def backdoor(&block) + core.send :schedule_execution do + core.instance_eval &block + end + end + end + describe 'Concurrent::Actress' do + prepend_before do + @do_not_reset = true + end + + def terminate_actors(*actors) + actors.each do |actor| + actor.backdoor { terminate! } + actor.terminated.wait + end + end class Ping include Context @@ -36,82 +55,80 @@ def on_message(message) # set_trace_func nil # end - #describe 'stress test' do - #pending('may cause deadlock which prevents test run from completing.') - #1.times do |i| - #it format('run %3d', i) do - ## puts format('run %3d', i) - #Array.new(10).map do - #Thread.new do - #10.times do - ## trace! do - #queue = Queue.new - #actor = Ping.spawn :ping, queue - - ## when spawn returns children are set - #Concurrent::Actress::ROOT.send(:core).instance_variable_get(:@children).should include(actor) - - #actor << 'a' << 1 - #queue.pop.should eq 'a' - #actor.ask(2).value.should eq 2 - - #actor.parent.should eq Concurrent::Actress::ROOT - #Concurrent::Actress::ROOT.path.should eq '/' - #actor.path.should eq '/ping' - #child = actor.ask(:child).value - #child.path.should eq '/ping/pong' - #queue.clear - #child.ask(3) - #queue.pop.should eq 3 - - #actor << :terminate - #actor.ask(:blow_up).wait.should be_rejected - #end - #end - #end.each(&:join) - #end - #end - #end + describe 'stress test' do + 1.times do |i| + it format('run %3d', i) do + # puts format('run %3d', i) + Array.new(10).map do + Thread.new do + 10.times do + # trace! do + queue = Queue.new + actor = Ping.spawn :ping, queue + + # when spawn returns children are set + Concurrent::Actress::ROOT.send(:core).instance_variable_get(:@children).should include(actor) + + actor << 'a' << 1 + queue.pop.should eq 'a' + actor.ask(2).value.should eq 2 + + actor.parent.should eq Concurrent::Actress::ROOT + Concurrent::Actress::ROOT.path.should eq '/' + actor.path.should eq '/ping' + child = actor.ask(:child).value + child.path.should eq '/ping/pong' + queue.clear + child.ask(3) + queue.pop.should eq 3 + + actor << :terminate + actor.ask(:blow_up).wait.should be_rejected + terminate_actors actor, child + end + end + end.each(&:join) + end + end + end describe 'spawning' do - #describe 'Actress#spawn' do - #behaviour = -> v { -> _ { v } } - #subjects = { spawn: -> { Actress.spawn(AdHoc, :ping, 'arg', &behaviour) }, - #context_spawn: -> { AdHoc.spawn(:ping, 'arg', &behaviour) }, - #spawn_by_hash: -> { Actress.spawn(class: AdHoc, name: :ping, args: ['arg'], &behaviour) }, - #context_spawn_by_hash: -> { AdHoc.spawn(name: :ping, args: ['arg'], &behaviour) } } - - #subjects.each do |desc, subject_definition| - #describe desc do - #subject &subject_definition - #its(:path) { pending('may cause deadlock which prevents test run from completing.'); should eq '/ping' } - #its(:parent) { pending('may cause deadlock which prevents test run from completing.'); should eq ROOT } - #its(:name) { pending('may cause deadlock which prevents test run from completing.'); should eq 'ping' } - #its(:executor) { pending('may cause deadlock which prevents test run from completing.'); should eq Concurrent.configuration.global_task_pool } - #its(:reference) { pending('may cause deadlock which prevents test run from completing.'); should eq subject } - #it 'returns ars' do - #subject.ask!(:anything).should eq 'arg' - #end - #end - #end - #end + describe 'Actress#spawn' do + behaviour = -> v { -> _ { v } } + subjects = { spawn: -> { Actress.spawn(AdHoc, :ping, 'arg', &behaviour) }, + context_spawn: -> { AdHoc.spawn(:ping, 'arg', &behaviour) }, + spawn_by_hash: -> { Actress.spawn(class: AdHoc, name: :ping, args: ['arg'], &behaviour) }, + context_spawn_by_hash: -> { AdHoc.spawn(name: :ping, args: ['arg'], &behaviour) } } + + subjects.each do |desc, subject_definition| + describe desc do + subject &subject_definition + after { terminate_actors subject } + its(:path) { should eq '/ping' } + its(:parent) { should eq ROOT } + its(:name) { should eq 'ping' } + it('executor should be global') { subject.executor.should eq Concurrent.configuration.global_task_pool } + its(:reference) { should eq subject } + it 'returns ars' do + subject.ask!(:anything).should eq 'arg' + end + end + end + end it 'terminates on failed initialization' do - pending('may cause deadlock which prevents test run from completing.') a = AdHoc.spawn(name: :fail, logger: Concurrent.configuration.no_logger) { raise } a.ask(nil).wait.rejected?.should be_true a.terminated?.should be_true end it 'terminates on failed initialization and raises with spawn!' do - pending('may cause deadlock which prevents test run from completing.') expect do AdHoc.spawn!(name: :fail, logger: Concurrent.configuration.no_logger) { raise 'm' } end.to raise_error(StandardError, 'm') end it 'terminates on failed message processing' do - pending('may cause deadlock which prevents test run from completing.') a = AdHoc.spawn(name: :fail, logger: Concurrent.configuration.no_logger) { -> _ { raise } } a.ask(nil).wait.rejected?.should be_true a.terminated?.should be_true @@ -121,11 +138,11 @@ def on_message(message) describe 'messaging' do subject { AdHoc.spawn(:add) { c = 0; -> v { c = c + v } } } specify do - pending('may cause deadlock which prevents test run from completing.') subject.tell(1).tell(1) subject << 1 << 1 subject.ask(0).value!.should eq 4 end + after { terminate_actors subject } end describe 'children' do @@ -142,23 +159,24 @@ def on_message(message) end it 'has children set after a child is created' do - pending('may cause deadlock which prevents test run from completing.') - child = parent.ask!(:child) + child = parent.ask!(:child) # FIXME may get stuck!! parent.ask!(nil).should include(child) child.ask!(nil).should eq parent + + terminate_actors parent, child end end describe 'envelope' do subject { AdHoc.spawn(:subject) { -> _ { envelope } } } specify do - pending('may cause deadlock which prevents test run from completing.') envelope = subject.ask!('a') envelope.should be_a_kind_of Envelope envelope.message.should eq 'a' envelope.ivar.should be_completed envelope.ivar.value.should eq envelope envelope.sender.should eq Thread.current + terminate_actors subject end end @@ -177,13 +195,14 @@ def on_message(message) end it 'terminates with all its children' do - pending('may cause deadlock which prevents test run from completing.') child = subject.ask! :child subject.terminated?.should be_false subject.ask(:terminate).wait subject.terminated?.should be_true child.terminated.wait child.terminated?.should be_true + + terminate_actors subject, child end end diff --git a/spec/support/example_group_extensions.rb b/spec/support/example_group_extensions.rb index 05375f254..2237fcd85 100644 --- a/spec/support/example_group_extensions.rb +++ b/spec/support/example_group_extensions.rb @@ -23,10 +23,12 @@ def rbx? end def reset_gem_configuration + return if @do_not_reset Concurrent.instance_variable_set(:@configuration, Concurrent::Configuration.new) end def kill_rogue_threads(warning = true) + return if @do_not_reset warn('[DEPRECATED] brute force thread control being used -- tests need updated') if warning Thread.list.each do |thread| thread.kill unless thread == Thread.current From 7f850796b72a89508cb6793d38ef9c0a566d2e41 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Tue, 27 May 2014 22:54:45 +0200 Subject: [PATCH 04/13] Remove old Context#logger in favor of #log --- lib/concurrent/actress/context.rb | 11 ++++++----- lib/concurrent/logging.rb | 2 +- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/lib/concurrent/actress/context.rb b/lib/concurrent/actress/context.rb index b5b48ec22..a85fb6090 100644 --- a/lib/concurrent/actress/context.rb +++ b/lib/concurrent/actress/context.rb @@ -26,10 +26,6 @@ def on_message(message) raise NotImplementedError end - def logger - core.logger - end - # @api private def on_envelope(envelope) @envelope = envelope @@ -53,9 +49,14 @@ def terminate! core.terminate! end + # delegates to core.log + # @see Logging#log + def log(level, progname, message = nil, &block) + core.log(level, progname, message, &block) + end + private - # @api private def initialize_core(core) @core = Type! core, Core end diff --git a/lib/concurrent/logging.rb b/lib/concurrent/logging.rb index 802db21f6..50960bbe1 100644 --- a/lib/concurrent/logging.rb +++ b/lib/concurrent/logging.rb @@ -9,7 +9,7 @@ module Logging # @param [Integer] level one of Logger::Severity constants # @param [String] progname e.g. a path of an Actor # @param [String, nil] message when nil block is used to generate the message - # @yields_return [String] a message + # @yieldreturn [String] a message def log(level, progname, message = nil, &block) (@logger || Concurrent.configuration.logger).call level, progname, message, &block end From 8672425108802baf816cd004f94b58c8f4c11191 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Wed, 28 May 2014 08:24:48 +0200 Subject: [PATCH 05/13] Configure travis CI to output more information to be able to find out where a deadlock is --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index 65caad865..ee8f210d4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -15,3 +15,4 @@ matrix: - rvm: ruby-head - rvm: jruby-head - rvm: 1.9.3 +script: "bundle exec rspec --color --backtrace --seed 1 --format documentation ./spec" From 14deab8b05fd813731281d65ab60188a53428eab Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Thu, 29 May 2014 13:35:09 +0200 Subject: [PATCH 06/13] Isolate actress test suite from other tests --- spec/concurrent/actress_spec.rb | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/spec/concurrent/actress_spec.rb b/spec/concurrent/actress_spec.rb index 74bc849a1..4b98548cd 100644 --- a/spec/concurrent/actress_spec.rb +++ b/spec/concurrent/actress_spec.rb @@ -15,7 +15,11 @@ def backdoor(&block) describe 'Concurrent::Actress' do prepend_before do - @do_not_reset = true + @do_not_reset = true + @@isolated_from_other_tests ||= begin + sleep 0.1 + true + end end def terminate_actors(*actors) @@ -159,7 +163,7 @@ def on_message(message) end it 'has children set after a child is created' do - child = parent.ask!(:child) # FIXME may get stuck!! + child = parent.ask!(:child) parent.ask!(nil).should include(child) child.ask!(nil).should eq parent From 430a805d726e987d23cdb5617fe80495a6a6c678 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Thu, 29 May 2014 18:19:07 +0200 Subject: [PATCH 07/13] Fix intermittently deadlocking #post_off test --- spec/concurrent/agent_spec.rb | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/spec/concurrent/agent_spec.rb b/spec/concurrent/agent_spec.rb index 50df2e58d..38b794e7f 100644 --- a/spec/concurrent/agent_spec.rb +++ b/spec/concurrent/agent_spec.rb @@ -20,10 +20,9 @@ module Concurrent end context '#send_off' do - subject { Agent.new 2 } + subject { Agent.new 2, executor: executor } it 'executes post and post-off in order' do - pending 'may cause deadlock' subject.post { |v| v + 2 } subject.post_off { |v| v * 3 } subject.await From f84d0ab01575a5e1ad7960f7a679de7b663e4348 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Thu, 29 May 2014 18:41:35 +0200 Subject: [PATCH 08/13] Rename OneByOne to SerializedExecution add basic rejection handling --- lib/concurrent/actress.rb | 2 +- lib/concurrent/actress/core.rb | 16 +++++----- lib/concurrent/agent.rb | 18 ++++++------ ...{one_by_one.rb => serialized_execution.rb} | 29 ++++++++++++++----- lib/concurrent/executors.rb | 2 +- spec/concurrent/agent_spec.rb | 2 +- 6 files changed, 41 insertions(+), 28 deletions(-) rename lib/concurrent/executor/{one_by_one.rb => serialized_execution.rb} (73%) diff --git a/lib/concurrent/actress.rb b/lib/concurrent/actress.rb index f2342f717..3ca89f3c4 100644 --- a/lib/concurrent/actress.rb +++ b/lib/concurrent/actress.rb @@ -1,5 +1,5 @@ require 'concurrent/configuration' -require 'concurrent/executor/one_by_one' +require 'concurrent/executor/serialized_execution' require 'concurrent/ivar' require 'concurrent/logging' diff --git a/lib/concurrent/actress/core.rb b/lib/concurrent/actress/core.rb index 7e13632c5..968a7c74b 100644 --- a/lib/concurrent/actress/core.rb +++ b/lib/concurrent/actress/core.rb @@ -23,14 +23,14 @@ class Core # can be used to hook actor instance to any logging system # @param [Proc] block for class instantiation def initialize(opts = {}, &block) - @mailbox = Array.new - @one_by_one = OneByOne.new + @mailbox = Array.new + @serialized_execution = SerializedExecution.new # noinspection RubyArgCount - @terminated = Event.new - @executor = Type! opts.fetch(:executor, Concurrent.configuration.global_task_pool), Executor - @children = Set.new - @reference = Reference.new self - @name = (Type! opts.fetch(:name), String, Symbol).to_s + @terminated = Event.new + @executor = Type! opts.fetch(:executor, Concurrent.configuration.global_task_pool), Executor + @children = Set.new + @reference = Reference.new self + @name = (Type! opts.fetch(:name), String, Symbol).to_s parent = opts[:parent] @parent_core = (Type! parent, Reference, NilClass) && parent.send(:core) @@ -180,7 +180,7 @@ def receive_envelope # Schedules blocks to be executed on executor sequentially, # sets Actress.current def schedule_execution - @one_by_one.post(@executor) do + @serialized_execution.post(@executor) do begin Thread.current[:__current_actress__] = reference yield diff --git a/lib/concurrent/agent.rb b/lib/concurrent/agent.rb index 24a3e729f..1b6e177a4 100644 --- a/lib/concurrent/agent.rb +++ b/lib/concurrent/agent.rb @@ -62,14 +62,14 @@ class Agent # @option opts [String] :copy_on_deref (nil) call the given `Proc` passing the internal value and # returning the value returned from the proc def initialize(initial, opts = {}) - @value = initial - @rescuers = [] - @validator = Proc.new { |result| true } - @timeout = opts.fetch(:timeout, TIMEOUT).freeze - self.observers = CopyOnWriteObserverSet.new - @one_by_one = OneByOne.new - @task_executor = OptionsParser.get_task_executor_from(opts) - @operation_executor = OptionsParser.get_operation_executor_from(opts) + @value = initial + @rescuers = [] + @validator = Proc.new { |result| true } + @timeout = opts.fetch(:timeout, TIMEOUT).freeze + self.observers = CopyOnWriteObserverSet.new + @serialized_execution = SerializedExecution.new + @task_executor = OptionsParser.get_task_executor_from(opts) + @operation_executor = OptionsParser.get_operation_executor_from(opts) init_mutex set_deref_options(opts) end @@ -180,7 +180,7 @@ def await(timeout = nil) def post_on(executor, &block) return nil if block.nil? - @one_by_one.post(executor) { work(&block) } + @serialized_execution.post(executor) { work(&block) } true end diff --git a/lib/concurrent/executor/one_by_one.rb b/lib/concurrent/executor/serialized_execution.rb similarity index 73% rename from lib/concurrent/executor/one_by_one.rb rename to lib/concurrent/executor/serialized_execution.rb index a86e421f7..952ea2785 100644 --- a/lib/concurrent/executor/one_by_one.rb +++ b/lib/concurrent/executor/serialized_execution.rb @@ -1,8 +1,10 @@ +require 'concurrent/logging' + module Concurrent - # Ensures that jobs are passed to the given executors one by one, - # never running at the same time. - class OneByOne + # Ensures passed jobs in a serialized order never running at the same time. + class SerializedExecution + include Logging Job = Struct.new(:executor, :args, :block) do def call @@ -30,10 +32,6 @@ def initialize # @raise [ArgumentError] if no task is given def post(executor, *args, &task) return nil if task.nil? - # FIXME Agent#send-off will blow up here - # if executor.can_overflow? - # raise ArgumentError, 'OneByOne cannot be used in conjunction with executor which may overflow' - # end job = Job.new executor, args, task @@ -56,7 +54,22 @@ def post(executor, *args, &task) private def call_job(job) - job.executor.post { work(job) } + did_it_run = begin + job.executor.post { work(job) } + true + rescue RejectedExecutionError => ex + false + end + + # TODO not the best idea to run it myself + unless did_it_run + begin + work job + rescue => ex + # let it fail + log DEBUG, ex + end + end end # ensures next job is executed if any is stashed diff --git a/lib/concurrent/executors.rb b/lib/concurrent/executors.rb index ff6ced453..86f0de038 100644 --- a/lib/concurrent/executors.rb +++ b/lib/concurrent/executors.rb @@ -6,4 +6,4 @@ require 'concurrent/executor/single_thread_executor' require 'concurrent/executor/thread_pool_executor' require 'concurrent/executor/timer_set' -require 'concurrent/executor/one_by_one' +require 'concurrent/executor/serialized_execution' diff --git a/spec/concurrent/agent_spec.rb b/spec/concurrent/agent_spec.rb index 38b794e7f..595396942 100644 --- a/spec/concurrent/agent_spec.rb +++ b/spec/concurrent/agent_spec.rb @@ -164,7 +164,7 @@ def trigger_observable(observable) subject.post { nil } sleep(0.1) subject. - instance_variable_get(:@one_by_one). + instance_variable_get(:@serialized_execution). instance_variable_get(:@stash). size.should eq 2 end From d1bab4cb000d65ae70138825f619ed74ab30c300 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Thu, 29 May 2014 18:42:50 +0200 Subject: [PATCH 09/13] Doc update --- .yardopts | 8 +++++++- lib/concurrent/actress.rb | 2 +- lib/concurrent/actress/context.rb | 4 ++-- lib/concurrent/actress/doc.md | 4 ++-- 4 files changed, 12 insertions(+), 6 deletions(-) diff --git a/.yardopts b/.yardopts index 7adf207f7..6beb5705a 100644 --- a/.yardopts +++ b/.yardopts @@ -1 +1,7 @@ ---protected --no-private --embed-mixins --output-dir ./doc --markup markdown +--protected +--no-private +--embed-mixins +--output-dir ./doc +--markup markdown +- +lib/concurrent/actress/doc.md diff --git a/lib/concurrent/actress.rb b/lib/concurrent/actress.rb index 3ca89f3c4..6bde5f217 100644 --- a/lib/concurrent/actress.rb +++ b/lib/concurrent/actress.rb @@ -5,7 +5,7 @@ module Concurrent - # {include:file:lib/concurrent/actress/doc.md} + # Fore more information please see {file:lib/concurrent/actress/doc.md Actress quide}. module Actress require 'concurrent/actress/type_check' diff --git a/lib/concurrent/actress/context.rb b/lib/concurrent/actress/context.rb index a85fb6090..626ff9110 100644 --- a/lib/concurrent/actress/context.rb +++ b/lib/concurrent/actress/context.rb @@ -72,12 +72,12 @@ def self.included(base) end module ClassMethods - # behaves as {Actress.spawn} but class_name is omitted + # behaves as {Concurrent::Actress.spawn} but class_name is omitted def spawn(name_or_opts, *args, &block) Actress.spawn spawn_optionify(name_or_opts, *args), &block end - # behaves as {Actress.spawn!} but class_name is omitted + # behaves as {Concurrent::Actress.spawn!} but class_name is omitted def spawn!(name_or_opts, *args, &block) Actress.spawn! spawn_optionify(name_or_opts, *args), &block end diff --git a/lib/concurrent/actress/doc.md b/lib/concurrent/actress/doc.md index e47248094..199c7aed0 100644 --- a/lib/concurrent/actress/doc.md +++ b/lib/concurrent/actress/doc.md @@ -1,6 +1,6 @@ -# Light-weighted implement of Actors. Inspired by Akka and Erlang. +# Light-weighted implementation of Actors. Inspired by Akka and Erlang. -Actors are using a thread-pool by default which makes them very cheap to create and discard. +Actors are sharing a thread-pool by default which makes them very cheap to create and discard. Thousands of actors can be created allowing to brake the program to small maintainable pieces without breaking single responsibility principles. From 88e252ddb5cb8d4dc0ed514b130c691edda5cf67 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Sun, 1 Jun 2014 19:44:07 +0200 Subject: [PATCH 10/13] Safe configuration access --- lib/concurrent/configuration.rb | 9 +++++++-- spec/support/example_group_extensions.rb | 2 +- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/lib/concurrent/configuration.rb b/lib/concurrent/configuration.rb index 138ce2e64..d47b88696 100644 --- a/lib/concurrent/configuration.rb +++ b/lib/concurrent/configuration.rb @@ -1,6 +1,7 @@ require 'thread' require 'concurrent/delay' require 'concurrent/errors' +require 'concurrent/atomic/atomic' require 'concurrent/executor/thread_pool_executor' require 'concurrent/executor/timer_set' require 'concurrent/utility/processor_count' @@ -111,8 +112,12 @@ def new_operation_pool end # create the default configuration on load - @configuration = Configuration.new - singleton_class.send :attr_reader, :configuration + @configuration = Atomic.new Configuration.new + + # @return [Configuration] + def self.configuration + @configuration.value + end # Perform gem-level configuration. # diff --git a/spec/support/example_group_extensions.rb b/spec/support/example_group_extensions.rb index 2237fcd85..100072e2d 100644 --- a/spec/support/example_group_extensions.rb +++ b/spec/support/example_group_extensions.rb @@ -24,7 +24,7 @@ def rbx? def reset_gem_configuration return if @do_not_reset - Concurrent.instance_variable_set(:@configuration, Concurrent::Configuration.new) + Concurrent.instance_variable_get(:@configuration).value = Concurrent::Configuration.new end def kill_rogue_threads(warning = true) From 9dc5365eb9c6f57b45f9ab706263dd43f7bb68b3 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Sun, 1 Jun 2014 19:50:13 +0200 Subject: [PATCH 11/13] make sure hard reset in tests won't interfere with tests not using it --- spec/concurrent/actress_spec.rb | 6 +----- spec/support/example_group_extensions.rb | 11 +++++++++-- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/spec/concurrent/actress_spec.rb b/spec/concurrent/actress_spec.rb index 4b98548cd..0099ceaee 100644 --- a/spec/concurrent/actress_spec.rb +++ b/spec/concurrent/actress_spec.rb @@ -15,11 +15,7 @@ def backdoor(&block) describe 'Concurrent::Actress' do prepend_before do - @do_not_reset = true - @@isolated_from_other_tests ||= begin - sleep 0.1 - true - end + do_no_reset! end def terminate_actors(*actors) diff --git a/spec/support/example_group_extensions.rb b/spec/support/example_group_extensions.rb index 100072e2d..bcfb195ff 100644 --- a/spec/support/example_group_extensions.rb +++ b/spec/support/example_group_extensions.rb @@ -22,9 +22,15 @@ def rbx? RbConfig::CONFIG['ruby_install_name']=~ /^rbx$/i end + def do_no_reset! + @do_not_reset = true + end + + @@killed = false + def reset_gem_configuration - return if @do_not_reset - Concurrent.instance_variable_get(:@configuration).value = Concurrent::Configuration.new + Concurrent.instance_variable_get(:@configuration).value = Concurrent::Configuration.new if @@killed + @@killed = false end def kill_rogue_threads(warning = true) @@ -33,6 +39,7 @@ def kill_rogue_threads(warning = true) Thread.list.each do |thread| thread.kill unless thread == Thread.current end + @@killed = true end extend self From 1f895e19e2b51b148a2e995f1ed290fa058a6bfe Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Tue, 3 Jun 2014 22:34:49 +0200 Subject: [PATCH 12/13] Make actor_class accessible --- lib/concurrent/actress/core.rb | 8 ++++---- lib/concurrent/actress/core_delegations.rb | 5 +++++ lib/concurrent/actress/reference.rb | 2 +- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/lib/concurrent/actress/core.rb b/lib/concurrent/actress/core.rb index 968a7c74b..42b009db1 100644 --- a/lib/concurrent/actress/core.rb +++ b/lib/concurrent/actress/core.rb @@ -11,7 +11,7 @@ class Core include TypeCheck include Concurrent::Logging - attr_reader :reference, :name, :path, :executor, :terminated + attr_reader :reference, :name, :path, :executor, :terminated, :actor_class # @option opts [String] name # @option opts [Reference, nil] parent of an actor spawning this one @@ -43,9 +43,9 @@ def initialize(opts = {}, &block) @parent_core.add_child reference if @parent_core - @actress_class = actress_class = Child! opts.fetch(:class), Context - args = opts.fetch(:args, []) - initialized = Type! opts[:initialized], IVar, NilClass + @actor_class = actress_class = Child! opts.fetch(:class), Context + args = opts.fetch(:args, []) + initialized = Type! opts[:initialized], IVar, NilClass schedule_execution do begin diff --git a/lib/concurrent/actress/core_delegations.rb b/lib/concurrent/actress/core_delegations.rb index 4a0144bcb..4dd4d134e 100644 --- a/lib/concurrent/actress/core_delegations.rb +++ b/lib/concurrent/actress/core_delegations.rb @@ -31,7 +31,12 @@ def executor core.executor end + def actor_class + core.actor_class + end + alias_method :ref, :reference + alias_method :actress_class, :actor_class end end end diff --git a/lib/concurrent/actress/reference.rb b/lib/concurrent/actress/reference.rb index a20ad5066..3d00f1879 100644 --- a/lib/concurrent/actress/reference.rb +++ b/lib/concurrent/actress/reference.rb @@ -50,7 +50,7 @@ def message(message, ivar = nil) end def to_s - "#<#{self.class} #{path}>" + "#<#{self.class} #{path} (#{actor_class})>" end alias_method :inspect, :to_s From c0ce8b677a314af75f6d2ecf532b7d552fdf0d5f Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Wed, 4 Jun 2014 21:41:30 +0200 Subject: [PATCH 13/13] Improving documentation --- .gitignore | 2 +- .yardopts | 10 +- Gemfile | 1 + LICENSE => LICENSE.txt | 0 README.md | 73 ++++----- doc/actress/celluloid_benchmark.rb | 96 ++++++++++++ doc/actress/format.rb | 75 +++++++++ doc/actress/init.rb | 2 + doc/actress/quick.in.rb | 84 ++++++++++ doc/actress/quick.out.rb | 91 +++++++++++ lib/concurrent/actress.rb | 143 +++++++++++++++++- lib/concurrent/actress/ad_hoc.rb | 6 + lib/concurrent/actress/context.rb | 7 +- lib/concurrent/actress/core.rb | 40 +++-- lib/concurrent/actress/doc.md | 53 ------- lib/concurrent/actress/envelope.rb | 26 +++- lib/concurrent/actress/reference.rb | 8 +- lib/concurrent/observable.rb | 2 +- tasks/.gitignore | 0 tasks/update_doc.rake | 44 ++++++ .../default/fulldoc/html/css/common.css | 125 +++++++++++++++ yard-template/default/layout/html/footer.erb | 15 ++ 22 files changed, 787 insertions(+), 116 deletions(-) rename LICENSE => LICENSE.txt (100%) create mode 100644 doc/actress/celluloid_benchmark.rb create mode 100644 doc/actress/format.rb create mode 100644 doc/actress/init.rb create mode 100644 doc/actress/quick.in.rb create mode 100644 doc/actress/quick.out.rb delete mode 100644 lib/concurrent/actress/doc.md delete mode 100644 tasks/.gitignore create mode 100644 tasks/update_doc.rake create mode 100644 yard-template/default/fulldoc/html/css/common.css create mode 100644 yard-template/default/layout/html/footer.erb diff --git a/.gitignore b/.gitignore index a291e1f63..b35ac15f6 100644 --- a/.gitignore +++ b/.gitignore @@ -6,7 +6,7 @@ tests.txt .ruby-gemset .bundle/* .yardoc/* -doc/* +yardoc/* tmp/* man/* *.tmproj diff --git a/.yardopts b/.yardopts index 6beb5705a..5dcc28604 100644 --- a/.yardopts +++ b/.yardopts @@ -1,7 +1,13 @@ --protected --no-private --embed-mixins ---output-dir ./doc +--output-dir ./yardoc --markup markdown +--title=Concurrent Ruby +--template default +--template-path ./yard-template + +./lib/**/*.rb - -lib/concurrent/actress/doc.md +README.md +LICENSE.txt diff --git a/Gemfile b/Gemfile index 4d65462ed..feea5ee86 100644 --- a/Gemfile +++ b/Gemfile @@ -8,6 +8,7 @@ group :development do gem 'countloc', '~> 0.4.0', platforms: :mri gem 'yard', '~> 0.8.7.4' gem 'inch', '~> 0.4.1', platforms: :mri + gem 'redcarpet', platforms: :mri # understands github markdown end group :testing do diff --git a/LICENSE b/LICENSE.txt similarity index 100% rename from LICENSE rename to LICENSE.txt diff --git a/README.md b/README.md index 55090f12b..8976609af 100644 --- a/README.md +++ b/README.md @@ -2,57 +2,59 @@ [![Gem Version](https://badge.fury.io/rb/concurrent-ruby.png)](http://badge.fury.io/rb/concurrent-ruby) [![Build Status](https://travis-ci.org/ruby-concurrency/concurrent-ruby.svg?branch=master)](https://travis-ci.org/ruby-concurrency/concurrent-ruby) [![Coverage Status](https://coveralls.io/repos/ruby-concurrency/concurrent-ruby/badge.png)](https://coveralls.io/r/ruby-concurrency/concurrent-ruby) [![Code Climate](https://codeclimate.com/github/ruby-concurrency/concurrent-ruby.png)](https://codeclimate.com/github/ruby-concurrency/concurrent-ruby) [![Inline docs](http://inch-ci.org/github/ruby-concurrency/concurrent-ruby.png)](http://inch-ci.org/github/ruby-concurrency/concurrent-ruby) [![Dependency Status](https://gemnasium.com/ruby-concurrency/concurrent-ruby.png)](https://gemnasium.com/ruby-concurrency/concurrent-ruby) - - - - + + + +
-

-Modern concurrency tools for Ruby. Inspired by -Erlang, -Clojure, -Scala, -Haskell, -F#, -C#, -Java, -and classic concurrency patterns. -

-

-The design goals of this gem are: -

    -
  • Be an 'unopinionated' toolbox that provides useful utilities without debating which is better or why
  • -
  • Remain free of external gem dependencies
  • -
  • Stay true to the spirit of the languages providing inspiration
  • -
  • But implement in a way that makes sense for Ruby
  • -
  • Keep the semantics as idiomatic Ruby as possible
  • -
  • Support features that make sense in Ruby
  • -
  • Exclude features that don't make sense in Ruby
  • -
  • Be small, lean, and loosely coupled
  • -
-

-
- -
+

+ Modern concurrency tools for Ruby. Inspired by + Erlang, + Clojure, + Scala, + Haskell, + F#, + C#, + Java, + and classic concurrency patterns. +

+

+ The design goals of this gem are: +

    +
  • Be an 'unopinionated' toolbox that provides useful utilities without debating which is better or why
  • +
  • Remain free of external gem dependencies
  • +
  • Stay true to the spirit of the languages providing inspiration
  • +
  • But implement in a way that makes sense for Ruby
  • +
  • Keep the semantics as idiomatic Ruby as possible
  • +
  • Support features that make sense in Ruby
  • +
  • Exclude features that don't make sense in Ruby
  • +
  • Be small, lean, and loosely coupled
  • +
+

+
+ +
-### Install +## Install ```shell gem install concurrent-ruby ``` + or add the following line to Gemfile: ```ruby gem 'concurrent-ruby' ``` + and run `bundle install` from your shell. -*NOTE: There is an old gem from 2007 called "concurrent" that does not appear to be under active development. That isn't us. Please do not run* `gem install concurrent`*. It is not the droid you are looking for.* +_NOTE: There is an old gem from 2007 called "concurrent" that does not appear to be under active development. That isn't us. Please do not run* `gem install concurrent`*. It is not the droid you are looking for._ ## Features & Documentation Please see the [Concurrent Ruby Wiki](https://github.com/ruby-concurrency/concurrent-ruby/wiki) -or the [API documentation](http://rubydoc.info/github/ruby-concurrency/concurrent-ruby/master/frames) +or the [API documentation](http://ruby-concurrency.github.io/concurrent-ruby/frames.html)) for more information or join our [mailing list](http://groups.google.com/group/concurrent-ruby). There are many concurrency abstractions in this library. These abstractions can be broadly categorized @@ -74,6 +76,7 @@ into several general groups: * Thread synchronization classes and algorithms including [dataflow](https://github.com/ruby-concurrency/concurrent-ruby/wiki/Dataflow), timeout, condition, countdown latch, dependency counter, and event * Java-inspired [thread pools](https://github.com/ruby-concurrency/concurrent-ruby/wiki/Thread%20Pools) +* New fast light-weighted [Actor model](http://ruby-concurrency.github.io/concurrent-ruby/frames.html#!Concurrent/Actress.html) implementation. * And many more... ### Semantic Versioning @@ -91,7 +94,7 @@ It should be fully compatible with any interpreter that is compliant with Ruby 1 Many more code examples can be found in the documentation for each class (linked above). This one simple example shows some of the power of this gem. -```ruby +```ruby require 'concurrent' require 'thread' # for Queue require 'open-uri' # for open(uri) diff --git a/doc/actress/celluloid_benchmark.rb b/doc/actress/celluloid_benchmark.rb new file mode 100644 index 000000000..6444c0175 --- /dev/null +++ b/doc/actress/celluloid_benchmark.rb @@ -0,0 +1,96 @@ +require 'benchmark' +require 'concurrent/actress' +Concurrent::Actress.i_know_it_is_experimental! +require 'celluloid' +require 'celluloid/autostart' + +logger = Logger.new($stderr) +logger.level = Logger::INFO +Concurrent.configuration.logger = lambda do |level, progname, message = nil, &block| + logger.add level, message, progname, &block +end + +scale = 1 +ADD_TO = (100 * scale).to_i +counts_size = (500 * scale).to_i +adders_size = (500 * scale).to_i + +class Counter + include Celluloid + + def initialize(adders, i) + @adders = adders + @i = i + end + + def counting(count, ivar) + if count < ADD_TO + @adders[(@i+1) % @adders.size].counting count+1, ivar + else + ivar.set count + end + end +end + +threads = [] + +Benchmark.bmbm(10) do |b| + [2, adders_size, adders_size*2, adders_size*3].each do |adders_size| + + b.report(format('%5d %4d %s', ADD_TO*counts_size, adders_size, 'actress')) do + counts = Array.new(counts_size) { [0, Concurrent::IVar.new] } + adders = Array.new(adders_size) do |i| + Concurrent::Actress::AdHoc.spawn("adder#{i}") do + lambda do |(count, ivar)| + if count.nil? + terminate! + else + if count < ADD_TO + adders[(i+1) % adders_size].tell [count+1, ivar] + else + ivar.set count + end + end + end + end + end + + counts.each_with_index do |count, i| + adders[i % adders_size].tell count + end + + counts.each do |count, ivar| + raise unless ivar.value >= ADD_TO + end + + threads << Thread.list.size + + adders.each { |a| a << [nil, nil] } + end + + b.report(format('%5d %4d %s', ADD_TO*counts_size, adders_size, 'celluloid')) do + counts = [] + counts_size.times { counts << [0, Concurrent::IVar.new] } + + adders = [] + adders_size.times do |i| + adders << Counter.new(adders, i) + end + + counts.each_with_index do |count, i| + adders[i % adders_size].counting *count + end + + counts.each do |count, ivar| + raise unless ivar.value >= ADD_TO + end + + threads << Thread.list.size + + adders.each(&:terminate) + end + end +end + +p threads + diff --git a/doc/actress/format.rb b/doc/actress/format.rb new file mode 100644 index 000000000..8d6a457e1 --- /dev/null +++ b/doc/actress/format.rb @@ -0,0 +1,75 @@ +require 'rubygems' +require 'bundler/setup' +require 'pry' +require 'pp' + +input_paths = if ARGV.empty? + Dir.glob("#{File.dirname(__FILE__)}/*.in.rb") + else + ARGV + end.map { |p| File.expand_path p } + +input_paths.each_with_index do |input_path, i| + + pid = fork do + require_relative 'init.rb' + + begin + output_path = input_path.gsub /\.in\.rb$/, '.out.rb' + input = File.readlines(input_path) + + chunks = [] + line = '' + + while !input.empty? + line += input.shift + if Pry::Code.complete_expression? line + chunks << line + line = '' + end + end + + raise unless line.empty? + + chunks.map! { |chunk| [chunk, [chunk.split($/).size, 1].max] } + environment = Module.new.send :binding + evaluate = ->(code, line) do + eval(code, environment, input_path, line) + end + + indent = 50 + + line_count = 1 + output = '' + chunks.each do |chunk, lines| + result = evaluate.(chunk, line_count) + unless chunk.strip.empty? || chunk =~ /\A *#/ + pre_lines = chunk.lines.to_a + last_line = pre_lines.pop + output << pre_lines.join + + if last_line =~ /\#$/ + output << last_line.gsub(/\#$/, '') + else + if last_line.size < indent && result.inspect.size < indent + output << "%-#{indent}s %s" % [last_line.chomp, "# => #{result.inspect}\n"] + else + output << last_line << " # => #{result.inspect}\n" + end + end + else + output << chunk + end + line_count += lines + end + + puts "#{input_path}\n -> #{output_path}" + #puts output + File.write(output_path, output) + rescue => ex + puts "#{ex} (#{ex.class})\n#{ex.backtrace * "\n"}" + end + end + + Process.wait pid +end diff --git a/doc/actress/init.rb b/doc/actress/init.rb new file mode 100644 index 000000000..7ee9f06ba --- /dev/null +++ b/doc/actress/init.rb @@ -0,0 +1,2 @@ +require 'concurrent/actress' +Concurrent::Actress.i_know_it_is_experimental! diff --git a/doc/actress/quick.in.rb b/doc/actress/quick.in.rb new file mode 100644 index 000000000..096d1a10c --- /dev/null +++ b/doc/actress/quick.in.rb @@ -0,0 +1,84 @@ +class Counter + # Include context of an actor which gives this class access to reference and other information + # about the actor, see CoreDelegations. + include Concurrent::Actress::Context + + # use initialize as you wish + def initialize(initial_value) + @count = initial_value + end + + # override on_message to define actor's behaviour + def on_message(message) + case message + when Integer + @count += message + when :terminate + terminate! + else + raise 'unknown' + end + end +end + +# Create new actor naming the instance 'first'. +# Return value is a reference to the actor, the actual actor is never returned. +counter = Counter.spawn(:first, 5) + +# Tell a message and forget returning self. +counter.tell(1) +counter << 1 +# (First counter now contains 7.) + +# Send a messages asking for a result. +counter.ask(0).class +counter.ask(0).value + +# Terminate the actor. +counter.tell(:terminate) +# Not terminated yet, it takes a while until the message is processed. +counter.terminated? +# Waiting for the termination. +counter.terminated.class +counter.terminated.wait +counter.terminated? +# Any subsequent messages are rejected. +counter.ask(5).wait.rejected? + +# Failure on message processing terminates the actor. +counter = Counter.spawn(:first, 0) +counter.ask('boom').wait.rejected? +counter.terminated? + + +# Lets define an actor creating children actors. +class Node + include Concurrent::Actress::Context + + def on_message(message) + case message + when :new_child + spawn self.class, :child + when :how_many_children + children.size + when :terminate + terminate! + else + raise 'unknown' + end + end +end + +# Actors are tracking parent-child relationships +parent = Node.spawn :parent +child = parent.tell(:new_child).ask!(:new_child) +child.parent +parent.ask!(:how_many_children) + +# There is a special root actor which is used for all actors spawned outside any actor. +parent.parent + +# Termination of an parent will also terminate all children. +parent.ask('boom').wait # +parent.terminated? +child.terminated? diff --git a/doc/actress/quick.out.rb b/doc/actress/quick.out.rb new file mode 100644 index 000000000..9f10dda3b --- /dev/null +++ b/doc/actress/quick.out.rb @@ -0,0 +1,91 @@ +class Counter + # Include context of an actor which gives this class access to reference and other information + # about the actor, see CoreDelegations. + include Concurrent::Actress::Context + + # use initialize as you wish + def initialize(initial_value) + @count = initial_value + end + + # override on_message to define actor's behaviour + def on_message(message) + case message + when Integer + @count += message + when :terminate + terminate! + else + raise 'unknown' + end + end +end # => :on_message + +# Create new actor naming the instance 'first'. +# Return value is a reference to the actor, the actual actor is never returned. +counter = Counter.spawn(:first, 5) + # => # + +# Tell a message and forget returning self. +counter.tell(1) + # => # +counter << 1 + # => # +# (First counter now contains 7.) + +# Send a messages asking for a result. +counter.ask(0).class # => Concurrent::IVar +counter.ask(0).value # => 7 + +# Terminate the actor. +counter.tell(:terminate) + # => # +# Not terminated yet, it takes a while until the message is processed. +counter.terminated? # => false +# Waiting for the termination. +counter.terminated.class # => Concurrent::Event +counter.terminated.wait # => true +counter.terminated? # => true +# Any subsequent messages are rejected. +counter.ask(5).wait.rejected? # => true + +# Failure on message processing terminates the actor. +counter = Counter.spawn(:first, 0) + # => # +counter.ask('boom').wait.rejected? # => true +counter.terminated? # => true + + +# Lets define an actor creating children actors. +class Node + include Concurrent::Actress::Context + + def on_message(message) + case message + when :new_child + spawn self.class, :child + when :how_many_children + children.size + when :terminate + terminate! + else + raise 'unknown' + end + end +end # => :on_message + +# Actors are tracking parent-child relationships +parent = Node.spawn :parent # => # +child = parent.tell(:new_child).ask!(:new_child) + # => # +child.parent # => # +parent.ask!(:how_many_children) # => 2 + +# There is a special root actor which is used for all actors spawned outside any actor. +parent.parent + # => # + +# Termination of an parent will also terminate all children. +parent.ask('boom').wait +parent.terminated? # => true +child.terminated? # => true diff --git a/lib/concurrent/actress.rb b/lib/concurrent/actress.rb index 6bde5f217..9880d9ed5 100644 --- a/lib/concurrent/actress.rb +++ b/lib/concurrent/actress.rb @@ -5,7 +5,129 @@ module Concurrent - # Fore more information please see {file:lib/concurrent/actress/doc.md Actress quide}. + # # Actor model + # + # - Light-weighted. + # - Inspired by Akka and Erlang. + # + # Actors are sharing a thread-pool by default which makes them very cheap to create and discard. + # Thousands of actors can be created allowing to brake the program to small maintainable pieces + # without breaking single responsibility principles. + # + # ## What is an actor model? + # + # [Wiki](http://en.wikipedia.org/wiki/Actor_model) says: + # The actor model in computer science is a mathematical model of concurrent computation + # that treats _actors_ as the universal primitives of concurrent digital computation: + # in response to a message that it receives, an actor can make local decisions, + # create more actors, send more messages, and determine how to respond to the next + # message received. + # + # ## Why? + # + # Concurrency is hard this is one of many ways how to simplify the problem. + # It is simpler to reason about actors then about locks (and all their possible states). + # + # ## How to use it + # + # {include:file:doc/actress/quick.out.rb} + # + # ## Messaging + # + # Messages are processed in same order as they are sent by a sender. It may interleaved with + # messages form other senders though. There is also a contract in actor model that + # messages sent between actors should be immutable. Gems like + # + # - [Algebrick](https://github.com/pitr-ch/algebrick) - Typed struct on steroids based on + # algebraic types and pattern matching + # - [Hamster](https://github.com/hamstergem/hamster) - Efficient, Immutable, Thread-Safe + # Collection classes for Ruby + # + # are very useful. + # + # ## Architecture + # + # Actors are running on shared thread poll which allows user to create many actors cheaply. + # Downside is that these actors cannot be directly used to do IO or other blocking operations. + # Blocking operations could starve the `default_task_pool`. However there are two options: + # + # - Create an regular actor which will schedule blocking operations in `global_operation_pool` + # (which is intended for blocking operations) sending results back to self in messages. + # - Create an actor using `global_operation_pool` instead of `global_task_pool`, e.g. + # `AnIOActor.spawn name: :blocking, executor: Concurrent.configuration.global_operation_pool`. + # + # Each actor is composed from 3 objects: + # + # ### {Reference} + # {include:Actress::Reference} + # + # ### {Core} + # {include:Actress::Core} + # + # ### {Context} + # {include:Actress::Context} + # + # ## Speed + # + # Simple benchmark Actress vs Celluloid, the numbers are looking good + # but you know how it is with benchmarks. Source code is in + # `examples/actress/celluloid_benchmark.rb`. It sends numbers between x actors + # and adding 1 until certain limit is reached. + # + # Benchmark legend: + # + # - mes. - number of messages send between the actors + # - act. - number of actors exchanging the messages + # - impl. - which gem is used + # + # ### JRUBY + # + # Rehearsal -------------------------------------------------------- + # 50000 2 actress 24.110000 0.800000 24.910000 ( 7.728000) + # 50000 2 celluloid 28.510000 4.780000 33.290000 ( 14.782000) + # 50000 500 actress 13.700000 0.280000 13.980000 ( 4.307000) + # 50000 500 celluloid 14.520000 11.740000 26.260000 ( 12.258000) + # 50000 1000 actress 10.890000 0.220000 11.110000 ( 3.760000) + # 50000 1000 celluloid 15.600000 21.690000 37.290000 ( 18.512000) + # 50000 1500 actress 10.580000 0.270000 10.850000 ( 3.646000) + # 50000 1500 celluloid 14.490000 29.790000 44.280000 ( 26.043000) + # --------------------------------------------- total: 201.970000sec + # + # mes. act. impl. user system total real + # 50000 2 actress 9.820000 0.510000 10.330000 ( 5.735000) + # 50000 2 celluloid 10.390000 4.030000 14.420000 ( 7.494000) + # 50000 500 actress 9.880000 0.200000 10.080000 ( 3.310000) + # 50000 500 celluloid 12.430000 11.310000 23.740000 ( 11.727000) + # 50000 1000 actress 10.590000 0.190000 10.780000 ( 4.029000) + # 50000 1000 celluloid 14.950000 23.260000 38.210000 ( 20.841000) + # 50000 1500 actress 10.710000 0.250000 10.960000 ( 3.892000) + # 50000 1500 celluloid 13.280000 30.030000 43.310000 ( 24.620000) (1) + # + # ### MRI 2.1.0 + # + # Rehearsal -------------------------------------------------------- + # 50000 2 actress 4.640000 0.080000 4.720000 ( 4.852390) + # 50000 2 celluloid 6.110000 2.300000 8.410000 ( 7.898069) + # 50000 500 actress 6.260000 2.210000 8.470000 ( 7.400573) + # 50000 500 celluloid 10.250000 4.930000 15.180000 ( 14.174329) + # 50000 1000 actress 6.300000 1.860000 8.160000 ( 7.303162) + # 50000 1000 celluloid 12.300000 7.090000 19.390000 ( 17.962621) + # 50000 1500 actress 7.410000 2.610000 10.020000 ( 8.887396) + # 50000 1500 celluloid 14.850000 10.690000 25.540000 ( 24.489796) + # ---------------------------------------------- total: 99.890000sec + # + # mes. act. impl. user system total real + # 50000 2 actress 4.190000 0.070000 4.260000 ( 4.306386) + # 50000 2 celluloid 6.490000 2.210000 8.700000 ( 8.280051) + # 50000 500 actress 7.060000 2.520000 9.580000 ( 8.518707) + # 50000 500 celluloid 10.550000 4.980000 15.530000 ( 14.699962) + # 50000 1000 actress 6.440000 1.870000 8.310000 ( 7.571059) + # 50000 1000 celluloid 12.340000 7.510000 19.850000 ( 18.793591) + # 50000 1500 actress 6.720000 2.160000 8.880000 ( 7.929630) + # 50000 1500 celluloid 14.140000 10.130000 24.270000 ( 22.775288) (1) + # + # *Note (1):* Celluloid is using thread per actor so this bench is creating about 1500 + # native threads. Actress is using constant number of threads. module Actress require 'concurrent/actress/type_check' @@ -20,7 +142,7 @@ module Actress # @return [Reference, nil] current executing actor if any def self.current - Thread.current[:__current_actress__] + Thread.current[:__current_actor__] end # implements ROOT @@ -39,8 +161,22 @@ def on_message(message) # A root actor, a default parent of all actors spawned outside an actor ROOT = Core.new(parent: nil, name: '/', class: Root).reference + # Spawns a new actor. + # + # @example simple + # Actress.spawn(AdHoc, :ping1) { -> message { message } } + # + # @example complex + # Actress.spawn name: :ping3, + # class: AdHoc, + # args: [1] + # executor: Concurrent.configuration.global_task_pool do |add| + # lambda { |number| number + add } + # end + # # @param block for actress_class instantiation # @param args see {.spawn_optionify} + # @return [Reference] never the actual actor def self.spawn(*args, &block) experimental_acknowledged? or warn '[EXPERIMENTAL] A full release of `Actress`, renamed `Actor`, is expected in the 0.7.0 release.' @@ -59,7 +195,7 @@ def self.spawn!(*args, &block) # @overload spawn_optionify(actress_class, name, *args) # @param [Context] actress_class to be spawned - # @param [String, Symbol] name of the instance, it's used to generate the path of the actor + # @param [String, Symbol] name of the instance, it's used to generate the {Core#path} of the actor # @param args for actress_class instantiation # @overload spawn_optionify(opts) # see {Core#initialize} opts @@ -73,6 +209,7 @@ def self.spawn_optionify(*args) end end + # call this to disable experimental warning def self.i_know_it_is_experimental! @experimental_acknowledged = true end diff --git a/lib/concurrent/actress/ad_hoc.rb b/lib/concurrent/actress/ad_hoc.rb index d4b49a386..f4c237f85 100644 --- a/lib/concurrent/actress/ad_hoc.rb +++ b/lib/concurrent/actress/ad_hoc.rb @@ -1,5 +1,11 @@ module Concurrent module Actress + # Allows quick creation of actors with behaviour defined by blocks. + # @example ping + # AdHoc.spawn :forward, an_actor do |where| + # # this block has to return proc defining #on_message behaviour + # -> message { where.tell message } + # end class AdHoc include Context def initialize(*args, &initializer) diff --git a/lib/concurrent/actress/context.rb b/lib/concurrent/actress/context.rb index 626ff9110..49a86ee3b 100644 --- a/lib/concurrent/actress/context.rb +++ b/lib/concurrent/actress/context.rb @@ -1,7 +1,8 @@ module Concurrent module Actress - # module used to define actor behaviours + # This module is used to define actors. It can be included in any class, + # only requirement is to override {Context#on_message} method. # @example ping # class Ping # include Context @@ -72,12 +73,12 @@ def self.included(base) end module ClassMethods - # behaves as {Concurrent::Actress.spawn} but class_name is omitted + # behaves as {Concurrent::Actress.spawn} but class_name is auto-inserted based on receiver def spawn(name_or_opts, *args, &block) Actress.spawn spawn_optionify(name_or_opts, *args), &block end - # behaves as {Concurrent::Actress.spawn!} but class_name is omitted + # behaves as {Concurrent::Actress.spawn!} but class_name is auto-inserted based on receiver def spawn!(name_or_opts, *args, &block) Actress.spawn! spawn_optionify(name_or_opts, *args), &block end diff --git a/lib/concurrent/actress/core.rb b/lib/concurrent/actress/core.rb index 42b009db1..95321df14 100644 --- a/lib/concurrent/actress/core.rb +++ b/lib/concurrent/actress/core.rb @@ -4,19 +4,35 @@ module Actress require 'set' # Core of the actor - # @api private + # @note Whole class should be considered private. An user should use {Context}s and {Reference}s only. # @note devel: core should not block on anything, e.g. it cannot wait on children to terminate # that would eat up all threads in task pool and deadlock class Core include TypeCheck include Concurrent::Logging + # @!attribute [r] reference + # @return [Reference] reference to this actor which can be safely passed around + # @!attribute [r] name + # @return [String] the name of this instance, it should be uniq (not enforced right now) + # @!attribute [r] path + # @return [String] a path of this actor. It is used for easier orientation and logging. + # Path is constructed recursively with: `parent.path + self.name` up to a {Actress::ROOT}, + # e.g. `/an_actor/its_child`. + # (It will also probably form a supervision path (failures will be reported up to parents) + # in future versions.) + # @!attribute [r] executor + # @return [Executor] which is used to process messages + # @!attribute [r] terminated + # @return [Event] event which will become set when actor is terminated. + # @!attribute [r] actor_class + # @return [Context] a class including {Context} representing Actor's behaviour attr_reader :reference, :name, :path, :executor, :terminated, :actor_class # @option opts [String] name # @option opts [Reference, nil] parent of an actor spawning this one - # @option opts [Context] actress_class a class to be instantiated defining Actor's behaviour - # @option opts [Array] args arguments for actress_class instantiation + # @option opts [Context] actor_class a class to be instantiated defining Actor's behaviour + # @option opts [Array] args arguments for actor_class instantiation # @option opts [Executor] executor, default is `Concurrent.configuration.global_task_pool` # @option opts [IVar, nil] initialized, if present it'll be set or failed after {Context} initialization # @option opts [Proc, nil] logger a proc accepting (level, progname, message = nil, &block) params, @@ -31,6 +47,7 @@ def initialize(opts = {}, &block) @children = Set.new @reference = Reference.new self @name = (Type! opts.fetch(:name), String, Symbol).to_s + @actor = Concurrent::Atomic.new parent = opts[:parent] @parent_core = (Type! parent, Reference, NilClass) && parent.send(:core) @@ -43,14 +60,14 @@ def initialize(opts = {}, &block) @parent_core.add_child reference if @parent_core - @actor_class = actress_class = Child! opts.fetch(:class), Context + @actor_class = actor_class = Child! opts.fetch(:class), Context args = opts.fetch(:args, []) initialized = Type! opts[:initialized], IVar, NilClass schedule_execution do begin - @actress = actress_class.new *args, &block - @actress.send :initialize_core, self + @actor.value = actor_class.new(*args, &block). + tap { |a| a.send :initialize_core, self } initialized.set true if initialized rescue => ex log ERROR, ex @@ -152,6 +169,11 @@ def process_envelopes? end end + # @return [Context] + def actor + @actor.value + end + # Processes single envelope, calls #process_envelopes? at the end to ensure next envelope # scheduling. def receive_envelope @@ -164,7 +186,7 @@ def receive_envelope log DEBUG, "received #{envelope.message} from #{envelope.sender_path}" - result = @actress.on_envelope envelope + result = actor.on_envelope envelope envelope.ivar.set result unless envelope.ivar.nil? nil @@ -182,12 +204,12 @@ def receive_envelope def schedule_execution @serialized_execution.post(@executor) do begin - Thread.current[:__current_actress__] = reference + Thread.current[:__current_actor__] = reference yield rescue => e log FATAL, e ensure - Thread.current[:__current_actress__] = nil + Thread.current[:__current_actor__] = nil end end diff --git a/lib/concurrent/actress/doc.md b/lib/concurrent/actress/doc.md deleted file mode 100644 index 199c7aed0..000000000 --- a/lib/concurrent/actress/doc.md +++ /dev/null @@ -1,53 +0,0 @@ -# Light-weighted implementation of Actors. Inspired by Akka and Erlang. - -Actors are sharing a thread-pool by default which makes them very cheap to create and discard. -Thousands of actors can be created allowing to brake the program to small maintainable pieces -without breaking single responsibility principles. - -## Quick example - - class Counter - include Context - - def initialize(initial_value) - @count = initial_value - end - - def on_message(message) - case message - when Integer - @count += message - when :terminate - terminate! - else - raise 'unknown' - end - end - end - - # create new actor - counter = Counter.spawn(:test_counter, 5) # => a Reference - - # send messages - counter.tell(1) # => counter - counter << 1 # => counter - - # send messages getting an IVar back for synchronization - counter.ask(0) # => an ivar - counter.ask(0).value # => 7 - - # terminate the actor - counter.ask(:terminate).wait - counter.terminated? # => true - counter.ask(5).wait.rejected? # => true - - # failure on message processing will terminate the actor - counter = Counter.spawn(:test_counter, 0) - counter.ask('boom').wait.rejected? # => true - counter.terminated? # => true - - - - - - diff --git a/lib/concurrent/actress/envelope.rb b/lib/concurrent/actress/envelope.rb index 15f979327..796a7eaab 100644 --- a/lib/concurrent/actress/envelope.rb +++ b/lib/concurrent/actress/envelope.rb @@ -1,12 +1,24 @@ module Concurrent module Actress - Envelope = Struct.new :message, :ivar, :sender do + class Envelope include TypeCheck - def initialize(message, ivar, sender) - super message, - (Type! ivar, IVar, NilClass), - (Type! sender, Reference, Thread) + # @!attribute [r] message + # @return [Object] a message + # @!attribute [r] ivar + # @return [IVar] an ivar which becomes resolved after message is processed + # @!attribute [r] sender + # @return [Reference, Thread] an actor or thread sending the message + # @!attribute [r] address + # @return [Reference] where this message will be delivered + + attr_reader :message, :ivar, :sender, :address + + def initialize(message, ivar, sender, address) + @message = message + @ivar = Type! ivar, IVar, NilClass + @sender = Type! sender, Reference, Thread + @address = Type! address, Reference end def sender_path @@ -17,6 +29,10 @@ def sender_path end end + def address_path + address.path + end + def reject!(error) ivar.fail error unless ivar.nil? end diff --git a/lib/concurrent/actress/reference.rb b/lib/concurrent/actress/reference.rb index 3d00f1879..d32d864c3 100644 --- a/lib/concurrent/actress/reference.rb +++ b/lib/concurrent/actress/reference.rb @@ -2,8 +2,8 @@ module Concurrent module Actress # Reference is public interface of Actor instances. It is used for sending messages and can - # be freely passed around the program. It also provides some basic information about the actor - # see {CoreDelegations} + # be freely passed around the program. It also provides some basic information about the actor, + # see {CoreDelegations}. class Reference include TypeCheck include CoreDelegations @@ -43,9 +43,9 @@ def ask!(message, ivar = IVar.new) ask(message, ivar).value! end - # behaves as #tell when no ivar and as #ask when ivar + # behaves as {#tell} when no ivar and as {#ask} when ivar def message(message, ivar = nil) - core.on_envelope Envelope.new(message, ivar, Actress.current || Thread.current) + core.on_envelope Envelope.new(message, ivar, Actress.current || Thread.current, self) return ivar || self end diff --git a/lib/concurrent/observable.rb b/lib/concurrent/observable.rb index 923381eb0..b1aaf0c26 100644 --- a/lib/concurrent/observable.rb +++ b/lib/concurrent/observable.rb @@ -10,7 +10,7 @@ def add_observer(*args, &block) observers.add_observer(*args, &block) end - # as #add_observer but it can be used for chaning + # as #add_observer but it can be used for chaining # @return [Observable] self def with_observer(*args, &block) add_observer *args, &block diff --git a/tasks/.gitignore b/tasks/.gitignore deleted file mode 100644 index e69de29bb..000000000 diff --git a/tasks/update_doc.rake b/tasks/update_doc.rake new file mode 100644 index 000000000..1482b68c7 --- /dev/null +++ b/tasks/update_doc.rake @@ -0,0 +1,44 @@ +require 'yard' +YARD::Rake::YardocTask.new + +root = File.expand_path File.join(File.dirname(__FILE__), '..') + +namespace :yard do + + cmd = lambda do |command| + puts ">> executing: #{command}" + system command or raise "#{command} failed" + end + + desc 'Pushes generated documentation to github pages: http://ruby-concurrency.github.io/concurrent-ruby/' + task :push => [:setup, :yard] do + + message = Dir.chdir(root) do + `git log -n 1 --oneline`.strip + end + puts "Generating commit: #{message}" + + Dir.chdir "#{root}/yardoc" do + cmd.call "git ac -m '#{message}'" + cmd.call 'git push origin gh-pages' + end + + end + + desc 'Setups second clone in ./yardoc dir for pushing doc to github' + task :setup do + + unless File.exist? "#{root}/yardoc/.git" + cmd.call "rm -rf #{root}/yardoc" if File.exist?("#{root}/yardoc") + Dir.chdir "#{root}" do + cmd.call 'git clone --single-branch --branch gh-pages git@github.com:ruby-concurrency/concurrent-ruby.git ./yardoc' + end + end + Dir.chdir "#{root}/yardoc" do + cmd.call 'git fetch origin' + cmd.call 'git reset --hard origin/gh-pages' + end + + end + +end diff --git a/yard-template/default/fulldoc/html/css/common.css b/yard-template/default/fulldoc/html/css/common.css new file mode 100644 index 000000000..dfd9d858a --- /dev/null +++ b/yard-template/default/fulldoc/html/css/common.css @@ -0,0 +1,125 @@ +/* Override this file with custom rules */ + +body { + line-height: 18px; +} + +.docstring code, .docstring .object_link a, #filecontents code { + padding: 0px 3px 1px 3px; + border: 1px solid #eef; + background: #f5f5ff; +} + +#filecontents pre code, .docstring pre code { + border: none; + background: none; + padding: 0; +} + +#filecontents pre.code, .docstring pre.code, .tags pre.example, .docstring code, .docstring .object_link a, +#filecontents code { + -moz-border-radius: 2px; + -webkit-border-radius: 2px; +} + +/* syntax highlighting */ +.source_code { + display: none; + padding: 3px 8px; + border-left: 8px solid #ddd; + margin-top: 5px; +} + +#filecontents pre.code, .docstring pre.code, .source_code pre { + font-family: monospace; +} + +#filecontents pre.code, .docstring pre.code { + display: block; +} + +.source_code .lines { + padding-right: 12px; + color: #555; + text-align: right; +} + +#filecontents pre.code, .docstring pre.code, +.tags pre.example { + padding: 5px 12px; + margin-top: 4px; + border: 1px solid #eef; + background: #f5f5ff; +} + +pre.code { + color: #000; +} + +pre.code .info.file { + color: #555; +} + +pre.code .val { + color: #036A07; +} + +pre.code .tstring_content, +pre.code .heredoc_beg, pre.code .heredoc_end, +pre.code .qwords_beg, pre.code .qwords_end, +pre.code .tstring, pre.code .dstring { + color: #036A07; +} + +pre.code .fid, +pre.code .rubyid_new, +pre.code .rubyid_to_s, +pre.code .rubyid_to_sym, +pre.code .rubyid_to_f, +pre.code .rubyid_to_i, +pre.code .rubyid_each { + color: inherit; +} + +pre.code .comment { + color: #777; + font-style: italic; +} + +pre.code .const, pre.code .constant { + color: inherit; + font-weight: bold; + font-style: italic; +} + +pre.code .label, +pre.code .symbol { + color: #C5060B; +} + +pre.code .kw, +pre.code .rubyid_require, +pre.code .rubyid_extend, +pre.code .rubyid_include, +pre.code .int { + color: #0000FF; +} + +pre.code .ivar { + color: #660E7A; +} + +pre.code .gvar, +pre.code .rubyid_backref, +pre.code .rubyid_nth_ref { + color: #6D79DE; +} + +pre.code .regexp, .dregexp { + color: #036A07; +} + +pre.code a { + border-bottom: 1px dotted #bbf; +} + diff --git a/yard-template/default/layout/html/footer.erb b/yard-template/default/layout/html/footer.erb new file mode 100644 index 000000000..4f2a7108a --- /dev/null +++ b/yard-template/default/layout/html/footer.erb @@ -0,0 +1,15 @@ + + +