Skip to content
Browse files

Restructure

  • Loading branch information...
1 parent f690835 commit a6f753d8cc866f41d4be046b9fa6efc1d860ca45 Darrick Wiebe committed Mar 31, 2011
Showing with 76 additions and 68 deletions.
  1. +28 −0 lib/agent/to_route.rb
  2. +4 −68 lib/pacer-agent.rb
  3. +31 −0 lib/pacer/pipes/channel_pipe.rb
  4. +13 −0 lib/pacer/source/channel.rb
View
28 lib/agent/to_route.rb
@@ -0,0 +1,28 @@
+module Agent
+ module ToRoute
+ def to_route(opts = {})
+ opts = {
+ :source => self,
+ :element_type => Object,
+ :route_name => "|#{ name }|",
+ :transform => Pacer::Source::Channel
+ }.merge(opts)
+ Pacer::Route.new(opts).route
+ end
+
+ def ==(other)
+ other.is_a?(Channel) and
+ other.name == name and
+ other.instance_variable_get('@type') == @type and
+ other.instance_variable_get('@direction') == @direction
+ end
+
+ def clone
+ Marshal.load(Marshal.dump(self))
+ end
+ end
+
+ class Channel
+ include ToRoute
+ end
+end
View
72 lib/pacer-agent.rb
@@ -12,6 +12,10 @@ def self.reload!
end
end
+require 'agent/to_route'
+require 'pacer/pipes/channel_pipe'
+require 'pacer/source/channel'
+
module Pacer
module Core
module Route
@@ -33,73 +37,5 @@ def channel(opts = {})
end
end
end
-
- module Pipes
- class ChannelPipe < AbstractPipe
- def initialize(channel, timeout = nil)
- super()
- @channel = channel.clone
- @selector = Agent::Selector.new
- @selector.case(channel, :receive) do
- value = channel.receive
- if channel == value
- channel.close rescue nil
- raise StopIteration
- else
- value
- end
- end
- if timeout
- @selector.timeout(timeout) { raise StopIteration }
- end
- end
-
- protected
-
- def processNextStart
- @selector.select
- rescue StopIteration
- raise Pacer::NoSuchElementException
- end
- end
- end
-
- module Source
- module Channel
-
- protected
-
- def iterator_from_source(src)
- return super unless src.is_a? Agent::Channel
- Pacer::Pipes::ChannelPipe.new(src)
- end
- end
- end
end
-module Agent
- class Channel
- include Enumerable
-
- def to_route(opts = {})
- opts = {
- :source => self,
- :element_type => Object,
- :route_name => "|#{ name }|",
- :transform => Pacer::Source::Channel
- }.merge(opts)
- Pacer::Route.new(opts).route
- end
-
- def ==(other)
- other.is_a?(Channel) and
- other.name == name and
- other.instance_variable_get('@type') == @type and
- other.instance_variable_get('@direction') == @direction
- end
-
- def clone
- Marshal.load(Marshal.dump(self))
- end
- end
-end
View
31 lib/pacer/pipes/channel_pipe.rb
@@ -0,0 +1,31 @@
+module Pacer
+ module Pipes
+ class ChannelPipe < AbstractPipe
+ def initialize(channel, timeout = nil)
+ super()
+ @channel = channel.clone
+ @selector = Agent::Selector.new
+ @selector.case(channel, :receive) do
+ value = channel.receive
+ if channel == value
+ channel.close rescue nil
+ raise StopIteration
+ else
+ value
+ end
+ end
+ if timeout
+ @selector.timeout(timeout) { raise StopIteration }
+ end
+ end
+
+ protected
+
+ def processNextStart
+ @selector.select
+ rescue StopIteration
+ raise Pacer::NoSuchElementException
+ end
+ end
+ end
+end
View
13 lib/pacer/source/channel.rb
@@ -0,0 +1,13 @@
+module Pacer
+ module Source
+ module Channel
+
+ protected
+
+ def iterator_from_source(src)
+ return super unless src.is_a? Agent::Channel
+ Pacer::Pipes::ChannelPipe.new(src)
+ end
+ end
+ end
+end

0 comments on commit a6f753d

Please sign in to comment.
Something went wrong with that request. Please try again.