Permalink
Browse files

Adding AMQP worker and consumer

  • Loading branch information...
1 parent 7d08c5c commit 38507a4539253a3658eb38bae7498b0a38c55cd0 @guiocavalcanti guiocavalcanti committed Sep 15, 2012
Showing with 113 additions and 0 deletions.
  1. +1 −0 Gemfile
  2. +2 −0 Gemfile.lock
  3. +2 −0 boot.rb
  4. +1 −0 lib/permit.rb
  5. +15 −0 lib/permit/consumer.rb
  6. +20 −0 lib/permit/worker.rb
  7. +51 −0 spec/consumer_spec.rb
  8. +21 −0 worker.rb
View
@@ -11,3 +11,4 @@ gem 'rspec'
gem 'em-synchrony'
gem 'debugger'
gem 'grape'
+gem 'yajl-ruby'
View
@@ -87,6 +87,7 @@ GEM
ruby_core_source (0.1.5)
archive-tar-minitar (>= 0.5.2)
thor (0.16.0)
+ yajl-ruby (1.1.0)
PLATFORMS
ruby
@@ -102,3 +103,4 @@ DEPENDENCIES
grape
rspec
ruby-debug19
+ yajl-ruby
View
@@ -0,0 +1,2 @@
+$:.unshift File.expand_path 'lib'
+require 'permit'
View
@@ -10,4 +10,5 @@ module Permit
require 'permit/consumer'
require 'permit/rule'
require 'permit/policy'
+require 'permit/worker'
require 'permit/api'
View
@@ -0,0 +1,15 @@
+module Permit
+ class Consumer
+ def call(metadata, payload)
+ puts metadata, payload
+ payload = JSON.parse(payload, :symbolize_keys => true)
+ rule = Rule.new(:resource_id => payload.delete(:resource_id),
+ :subject_id => payload.delete(:subject_id),
+ :logger => Logger.new(STDOUT))
+ payload.fetch(:action, {}).keys.each do |k|
+ puts "rule.insert"
+ rule.insert(:action => k)
+ end
+ end
+ end
+end
View
@@ -0,0 +1,20 @@
+module Permit
+ class Worker
+ def initialize(opts)
+ @channel = opts[:channel]
+ @queue_name = opts[:queue_name] || ""
+ @consumer = opts[:consumer] || Consumer.new
+ @exchange = opts[:exchange]
+
+ Permit::Connection.establish_connections(1, "development")
+ end
+
+ def start
+ @channel.queue(@queue_name, :exclusive => true) do |queue|
+ queue.bind(@exchange, :routing_key => "permit.#").subscribe do |h,p|
+ @consumer.call(h,p)
+ end
+ end
+ end
+ end
+end
View
@@ -0,0 +1,51 @@
+require 'spec_helper'
+
+module Permit
+ describe Consumer do
+ it "should respondo to #handle_message" do
+ EventMachine.synchrony do
+ Permit::Consumer.new.should respond_to :handle_message
+ EM.stop
+ end
+ end
+
+ it "should insert rules" do
+ EventMachine.synchrony do
+ Permit::Connection.establish_connections(1, "test")
+ rules = Permit::Connection.pool.collection('rules')
+ rules.remove({})
+ Permit::Consumer.new.handle_message({}, { :resource_id => 'r', :subject_id => 's', :actions => {:read => true} })
+ rules.count.should == 1
+ rules.remove({})
+ EM.stop
+ end
+ end
+
+ it "should insert multiple actions" do
+ EventMachine.synchrony do
+ Permit::Connection.establish_connections(1, "test")
+ rules = Permit::Connection.pool.collection('rules')
+ rules.remove({})
+ Permit::Consumer.new.handle_message({}, { :resource_id => 'r', :subject_id => 's', :actions => {:read => true, :foo => true} })
+ rules.count.should == 2
+ rules.remove({})
+ EM.stop
+ end
+ end
+
+ it "should insert the rules correctly" do
+ EventMachine.synchrony do
+ Permit::Connection.establish_connections(1, "test")
+ rules = Permit::Connection.pool.collection('rules')
+ rules.remove({})
+ r = { :resource_id => 'r', :subject_id => 's', :actions => {:read => true} }
+ Permit::Consumer.new.handle_message({}, r)
+ docs = rules.find({ :resource_id => 'r' })
+ docs.first[:_id].should == r[:_id]
+ rules.remove({})
+ EM.stop
+ end
+
+ end
+ end
+end
View
@@ -0,0 +1,21 @@
+require_relative 'boot'
+require 'amqp'
+require 'yajl/json_gem'
+
+AMQP.start do |connection|
+
+ EventMachine.synchrony do
+ channel = AMQP::Channel.new(connection)
+ exchange = channel.topic("permit", :auto_delete => true)
+ consumer = Permit::Consumer.new
+ worker = Permit::Worker.new(:consumer => consumer, :channel => channel,
+ :exchange => exchange)
+ worker.start
+
+ EM::add_periodic_timer do
+ exchange.publish({:resource_id => 'r', :subject_id => 's',
+ :action => { :read => true } }.to_json,
+ :routing_key => "permit.core")
+ end
+ end
+end

0 comments on commit 38507a4

Please sign in to comment.