Permalink
Browse files

ii

  • Loading branch information...
0 parents commit d11864c5be6f17919b56efc14a4c40a9c33c864b @jondot committed Jul 7, 2012
@@ -0,0 +1,17 @@
+*.gem
+*.rbc
+.bundle
+.config
+.yardoc
+Gemfile.lock
+InstalledFiles
+_yardoc
+coverage
+doc/
+lib/bundler/man
+pkg
+rdoc
+spec/reports
+test/tmp
+test/version_tmp
+tmp
@@ -0,0 +1,4 @@
+source 'https://rubygems.org'
+
+# Specify your gem's dependencies in frenzy_bunnies.gemspec
+gemspec
@@ -0,0 +1,22 @@
+Copyright (c) 2012 Dotan Nahum
+
+MIT License
+
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+"Software"), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
@@ -0,0 +1,29 @@
+# FrenzyBunnies
+
+TODO: Write a gem description
+
+## Installation
+
+Add this line to your application's Gemfile:
+
+ gem 'frenzy_bunnies'
+
+And then execute:
+
+ $ bundle
+
+Or install it yourself as:
+
+ $ gem install frenzy_bunnies
+
+## Usage
+
+TODO: Write usage instructions here
+
+## Contributing
+
+1. Fork it
+2. Create your feature branch (`git checkout -b my-new-feature`)
+3. Commit your changes (`git commit -am 'Added some feature'`)
+4. Push to the branch (`git push origin my-new-feature`)
+5. Create new Pull Request
@@ -0,0 +1,2 @@
+#!/usr/bin/env rake
+require "bundler/gem_tasks"
@@ -0,0 +1,6 @@
+#!/usr/bin/env ruby
+require 'frenzy_bunnies'
+require 'frenzy_bunnies/cli'
+
+FrenzyBunnies::CLI.start
+
@@ -0,0 +1,20 @@
+require 'rubygems'
+require 'hot_bunnies'
+
+
+
+
+connection = HotBunnies.connect(:host => 'localhost')
+channel = connection.create_channel
+channel.prefetch = 10
+
+exchange = channel.exchange('frenzy_bunnies', :type => :direct, :durable => true)
+
+
+
+100_000.times do |i|
+ exchange.publish("hello world! #{i}", :routing_key => 'new.feeds')
+end
+puts "done"
+
+
@@ -0,0 +1,33 @@
+$:<< File.expand_path('../lib', File.dirname(__FILE__))
+
+require 'rubygems'
+require 'frenzy_bunnies'
+
+class FeedWorker
+ include FrenzyBunnies::Worker
+ from_queue 'new.feeds', :prefetch => 20, :threads => 13, :durable => true
+
+ def work(msg)
+ puts msg
+ ack!
+ end
+end
+
+class FeedDownloader
+ include FrenzyBunnies::Worker
+ from_queue 'new.downloads', :durable => true
+ def work(msg)
+ puts msg
+ ack!
+ end
+end
+
+f = FrenzyBunnies::Context.new
+
+f.run FeedWorker,FeedDownloader
+
+
+trap "INT" do
+ f.stop
+ exit!
+end
@@ -0,0 +1,21 @@
+class FeedWorker
+ include FrenzyBunnies::Worker
+ from_queue 'new.feeds', :prefetch => 20, :threads => 13, :durable => true
+
+ def work(msg)
+ puts msg
+ ack!
+ end
+end
+
+class FeedDownloader
+ include FrenzyBunnies::Worker
+ from_queue 'new.downloads', :durable => true
+ def work(msg)
+ puts msg
+ ack!
+ end
+end
+
+
+
@@ -0,0 +1,20 @@
+# -*- encoding: utf-8 -*-
+require File.expand_path('../lib/frenzy_bunnies/version', __FILE__)
+
+Gem::Specification.new do |gem|
+ gem.authors = ["Dotan Nahum"]
+ gem.email = ["jondotan@gmail.com"]
+ gem.description = %q{TODO: Write a gem description}
+ gem.summary = %q{TODO: Write a gem summary}
+ gem.homepage = ""
+
+ gem.files = `git ls-files`.split($\)
+ gem.executables = gem.files.grep(%r{^bin/}).map{ |f| File.basename(f) }
+ gem.test_files = gem.files.grep(%r{^(test|spec|features)/})
+ gem.name = "frenzy_bunnies"
+ gem.require_paths = ["lib"]
+ gem.version = FrenzyBunnies::VERSION
+
+ gem.add_runtime_dependency 'hot_bunnies', '>= 1.4.0.pre3'
+ gem.add_runtime_dependency 'thor'
+end
@@ -0,0 +1,12 @@
+require 'hot_bunnies'
+
+
+module FrenzyBunnies
+end
+
+
+require "frenzy_bunnies/version"
+require 'frenzy_bunnies/queue_factory'
+require 'frenzy_bunnies/context'
+require 'frenzy_bunnies/worker'
+
@@ -0,0 +1,27 @@
+require 'thor'
+
+
+class FrenzyBunnies::CLI < Thor
+ BUNNIES =<<-EOF
+
+ (\\___/)
+ (='.'=) Frenzy Bunnies!
+ (")_(") JRuby based workers on top of hot_bunnies
+
+ EOF
+
+ desc 'run', "run workers from a file"
+ def start_workers(workerfile)
+
+ require workerfile
+ # enumerate all workers
+ workers = []
+ ObjectSpace.each_object(Class){|o| workers << o if o.ancestors.map(&:name).include? "FrenzyBunnies::Worker"}
+ workers.uniq!
+ puts BUNNIES
+ puts "Discovered #{workers.inspect}"
+ c = FrenzyBunnies::Context.new
+ c.run *workers
+ Signal.trap('INT') { c.stop; exit! }
+ end
+end
@@ -0,0 +1,26 @@
+require 'logger'
+
+class FrenzyBunnies::Context
+ attr_reader :queue_factory, :logger
+
+ def initialize(opts={})
+ @opts = opts
+ @opts[:host] ||= 'localhost'
+ @opts[:exchange] ||= 'frenzy_bunnies'
+ @logger = @opts[:logger] || Logger.new(STDOUT)
+ @connection = HotBunnies.connect(:host => @opts[:host])
+ @connection.add_shutdown_listener(lambda { |cause| puts cause; stop; sleep(10); start;})
+
+ @queue_factory = FrenzyBunnies::QueueFactory.new(@connection, @opts[:exchange])
+ @klasses = []
+ end
+
+ def run(*klasses)
+ klasses.each{|klass| klass.start(self); @klasses << klass}
+ end
+
+ def stop
+ @klasses.each{|klass| klass.stop }
+ end
+end
+
@@ -0,0 +1,17 @@
+class FrenzyBunnies::QueueFactory
+ def initialize(connection, exchange)
+ @connection = connection
+ @exchange = exchange
+ end
+
+ def build_queue(name, prefetch, durable)
+ channel = @connection.create_channel
+ channel.prefetch = prefetch
+
+ exchange = channel.exchange(@exchange, :type => :direct, :durable => durable)
+
+ queue = channel.queue(name)
+ queue.bind(exchange, :routing_key => name)
+ queue
+ end
+end
@@ -0,0 +1,3 @@
+module FrenzyBunnies
+ VERSION = "0.0.1"
+end
@@ -0,0 +1,73 @@
+module FrenzyBunnies::Worker
+ import java.util.concurrent.Executors
+
+ def ack!
+ true
+ end
+ def work
+
+ end
+
+ def self.included(base)
+ base.extend ClassMethods
+ end
+
+ module ClassMethods
+ def from_queue(q, opts={})
+ @queue_name = q
+ @queue_opts = opts
+ end
+
+ def start(context)
+ @logger = context.logger
+
+ @queue_opts[:prefetch] ||= 10
+ @queue_opts[:durable] ||= false
+
+ if @queue_opts[:threads]
+ @thread_pool = Executors.new_fixed_thread_pool(@queue_opts[:threads])
+ else
+ @thread_pool = Executors.new_cached_thread_pool
+ end
+
+ q = context.queue_factory.build_queue(@queue_name, @queue_opts[:prefetch], @queue_opts[:durable])
+ @s = q.subscribe(:ack => true)
+
+ say "#{@queue_opts[:threads] ? "#{@queue_opts[:threads]} threads " : ''}with #{@queue_opts[:prefetch]} prefetch on <#{@queue_name}>."
+
+ @s.each(:blocking => false, :executor => @thread_pool) do |h, msg|
+ wkr = new
+ begin
+ if(wkr.work(msg))
+ h.ack
+ else
+ h.reject
+ error "Cannot process message <#{msg.inspect}>"
+ end
+ rescue
+ h.reject
+ error "ERROR #{$!}"
+ end
+ end
+
+ say "workers up."
+ end
+
+ def stop
+ say "stopping"
+ @thread_pool.shutdown_now
+ say "pool shutdown"
+ @s.cancel
+ say "stopped"
+ end
+
+ def say(text)
+ @logger.info "[#{self.name}] #{text}"
+ end
+
+ def error(text)
+ @logger.error "[#{self.name}] #{text}"
+ end
+ end
+end
+

0 comments on commit d11864c

Please sign in to comment.