Permalink
Browse files

initial import

  • Loading branch information...
0 parents commit bc7d7c5ed0814ed5b862d7edb8f65ab51d48d30c @technoweenie committed Aug 24, 2011
Showing with 249 additions and 0 deletions.
  1. +14 −0 README.md
  2. +88 −0 lib/fantomex.rb
  3. +94 −0 lib/fantomex/adapters/sequel_adapter.rb
  4. +53 −0 test/sequel_adapter_test.rb
@@ -0,0 +1,14 @@
+# Fantomex
+
+Rough ruby port of https://github.com/technoweenie/fantomex
+
+TBA
+
+## USAGE
+
+```ruby
+require 'fantomex'
+queue = Fantomex::Adapters::SequelAdapter.new "sqlite://"
+queue.add 'some-message'
+puts queue.peek.inspect
+```
@@ -0,0 +1,88 @@
+module Fantomex
+ # Represents a single queued message.
+ class Message
+ attr_accessor :id, :data, :retries, :run_at
+
+ def initialize(data = {})
+ case data
+ when String
+ @data = data
+ when Hash
+ @data = data[:data]
+ @retries = data[:retries]
+ @run_at = data[:run_at]
+ @id = data[:id]
+ end
+ @retries ||= 0
+ @run_at ||= Time.now
+ end
+
+ # Public: Reschedules this job to run in the future. Uses exponential
+ # backoff.
+ #
+ # Returns nothing.
+ def reschedule!
+ duration = (@retries ** 4) + 5
+ @retries += 1
+ @run_at += duration
+ end
+ end
+
+ module Adapters
+ autoload :SequelAdapter, File.expand_path("../fantomex/adapters/sequel_adapter", __FILE__)
+ end
+
+ class Adapter
+ def initialize(options = {})
+ end
+
+ # Public: Adds a new Message to the queue.
+ #
+ # data - The String message.
+ #
+ # Returns a Message instance with a set id.
+ def push(data)
+ raise NotImplementedError
+ end
+
+ # Public: Gets the earliest Message.
+ #
+ # Returns a Message.
+ def peek
+ raise NotImplementedError
+ end
+
+ # Public: Counts the messages in the queue.
+ #
+ # Returns the Integer count.
+ def count
+ raise NotImplementedError
+ end
+
+ # Public: Removes a message from the queue.
+ #
+ # id - Integer ID of the Message.
+ #
+ # Returns nothing.
+ def remove(id)
+ raise NotImplementedError
+ end
+
+ # Public: Reschedules the Message to run at a later time. Re-assigns
+ # the time using exponential back-off.
+ #
+ # msg - The Message to reschedule.
+ #
+ # Returns the updated Message.
+ def reschedule(msg)
+ raise NotImplementedError
+ end
+
+ # Public: Sets up the DB schema for the queue.
+ #
+ # Returns nothing.
+ def setup
+ end
+ end
+end
+
@@ -0,0 +1,94 @@
+require 'sequel'
+
+module Fantomex
+ module Adapters
+ class SequelAdapter < Adapter
+ attr_reader :client
+
+ def initialize(options = {})
+ @client = case options
+ when Hash, String then Sequel.connect(options)
+ when Sequel::Database then options
+ else
+ raise ArgumentError, "Invalid options: #{options.inspect}"
+ end
+ @table = @client[:messages]
+ end
+
+ # Public: Adds a new Message to the queue.
+ #
+ # data - The String message.
+ #
+ # Returns a Message instance with a set id.
+ def push(data)
+ msg = case data
+ when String then Message.new(data)
+ when Message then data
+ else
+ raise ArgumentError, "Invalid message: #{data.inspect}"
+ end
+ msg.id = @table << to_row(msg)
+ msg
+ end
+
+ # Public: Gets the earliest Message.
+ #
+ # Returns a Message.
+ def peek
+ from_row @table.select(:rowid, :data, :retries, :run_at).order(:run_at).first
+ end
+
+ # Public: Counts the messages in the queue.
+ #
+ # Returns the Integer count.
+ def count
+ @table.count
+ end
+
+ # Public: Removes a message from the queue.
+ #
+ # id - Integer ID of the Message.
+ #
+ # Returns nothing.
+ def remove(id)
+ @table.where(:rowid => id.to_i).delete
+ end
+
+ # Public: Reschedules the Message to run at a later time. Re-assigns
+ # the time using exponential back-off.
+ #
+ # msg - The Message to reschedule.
+ #
+ # Returns the updated Message.
+ def reschedule(msg)
+ msg.reschedule!
+ @table.where(:rowid => msg.id).update(to_row(msg))
+ msg
+ end
+
+ # Public: Sets up the DB schema for the queue.
+ #
+ # Returns nothing.
+ def setup
+ @client.transaction do
+ @client.execute "CREATE TABLE IF NOT EXISTS messages (
+ data TEXT,
+ retries INTEGER,
+ run_at DATETIME DEFAULT CURRENT_TIMESTAMP)"
+ @client.execute "CREATE INDEX IF NOT EXISTS messages_by_run_at ON messages (run_at)"
+ end
+ end
+
+ def to_row(msg)
+ {:data => msg.data, :retries => msg.retries, :run_at => msg.run_at.utc}
+ end
+
+ def from_row(row)
+ return nil if !row
+ Message.new row.merge(:id => row[:rowid])
+ end
+ end
+ end
+end
+
+
@@ -0,0 +1,53 @@
+require 'test/unit'
+require File.expand_path('../../lib/fantomex', __FILE__)
+
+class SequelAdapterTest < Test::Unit::TestCase
+ def setup
+ @adapter = Fantomex::Adapters::SequelAdapter.new \
+ :adapter => 'sqlite',
+ :database => ":memory:"
+ @adapter.setup
+ end
+
+ def test_adding_a_message
+ msg = @adapter.push 'abc'
+ assert msg.id > 0
+ assert_equal 'abc', msg.data
+ assert_equal 0, msg.retries
+ end
+
+ def test_peek
+ assert_nil @adapter.peek
+ msg = @adapter.push 'abc'
+ assert_equal msg.id, @adapter.peek.id
+ end
+
+ def test_count
+ assert_equal 0, @adapter.count
+ @adapter.push 'abc'
+ assert_equal 1, @adapter.count
+ end
+
+ def test_reschedule
+ msg = @adapter.push 'abc'
+ start = msg.run_at
+
+ assert_equal start, @adapter.peek.run_at
+
+ msg.reschedule!
+ assert_equal start+5, msg.run_at
+ assert_equal start, @adapter.peek.run_at
+
+ @adapter.reschedule @adapter.peek
+ assert_equal start+5, @adapter.peek.run_at
+ end
+
+ def test_remove
+ assert_nil @adapter.peek
+ msg = @adapter.push 'abc'
+ assert_equal msg.id, @adapter.peek.id
+
+ @adapter.remove msg.id
+ assert_nil @adapter.peek
+ end
+end

0 comments on commit bc7d7c5

Please sign in to comment.