Permalink
Browse files

Now with channels!

  • Loading branch information...
1 parent 559d09e commit b4143e09e6423fbf8b1153aefac1954dee612381 @halogenandtoast halogenandtoast committed May 5, 2011
Showing with 185 additions and 57 deletions.
  1. +2 −2 Gemfile
  2. +15 −18 Gemfile.lock
  3. +1 −1 lib/daikon.rb
  4. +16 −0 lib/daikon/bus.rb
  5. +3 −3 lib/daikon/client.rb
  6. +3 −4 lib/daikon/monitor.rb
  7. +25 −5 lib/daikon/reactor.rb
  8. +0 −13 lib/daikon/stopper.rb
  9. +33 −0 spec/bus_spec.rb
  10. +4 −4 spec/client_spec.rb
  11. +20 −0 spec/monitor_spec.rb
  12. +63 −7 spec/reactor_spec.rb
View
@@ -1,14 +1,14 @@
source "http://rubygems.org"
gem "em-http-request", "0.3.0"
-gem "em-spec", :path => "../em-spec"
-gem "em-hiredis", :path => "../em-hiredis"
+gem "em-hiredis", "0.1.0"
gem "daemons", "1.1.0"
gem "json_pure", "1.4.6"
gem "redis", "2.1.1"
group :development do
gem "bourne"
+ gem "em-spec", :git => "https://github.com/bcg/em-spec.git"
gem "jeweler"
gem "rspec"
gem "timecop"
View
@@ -1,23 +1,20 @@
-PATH
- remote: ../em-hiredis
- specs:
- em-hiredis (0.0.1)
- hiredis (~> 0.3.0)
-
-PATH
- remote: ../em-spec
+GIT
+ remote: https://github.com/bcg/em-spec.git
+ revision: e44d67847794d317078cf357271863447d26c8c5
specs:
em-spec (0.2.2)
GEM
remote: http://rubygems.org/
specs:
- addressable (2.2.4)
+ addressable (2.2.5)
bourne (1.0)
mocha (= 0.9.8)
crack (0.1.8)
daemons (1.1.0)
diff-lcs (1.1.2)
+ em-hiredis (0.1.0)
+ hiredis (~> 0.3.0)
em-http-request (0.3.0)
addressable (>= 2.0.0)
escape_utils
@@ -35,16 +32,16 @@ GEM
rake
rake (0.8.7)
redis (2.1.1)
- rspec (2.1.0)
- rspec-core (~> 2.1.0)
- rspec-expectations (~> 2.1.0)
- rspec-mocks (~> 2.1.0)
- rspec-core (2.1.0)
- rspec-expectations (2.1.0)
+ rspec (2.5.0)
+ rspec-core (~> 2.5.0)
+ rspec-expectations (~> 2.5.0)
+ rspec-mocks (~> 2.5.0)
+ rspec-core (2.5.1)
+ rspec-expectations (2.5.0)
diff-lcs (~> 1.1.2)
- rspec-mocks (2.1.0)
+ rspec-mocks (2.5.0)
timecop (0.3.5)
- webmock (1.6.1)
+ webmock (1.6.2)
addressable (>= 2.2.2)
crack (>= 0.1.7)
@@ -54,7 +51,7 @@ PLATFORMS
DEPENDENCIES
bourne
daemons (= 1.1.0)
- em-hiredis!
+ em-hiredis (= 0.1.0)
em-http-request (= 0.3.0)
em-spec!
jeweler
View
@@ -20,7 +20,7 @@
$LOAD_PATH.include?(File.expand_path(__DIR__))
require 'daikon/configuration'
-require 'daikon/stopper'
+require 'daikon/bus'
require 'daikon/client'
require 'daikon/reactor'
require 'daikon/daemon'
View
@@ -0,0 +1,16 @@
+module Daikon
+ module Bus
+ def channel
+ @channel ||= EM::Channel.new
+ end
+ private :channel
+
+ def emit(event)
+ channel.push(event)
+ end
+
+ def on(event, &block)
+ channel.subscribe{ |msg| EM::Callback(msg, &block).call if event == msg }
+ end
+ end
+end
View
@@ -1,6 +1,6 @@
module Daikon
class Client
- include Stopper
+ include Bus
EXCEPTIONS = [Timeout::Error,
Errno::EINVAL,
@@ -43,11 +43,11 @@ def request(method, path, options = {})
http = EventMachine::HttpRequest.new(url).send(method, options)
http.callback do
log "SUCCESS: #{http.response}"
- stopper
+ emit(:request_success)
end
http.errback do
log "ERROR: #{http.response}"
- stopper
+ emit(:request_error)
end
end
View
@@ -53,10 +53,9 @@ def current_summary
end
def self.start(redis)
- Thread.new do
- redis.monitor do |line|
- parse(line)
- end
+ redis.monitor
+ redis.on(:monitor) do |line|
+ parse(line)
end
end
View
@@ -1,26 +1,38 @@
module Daikon
class Reactor
- include Stopper
-
+ include Bus
attr_reader :current_time
- attr_writer :info_interval
+ attr_writer :info_interval, :summary_interval
def initialize(client = nil)
@client = client
end
def start
+ emit(:start)
EventMachine.add_periodic_timer(info_interval) do
+ emit(:start_info)
@current_time = Time.now
collect_info
- stopper
+ emit(:end_info)
end
+
+ EventMachine.add_periodic_timer(summary_interval) do
+ emit(:start_summary)
+ @current_time = Time.now
+ @client.rotate_monitor(@current_time, @current_time)
+ emit(:end_summary)
+ end
+
+ Daikon::Monitor.start(summary_collector)
+ emit(:started)
end
def collect_info
info_collector.info do |info|
+ emit(:collect_info)
@client.report_info(info)
- stopper
+ emit(:collected_info)
end
end
@@ -35,5 +47,13 @@ def info_collector
def info_interval
@info_interval ||= 10
end
+
+ def summary_collector
+ @summary_collector ||= connect
+ end
+
+ def summary_interval
+ @summary_interval ||= 60
+ end
end
end
View
@@ -1,13 +0,0 @@
-module Daikon
- module Stopper
- attr_writer :stopper
-
- private
-
- def stopper
- if @stopper && @stopper.call(self)
- EventMachine.stop
- end
- end
- end
-end
View
@@ -0,0 +1,33 @@
+require 'spec_helper'
+
+describe Daikon::Bus, 'emit' do
+ subject { Object.new.extend(Daikon::Bus) }
+ let(:channel) { stub('channel') }
+ before do
+ channel.stubs(:push)
+ EM::Channel.stubs(:new => channel)
+ end
+
+ it "publishes to the channel" do
+ subject.emit(:doom)
+ channel.should have_received(:push).with(:doom)
+ end
+
+end
+
+describe Daikon::Bus, 'on' do
+ subject { Object.new.extend(Daikon::Bus) }
+ let(:in_block) { stub('in block') }
+ before { in_block.stubs(:foo) }
+
+ it "calls the block for the event" do
+ em do
+ subject.on(:doom) do
+ in_block.foo
+ EM.stop
+ end
+ subject.emit(:doom)
+ end
+ in_block.should have_received(:foo)
+ end
+end
View
@@ -30,12 +30,12 @@
subject { Daikon::Client.new }
before do
- subject.stopper = lambda { |client| EventMachine.stop }
stub_request(:any, infos_url).to_timeout
end
it "does not kill the client" do
em do
+ subject.on(:request_error) { EM.stop }
lambda {
subject.report_info({})
}.should_not raise_error
@@ -47,12 +47,12 @@
subject { Daikon::Client.new }
before do
- subject.stopper = lambda { |client| EventMachine.stop }
stub_request(:post, infos_url).to_return(:body => "{'bad':'json}")
end
it "does not commit suicide" do
em do
+ subject.on(:request_success) { EM.stop }
lambda {
subject.report_info({})
}.should_not raise_error
@@ -94,7 +94,7 @@
before do
Timecop.freeze DateTime.parse(now)
Daikon::Monitor.stubs(:pop).yields(data)
- subject.stopper = lambda { |client| EventMachine.stop }
+ subject.on(:request_success) { EM.stop }
stub_request(:post, summaries_url(server)).to_return(:status => 200)
end
@@ -141,7 +141,7 @@
let(:info) { {"connected_clients"=>"1", "used_cpu_sys_childrens"=>"0.00"} }
before do
- subject.stopper = lambda { |client| EventMachine.stop }
+ subject.on(:request_success) { EM.stop }
stub_request(:post, infos_url(server)).to_return(:status => 200)
end
View
@@ -226,3 +226,23 @@
end
end
end
+
+describe Daikon::Monitor, ".start" do
+ let(:redis) { stub('redis', :monitor => true) }
+ before do
+ redis.stubs(:on).with(:monitor).yields("INCR foo")
+ end
+ it "should subscribe to monitor" do
+ Daikon::Monitor.start(redis)
+ redis.should have_received(:monitor)
+ redis.should have_received(:on).with(:monitor)
+ end
+
+ it "should parse subscription data" do
+ Daikon::Monitor.start(redis)
+ Daikon::Monitor.pop do |summary|
+ summary["commands"].should == {"INCR" => 1}
+ summary["keys"].should == {"foo" => 1 }
+ end
+ end
+end
Oops, something went wrong.

0 comments on commit b4143e0

Please sign in to comment.