Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions lib/concurrent/atom.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
require 'concurrent/atomic/atomic_reference'
require 'concurrent/collection/copy_on_notify_observer_set'
require 'concurrent/concern/observable'
require 'concurrent/synchronization'

# @!macro [new] thread_safe_variable_comparison
#
Expand Down Expand Up @@ -91,13 +90,9 @@ module Concurrent
#
# @see http://clojure.org/atoms Clojure Atoms
# @see http://clojure.org/state Values and Change - Clojure's approach to Identity and State
class Atom < Synchronization::Object
class Atom
include Concern::Observable

safe_initialization!
private(*attr_volatile_with_cas(:value))
public :value

# Create a new atom with the given initial value.
#
# @param [Object] value The initial value
Expand All @@ -111,15 +106,18 @@ class Atom < Synchronization::Object
#
# @raise [ArgumentError] if the validator is not a `Proc` (when given)
def initialize(value, opts = {})
@Validator = opts.fetch(:validator, -> v { true })
@validator = opts.fetch(:validator, -> v { true })
self.observers = Collection::CopyOnNotifyObserverSet.new
super(value)
@state = AtomicReference.new(value)
end

# @!method value
# The current value of the atom.
#
# @return [Object] The current value.
def value
@state.get
end

alias_method :deref, :value

Expand Down Expand Up @@ -160,7 +158,7 @@ def swap(*args)
begin
new_value = yield(old_value, *args)
break old_value unless valid?(new_value)
break new_value if compare_and_set(old_value, new_value)
break new_value if @state.compare_and_set(old_value, new_value)
rescue
break old_value
end
Expand All @@ -177,7 +175,7 @@ def swap(*args)
#
# @return [Boolean] True if the value is changed else false.
def compare_and_set(old_value, new_value)
if valid?(new_value) && compare_and_set_value(old_value, new_value)
if valid?(new_value) && @state.compare_and_set(old_value, new_value)
observers.notify_observers(Time.now, old_value, new_value)
true
else
Expand All @@ -196,7 +194,7 @@ def compare_and_set(old_value, new_value)
def reset(new_value)
old_value = value
if valid?(new_value)
self.value = new_value
@state.set(new_value)
observers.notify_observers(Time.now, old_value, new_value)
new_value
else
Expand All @@ -212,7 +210,7 @@ def reset(new_value)
# @return [Boolean] false if the validator function returns false or raises
# an exception else true
def valid?(new_value)
@Validator.call(new_value)
@validator.call(new_value)
rescue
false
end
Expand Down