diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d87d4be --- /dev/null +++ b/.gitignore @@ -0,0 +1,17 @@ +*.gem +*.rbc +.bundle +.config +.yardoc +Gemfile.lock +InstalledFiles +_yardoc +coverage +doc/ +lib/bundler/man +pkg +rdoc +spec/reports +test/tmp +test/version_tmp +tmp diff --git a/Gemfile b/Gemfile new file mode 100644 index 0000000..80f84e7 --- /dev/null +++ b/Gemfile @@ -0,0 +1,4 @@ +source 'https://rubygems.org' + +# Specify your gem's dependencies in frenzy_bunnies.gemspec +gemspec diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..894017c --- /dev/null +++ b/LICENSE @@ -0,0 +1,22 @@ +Copyright (c) 2012 Dotan Nahum + +MIT License + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..5e655a9 --- /dev/null +++ b/README.md @@ -0,0 +1,29 @@ +# FrenzyBunnies + +TODO: Write a gem description + +## Installation + +Add this line to your application's Gemfile: + + gem 'frenzy_bunnies' + +And then execute: + + $ bundle + +Or install it yourself as: + + $ gem install frenzy_bunnies + +## Usage + +TODO: Write usage instructions here + +## Contributing + +1. Fork it +2. Create your feature branch (`git checkout -b my-new-feature`) +3. Commit your changes (`git commit -am 'Added some feature'`) +4. Push to the branch (`git push origin my-new-feature`) +5. Create new Pull Request diff --git a/Rakefile b/Rakefile new file mode 100644 index 0000000..f57ae68 --- /dev/null +++ b/Rakefile @@ -0,0 +1,2 @@ +#!/usr/bin/env rake +require "bundler/gem_tasks" diff --git a/bin/frenzy_bunnies b/bin/frenzy_bunnies new file mode 100755 index 0000000..dc4c5f6 --- /dev/null +++ b/bin/frenzy_bunnies @@ -0,0 +1,6 @@ +#!/usr/bin/env ruby +require 'frenzy_bunnies' +require 'frenzy_bunnies/cli' + +FrenzyBunnies::CLI.start + diff --git a/examples/feed.rb b/examples/feed.rb new file mode 100644 index 0000000..c74f24f --- /dev/null +++ b/examples/feed.rb @@ -0,0 +1,20 @@ +require 'rubygems' +require 'hot_bunnies' + + + + +connection = HotBunnies.connect(:host => 'localhost') +channel = connection.create_channel +channel.prefetch = 10 + +exchange = channel.exchange('frenzy_bunnies', :type => :direct, :durable => true) + + + +100_000.times do |i| + exchange.publish("hello world! #{i}", :routing_key => 'new.feeds') +end +puts "done" + + diff --git a/examples/feed_worker.rb b/examples/feed_worker.rb new file mode 100644 index 0000000..9a17a0d --- /dev/null +++ b/examples/feed_worker.rb @@ -0,0 +1,33 @@ +$:<< File.expand_path('../lib', File.dirname(__FILE__)) + +require 'rubygems' +require 'frenzy_bunnies' + +class FeedWorker + include FrenzyBunnies::Worker + from_queue 'new.feeds', :prefetch => 20, :threads => 13, :durable => true + + def work(msg) + puts msg + ack! + end +end + +class FeedDownloader + include FrenzyBunnies::Worker + from_queue 'new.downloads', :durable => true + def work(msg) + puts msg + ack! + end +end + +f = FrenzyBunnies::Context.new + +f.run FeedWorker,FeedDownloader + + +trap "INT" do + f.stop + exit! +end diff --git a/examples/feed_workers_bin.rb b/examples/feed_workers_bin.rb new file mode 100644 index 0000000..494e97a --- /dev/null +++ b/examples/feed_workers_bin.rb @@ -0,0 +1,21 @@ +class FeedWorker + include FrenzyBunnies::Worker + from_queue 'new.feeds', :prefetch => 20, :threads => 13, :durable => true + + def work(msg) + puts msg + ack! + end +end + +class FeedDownloader + include FrenzyBunnies::Worker + from_queue 'new.downloads', :durable => true + def work(msg) + puts msg + ack! + end +end + + + diff --git a/frenzy_bunnies.gemspec b/frenzy_bunnies.gemspec new file mode 100644 index 0000000..b9e583d --- /dev/null +++ b/frenzy_bunnies.gemspec @@ -0,0 +1,20 @@ +# -*- encoding: utf-8 -*- +require File.expand_path('../lib/frenzy_bunnies/version', __FILE__) + +Gem::Specification.new do |gem| + gem.authors = ["Dotan Nahum"] + gem.email = ["jondotan@gmail.com"] + gem.description = %q{TODO: Write a gem description} + gem.summary = %q{TODO: Write a gem summary} + gem.homepage = "" + + gem.files = `git ls-files`.split($\) + gem.executables = gem.files.grep(%r{^bin/}).map{ |f| File.basename(f) } + gem.test_files = gem.files.grep(%r{^(test|spec|features)/}) + gem.name = "frenzy_bunnies" + gem.require_paths = ["lib"] + gem.version = FrenzyBunnies::VERSION + + gem.add_runtime_dependency 'hot_bunnies', '>= 1.4.0.pre3' + gem.add_runtime_dependency 'thor' +end diff --git a/lib/frenzy_bunnies.rb b/lib/frenzy_bunnies.rb new file mode 100644 index 0000000..aa915d2 --- /dev/null +++ b/lib/frenzy_bunnies.rb @@ -0,0 +1,12 @@ +require 'hot_bunnies' + + +module FrenzyBunnies +end + + +require "frenzy_bunnies/version" +require 'frenzy_bunnies/queue_factory' +require 'frenzy_bunnies/context' +require 'frenzy_bunnies/worker' + diff --git a/lib/frenzy_bunnies/cli.rb b/lib/frenzy_bunnies/cli.rb new file mode 100644 index 0000000..8f3e9e0 --- /dev/null +++ b/lib/frenzy_bunnies/cli.rb @@ -0,0 +1,27 @@ +require 'thor' + + +class FrenzyBunnies::CLI < Thor + BUNNIES =<<-EOF + + (\\___/) + (='.'=) Frenzy Bunnies! + (")_(") JRuby based workers on top of hot_bunnies + + EOF + + desc 'run', "run workers from a file" + def start_workers(workerfile) + + require workerfile + # enumerate all workers + workers = [] + ObjectSpace.each_object(Class){|o| workers << o if o.ancestors.map(&:name).include? "FrenzyBunnies::Worker"} + workers.uniq! + puts BUNNIES + puts "Discovered #{workers.inspect}" + c = FrenzyBunnies::Context.new + c.run *workers + Signal.trap('INT') { c.stop; exit! } + end +end diff --git a/lib/frenzy_bunnies/context.rb b/lib/frenzy_bunnies/context.rb new file mode 100644 index 0000000..92dd2dd --- /dev/null +++ b/lib/frenzy_bunnies/context.rb @@ -0,0 +1,26 @@ +require 'logger' + +class FrenzyBunnies::Context + attr_reader :queue_factory, :logger + + def initialize(opts={}) + @opts = opts + @opts[:host] ||= 'localhost' + @opts[:exchange] ||= 'frenzy_bunnies' + @logger = @opts[:logger] || Logger.new(STDOUT) + @connection = HotBunnies.connect(:host => @opts[:host]) + @connection.add_shutdown_listener(lambda { |cause| puts cause; stop; sleep(10); start;}) + + @queue_factory = FrenzyBunnies::QueueFactory.new(@connection, @opts[:exchange]) + @klasses = [] + end + + def run(*klasses) + klasses.each{|klass| klass.start(self); @klasses << klass} + end + + def stop + @klasses.each{|klass| klass.stop } + end +end + diff --git a/lib/frenzy_bunnies/queue_factory.rb b/lib/frenzy_bunnies/queue_factory.rb new file mode 100644 index 0000000..5113b69 --- /dev/null +++ b/lib/frenzy_bunnies/queue_factory.rb @@ -0,0 +1,17 @@ +class FrenzyBunnies::QueueFactory + def initialize(connection, exchange) + @connection = connection + @exchange = exchange + end + + def build_queue(name, prefetch, durable) + channel = @connection.create_channel + channel.prefetch = prefetch + + exchange = channel.exchange(@exchange, :type => :direct, :durable => durable) + + queue = channel.queue(name) + queue.bind(exchange, :routing_key => name) + queue + end +end diff --git a/lib/frenzy_bunnies/version.rb b/lib/frenzy_bunnies/version.rb new file mode 100644 index 0000000..cd96899 --- /dev/null +++ b/lib/frenzy_bunnies/version.rb @@ -0,0 +1,3 @@ +module FrenzyBunnies + VERSION = "0.0.1" +end diff --git a/lib/frenzy_bunnies/worker.rb b/lib/frenzy_bunnies/worker.rb new file mode 100644 index 0000000..a947a48 --- /dev/null +++ b/lib/frenzy_bunnies/worker.rb @@ -0,0 +1,73 @@ +module FrenzyBunnies::Worker + import java.util.concurrent.Executors + + def ack! + true + end + def work + + end + + def self.included(base) + base.extend ClassMethods + end + + module ClassMethods + def from_queue(q, opts={}) + @queue_name = q + @queue_opts = opts + end + + def start(context) + @logger = context.logger + + @queue_opts[:prefetch] ||= 10 + @queue_opts[:durable] ||= false + + if @queue_opts[:threads] + @thread_pool = Executors.new_fixed_thread_pool(@queue_opts[:threads]) + else + @thread_pool = Executors.new_cached_thread_pool + end + + q = context.queue_factory.build_queue(@queue_name, @queue_opts[:prefetch], @queue_opts[:durable]) + @s = q.subscribe(:ack => true) + + say "#{@queue_opts[:threads] ? "#{@queue_opts[:threads]} threads " : ''}with #{@queue_opts[:prefetch]} prefetch on <#{@queue_name}>." + + @s.each(:blocking => false, :executor => @thread_pool) do |h, msg| + wkr = new + begin + if(wkr.work(msg)) + h.ack + else + h.reject + error "Cannot process message <#{msg.inspect}>" + end + rescue + h.reject + error "ERROR #{$!}" + end + end + + say "workers up." + end + + def stop + say "stopping" + @thread_pool.shutdown_now + say "pool shutdown" + @s.cancel + say "stopped" + end + + def say(text) + @logger.info "[#{self.name}] #{text}" + end + + def error(text) + @logger.error "[#{self.name}] #{text}" + end + end +end +