diff --git a/.travis.yml b/.travis.yml index 65caad865..ee8f210d4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -15,3 +15,4 @@ matrix: - rvm: ruby-head - rvm: jruby-head - rvm: 1.9.3 +script: "bundle exec rspec --color --backtrace --seed 1 --format documentation ./spec" diff --git a/.yardopts b/.yardopts index 7adf207f7..c9c064139 100644 --- a/.yardopts +++ b/.yardopts @@ -1 +1,6 @@ ---protected --no-private --embed-mixins --output-dir ./doc --markup markdown +--protected +--no-private +--embed-mixins +--output-dir ./doc +--markup markdown +--use-cache diff --git a/Gemfile b/Gemfile index 4d65462ed..feea5ee86 100644 --- a/Gemfile +++ b/Gemfile @@ -8,6 +8,7 @@ group :development do gem 'countloc', '~> 0.4.0', platforms: :mri gem 'yard', '~> 0.8.7.4' gem 'inch', '~> 0.4.1', platforms: :mri + gem 'redcarpet', platforms: :mri # understands github markdown end group :testing do diff --git a/README.md b/README.md index 6b477b23c..30336dc57 100644 --- a/README.md +++ b/README.md @@ -40,14 +40,16 @@ The design goals of this gem are: ```shell gem install concurrent-ruby ``` + or add the following line to Gemfile: ```ruby gem 'concurrent-ruby' ``` + and run `bundle install` from your shell. -*NOTE: There is an old gem from 2007 called "concurrent" that does not appear to be under active development. That isn't us. Please do not run* `gem install concurrent`*. It is not the droid you are looking for.* +_NOTE: There is an old gem from 2007 called "concurrent" that does not appear to be under active development. That isn't us. Please do not run* `gem install concurrent`*. It is not the droid you are looking for._ ## Features & Documentation @@ -74,6 +76,7 @@ into several general groups: * Thread synchronization classes and algorithms including [dataflow](https://github.com/jdantonio/concurrent-ruby/wiki/Dataflow), timeout, condition, countdown latch, dependency counter, and event * Java-inspired [thread pools](https://github.com/jdantonio/concurrent-ruby/wiki/Thread%20Pools) +* New fast light-weighted [Actor model](http://rubydoc.info/gems/concurrent-ruby/Concurrent/Actress) implementation. * And many more... ### Semantic Versioning @@ -91,7 +94,7 @@ It should be fully compatible with any interpreter that is compliant with Ruby 1 Many more code examples can be found in the documentation for each class (linked above). This one simple example shows some of the power of this gem. -```ruby +```ruby require 'concurrent' require 'thread' # for Queue require 'open-uri' # for open(uri) diff --git a/examples/actress/celluloid_benchmark.rb b/examples/actress/celluloid_benchmark.rb new file mode 100644 index 000000000..6444c0175 --- /dev/null +++ b/examples/actress/celluloid_benchmark.rb @@ -0,0 +1,96 @@ +require 'benchmark' +require 'concurrent/actress' +Concurrent::Actress.i_know_it_is_experimental! +require 'celluloid' +require 'celluloid/autostart' + +logger = Logger.new($stderr) +logger.level = Logger::INFO +Concurrent.configuration.logger = lambda do |level, progname, message = nil, &block| + logger.add level, message, progname, &block +end + +scale = 1 +ADD_TO = (100 * scale).to_i +counts_size = (500 * scale).to_i +adders_size = (500 * scale).to_i + +class Counter + include Celluloid + + def initialize(adders, i) + @adders = adders + @i = i + end + + def counting(count, ivar) + if count < ADD_TO + @adders[(@i+1) % @adders.size].counting count+1, ivar + else + ivar.set count + end + end +end + +threads = [] + +Benchmark.bmbm(10) do |b| + [2, adders_size, adders_size*2, adders_size*3].each do |adders_size| + + b.report(format('%5d %4d %s', ADD_TO*counts_size, adders_size, 'actress')) do + counts = Array.new(counts_size) { [0, Concurrent::IVar.new] } + adders = Array.new(adders_size) do |i| + Concurrent::Actress::AdHoc.spawn("adder#{i}") do + lambda do |(count, ivar)| + if count.nil? + terminate! + else + if count < ADD_TO + adders[(i+1) % adders_size].tell [count+1, ivar] + else + ivar.set count + end + end + end + end + end + + counts.each_with_index do |count, i| + adders[i % adders_size].tell count + end + + counts.each do |count, ivar| + raise unless ivar.value >= ADD_TO + end + + threads << Thread.list.size + + adders.each { |a| a << [nil, nil] } + end + + b.report(format('%5d %4d %s', ADD_TO*counts_size, adders_size, 'celluloid')) do + counts = [] + counts_size.times { counts << [0, Concurrent::IVar.new] } + + adders = [] + adders_size.times do |i| + adders << Counter.new(adders, i) + end + + counts.each_with_index do |count, i| + adders[i % adders_size].counting *count + end + + counts.each do |count, ivar| + raise unless ivar.value >= ADD_TO + end + + threads << Thread.list.size + + adders.each(&:terminate) + end + end +end + +p threads + diff --git a/examples/actress/format.rb b/examples/actress/format.rb new file mode 100644 index 000000000..3bd7287e7 --- /dev/null +++ b/examples/actress/format.rb @@ -0,0 +1,76 @@ +require 'rubygems' +require 'bundler/setup' +require 'pry' +require 'pp' + +input_paths = if ARGV.empty? + Dir.glob("#{File.dirname(__FILE__)}/*.in.rb") + else + ARGV + end.map { |p| File.expand_path p } + +input_paths.each_with_index do |input_path, i| + + pid = fork do + require 'concurrent/actress' + Concurrent::Actress.i_know_it_is_experimental! + + begin + output_path = input_path.gsub /\.in\.rb$/, '.out.rb' + input = File.readlines(input_path) + + chunks = [] + line = '' + + while !input.empty? + line += input.shift + if Pry::Code.complete_expression? line + chunks << line + line = '' + end + end + + raise unless line.empty? + + chunks.map! { |chunk| [chunk, [chunk.split($/).size, 1].max] } + environment = Module.new.send :binding + evaluate = ->(code, line) do + eval(code, environment, input_path, line) + end + + indent = 50 + + line_count = 1 + output = '' + chunks.each do |chunk, lines| + result = evaluate.(chunk, line_count) + unless chunk.strip.empty? || chunk =~ /\A *#/ + pre_lines = chunk.lines.to_a + last_line = pre_lines.pop + output << pre_lines.join + + if last_line =~ /\#$/ + output << last_line.gsub(/\#$/, '') + else + if last_line.size < indent && result.inspect.size < indent + output << "%-#{indent}s %s" % [last_line.chomp, "# => #{result.inspect}\n"] + else + output << last_line << " # => #{result.inspect}\n" + end + end + else + output << chunk + end + line_count += lines + end + + puts "#{input_path}\n -> #{output_path}" + #puts output + File.write(output_path, output) + rescue => ex + puts "#{ex} (#{ex.class})\n#{ex.backtrace * "\n"}" + end + end + + Process.wait pid +end diff --git a/examples/actress/quick.in.rb b/examples/actress/quick.in.rb new file mode 100644 index 000000000..e56f0dd7a --- /dev/null +++ b/examples/actress/quick.in.rb @@ -0,0 +1,84 @@ +class Counter + # Include context of an actor which gives this class access to reference and other information + # about the actor, see CoreDelegations. + include Concurrent::Actress::Context + + # use initialize as you wish + def initialize(initial_value) + @count = initial_value + end + + # override on_message to define actor's behaviour + def on_message(message) + case message + when Integer + @count += message + when :terminate + terminate! + else + raise 'unknown' + end + end +end + +# Create new actor naming the instance 'first'. +# Return value is a reference to the actor, the actual actor is never returned. +counter = Counter.spawn(:first, 5) + +# Tell a message and forget returning self. +counter.tell(1) +counter << 1 +# (First counter now contains 7.) + +# Send a messages asking for a result. +counter.ask(0).class +counter.ask(0).value + +# Terminate the actor. +counter.tell(:terminate) +# Not terminated yet, it takes a while until the message is processed. +counter.terminated? +# Waiting for the termination. +counter.terminated.class +counter.terminated.wait +counter.terminated? +# Any subsequent messages are rejected. +counter.ask(5).wait.rejected? + +# Failure on message processing terminates the actor. +counter = Counter.spawn(:first, 0) +counter.ask('boom').wait.rejected? +counter.terminated? + + +# Lets define an actor creating children actors. +class Node + include Concurrent::Actress::Context + + def on_message(message) + case message + when :new_child + spawn self.class, :child + when :children + children + when :terminate + terminate! + else + raise 'unknown' + end + end +end + +# Actors are tracking parent-child relationships +parent = Node.spawn :parent +child = parent.ask!(:new_child) +child.parent +parent.ask!(:children) + +# There is a special root actor which is used for all actors spawned outside any actor. +parent.parent + +# Termination of an parent will also terminate all children. +parent.ask('boom').wait # +parent.terminated? +child.terminated? diff --git a/examples/actress/quick.out.rb b/examples/actress/quick.out.rb new file mode 100644 index 000000000..84b2efb8e --- /dev/null +++ b/examples/actress/quick.out.rb @@ -0,0 +1,92 @@ +class Counter + # Include context of an actor which gives this class access to reference and other information + # about the actor, see CoreDelegations. + include Concurrent::Actress::Context + + # use initialize as you wish + def initialize(initial_value) + @count = initial_value + end + + # override on_message to define actor's behaviour + def on_message(message) + case message + when Integer + @count += message + when :terminate + terminate! + else + raise 'unknown' + end + end +end # => :on_message + +# Create new actor naming the instance 'first'. +# Return value is a reference to the actor, the actual actor is never returned. +counter = Counter.spawn(:first, 5) + # => # + +# Tell a message and forget returning self. +counter.tell(1) + # => # +counter << 1 + # => # +# (First counter now contains 7.) + +# Send a messages asking for a result. +counter.ask(0).class # => Concurrent::IVar +counter.ask(0).value # => 7 + +# Terminate the actor. +counter.tell(:terminate) + # => # +# Not terminated yet, it takes a while until the message is processed. +counter.terminated? # => false +# Waiting for the termination. +counter.terminated.class # => Concurrent::Event +counter.terminated.wait # => true +counter.terminated? # => true +# Any subsequent messages are rejected. +counter.ask(5).wait.rejected? # => true + +# Failure on message processing terminates the actor. +counter = Counter.spawn(:first, 0) + # => # +counter.ask('boom').wait.rejected? # => true +counter.terminated? # => true + + +# Lets define an actor creating children actors. +class Node + include Concurrent::Actress::Context + + def on_message(message) + case message + when :new_child + spawn self.class, :child + when :children + children + when :terminate + terminate! + else + raise 'unknown' + end + end +end # => :on_message + +# Actors are tracking parent-child relationships +parent = Node.spawn :parent # => # +child = parent.ask!(:new_child) + # => # +child.parent # => # +parent.ask!(:children) + # => [#] + +# There is a special root actor which is used for all actors spawned outside any actor. +parent.parent + # => # + +# Termination of an parent will also terminate all children. +parent.ask('boom').wait +parent.terminated? # => true +child.terminated? # => true diff --git a/lib/concurrent/actress.rb b/lib/concurrent/actress.rb index 2013def07..9008a9437 100644 --- a/lib/concurrent/actress.rb +++ b/lib/concurrent/actress.rb @@ -1,11 +1,222 @@ require 'concurrent/configuration' -require 'concurrent/executor/one_by_one' +require 'concurrent/executor/serialized_execution' require 'concurrent/ivar' require 'concurrent/logging' module Concurrent - # {include:file:lib/concurrent/actress/doc.md} + # # Actor model + # + # - Light-weighted. + # - Inspired by Akka and Erlang. + # + # Actors are sharing a thread-pool by default which makes them very cheap to create and discard. + # Thousands of actors can be created allowing to brake the program to small maintainable pieces + # without breaking single responsibility principles. + # + # ## What is an actor model? + # + # [Wiki](http://en.wikipedia.org/wiki/Actor_model) says: + # The actor model in computer science is a mathematical model of concurrent computation + # that treats _actors_ as the universal primitives of concurrent digital computation: + # in response to a message that it receives, an actor can make local decisions, + # create more actors, send more messages, and determine how to respond to the next + # message received. + # + # ## Why? + # + # Concurrency is hard this is one of many ways how to simplify the problem. + # It is simpler to reason about actors then about locks (and all their possible states). + # + # ## How to use it + # + # class Counter + # # Include context of an actor which gives this class access to reference and other information + # # about the actor, see CoreDelegations. + # include Concurrent::Actress::Context + # + # # use initialize as you wish + # def initialize(initial_value) + # @count = initial_value + # end + # + # # override on_message to define actor's behaviour + # def on_message(message) + # case message + # when Integer + # @count += message + # when :terminate + # terminate! + # else + # raise 'unknown' + # end + # end + # end # => :on_message + # + # # Create new actor naming the instance 'first'. + # # Return value is a reference to the actor, the actual actor is never returned. + # counter = Counter.spawn(:first, 5) + # # => # + # + # # Tell a message and forget returning self. + # counter.tell(1) + # # => # + # counter << 1 + # # => # + # # (First counter now contains 7.) + # + # # Send a messages asking for a result. + # counter.ask(0).class # => Concurrent::IVar + # counter.ask(0).value # => 7 + # + # # Terminate the actor. + # counter.tell(:terminate) + # # => # + # # Not terminated yet, it takes a while until the message is processed. + # counter.terminated? # => false + # # Waiting for the termination. + # counter.terminated.class # => Concurrent::Event + # counter.terminated.wait # => true + # counter.terminated? # => true + # # Any subsequent messages are rejected. + # counter.ask(5).wait.rejected? # => true + # + # # Failure on message processing terminates the actor. + # counter = Counter.spawn(:first, 0) + # # => # + # counter.ask('boom').wait.rejected? # => true + # counter.terminated? # => true + # + # + # # Lets define an actor creating children actors. + # class Node + # include Concurrent::Actress::Context + # + # def on_message(message) + # case message + # when :new_child + # spawn self.class, :child + # when :children + # children + # when :terminate + # terminate! + # else + # raise 'unknown' + # end + # end + # end # => :on_message + # + # # Actors are tracking parent-child relationships + # parent = Node.spawn :parent # => # + # child = parent.ask!(:new_child) + # # => # + # child.parent # => # + # parent.ask!(:children) + # # => [#] + # + # # There is a special root actor which is used for all actors spawned outside any actor. + # parent.parent + # # => # + # + # # Termination of an parent will also terminate all children. + # parent.ask('boom').wait + # parent.terminated? # => true + # child.terminated? # => true + # + # ## Messaging + # + # There is a contract that messages send between actors should be immutable. Gems like + # + # - [Algebrick](https://github.com/pitr-ch/algebrick) - Typed struct on steroids based on + # algebraic types and pattern matching + # - [Hamster](https://github.com/hamstergem/hamster) - Efficient, Immutable, Thread-Safe + # Collection classes for Ruby + # + # are very useful. + # + # ## Architecture + # + # Actors are running on shared thread poll which allows user to create many actors cheaply. + # Downside is that these actors cannot be directly used to do IO or other blocking operations. + # Blocking operations could starve the `default_task_pool`. However there are two options: + # + # - Create an regular actor which will schedule blocking operations in `global_operation_pool` + # (which is intended for blocking operations) sending results back to self in messages. + # - Create an actor using `global_operation_pool` instead of `global_task_pool`, e.g. + # `AnIOActor.spawn name: :blocking, executor: Concurrent.configuration.global_operation_pool`. + # + # Each actor is composed from 3 objects: + # + # ### {Reference} + # {include:Actress::Reference} + # + # ### {Core} + # {include:Actress::Core} + # + # ### {Context} + # {include:Actress::Context} + # + # ## Speed + # + # Simple benchmark Actress vs Celluloid, the numbers are looking good + # but you know how it is with benchmarks. Source code is in + # `examples/actress/celluloid_benchmark.rb`. It sends numbers between x actors + # and adding 1 until certain limit is reached. + # + # Benchmark legend: + # + # - mes. - number of messages send between the actors + # - act. - number of actors exchanging the messages + # - impl. - which gem is used + # + # ### JRUBY + # + # Rehearsal -------------------------------------------------------- + # 50000 2 actress 24.110000 0.800000 24.910000 ( 7.728000) + # 50000 2 celluloid 28.510000 4.780000 33.290000 ( 14.782000) + # 50000 500 actress 13.700000 0.280000 13.980000 ( 4.307000) + # 50000 500 celluloid 14.520000 11.740000 26.260000 ( 12.258000) + # 50000 1000 actress 10.890000 0.220000 11.110000 ( 3.760000) + # 50000 1000 celluloid 15.600000 21.690000 37.290000 ( 18.512000) + # 50000 1500 actress 10.580000 0.270000 10.850000 ( 3.646000) + # 50000 1500 celluloid 14.490000 29.790000 44.280000 ( 26.043000) + # --------------------------------------------- total: 201.970000sec + # + # mes. act. impl. user system total real + # 50000 2 actress 9.820000 0.510000 10.330000 ( 5.735000) + # 50000 2 celluloid 10.390000 4.030000 14.420000 ( 7.494000) + # 50000 500 actress 9.880000 0.200000 10.080000 ( 3.310000) + # 50000 500 celluloid 12.430000 11.310000 23.740000 ( 11.727000) + # 50000 1000 actress 10.590000 0.190000 10.780000 ( 4.029000) + # 50000 1000 celluloid 14.950000 23.260000 38.210000 ( 20.841000) + # 50000 1500 actress 10.710000 0.250000 10.960000 ( 3.892000) + # 50000 1500 celluloid 13.280000 30.030000 43.310000 ( 24.620000) (1) + # + # ### MRI 2.1.0 + # + # Rehearsal -------------------------------------------------------- + # 50000 2 actress 4.640000 0.080000 4.720000 ( 4.852390) + # 50000 2 celluloid 6.110000 2.300000 8.410000 ( 7.898069) + # 50000 500 actress 6.260000 2.210000 8.470000 ( 7.400573) + # 50000 500 celluloid 10.250000 4.930000 15.180000 ( 14.174329) + # 50000 1000 actress 6.300000 1.860000 8.160000 ( 7.303162) + # 50000 1000 celluloid 12.300000 7.090000 19.390000 ( 17.962621) + # 50000 1500 actress 7.410000 2.610000 10.020000 ( 8.887396) + # 50000 1500 celluloid 14.850000 10.690000 25.540000 ( 24.489796) + # ---------------------------------------------- total: 99.890000sec + # + # mes. act. impl. user system total real + # 50000 2 actress 4.190000 0.070000 4.260000 ( 4.306386) + # 50000 2 celluloid 6.490000 2.210000 8.700000 ( 8.280051) + # 50000 500 actress 7.060000 2.520000 9.580000 ( 8.518707) + # 50000 500 celluloid 10.550000 4.980000 15.530000 ( 14.699962) + # 50000 1000 actress 6.440000 1.870000 8.310000 ( 7.571059) + # 50000 1000 celluloid 12.340000 7.510000 19.850000 ( 18.793591) + # 50000 1500 actress 6.720000 2.160000 8.880000 ( 7.929630) + # 50000 1500 celluloid 14.140000 10.130000 24.270000 ( 22.775288) (1) + # + # *Note (1):* Celluloid is using thread per actor so this bench is creating about 1500 + # native threads. Actress is using constant number of threads. module Actress require 'concurrent/actress/type_check' @@ -39,10 +250,26 @@ def on_message(message) # A root actor, a default parent of all actors spawned outside an actor ROOT = Core.new(parent: nil, name: '/', class: Root).reference + # Spawns a new actor. + # + # @example simple + # Actress.spawn(AdHoc, :ping1) { -> message { message } } + # + # @example complex + # Actress.spawn name: :ping3, + # class: AdHoc, + # args: [1] + # executor: Concurrent.configuration.global_task_pool do |add| + # lambda { |number| number + add } + # end + # # @param block for actress_class instantiation # @param args see {.spawn_optionify} + # @return [Reference] never the actual actor def self.spawn(*args, &block) - warn '[EXPERIMENTAL] A full release of `Actress`, renamed `Actor`, is expected in the 0.7.0 release.' + experimental_acknowledged? or + warn '[EXPERIMENTAL] A full release of `Actress`, renamed `Actor`, is expected in the 0.7.0 release.' + if Actress.current Core.new(spawn_optionify(*args).merge(parent: Actress.current), &block).reference else @@ -52,7 +279,6 @@ def self.spawn(*args, &block) # as {.spawn} but it'll raise when Actor not initialized properly def self.spawn!(*args, &block) - warn '[EXPERIMENTAL] A full release of `Actress`, renamed `Actor`, is expected in the 0.7.0 release.' spawn(spawn_optionify(*args).merge(initialized: ivar = IVar.new), &block).tap { ivar.no_error! } end @@ -71,5 +297,14 @@ def self.spawn_optionify(*args) args: args[2..-1] } end end + + # call this to disable experimental warning + def self.i_know_it_is_experimental! + @experimental_acknowledged = true + end + + def self.experimental_acknowledged? + !!@experimental_acknowledged + end end end diff --git a/lib/concurrent/actress/ad_hoc.rb b/lib/concurrent/actress/ad_hoc.rb index d4b49a386..f4c237f85 100644 --- a/lib/concurrent/actress/ad_hoc.rb +++ b/lib/concurrent/actress/ad_hoc.rb @@ -1,5 +1,11 @@ module Concurrent module Actress + # Allows quick creation of actors with behaviour defined by blocks. + # @example ping + # AdHoc.spawn :forward, an_actor do |where| + # # this block has to return proc defining #on_message behaviour + # -> message { where.tell message } + # end class AdHoc include Context def initialize(*args, &initializer) diff --git a/lib/concurrent/actress/context.rb b/lib/concurrent/actress/context.rb index b5b48ec22..49a86ee3b 100644 --- a/lib/concurrent/actress/context.rb +++ b/lib/concurrent/actress/context.rb @@ -1,7 +1,8 @@ module Concurrent module Actress - # module used to define actor behaviours + # This module is used to define actors. It can be included in any class, + # only requirement is to override {Context#on_message} method. # @example ping # class Ping # include Context @@ -26,10 +27,6 @@ def on_message(message) raise NotImplementedError end - def logger - core.logger - end - # @api private def on_envelope(envelope) @envelope = envelope @@ -53,9 +50,14 @@ def terminate! core.terminate! end + # delegates to core.log + # @see Logging#log + def log(level, progname, message = nil, &block) + core.log(level, progname, message, &block) + end + private - # @api private def initialize_core(core) @core = Type! core, Core end @@ -71,12 +73,12 @@ def self.included(base) end module ClassMethods - # behaves as {Actress.spawn} but class_name is omitted + # behaves as {Concurrent::Actress.spawn} but class_name is auto-inserted based on receiver def spawn(name_or_opts, *args, &block) Actress.spawn spawn_optionify(name_or_opts, *args), &block end - # behaves as {Actress.spawn!} but class_name is omitted + # behaves as {Concurrent::Actress.spawn!} but class_name is auto-inserted based on receiver def spawn!(name_or_opts, *args, &block) Actress.spawn! spawn_optionify(name_or_opts, *args), &block end diff --git a/lib/concurrent/actress/core.rb b/lib/concurrent/actress/core.rb index f51b8855c..42b009db1 100644 --- a/lib/concurrent/actress/core.rb +++ b/lib/concurrent/actress/core.rb @@ -11,7 +11,7 @@ class Core include TypeCheck include Concurrent::Logging - attr_reader :reference, :name, :path, :executor, :terminated + attr_reader :reference, :name, :path, :executor, :terminated, :actor_class # @option opts [String] name # @option opts [Reference, nil] parent of an actor spawning this one @@ -23,14 +23,14 @@ class Core # can be used to hook actor instance to any logging system # @param [Proc] block for class instantiation def initialize(opts = {}, &block) - @mailbox = Array.new - @one_by_one = OneByOne.new + @mailbox = Array.new + @serialized_execution = SerializedExecution.new # noinspection RubyArgCount - @terminated = Event.new - @executor = Type! opts.fetch(:executor, Concurrent.configuration.global_task_pool), Executor - @children = Set.new - @reference = Reference.new self - @name = (Type! opts.fetch(:name), String, Symbol).to_s + @terminated = Event.new + @executor = Type! opts.fetch(:executor, Concurrent.configuration.global_task_pool), Executor + @children = Set.new + @reference = Reference.new self + @name = (Type! opts.fetch(:name), String, Symbol).to_s parent = opts[:parent] @parent_core = (Type! parent, Reference, NilClass) && parent.send(:core) @@ -43,9 +43,9 @@ def initialize(opts = {}, &block) @parent_core.add_child reference if @parent_core - @actress_class = actress_class = Child! opts.fetch(:class), Context - args = opts.fetch(:args, []) - initialized = Type! opts[:initialized], IVar, NilClass + @actor_class = actress_class = Child! opts.fetch(:class), Context + args = opts.fetch(:args, []) + initialized = Type! opts[:initialized], IVar, NilClass schedule_execution do begin @@ -113,6 +113,8 @@ def terminated? # Terminates all its children, does not wait until they are terminated. def terminate! guard! + return nil if terminated? + @children.each do |ch| ch.send(:core).tap { |core| core.send(:schedule_execution) { core.terminate! } } end @@ -178,7 +180,7 @@ def receive_envelope # Schedules blocks to be executed on executor sequentially, # sets Actress.current def schedule_execution - @one_by_one.post(@executor) do + @serialized_execution.post(@executor) do begin Thread.current[:__current_actress__] = reference yield diff --git a/lib/concurrent/actress/core_delegations.rb b/lib/concurrent/actress/core_delegations.rb index 4a0144bcb..4dd4d134e 100644 --- a/lib/concurrent/actress/core_delegations.rb +++ b/lib/concurrent/actress/core_delegations.rb @@ -31,7 +31,12 @@ def executor core.executor end + def actor_class + core.actor_class + end + alias_method :ref, :reference + alias_method :actress_class, :actor_class end end end diff --git a/lib/concurrent/actress/doc.md b/lib/concurrent/actress/doc.md deleted file mode 100644 index e47248094..000000000 --- a/lib/concurrent/actress/doc.md +++ /dev/null @@ -1,53 +0,0 @@ -# Light-weighted implement of Actors. Inspired by Akka and Erlang. - -Actors are using a thread-pool by default which makes them very cheap to create and discard. -Thousands of actors can be created allowing to brake the program to small maintainable pieces -without breaking single responsibility principles. - -## Quick example - - class Counter - include Context - - def initialize(initial_value) - @count = initial_value - end - - def on_message(message) - case message - when Integer - @count += message - when :terminate - terminate! - else - raise 'unknown' - end - end - end - - # create new actor - counter = Counter.spawn(:test_counter, 5) # => a Reference - - # send messages - counter.tell(1) # => counter - counter << 1 # => counter - - # send messages getting an IVar back for synchronization - counter.ask(0) # => an ivar - counter.ask(0).value # => 7 - - # terminate the actor - counter.ask(:terminate).wait - counter.terminated? # => true - counter.ask(5).wait.rejected? # => true - - # failure on message processing will terminate the actor - counter = Counter.spawn(:test_counter, 0) - counter.ask('boom').wait.rejected? # => true - counter.terminated? # => true - - - - - - diff --git a/lib/concurrent/actress/reference.rb b/lib/concurrent/actress/reference.rb index a20ad5066..e9aecf052 100644 --- a/lib/concurrent/actress/reference.rb +++ b/lib/concurrent/actress/reference.rb @@ -2,8 +2,8 @@ module Concurrent module Actress # Reference is public interface of Actor instances. It is used for sending messages and can - # be freely passed around the program. It also provides some basic information about the actor - # see {CoreDelegations} + # be freely passed around the program. It also provides some basic information about the actor, + # see {CoreDelegations}. class Reference include TypeCheck include CoreDelegations @@ -43,14 +43,14 @@ def ask!(message, ivar = IVar.new) ask(message, ivar).value! end - # behaves as #tell when no ivar and as #ask when ivar + # behaves as {#tell} when no ivar and as {#ask} when ivar def message(message, ivar = nil) core.on_envelope Envelope.new(message, ivar, Actress.current || Thread.current) return ivar || self end def to_s - "#<#{self.class} #{path}>" + "#<#{self.class} #{path} (#{actor_class})>" end alias_method :inspect, :to_s diff --git a/lib/concurrent/agent.rb b/lib/concurrent/agent.rb index 24a3e729f..1b6e177a4 100644 --- a/lib/concurrent/agent.rb +++ b/lib/concurrent/agent.rb @@ -62,14 +62,14 @@ class Agent # @option opts [String] :copy_on_deref (nil) call the given `Proc` passing the internal value and # returning the value returned from the proc def initialize(initial, opts = {}) - @value = initial - @rescuers = [] - @validator = Proc.new { |result| true } - @timeout = opts.fetch(:timeout, TIMEOUT).freeze - self.observers = CopyOnWriteObserverSet.new - @one_by_one = OneByOne.new - @task_executor = OptionsParser.get_task_executor_from(opts) - @operation_executor = OptionsParser.get_operation_executor_from(opts) + @value = initial + @rescuers = [] + @validator = Proc.new { |result| true } + @timeout = opts.fetch(:timeout, TIMEOUT).freeze + self.observers = CopyOnWriteObserverSet.new + @serialized_execution = SerializedExecution.new + @task_executor = OptionsParser.get_task_executor_from(opts) + @operation_executor = OptionsParser.get_operation_executor_from(opts) init_mutex set_deref_options(opts) end @@ -180,7 +180,7 @@ def await(timeout = nil) def post_on(executor, &block) return nil if block.nil? - @one_by_one.post(executor) { work(&block) } + @serialized_execution.post(executor) { work(&block) } true end diff --git a/lib/concurrent/configuration.rb b/lib/concurrent/configuration.rb index 138ce2e64..d47b88696 100644 --- a/lib/concurrent/configuration.rb +++ b/lib/concurrent/configuration.rb @@ -1,6 +1,7 @@ require 'thread' require 'concurrent/delay' require 'concurrent/errors' +require 'concurrent/atomic/atomic' require 'concurrent/executor/thread_pool_executor' require 'concurrent/executor/timer_set' require 'concurrent/utility/processor_count' @@ -111,8 +112,12 @@ def new_operation_pool end # create the default configuration on load - @configuration = Configuration.new - singleton_class.send :attr_reader, :configuration + @configuration = Atomic.new Configuration.new + + # @return [Configuration] + def self.configuration + @configuration.value + end # Perform gem-level configuration. # diff --git a/lib/concurrent/executor/one_by_one.rb b/lib/concurrent/executor/serialized_execution.rb similarity index 73% rename from lib/concurrent/executor/one_by_one.rb rename to lib/concurrent/executor/serialized_execution.rb index a86e421f7..952ea2785 100644 --- a/lib/concurrent/executor/one_by_one.rb +++ b/lib/concurrent/executor/serialized_execution.rb @@ -1,8 +1,10 @@ +require 'concurrent/logging' + module Concurrent - # Ensures that jobs are passed to the given executors one by one, - # never running at the same time. - class OneByOne + # Ensures passed jobs in a serialized order never running at the same time. + class SerializedExecution + include Logging Job = Struct.new(:executor, :args, :block) do def call @@ -30,10 +32,6 @@ def initialize # @raise [ArgumentError] if no task is given def post(executor, *args, &task) return nil if task.nil? - # FIXME Agent#send-off will blow up here - # if executor.can_overflow? - # raise ArgumentError, 'OneByOne cannot be used in conjunction with executor which may overflow' - # end job = Job.new executor, args, task @@ -56,7 +54,22 @@ def post(executor, *args, &task) private def call_job(job) - job.executor.post { work(job) } + did_it_run = begin + job.executor.post { work(job) } + true + rescue RejectedExecutionError => ex + false + end + + # TODO not the best idea to run it myself + unless did_it_run + begin + work job + rescue => ex + # let it fail + log DEBUG, ex + end + end end # ensures next job is executed if any is stashed diff --git a/lib/concurrent/executors.rb b/lib/concurrent/executors.rb index ff6ced453..86f0de038 100644 --- a/lib/concurrent/executors.rb +++ b/lib/concurrent/executors.rb @@ -6,4 +6,4 @@ require 'concurrent/executor/single_thread_executor' require 'concurrent/executor/thread_pool_executor' require 'concurrent/executor/timer_set' -require 'concurrent/executor/one_by_one' +require 'concurrent/executor/serialized_execution' diff --git a/lib/concurrent/logging.rb b/lib/concurrent/logging.rb index 802db21f6..50960bbe1 100644 --- a/lib/concurrent/logging.rb +++ b/lib/concurrent/logging.rb @@ -9,7 +9,7 @@ module Logging # @param [Integer] level one of Logger::Severity constants # @param [String] progname e.g. a path of an Actor # @param [String, nil] message when nil block is used to generate the message - # @yields_return [String] a message + # @yieldreturn [String] a message def log(level, progname, message = nil, &block) (@logger || Concurrent.configuration.logger).call level, progname, message, &block end diff --git a/lib/concurrent/observable.rb b/lib/concurrent/observable.rb index 923381eb0..b1aaf0c26 100644 --- a/lib/concurrent/observable.rb +++ b/lib/concurrent/observable.rb @@ -10,7 +10,7 @@ def add_observer(*args, &block) observers.add_observer(*args, &block) end - # as #add_observer but it can be used for chaning + # as #add_observer but it can be used for chaining # @return [Observable] self def with_observer(*args, &block) add_observer *args, &block diff --git a/spec/concurrent/actress_spec.rb b/spec/concurrent/actress_spec.rb index a1dde4913..0099ceaee 100644 --- a/spec/concurrent/actress_spec.rb +++ b/spec/concurrent/actress_spec.rb @@ -3,7 +3,27 @@ module Concurrent module Actress + i_know_it_is_experimental! + + class Reference + def backdoor(&block) + core.send :schedule_execution do + core.instance_eval &block + end + end + end + describe 'Concurrent::Actress' do + prepend_before do + do_no_reset! + end + + def terminate_actors(*actors) + actors.each do |actor| + actor.backdoor { terminate! } + actor.terminated.wait + end + end class Ping include Context @@ -35,82 +55,80 @@ def on_message(message) # set_trace_func nil # end - #describe 'stress test' do - #pending('may cause deadlock which prevents test run from completing.') - #1.times do |i| - #it format('run %3d', i) do - ## puts format('run %3d', i) - #Array.new(10).map do - #Thread.new do - #10.times do - ## trace! do - #queue = Queue.new - #actor = Ping.spawn :ping, queue - - ## when spawn returns children are set - #Concurrent::Actress::ROOT.send(:core).instance_variable_get(:@children).should include(actor) - - #actor << 'a' << 1 - #queue.pop.should eq 'a' - #actor.ask(2).value.should eq 2 - - #actor.parent.should eq Concurrent::Actress::ROOT - #Concurrent::Actress::ROOT.path.should eq '/' - #actor.path.should eq '/ping' - #child = actor.ask(:child).value - #child.path.should eq '/ping/pong' - #queue.clear - #child.ask(3) - #queue.pop.should eq 3 - - #actor << :terminate - #actor.ask(:blow_up).wait.should be_rejected - #end - #end - #end.each(&:join) - #end - #end - #end + describe 'stress test' do + 1.times do |i| + it format('run %3d', i) do + # puts format('run %3d', i) + Array.new(10).map do + Thread.new do + 10.times do + # trace! do + queue = Queue.new + actor = Ping.spawn :ping, queue + + # when spawn returns children are set + Concurrent::Actress::ROOT.send(:core).instance_variable_get(:@children).should include(actor) + + actor << 'a' << 1 + queue.pop.should eq 'a' + actor.ask(2).value.should eq 2 + + actor.parent.should eq Concurrent::Actress::ROOT + Concurrent::Actress::ROOT.path.should eq '/' + actor.path.should eq '/ping' + child = actor.ask(:child).value + child.path.should eq '/ping/pong' + queue.clear + child.ask(3) + queue.pop.should eq 3 + + actor << :terminate + actor.ask(:blow_up).wait.should be_rejected + terminate_actors actor, child + end + end + end.each(&:join) + end + end + end describe 'spawning' do - #describe 'Actress#spawn' do - #behaviour = -> v { -> _ { v } } - #subjects = { spawn: -> { Actress.spawn(AdHoc, :ping, 'arg', &behaviour) }, - #context_spawn: -> { AdHoc.spawn(:ping, 'arg', &behaviour) }, - #spawn_by_hash: -> { Actress.spawn(class: AdHoc, name: :ping, args: ['arg'], &behaviour) }, - #context_spawn_by_hash: -> { AdHoc.spawn(name: :ping, args: ['arg'], &behaviour) } } - - #subjects.each do |desc, subject_definition| - #describe desc do - #subject &subject_definition - #its(:path) { pending('may cause deadlock which prevents test run from completing.'); should eq '/ping' } - #its(:parent) { pending('may cause deadlock which prevents test run from completing.'); should eq ROOT } - #its(:name) { pending('may cause deadlock which prevents test run from completing.'); should eq 'ping' } - #its(:executor) { pending('may cause deadlock which prevents test run from completing.'); should eq Concurrent.configuration.global_task_pool } - #its(:reference) { pending('may cause deadlock which prevents test run from completing.'); should eq subject } - #it 'returns ars' do - #subject.ask!(:anything).should eq 'arg' - #end - #end - #end - #end + describe 'Actress#spawn' do + behaviour = -> v { -> _ { v } } + subjects = { spawn: -> { Actress.spawn(AdHoc, :ping, 'arg', &behaviour) }, + context_spawn: -> { AdHoc.spawn(:ping, 'arg', &behaviour) }, + spawn_by_hash: -> { Actress.spawn(class: AdHoc, name: :ping, args: ['arg'], &behaviour) }, + context_spawn_by_hash: -> { AdHoc.spawn(name: :ping, args: ['arg'], &behaviour) } } + + subjects.each do |desc, subject_definition| + describe desc do + subject &subject_definition + after { terminate_actors subject } + its(:path) { should eq '/ping' } + its(:parent) { should eq ROOT } + its(:name) { should eq 'ping' } + it('executor should be global') { subject.executor.should eq Concurrent.configuration.global_task_pool } + its(:reference) { should eq subject } + it 'returns ars' do + subject.ask!(:anything).should eq 'arg' + end + end + end + end it 'terminates on failed initialization' do - pending('may cause deadlock which prevents test run from completing.') a = AdHoc.spawn(name: :fail, logger: Concurrent.configuration.no_logger) { raise } a.ask(nil).wait.rejected?.should be_true a.terminated?.should be_true end it 'terminates on failed initialization and raises with spawn!' do - pending('may cause deadlock which prevents test run from completing.') expect do AdHoc.spawn!(name: :fail, logger: Concurrent.configuration.no_logger) { raise 'm' } end.to raise_error(StandardError, 'm') end it 'terminates on failed message processing' do - pending('may cause deadlock which prevents test run from completing.') a = AdHoc.spawn(name: :fail, logger: Concurrent.configuration.no_logger) { -> _ { raise } } a.ask(nil).wait.rejected?.should be_true a.terminated?.should be_true @@ -120,11 +138,11 @@ def on_message(message) describe 'messaging' do subject { AdHoc.spawn(:add) { c = 0; -> v { c = c + v } } } specify do - pending('may cause deadlock which prevents test run from completing.') subject.tell(1).tell(1) subject << 1 << 1 subject.ask(0).value!.should eq 4 end + after { terminate_actors subject } end describe 'children' do @@ -141,23 +159,24 @@ def on_message(message) end it 'has children set after a child is created' do - pending('may cause deadlock which prevents test run from completing.') child = parent.ask!(:child) parent.ask!(nil).should include(child) child.ask!(nil).should eq parent + + terminate_actors parent, child end end describe 'envelope' do subject { AdHoc.spawn(:subject) { -> _ { envelope } } } specify do - pending('may cause deadlock which prevents test run from completing.') envelope = subject.ask!('a') envelope.should be_a_kind_of Envelope envelope.message.should eq 'a' envelope.ivar.should be_completed envelope.ivar.value.should eq envelope envelope.sender.should eq Thread.current + terminate_actors subject end end @@ -176,13 +195,14 @@ def on_message(message) end it 'terminates with all its children' do - pending('may cause deadlock which prevents test run from completing.') child = subject.ask! :child subject.terminated?.should be_false subject.ask(:terminate).wait subject.terminated?.should be_true child.terminated.wait child.terminated?.should be_true + + terminate_actors subject, child end end diff --git a/spec/concurrent/agent_spec.rb b/spec/concurrent/agent_spec.rb index 50df2e58d..595396942 100644 --- a/spec/concurrent/agent_spec.rb +++ b/spec/concurrent/agent_spec.rb @@ -20,10 +20,9 @@ module Concurrent end context '#send_off' do - subject { Agent.new 2 } + subject { Agent.new 2, executor: executor } it 'executes post and post-off in order' do - pending 'may cause deadlock' subject.post { |v| v + 2 } subject.post_off { |v| v * 3 } subject.await @@ -165,7 +164,7 @@ def trigger_observable(observable) subject.post { nil } sleep(0.1) subject. - instance_variable_get(:@one_by_one). + instance_variable_get(:@serialized_execution). instance_variable_get(:@stash). size.should eq 2 end diff --git a/spec/concurrent/configuration_spec.rb b/spec/concurrent/configuration_spec.rb index 0e8b2ca0f..b2090f0aa 100644 --- a/spec/concurrent/configuration_spec.rb +++ b/spec/concurrent/configuration_spec.rb @@ -3,8 +3,6 @@ module Concurrent describe Configuration do - with_full_reset - it 'creates a global timer pool' do Concurrent.configuration.global_timer_set.should_not be_nil Concurrent.configuration.global_timer_set.should respond_to(:post) diff --git a/spec/concurrent/scheduled_task_spec.rb b/spec/concurrent/scheduled_task_spec.rb index 4922e5805..2757e7d2c 100644 --- a/spec/concurrent/scheduled_task_spec.rb +++ b/spec/concurrent/scheduled_task_spec.rb @@ -7,8 +7,6 @@ module Concurrent describe ScheduledTask do - with_full_reset - context 'behavior' do # obligation @@ -56,9 +54,9 @@ def execute_dereferenceable(subject) it_should_behave_like :dereferenceable # observable - + subject{ ScheduledTask.new(0.1){ nil } } - + def trigger_observable(observable) observable.execute sleep(0.2) diff --git a/spec/concurrent/timer_task_spec.rb b/spec/concurrent/timer_task_spec.rb index 6b47512d1..daf9bd5e4 100644 --- a/spec/concurrent/timer_task_spec.rb +++ b/spec/concurrent/timer_task_spec.rb @@ -5,8 +5,6 @@ module Concurrent describe TimerTask do - with_full_reset - before(:each) do # suppress deprecation warnings. Concurrent::TimerTask.any_instance.stub(:warn) diff --git a/spec/support/example_group_extensions.rb b/spec/support/example_group_extensions.rb index 2c02fa23f..bcfb195ff 100644 --- a/spec/support/example_group_extensions.rb +++ b/spec/support/example_group_extensions.rb @@ -22,15 +22,24 @@ def rbx? RbConfig::CONFIG['ruby_install_name']=~ /^rbx$/i end + def do_no_reset! + @do_not_reset = true + end + + @@killed = false + def reset_gem_configuration - Concurrent.instance_variable_set(:@configuration, Concurrent::Configuration.new) + Concurrent.instance_variable_get(:@configuration).value = Concurrent::Configuration.new if @@killed + @@killed = false end def kill_rogue_threads(warning = true) + return if @do_not_reset warn('[DEPRECATED] brute force thread control being used -- tests need updated') if warning Thread.list.each do |thread| thread.kill unless thread == Thread.current end + @@killed = true end extend self @@ -38,18 +47,6 @@ def kill_rogue_threads(warning = true) end class RSpec::Core::ExampleGroup - def self.with_full_reset - before(:each) do - reset_gem_configuration - end - - after(:each) do - Thread.list.each do |thread| - thread.kill unless thread == Thread.current - end - end - end - include Concurrent::TestHelpers extend Concurrent::TestHelpers end diff --git a/tasks/.gitignore b/tasks/.gitignore deleted file mode 100644 index e69de29bb..000000000