Permalink
Browse files

added log entries to task

  • Loading branch information...
1 parent 60f94d4 commit 6b21dea86d12ce006653b685f781689166c8054f @ohler55 committed Feb 18, 2014
Showing with 72 additions and 19 deletions.
  1. +4 −10 README.md
  2. +5 −0 lib/oflow/actors/log.rb
  3. +5 −1 lib/oflow/box.rb
  4. +5 −1 lib/oflow/haslinks.rb
  5. +7 −0 lib/oflow/haslog.rb
  6. +5 −0 lib/oflow/task.rb
  7. +0 −1 notes
  8. +40 −5 test/flow_log_test.rb
  9. +1 −1 test/flow_nest_test.rb
View
@@ -119,26 +119,20 @@ Hello World!
## Future Features
- - Balancer Actor that distributes to a set of others Tasks based on how busy each is.
+ - HTTP Server Actor
- - Merger Actor that waits for a criteria to be met before continuing.
+ - OmniGraffle file input for configuration.
- - HTTP Server Actor
+ - .svg file input for configuration.
- - Persister Actor that writes to disk and reads on start)
+ - Visio file input for configuration.
- CallOut Actor that uses pipes and fork to use a non-Ruby actor.
- Cross linking Tasks and Flows.
- Dynamic links to Tasks and Flows.
- - OmniGraffle file input for configuration.
-
- - .svg file input for configuration.
-
- - Visio file input for configuration.
-
- High performance C version. Proof of concept puts the performance range at
around 10M operations per second where an operation is one task execution per
thread.
View
@@ -14,6 +14,11 @@ class Log < Actor
:warn => Logger::Severity::WARN,
:info => Logger::Severity::INFO,
:debug => Logger::Severity::DEBUG,
+ :FATAL => Logger::Severity::FATAL,
+ :ERROR => Logger::Severity::ERROR,
+ :WARN => Logger::Severity::WARN,
+ :INFO => Logger::Severity::INFO,
+ :DEBUG => Logger::Severity::DEBUG,
}
def initialize(task, options={})
@logger = nil
View
@@ -74,7 +74,11 @@ def aget(path)
# Returns a string representation of the Box and contents.
def to_s()
- "Box{#{@contents}, tracker: #{@tracker}}"
+ if @tracker.nil?
+ "Box{#{@contents}}"
+ else
+ "Box{#{@contents}, tracker: #{@tracker}}"
+ end
end
alias inspect to_s
View
@@ -59,6 +59,10 @@ def find_link(label)
def links()
@links
end
-
+
+ def has_links?()
+ !@links.nil? && !@links.empty?
+ end
+
end # HasLinks
end # OFlow
View
@@ -30,7 +30,14 @@ def log=(t)
# @param msg [String] message to log
# @param fn [String] full name of Task or Flow calling the log function
def log_msg(level, msg, fn)
+ # Abort early if the log severity/level would not log the message. This
+ # also allows non-Loggers to be used in place of the Log Actor.
+ return if Env.log_level > Actors::Log::SEVERITY_MAP[level]
+
lt = log()
+ # To prevent infinite looping, don't allow the logger to log to itself.
+ return if self == lt
+
unless lt.nil?
lt.receive(level, Box.new([msg, fn]))
else
View
@@ -52,6 +52,7 @@ def initialize(flow, name, actor_class, options={})
init_links()
set_options(options)
+ info("Creating actor #{actor_class} with options #{options}.")
@actor = actor_class.new(self, options)
raise Exception.new("#{actor} does not respond to the perform() method.") unless @actor.respond_to?(:perform)
@@ -76,6 +77,7 @@ def initialize(flow, name, actor_class, options={})
@current_req = req
begin
+ info("perform(#{req.op}, #{req.box})")
@actor.perform(req.op, req.box) unless req.nil?
rescue Exception => e
handle_error(e)
@@ -279,6 +281,8 @@ def outputs()
# @param op [Symbol] operation to perform
# @param box [Box] contents or data for the request
def receive(op, box)
+ info("receive(#{op}, #{box}) #{state_string}")
+
return if CLOSING == @state
raise BlockedError.new() if BLOCKED == @state
@@ -313,6 +317,7 @@ def ship(dest, box)
box.freeze() unless box.nil?
link = resolve_link(dest)
raise LinkError.new(dest) if link.nil? || link.target.nil?
+ info("shipping #{box} to #{link.target_name}:#{link.op}")
link.target.receive(link.op, box)
link
end
View
1 notes
@@ -4,7 +4,6 @@
^c^s show subtree
- todo
- - add log entries to actors
- features
- HttpServer Actor
View
@@ -17,7 +17,8 @@ def initialize(task, options)
end
def perform(op, box)
- task.info("op: #{op}, box: #{box.contents}")
+ task.warn("op: #{op}, box: #{box.contents}")
+ task.ship(nil, ::OFlow::Box.new([box.contents])) if task.has_links?
end
end # Noise
@@ -28,6 +29,7 @@ class FlowLogTest < ::Test::Unit::TestCase
def test_flow_log_relay
trigger = nil
collector = nil
+ ::OFlow::Env.log_level = Logger::WARN
::OFlow::Env.flow('log_relay') { |f|
trigger = f.task('noise', Noise)
f.task(:log, Collector) { |t|
@@ -48,7 +50,8 @@ def test_flow_log_relay
def test_flow_log_var
trigger = nil
collector = nil
- ::OFlow::Env.flow('log_relay') { |f|
+ ::OFlow::Env.log_level = Logger::WARN
+ ::OFlow::Env.flow('log_var') { |f|
trigger = f.task('noise', Noise)
f.log = f.task(:collector, Collector) { |t|
collector = t.actor
@@ -59,7 +62,7 @@ def test_flow_log_var
assert_equal(collector.collection.size, 1)
assert_equal(collector.collection[0][0], 'op: speak, box: 7')
- assert_equal(collector.collection[0][1], ':log_relay:noise')
+ assert_equal(collector.collection[0][1], ':log_var:noise')
::OFlow::Env.clear()
end
@@ -68,7 +71,8 @@ def test_flow_log_var
def test_flow_log_env
trigger = nil
collector = nil
- ::OFlow::Env.flow('log_relay') { |f|
+ ::OFlow::Env.log_level = Logger::WARN
+ ::OFlow::Env.flow('log_env') { |f|
trigger = f.task('noise', Noise)
::OFlow::Env.log = f.task(:collector, Collector) { |t|
collector = t.actor
@@ -79,9 +83,40 @@ def test_flow_log_env
assert_equal(collector.collection.size, 1)
assert_equal(collector.collection[0][0], 'op: speak, box: 7')
- assert_equal(collector.collection[0][1], ':log_relay:noise')
+ assert_equal(collector.collection[0][1], ':log_env:noise')
+
+ ::OFlow::Env.clear()
+ end
+
+ def test_flow_log_info
+ trigger = nil
+ collector = nil
+ ::OFlow::Env.log_level = Logger::WARN
+ ::OFlow::Env.flow('log_info') { |f|
+ f.log = f.task(:collector, Collector) { |t|
+ collector = t.actor
+ }
+ # Set after log to avoid race condition with the creation of the collector
+ # and the assignment to f.log. The race is whether a log message is
+ # displayed on the output.
+ ::OFlow::Env.log_level = Logger::INFO
+ trigger = f.task('noise', Noise) { |t|
+ t.link(nil, :collector, nil)
+ }
+ }
+ trigger.receive(:speak, ::OFlow::Box.new(7))
+ ::OFlow::Env.flush()
+
+ entries = collector.collection.map { |entry| entry[0] }
+ assert_equal(["Creating actor Noise with options {:state=>1}.",
+ "receive(speak, Box{7}) RUNNING",
+ "perform(speak, Box{7})",
+ "op: speak, box: 7",
+ "shipping Box{[7]} to collector:",
+ 7], entries)
::OFlow::Env.clear()
+ ::OFlow::Env.log_level = Logger::WARN
end
end # FlowLogTest
View
@@ -17,7 +17,7 @@ def initialize(task, options)
end
def perform(op, box)
- task.info("#{op} #{box.contents}")
+ task.warn("#{op} #{box.contents}")
task.ship(op, box)
end

0 comments on commit 6b21dea

Please sign in to comment.