Permalink
Browse files

Try again, this time within Pacer's structure.

  • Loading branch information...
1 parent c989ea0 commit f690835d7061157d87b339969a60aae49f9457df Darrick Wiebe committed Mar 31, 2011
Showing with 55 additions and 27 deletions.
  1. +55 −27 lib/pacer-agent.rb
View
82 lib/pacer-agent.rb
@@ -1,3 +1,4 @@
+require 'pacer'
require 'agent'
module PacerAgent
@@ -32,39 +33,62 @@ def channel(opts = {})
end
end
end
+
+ module Pipes
+ class ChannelPipe < AbstractPipe
+ def initialize(channel, timeout = nil)
+ super()
+ @channel = channel.clone
+ @selector = Agent::Selector.new
+ @selector.case(channel, :receive) do
+ value = channel.receive
+ if channel == value
+ channel.close rescue nil
+ raise StopIteration
+ else
+ value
+ end
+ end
+ if timeout
+ @selector.timeout(timeout) { raise StopIteration }
+ end
+ end
+
+ protected
+
+ def processNextStart
+ @selector.select
+ rescue StopIteration
+ raise Pacer::NoSuchElementException
+ end
+ end
+ end
+
+ module Source
+ module Channel
+
+ protected
+
+ def iterator_from_source(src)
+ return super unless src.is_a? Agent::Channel
+ Pacer::Pipes::ChannelPipe.new(src)
+ end
+ end
+ end
end
module Agent
class Channel
include Enumerable
- def to_route
- r = Pacer::Route.new :source => self, :element_type => Object, :route_name => "|#{ name }|"
- r.route
- end
-
- def each
- return to_enum unless block_given?
- s = Agent::Selector.new
- s.case(self, :receive) do
- value = receive
- if self == value
- close rescue nil
- return
- else
- yield value
- end
- end
- s.timeout(0.5) { }
- while not closed?
- begin
- s.select
- rescue Exception, StandardError => e
- puts e.message
- pp e.backtrace
- raise
- end
- end
+ def to_route(opts = {})
+ opts = {
+ :source => self,
+ :element_type => Object,
+ :route_name => "|#{ name }|",
+ :transform => Pacer::Source::Channel
+ }.merge(opts)
+ Pacer::Route.new(opts).route
end
def ==(other)
@@ -73,5 +97,9 @@ def ==(other)
other.instance_variable_get('@type') == @type and
other.instance_variable_get('@direction') == @direction
end
+
+ def clone
+ Marshal.load(Marshal.dump(self))
+ end
end
end

0 comments on commit f690835

Please sign in to comment.