Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: resque/resque
...
head fork: axehomeyg/resque
compare: master
Checking mergeability… Don't worry, you can still create the pull request.
  • 8 commits
  • 26 files changed
  • 0 commit comments
  • 1 contributor
View
4 init.rb
@@ -1 +1,5 @@
require 'resque'
+
+# Change default in an initializer or test-setup to use a different backend
+Resque.connection_options = Resque::REDIS_CONNECTION_OPTIONS
+# Resque.connection_options = Resque::MYSQL_CONNECTION_OPTIONS
View
196 lib/resque.rb
@@ -1,4 +1,3 @@
-require 'redis/namespace'
begin
require 'yajl'
@@ -8,6 +7,9 @@
require 'resque/version'
+require 'resque/connection'
+require 'resque/base_model'
+
require 'resque/errors'
require 'resque/failure'
@@ -16,6 +18,7 @@
require 'resque/helpers'
require 'resque/stat'
require 'resque/job'
+require 'resque/queue'
require 'resque/worker'
require 'resque/plugin'
@@ -23,40 +26,8 @@ module Resque
include Helpers
extend self
- # Accepts:
- # 1. A 'hostname:port' string
- # 2. A 'hostname:port:db' string (to select the Redis db)
- # 3. An instance of `Redis`, `Redis::Client`, `Redis::DistRedis`,
- # or `Redis::Namespace`.
- def redis=(server)
- if server.respond_to? :split
- host, port, db = server.split(':')
- redis = Redis.new(:host => host, :port => port,
- :thread_safe => true, :db => db)
- @redis = Redis::Namespace.new(:resque, :redis => redis)
- elsif server.respond_to? :namespace=
- @redis = server
- else
- @redis = Redis::Namespace.new(:resque, :redis => server)
- end
- end
-
- # Returns the current Redis connection. If none has been created, will
- # create a new one.
- def redis
- return @redis if @redis
- self.redis = 'localhost:6379'
- self.redis
- end
-
- def redis_id
- # support 1.x versions of redis-rb
- if redis.respond_to?(:server)
- redis.server
- else
- redis.client.id
- end
- end
+ REDIS_CONNECTION_OPTIONS = {:type => 'redis', :server => 'localhost:6379'}
+ MYSQL_CONNECTION_OPTIONS = {:type => 'mysql', :server => {:host => 'localhost', :user => 'root'}}
# The `before_first_fork` hook will be run in the **parent** process
# only once, before forking to run the first job. Be careful- any
@@ -110,72 +81,23 @@ def to_s
end
- #
- # queue manipulation
- #
-
- # Pushes a job onto a queue. Queue name should be a string and the
- # item should be any JSON-able Ruby object.
+ # TODO: rename methods to indicate their queue-manipulation nature
def push(queue, item)
- watch_queue(queue)
- redis.rpush "queue:#{queue}", encode(item)
+ Queue.push_to_set(queue)
+ Queue.find(queue).push_to_list(item)
end
-
- # Pops a job off a queue. Queue name should be a string.
- #
- # Returns a Ruby object.
- def pop(queue)
- decode redis.lpop("queue:#{queue}")
- end
-
- # Returns an integer representing the size of a queue.
- # Queue name should be a string.
- def size(queue)
- redis.llen("queue:#{queue}").to_i
- end
-
- # Returns an array of items currently queued. Queue name should be
- # a string.
- #
- # start and count should be integer and can be used for pagination.
- # start is the item to begin, count is how many items to return.
- #
- # To get the 3rd page of a 30 item, paginatied list one would use:
- # Resque.peek('my_list', 59, 30)
+ def pop(queue) ; Queue.find(queue).pop_from_list ; end
+ def size(queue) ; Queue.find(queue).list_size ; end
def peek(queue, start = 0, count = 1)
- list_range("queue:#{queue}", start, count)
- end
-
- # Does the dirty work of fetching a range of items from a Redis list
- # and converting them into Ruby objects.
- def list_range(key, start = 0, count = 1)
- if count == 1
- decode redis.lindex(key, start)
- else
- Array(redis.lrange(key, start, start+count-1)).map do |item|
- decode item
- end
- end
- end
-
- # Returns an array of all known Resque queues as strings.
- def queues
- redis.smembers(:queues)
- end
-
- # Given a queue name, completely deletes the queue.
- def remove_queue(queue)
- redis.srem(:queues, queue.to_s)
- redis.del("queue:#{queue}")
- end
-
- # Used internally to keep track of which queues we've created.
- # Don't call this directly.
- def watch_queue(queue)
- redis.sadd(:queues, queue.to_s)
+ jobs = Queue.find(queue).list_range(start, count)
+ return jobs.first if count == 1
+ jobs
end
+ def queues ; Queue.all ; end
+ def remove_queue(queue) ; Queue.delete(queue) ; end
+ # TODO: rename methods to indiciate their job-manipulation nature
#
# job shortcuts
#
@@ -244,31 +166,9 @@ def reserve(queue)
Job.reserve(queue)
end
-
- #
- # worker shortcuts
- #
-
- # A shortcut to Worker.all
- def workers
- Worker.all
- end
-
- # A shortcut to Worker.working
- def working
- Worker.working
- end
-
- # A shortcut to unregister_worker
- # useful for command line tool
- def remove_worker(worker_id)
- worker = Resque::Worker.find(worker_id)
- worker.unregister_worker
- end
-
- #
- # stats
- #
+ def workers ; Worker.all ; end
+ def working ; Worker.working ; end
+ def remove_worker(worker_id) ; Resque::Worker.find(worker_id).unregister_worker ; end
# Returns a hash, similar to redis-rb's #info, of interesting stats.
def info
@@ -279,16 +179,58 @@ def info
:workers => workers.size.to_i,
:working => working.size,
:failed => Stat[:failed],
- :servers => [redis_id],
- :environment => defined?(RAILS_ENV) ? RAILS_ENV : (ENV['RACK_ENV'] || 'development')
+ :servers => [connection_id],
+ :environment => defined?(Rails) ? Rails.env : (ENV['RACK_ENV'] || 'development')
}
end
- # Returns an array of all known Resque keys in Redis. Redis' KEYS operation
- # is O(N) for the keyspace, so be careful - this can be slow for big databases.
- def keys
- redis.keys("*").map do |key|
- key.sub("#{redis.namespace}:", '')
+ #
+ # datastores
+ #
+
+ # lazily load the backend datastore
+ def connection
+ return @connection if @connection
+ @connection = Resque::Connection.establish_connection(connection_options[:type], connection_options[:server])
+ end
+
+ def connection_options ; @connection_options ||= REDIS_CONNECTION_OPTIONS ; end
+ def connection_options=(options)
+ @connection = nil # force a re-memoization of the connection object
+ @connection_options = options
+ end
+
+ def connection_id
+ case connection_type.to_s
+ when 'redis' : redis_id
+ when 'mysql' : connection_options[:server]
+ end
+ end
+
+ def connection_type ; connection_options[:type] ; end
+ def connection_type=(type)
+ case type.to_s
+ when 'redis'
+ self.connection_options = REDIS_CONNECTION_OPTIONS
+ when 'mysql'
+ self.connection_options = MYSQL_CONNECTION_OPTIONS
end
end
+
+ # Deprecate
+ def redis_id
+ # support 1.x versions of redis-rb
+ if connection.respond_to?(:server)
+ connection.server
+ elsif connection.respond_to?(:client)
+ connection.client.id
+ end
+ end
+
+ # def redis ; connection ; end
+ def redis=(server)
+ self.connection_options = REDIS_CONNECTION_OPTIONS
+ connection
+ end
+ def keys ; connection.keys ; end
end
View
112 lib/resque/base_model.rb
@@ -0,0 +1,112 @@
+module Resque
+ class BaseModel
+ class << self
+ # cheap proxy
+ def method_missing(method_symbol, *args, &block)
+ connection.send(method_symbol, *args, &block)
+ end
+
+ def connection ; Resque.connection ; end
+
+ def find(id)
+ obj = new(get(id))
+ obj.id = id
+ obj
+ end
+
+ def all_from_set(&block)
+ data = connection.all_from_set(collection_key)
+
+ return data.map(&block) if block_given?
+
+ data.map{ |id| find(id) }
+ end
+
+ def exists_in_set?(obj) ; connection.exists_in_set?(collection_key, obj) ; end
+ def delete_from_set(obj) ; connection.delete_from_set(collection_key, obj) ; end
+ def push_to_set(obj) ; connection.push_to_set(collection_key, obj) ; end
+
+ def get(id) ; connection.get("#{key}:#{id}") ; end
+ def set(id, value) ; connection.set("#{key}:#{id}", value) ; end
+ def delete(id) ; connection.delete("#{key}:#{id}") ; end
+
+ def increment(id, by) ; connection.increment("#{key}:#{id}", by) ; end
+ def decrement(id, by) ; connection.decrement("#{key}:#{id}", by) ; end
+
+ def size_of_list(id) ; connection.size_of_list("#{key}:#{id}") ; end
+ def pop_from_list(id) ; connection.pop_from_list("#{key}:#{id}") ; end
+ def push_to_list(id, item) ; connection.push_to_list("#{key}:#{id}", item) ; end
+
+
+ def key_prefix ; "#{entity}:" ; end
+
+ # Given a word with dashes, returns a camel cased version of it.
+ #
+ # classify('job-name') # => 'JobName'
+ def classify(dashed_word)
+ dashed_word.split('-').each { |part| part[0] = part[0].chr.upcase }.join
+ end
+
+ # Given a camel cased word, returns the constant it represents
+ #
+ # constantize('JobName') # => JobName
+ def constantize(camel_cased_word)
+ camel_cased_word = camel_cased_word.to_s
+
+ if camel_cased_word.include?('-')
+ camel_cased_word = classify(camel_cased_word)
+ end
+
+ names = camel_cased_word.split('::')
+ names.shift if names.empty? || names.first.empty?
+
+ constant = Object
+ names.each do |name|
+ constant = constant.const_get(name) || constant.const_missing(name)
+ end
+ constant
+ end
+
+ def key ; (self.to_s.downcase.split("::").last).to_sym ; end
+ def collection_key ; (key.to_s + "s").to_sym ; end
+
+ end # end of ClassMethods
+
+ # start of InstanceMethods
+ def initialize(attributes)
+ @attributes = attributes
+ end
+
+ def id=(value)
+ @to_s = value
+ end
+
+ def to_s ; @to_s ; end
+
+ def key ; "#{self.class.key}:#{to_s}" ; end
+
+ def push_to_list(item)
+ self.class.push_to_list(to_s, item)
+ end
+
+ def pop_from_list
+ self.class.pop_from_list(to_s)
+ end
+
+ def list_size
+ self.class.size_of_list(to_s)
+ end
+
+ # Bypass DataModel, so build our own key
+ def list_range(start = 0, count = 1)
+ range = self.class.range_from_list(key, start, count)
+
+ return self.class.decode(range) unless range.is_a?(Array)
+
+ range.map do |item|
+ self.class.decode(item)
+ end
+ end
+
+ end
+end
View
15 lib/resque/connection.rb
@@ -0,0 +1,15 @@
+%w(base cassandra mysql mongo redis).each do |adapter|
+ require File.join(File.dirname(__FILE__), "connection/#{adapter}.rb")
+end
+module Resque
+ module Connection
+ def self.establish_connection(backend_type, server)
+ case @backend_type = backend_type
+ when 'redis' : Resque::Connection::Redis.new(server)
+ when 'cassandra' : Resque::Connection::Cassandra.new(server)
+ when 'mysql' : Resque::Connection::Mysql.new(server)
+ else ; raise "Unknown Datastore Type: #{backend_type}"
+ end
+ end
+ end
+end
View
71 lib/resque/connection/base.rb
@@ -0,0 +1,71 @@
+module Resque
+ class Base
+
+ attr_accessor :backend, # adapter class
+ :backend_type, # redis, mysql, cassandra
+ :server # the connection string/object used to create the connection
+ # in lieu of a whole adapter mechanism, lets just provide parallel implementations here
+ def initialize(server)
+ self.server = server
+ self.backend = setup_backend(server)
+ end
+
+ def keys
+ raise 'override keys method'
+ end
+
+ def delete(key)
+ raise 'override delete method'
+ end
+
+ # Does the dirty work of fetching a range of items from a Redis list
+ # and converting them into Ruby objects.
+ def get_size(key)
+ raise 'override get_size method'
+ end
+
+ def get_value_as_array(key, start = 0)
+ raise 'override get_value method'
+ end
+
+ def list_range(*args) ; range_from_list(*args) ; end
+
+
+ def method_missing(method_symbol, *args, &block)
+ backend.send(method_symbol, *args, &block)
+ end
+
+ # Helpers
+ # Given a Ruby object, returns a string suitable for storage in a
+ # queue.
+ def encode(object)
+ if defined? Yajl
+ Yajl::Encoder.encode(object)
+ else
+ object.to_json
+ end
+ end
+
+ # Given a string, returns a Ruby object.
+ def decode(object)
+ return unless object
+
+ if defined? Yajl
+ begin
+ Yajl::Parser.parse(object, :check_utf8 => false)
+ rescue Yajl::ParseError
+ end
+ else
+ begin
+ JSON.parse(object)
+ rescue JSON::ParserError
+ end
+ end
+ end
+
+ private
+ def setup_backend(server)
+ raise 'override setup method'
+ end
+ end
+end
View
13 lib/resque/connection/cassandra.rb
@@ -0,0 +1,13 @@
+module Resque
+ module Connection
+ class Mysql < Base
+ def setup_backend(server)
+ # Should we just ActiveRecord here?
+ require 'cassandra'
+ # self.backend = ::Mysql.new(server[:host], server[:user])
+ # backend.select_db('resque') rescue nil
+ # backend
+ end
+ end
+ end
+end
View
0  lib/resque/connection/mongo.rb
No changes.
View
219 lib/resque/connection/mysql.rb
@@ -0,0 +1,219 @@
+module Resque
+ module Connection
+ class Mysql < Base
+
+ def setup_backend(server)
+ # Should we just ActiveRecord here?
+ require 'mysql'
+ self.backend = ::Mysql.new(server[:host], server[:user])
+ backend.select_db('resque') rescue nil
+ backend
+ end
+
+ # K/V Methods
+ def set(key, value)
+ st = backend.prepare("INSERT INTO resque_data (k, v) VALUES (?, ?) ON DUPLICATE KEY UPDATE v = ?")
+ st.execute(key, value, value)
+ st.close
+ end
+
+ def get(key) ; find_one("SELECT v FROM resque_data WHERE k = '#{key}'") ; end
+ def delete(key)
+ query("DELETE FROM resque_data WHERE k = '#{key}'")
+ delete_list(key)
+ end
+
+ def mapped_mget(*keys)
+ key_list = keys.map{ |key| "'#{key}'"}.join(",")
+
+ vals = find_all("SELECT k, v FROM resque_data WHERE k IN (#{key_list})").inject({}) do |accum, row|
+ accum.merge(row[0] => row[1])
+ end
+
+ vals
+ end
+
+ def keys
+ find_column("SELECT k FROM resque_data ORDER BY k ASC") +
+ find_column("SELECT DISTINCT k FROM resque_lists ORDER BY k ASC")
+ end
+
+ # Set methods (treating as plain lists right now.) TODO: figure out if we need a
+ # set/list distinction in the mysql adapter
+ def all_from_set(key)
+ find_column("SELECT v FROM resque_lists WHERE k = '#{key}' ORDER BY created_at DESC")
+ end
+
+ def exists_in_set?(key, obj)
+ find_one("SELECT COUNT(*) FROM resque_lists WHERE k = '#{key}' AND v = '#{obj}' ORDER BY created_at ASC").to_i > 0
+ end
+
+ def delete_from_set(key, obj)
+ query("DELETE FROM resque_lists WHERE k = '#{key}' AND v = '#{obj}'")
+ end
+
+ def push_to_set(key, obj)
+ transaction do
+ unless exists?("SELECT COUNT(*) FROM resque_lists WHERE k = '#{key}' and v = '#{obj}' LIMIT 1")
+ created_at = find_one("SELECT created_at FROM resque_lists WHERE k = '#{key}' ORDER BY created_at ASC LIMIT 1")
+
+ if created_at.nil?
+ st = backend.prepare("INSERT INTO resque_lists (k, v, created_at) VALUES (?, ?, now())")
+ st.execute(key.to_s, obj.to_s)
+ st.close
+ else
+ st = backend.prepare("INSERT INTO resque_lists (k,v, created_at) VALUES (?, ?, DATE_SUB(?, INTERVAL 1 SECOND)) ON DUPLICATE KEY UPDATE v = ?")
+ st.execute(key.to_s, obj.to_s, created_at, obj.to_s)
+ st.close
+ end
+ end
+ end
+ end
+
+ # Counter methods
+ def increment(key, by)
+ unless exists?("SELECT COUNT(*) FROM resque_data WHERE k = '#{key}'")
+ query("INSERT INTO resque_data (k,v) VALUES ('#{key}', 0)")
+ end
+ query("UPDATE resque_data SET v = v + '#{by}' WHERE k = '#{key}'")
+ end
+
+ def decrement(key, by)
+ query("UPDATE resque_data SET v = v - '#{by}' WHERE k = '#{key}'")
+ end
+
+ # List methods
+ def pop_from_list(key)
+ value = nil
+ selective_sql = "FROM resque_lists WHERE k = '#{key}' ORDER BY created_at ASC LIMIT 1"
+ transaction do
+ value = decode(find_one("SELECT v #{selective_sql}"))
+ query("DELETE #{selective_sql}")
+ end
+ value
+ end
+
+ def size_of_list(key)
+ find_one("SELECT COUNT(*) FROM resque_lists WHERE k = '#{key}'").to_i
+ end
+
+ def push_to_list(key, item)
+ st = backend.prepare("INSERT INTO resque_lists (k, v, created_at) VALUES (?, ?, now())")
+ st.execute(key.to_s, encode(item))
+ st.close
+
+ find_one("SELECT COUNT(*) FROM resque_lists WHERE k = '#{key}'").to_i
+ end
+
+ def delete_list(key)
+ query("DELETE FROM resque_lists WHERE k = '#{key}'")
+ end
+
+ # encoding of value is handled externally
+ def delete_from_list(key, count, value)
+ order = count > 0 ? "ASC" : "DESC"
+ st = backend.prepare("DELETE FROM resque_lists WHERE k = ? and v = ? ORDER BY created_at #{order} LIMIT #{count == 0 ? 1 : count.abs}")
+ st.execute(key, value)
+ st.close
+
+ backend.affected_rows.abs
+ end
+
+ # decoding of value is handled externally
+ def range_from_list(key, start = 0, count = 1) # positive 'count' is a delta. negative 'count' is passed through.
+ upper_bound = count < 0 ? count : (start+count-1)
+
+ if count > 0
+ find_column("SELECT v FROM resque_lists WHERE k = '#{key}' LIMIT #{start}, #{count}")
+ else
+ find_column("SELECT v FROM resque_lists WHERE k = '#{key}'")
+ end
+ end
+
+ def list_range(*args) ; range_from_list(*args) ; end
+
+ def get_size(key)
+ case type(key)
+ when 'string' : get(key).length
+ when 'list' : size_of_list(key)
+ else
+ []
+ end
+ end
+
+ def get_value_as_array(key, start, count)
+ case type(key)
+ when 'string' : [get(key)]
+ when 'list' : range_from_list(key, start, count)
+ else
+ []
+ end
+ end
+
+ def type(key)
+ if value = get(key)
+ 'string'
+ elsif size_of_list(key) > 0
+ 'list'
+ else
+ 'none'
+ end
+ end
+
+ def flushall
+ query "truncate resque_data"
+ query "truncate resque_lists"
+ end
+
+ def backend_type ; 'redis' ; end
+
+ def namespace ; 'mysql' ; end
+
+ private
+ def query(sql)
+ # puts "** EXECUTING QUERY: #{sql} **"
+ backend.query(sql)
+ end
+
+ def exists?(sql)
+ find_one(sql).to_i > 0
+ end
+
+ def find_all(sql)
+ results = []
+ query(sql).each do |row|
+ results << row
+ end
+ results
+ end
+
+ def find_one(sql)
+ results = []
+ query(sql).each do |row|
+ results << row.first
+ end
+ results.first
+ end
+
+
+ def find_column(sql)
+ results = []
+ query(sql).each do |row|
+ results << row.first
+ end
+ results
+ end
+
+ def transaction(&block)
+ query("BEGIN")
+
+ result = yield
+
+ query("COMMIT")
+
+ result
+ end
+ end
+
+ end
+end
View
100 lib/resque/connection/redis.rb
@@ -0,0 +1,100 @@
+module Resque
+ module Connection
+ class Redis < Base
+
+ def setup_backend(server)
+ require 'redis/namespace'
+ if server.respond_to? :split
+ host, port, db = server.split(':')
+ redis = ::Redis.new(:host => host, :port => port,
+ :thread_safe => true, :db => db)
+ ::Redis::Namespace.new(:resque, :redis => redis)
+ elsif server.respond_to? :namespace=
+ server
+ else
+ ::Redis::Namespace.new(:resque, :redis => server)
+ end
+ end
+
+ # K/V Methods
+ def set(key, value) ; backend.set(key, value) ; end
+ def get(key) ; backend.get(key) ; end
+ def delete(key) ; backend.del(key) ; end
+ def keys
+ backend.keys("*").map do |key|
+ key.sub("#{backend.namespace}:", '')
+ end
+ end
+
+ # Set methods
+ def all_from_set(key) ; backend.smembers(key) || [] ; end
+ def exists_in_set?(key, obj) ; backend.sismember(key, obj) ; end
+ def delete_from_set(key, obj) ; backend.srem(key, obj) ; end
+ def push_to_set(key, obj) ; backend.sadd(key, obj) ; end
+
+ # Counter methods
+ def increment(key, by) ; backend.incrby(key, by) ; end
+ def decrement(key, by) ; backend.decrby(key, by) ; end
+
+ # List methods
+ def delete_list(key) ; delete(key); end
+ def pop_from_list(key) ; decode(backend.lpop(key)) ; end
+ def size_of_list(key) ; backend.llen(key).to_i ; end
+ def push_to_list(key, item) ; backend.rpush(key, encode(item)) ; end
+
+ # encoding of value is handled externally
+ def delete_from_list(key, count, value)
+ backend.lrem(key, count, value).to_i
+ end
+
+ # encoding of value is handled externally
+ def range_from_list(key, start = 0, count = 1) # positive 'count' is a delta. negative 'count' is passed through.
+ if count == 1
+ #decode
+ [backend.lindex(key, start)]
+ else
+ upper_bound = count < 0 ? count : (start+count-1)
+ Array(backend.lrange(key, start, upper_bound))
+ end
+ end
+
+ def get_size(key)
+ case backend.type(key)
+ when 'none'
+ []
+ when 'list'
+ backend.llen(key)
+ when 'set'
+ backend.scard(key)
+ when 'string'
+ backend.get(key).length
+ when 'zset'
+ backend.zcard(key)
+ end
+ end
+
+ def get_value_as_array(key, start = 0, count = 20)
+ case backend.type(key)
+ when 'none'
+ []
+ when 'list'
+ backend.lrange(key, start, start + count)
+ when 'set'
+ backend.smembers(key)[start..(start + count)]
+ when 'string'
+ [backend.get(key)]
+ when 'zset'
+ backend.zrange(key, start, start + count)
+ end
+ end
+
+ def type(key)
+ backend.type(key)
+ end
+
+ def backend_type ; 'redis' ; end
+
+ end
+
+ end
+end
View
21 lib/resque/failure/redis.rb
@@ -13,20 +13,30 @@ def save
:worker => worker.to_s,
:queue => queue
}
- data = Resque.encode(data)
- Resque.redis.rpush(:failed, data)
+ # data = Resque.encode(data)
+
+ # Resque.redis.rpush(:failed, data)
+ #
+ Resque.connection.push_to_list(:failed, data)
end
def self.count
- Resque.redis.llen(:failed).to_i
+ Resque.connection.size_of_list(:failed)
+# Resque.redis.llen(:failed).to_i
end
def self.all(start = 0, count = 1)
- Resque.list_range(:failed, start, count)
+ failed_jobs = Resque.connection.list_range(:failed, start, count).map do |item|
+ Resque.decode(item)
+ end
+
+ return failed_jobs.first if count == 1
+
+ failed_jobs
end
def self.clear
- Resque.redis.del(:failed)
+ Resque.connection.delete_list(:failed)
end
def self.requeue(index)
@@ -36,5 +46,6 @@ def self.requeue(index)
Job.create(item['queue'], item['payload']['class'], *item['payload']['args'])
end
end
+
end
end
View
18 lib/resque/helpers.rb
@@ -1,11 +1,23 @@
module Resque
# Methods used by various classes in Resque.
module Helpers
- # Direct access to the Redis instance.
- def redis
- Resque.redis
+ def self.included(base)
+ base.extend ClassMethods
end
+ module ClassMethods
+ # Returns an array of all known Resque keys in Redis. Redis' KEYS operation
+ # is O(N) for the keyspace, so be careful - this can be slow for big databases.
+ def keys
+ # for now, this only works with redis right now
+ connection.keys("*").map do |key|
+ key.sub("#{backend.namespace}:", '')
+ end
+ end
+
+ def connection ; Resque.connection ; end
+ end
+
# Given a Ruby object, returns a string suitable for storage in a
# queue.
def encode(object)
View
9 lib/resque/job.rb
@@ -11,13 +11,13 @@ module Resque
# job = Resque::Job.reserve(:high)
# klass = Resque::Job.constantize(job.payload['class'])
# klass.perform(*job.payload['args'])
- class Job
+ class Job < BaseModel
include Helpers
extend Helpers
# Raise Resque::Job::DontPerform from a before_perform hook to
# abort the job.
- DontPerform = Class.new(StandardError)
+ class DontPerform < StandardError ; end
# The worker object which is currently processing this job.
attr_accessor :worker
@@ -80,14 +80,15 @@ def self.destroy(queue, klass, *args)
queue = "queue:#{queue}"
destroyed = 0
- redis.lrange(queue, 0, -1).each do |string|
+ connection.range_from_list(queue, 0, -1).each do |string|
json = decode(string)
match = json['class'] == klass
match &= json['args'] == args unless args.empty?
if match
- destroyed += redis.lrem(queue, 0, string).to_i
+
+ destroyed += connection.delete_from_list(queue, 0, string) #connection.lrem(queue, 0, string).to_i
end
end
View
23 lib/resque/queue.rb
@@ -0,0 +1,23 @@
+module Resque
+ class Queue < BaseModel
+ class << self
+ def delete(key)
+ super
+ delete_from_set(key.to_s)
+ end
+
+ def all
+ all_from_set do |item|
+ item
+ end
+ end
+
+ def find(id)
+ obj = new(id)
+ obj.id = id
+ obj
+ end
+
+ end
+ end
+end
View
28 lib/resque/server.rb
@@ -47,33 +47,11 @@ def tabs
end
def redis_get_size(key)
- case Resque.redis.type(key)
- when 'none'
- []
- when 'list'
- Resque.redis.llen(key)
- when 'set'
- Resque.redis.scard(key)
- when 'string'
- Resque.redis.get(key).length
- when 'zset'
- Resque.redis.zcard(key)
- end
+ Resque.connection.get_size(key)
end
def redis_get_value_as_array(key, start=0)
- case Resque.redis.type(key)
- when 'none'
- []
- when 'list'
- Resque.redis.lrange(key, start, start + 20)
- when 'set'
- Resque.redis.smembers(key)[start..(start + 20)]
- when 'string'
- [Resque.redis.get(key)]
- when 'zset'
- Resque.redis.zrange(key, start, start + 20)
- end
+ Resque.connection.get_value_as_array(key, start, 20)
end
def show_args(args)
@@ -106,7 +84,7 @@ def show(page, layout = true)
begin
erb page.to_sym, {:layout => layout}, :resque => Resque
rescue Errno::ECONNREFUSED
- erb :error, {:layout => false}, :error => "Can't connect to Redis! (#{Resque.redis_id})"
+ erb :error, {:layout => false}, :error => "Can't connect to Redis! (#{Resque.connection_options.inspect})"
end
end
View
6 lib/resque/server/views/layout.erb
@@ -16,9 +16,9 @@
<%= tab tab_name %>
<% end %>
</ul>
- <% if Resque.redis.namespace != :resque %>
+ <% if resque.connection.namespace != :resque %>
<abbr class="namespace" title="Resque's Redis Namespace">
- <%= Resque.redis.namespace %>
+ <%= resque.connection.namespace %>
</abbr>
<% end %>
</div>
@@ -37,7 +37,7 @@
<div id="footer">
<p>Powered by <a href="http://github.com/defunkt/resque">Resque</a> v<%=Resque::Version%></p>
- <p>Connected to Redis namespace <%= Resque.redis.namespace %> on <%=Resque.redis_id%></p>
+ <p>Connected to Redis namespace <%= resque.connection.namespace %> on <%= resque.connection_id %></p>
</div>
</body>
View
8 lib/resque/server/views/stats.erb
@@ -2,7 +2,7 @@
<% if params[:key] %>
-<%= partial resque.redis.type(params[:key]).eql?("string") ? :key_string : :key_sets %>
+<%= partial resque.connection.type(params[:key]).eql?("string") ? :key_string : :key_sets %>
<% elsif params[:id] == "resque" %>
@@ -22,9 +22,9 @@
<% elsif params[:id] == 'redis' %>
- <h1><%= resque.redis_id %></h1>
+ <h1><%= resque.connection_id %></h1>
<table class='stats'>
- <% for key, value in resque.redis.info.to_a.sort_by { |i| i[0].to_s } %>
+ <% for key, value in resque.connection.info.to_a.sort_by { |i| i[0].to_s } %>
<tr>
<th>
<%= key %>
@@ -51,7 +51,7 @@
<th>
<a href="<%=u "/stats/keys/#{key}" %>"><%= key %></a>
</th>
- <td><%= resque.redis.type key %></td>
+ <td><%= resque.connection.type key %></td>
<td><%= redis_get_size key %></td>
</tr>
<% end %>
View
73 lib/resque/stat.rb
@@ -5,49 +5,50 @@ module Resque
# Incr a stat: Stat.incr(name)
# Decr a stat: Stat.decr(name)
# Kill a stat: Stat.clear(name)
- module Stat
- extend self
+ class Stat < BaseModel
extend Helpers
+ class << self
+ # Returns the int value of a stat, given a string stat name.
+ def get(stat)
+ super.to_i
+ # find(stat).to_i
+ end
- # Returns the int value of a stat, given a string stat name.
- def get(stat)
- redis.get("stat:#{stat}").to_i
- end
+ # Alias of `get`
+ def [](stat)
+ get stat
+ end
- # Alias of `get`
- def [](stat)
- get(stat)
- end
+ # For a string stat name, increments the stat by one.
+ #
+ # Can optionally accept a second int parameter. The stat is then
+ # incremented by that amount.
+ def incr(stat, by = 1)
+ increment(stat, by)
+ end
- # For a string stat name, increments the stat by one.
- #
- # Can optionally accept a second int parameter. The stat is then
- # incremented by that amount.
- def incr(stat, by = 1)
- redis.incrby("stat:#{stat}", by)
- end
+ # Increments a stat by one.
+ def <<(stat)
+ incr stat
+ end
- # Increments a stat by one.
- def <<(stat)
- incr stat
- end
+ # For a string stat name, decrements the stat by one.
+ #
+ # Can optionally accept a second int parameter. The stat is then
+ # decremented by that amount.
+ def decr(stat, by = 1)
+ decrement(stat, by)
+ end
- # For a string stat name, decrements the stat by one.
- #
- # Can optionally accept a second int parameter. The stat is then
- # decremented by that amount.
- def decr(stat, by = 1)
- redis.decrby("stat:#{stat}", by)
- end
-
- # Decrements a stat by one.
- def >>(stat)
- decr stat
- end
+ # Decrements a stat by one.
+ def >>(stat)
+ decr stat
+ end
- # Removes a stat from Redis, effectively setting it to 0.
- def clear(stat)
- redis.del("stat:#{stat}")
+ # Removes a stat from Redis, effectively setting it to 0.
+ def clear(stat)
+ delete(stat)
+ end
end
end
end
View
40 lib/resque/worker.rb
@@ -6,7 +6,11 @@ module Resque
#
# It also ensures workers are always listening to signals from you,
# their master, and can react accordingly.
- class Worker
+ class Worker < BaseModel
+ class << self
+ def exists?(id) ; exists_in_set?(id) ; end
+ def all ; all_from_set ; end
+ end
include Resque::Helpers
extend Resque::Helpers
@@ -22,25 +26,22 @@ class Worker
attr_writer :to_s
- # Returns an array of all worker objects.
- def self.all
- redis.smembers(:workers).map { |id| find(id) }
- end
+ # # Returns an array of all worker objects.
# Returns an array of all worker objects currently processing
# jobs.
def self.working
- names = all
+ names = all_from_set
return [] unless names.any?
names.map! { |name| "worker:#{name}" }
- redis.mapped_mget(*names).keys.map do |key|
+ connection.mapped_mget(*names).keys.map do |key|
find key.sub("worker:", '')
end.compact
end
# Returns a single worker object. Accepts a string id.
def self.find(worker_id)
- if exists? worker_id
+ if exists_in_set? worker_id
queues = worker_id.split(':')[-1].split(',')
worker = new(*queues)
worker.to_s = worker_id
@@ -57,9 +58,6 @@ def self.attach(worker_id)
# Given a string worker id, return a boolean indicating whether the
# worker exists
- def self.exists?(worker_id)
- redis.sismember(:workers, worker_id)
- end
# Workers should be initialized with an array of string queue
# names. The order is important: a Worker will check the first
@@ -317,7 +315,7 @@ def prune_dead_workers
# Registers ourself as a worker. Useful when entering the worker
# lifecycle on startup.
def register_worker
- redis.sadd(:workers, self)
+ self.class.push_to_set(self)
started!
end
@@ -343,9 +341,9 @@ def unregister_worker
job.fail(DirtyExit.new)
end
- redis.srem(:workers, self)
- redis.del("worker:#{self}")
- redis.del("worker:#{self}:started")
+ self.class.delete_from_set(self)
+ self.class.delete(self)
+ self.class.delete("#{self}:start")
Stat.clear("processed:#{self}")
Stat.clear("failed:#{self}")
@@ -359,14 +357,14 @@ def working_on(job)
:queue => job.queue,
:run_at => Time.now.to_s,
:payload => job.payload
- redis.set("worker:#{self}", data)
+ self.class.set(self, data)
end
# Called when we are done working - clears our `working_on` state
# and tells Redis we processed a job.
def done_working
processed!
- redis.del("worker:#{self}")
+ self.class.delete(self)
end
# How many jobs has this worker processed? Returns an int.
@@ -393,17 +391,17 @@ def failed!
# What time did this worker start? Returns an instance of `Time`
def started
- redis.get "worker:#{self}:started"
+ self.class.get "#{self}:started"
end
# Tell Redis we've started
def started!
- redis.set("worker:#{self}:started", Time.now.to_s)
+ self.class.set "#{self}:started", Time.now.to_s
end
# Returns a hash explaining the Job we're currently processing, if any.
def job
- decode(redis.get("worker:#{self}")) || {}
+ decode(self.class.get(self)) || {}
end
alias_method :processing, :job
@@ -420,7 +418,7 @@ def idle?
# Returns a symbol representing the current worker state,
# which can be either :working or :idle
def state
- redis.exists("worker:#{self}") ? :working : :idle
+ self.class.exists?(self) ? :working : :idle
end
# Is this worker the same as another worker?
View
127 test/backend_test.rb
@@ -0,0 +1,127 @@
+require File.dirname(__FILE__) + '/test_helper'
+
+%w(mysql redis).each do |backend_type|
+ context "#{backend_type} adapter" do
+ # in rspec, we could use shared_behaviors for this
+ setup do
+ @connection = ResqueTestBackendManager.setup(backend_type)
+
+ @connection.flushall
+ end
+
+ teardown do
+ ResqueTestBackendManager.teardown(backend_type)
+ end
+
+ # These tests necessarily present a chicken-and-egg situation. The setup requires functionality that itself is under tests
+ test "#{backend_type} flushes all keys" do
+ @connection.set("some_key", "some_value")
+
+ @connection.flushall
+
+ assert_nil @connection.get("some_key")
+ end
+
+ test "#{backend_type} sets and gets k/v pairs" do
+ @connection.set("some_key", "some_value")
+
+ assert_equal(@connection.get("some_key"), "some_value")
+ end
+
+ test "#{backend_type} deletes specific keys" do
+ @connection.set("k1", "v1")
+ @connection.delete("k1")
+ assert_nil @connection.get("k1")
+ end
+
+ test "#{backend_type} exposes all keys" do
+ @connection.set("some_key", "some_value")
+ @connection.push_to_set("some_set_key", "some_set_value")
+ assert_equal @connection.keys.sort, ["some_key", "some_set_key"]
+ end
+
+ test "#{backend_type} retrieves all k=>v data is a hash" do
+ data = {"k1" => "v1",
+ "k2" => "v2",
+ "k3" => "v3"}
+
+ data.each do |k,v|
+ @connection.set(k, v)
+ end
+
+ assert_equal @connection.mapped_mget(*data.keys), data
+ end
+
+ test "#{backend_type} supports set operations" do
+ set_key = 'myset'
+ values = %w(v1 v2 v3)
+ values.each do |v| @connection.push_to_set(set_key, v) end
+
+ assert_equal @connection.all_from_set(set_key), values
+ assert @connection.exists_in_set?(set_key, 'v1')
+ assert !@connection.exists_in_set?(set_key, 'not_in_set')
+
+ @connection.delete_from_set(set_key, 'v2')
+ assert_equal @connection.all_from_set(set_key), %w(v1 v3)
+ end
+
+ test "#{backend_type} supports counter operations" do
+ @connection.increment('mycounter', 1)
+ assert_equal @connection.get('mycounter').to_i, 1
+
+ @connection.increment('mycounter', 1)
+ assert_equal @connection.get('mycounter').to_i, 2
+
+ @connection.decrement('mycounter', 1)
+ assert_equal @connection.get('mycounter').to_i, 1
+ end
+
+ test "#{backend_type} supports list operations" do
+ items = %w(a b c d)
+ key = "mylist"
+
+ items.each { |i| @connection.push_to_list(key, i) }
+
+ assert_equal @connection.size_of_list(key), items.length
+
+ assert_equal @connection.pop_from_list(key), items.first
+
+ assert_equal @connection.size_of_list(key), items.length - 1
+ end
+
+ # test "#delete_from_list"
+ test "#{backend_type} deletes item from a list" do
+ list = %w(a b c d)
+
+ key = 'listkey'
+
+ list.each {|v| @connection.push_to_list(key, v)}
+
+ assert_equal @connection.delete_from_list(key, 0, @connection.encode('c')), 1 # items deleted
+
+ assert_equal @connection.range_from_list(key, 0, 4).map{|i|@connection.decode(i)}, (list - ['c'])
+ end
+
+ test "#{backend_type} displays size of an item" do
+ @connection.set('string', "blah")
+ %w(a b c d e f g h).each {|v| @connection.push_to_list('list', v) }
+ %w(i j k l m n o p q r s t).each {|v| @connection.push_to_set('set', v) }
+
+ assert_equal @connection.get_size('string'), 4
+ assert_equal @connection.get_size('list'), 8
+ assert_equal @connection.get_size('set'), 12
+ end
+
+ test "#{backend_type} gets an array of values (across k/v or lists)" do
+ @connection.set('k1', 'v1')
+ assert_equal @connection.get_value_as_array('k1', 0, 1), ['v1']
+
+ @connection.push_to_set('k2', 'v2')
+ assert_equal @connection.get_value_as_array('k2', 0, 1), ['v2']
+ end
+
+ # This piece might be a little too coupled to Redis.
+ # test "#{backend_type} gives us the type of an item" do
+ end
+end
+
View
54 test/configs/cassandra.in.sh
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cassandra_home=`dirname $0`/..
+
+# The directory where Cassandra's configs live (required)
+CASSANDRA_CONF=$cassandra_home/conf
+
+# This can be the path to a jar file, or a directory containing the
+# compiled classes. NOTE: This isn't needed by the startup script,
+# it's just used here in constructing the classpath.
+cassandra_bin=$cassandra_home/build/classes
+#cassandra_bin=$cassandra_home/build/cassandra.jar
+
+# JAVA_HOME can optionally be set here
+#JAVA_HOME=/usr/local/jdk6
+
+# The java classpath (required)
+CLASSPATH=$CASSANDRA_CONF:$cassandra_bin
+
+for jar in $cassandra_home/lib/*.jar; do
+ CLASSPATH=$CLASSPATH:$jar
+done
+
+# Arguments to pass to the JVM
+JVM_OPTS=" \
+ -ea \
+ -Xms128M \
+ -Xmx1G \
+ -XX:TargetSurvivorRatio=90 \
+ -XX:+AggressiveOpts \
+ -XX:+UseParNewGC \
+ -XX:+UseConcMarkSweepGC \
+ -XX:+CMSParallelRemarkEnabled \
+ -XX:+HeapDumpOnOutOfMemoryError \
+ -XX:SurvivorRatio=128 \
+ -XX:MaxTenuringThreshold=0 \
+ -Dcom.sun.management.jmxremote.port=8080 \
+ -Dcom.sun.management.jmxremote.ssl=false \
+ -Dcom.sun.management.jmxremote.authenticate=false"
View
4 test/redis-test.conf → test/configs/redis-test.conf
@@ -6,7 +6,7 @@ daemonize yes
# When run as a daemon, Redis write a pid file in /var/run/redis.pid by default.
# You can specify a custom pid file location here.
-pidfile ./test/redis-test.pid
+pidfile /tmp/redis-test.pid
# Accept connections on the specified port, default is 6379
port 9736
@@ -39,7 +39,7 @@ dbfilename dump.rdb
# For default save/load DB in/from the working directory
# Note that you must specify a directory not a file name.
-dir ./test/
+dir /tmp/
# Set server verbosity to 'debug'
# it can be one of:
View
318 test/configs/storage-conf.xml
@@ -0,0 +1,318 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+-->
+<Storage>
+ <!--======================================================================-->
+ <!-- Basic Configuration -->
+ <!--======================================================================-->
+
+ <!--
+ ~ The name of this cluster. This is mainly used to prevent machines in
+ ~ one logical cluster from joining another.
+ -->
+ <ClusterName>Test</ClusterName>
+
+ <!--
+ ~ Turn on to make new [non-seed] nodes automatically migrate the right data
+ ~ to themselves. (If no InitialToken is specified, they will pick one
+ ~ such that they will get half the range of the most-loaded node.)
+ ~ If a node starts up without bootstrapping, it will mark itself bootstrapped
+ ~ so that you can't subsequently accidently bootstrap a node with
+ ~ data on it. (You can reset this by wiping your data and commitlog
+ ~ directories.)
+ ~
+ ~ Off by default so that new clusters and upgraders from 0.4 don't
+ ~ bootstrap immediately. You should turn this on when you start adding
+ ~ new nodes to a cluster that already has data on it. (If you are upgrading
+ ~ from 0.4, start your cluster with it off once before changing it to true.
+ ~ Otherwise, no data will be lost but you will incur a lot of unnecessary
+ ~ I/O before your cluster starts up.)
+ -->
+ <AutoBootstrap>false</AutoBootstrap>
+
+ <!--
+ ~ Keyspaces and ColumnFamilies:
+ ~ A ColumnFamily is the Cassandra concept closest to a relational
+ ~ table. Keyspaces are separate groups of ColumnFamilies. Except in
+ ~ very unusual circumstances you will have one Keyspace per application.
+
+ ~ There is an implicit keyspace named 'system' for Cassandra internals.
+ -->
+ <Keyspaces>
+ <Keyspace Name="Resque">
+ <KeysCachedFraction>0.01</KeysCachedFraction>
+ <ColumnFamily CompareWith="UTF8Type" Name="Processed" />
+ <ColumnFamily CompareWith="UTF8Type" Name="Stats" /> <!-- processed + failed -->
+ <ColumnFamily CompareWith="UTF8Type" Name="Workers" />
+ <ColumnFamily CompareWith="UTF8Type" Name="Queues" />
+
+ <ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>
+ <ReplicationFactor>1</ReplicationFactor>
+ <EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
+ </Keyspace>
+
+ <!-- <Keyspace Name="Twitter">
+ <KeysCachedFraction>0.01</KeysCachedFraction>
+ <ColumnFamily CompareWith="UTF8Type" Name="Users" />
+ <ColumnFamily CompareWith="UTF8Type" Name="UserAudits" />
+ <ColumnFamily CompareWith="UTF8Type" CompareSubcolumnsWith="TimeUUIDType" ColumnType="Super" Name="UserRelationships" />
+ <ColumnFamily CompareWith="UTF8Type" Name="Usernames" />
+ <ColumnFamily CompareWith="UTF8Type" Name="Statuses" />
+ <ColumnFamily CompareWith="UTF8Type" Name="StatusAudits" />
+ <ColumnFamily CompareWith="UTF8Type" CompareSubcolumnsWith="TimeUUIDType" ColumnType="Super" Name="StatusRelationships" />
+ <ColumnFamily CompareWith="UTF8Type" ColumnType="Super" Name="Index" />
+ <ColumnFamily CompareWith="BytesType" ColumnType="Standard" Name="TimelinishThings" />
+
+ <ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>
+ <ReplicationFactor>1</ReplicationFactor>
+ <EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
+ </Keyspace> -->
+
+ </Keyspaces>
+
+ <!--
+ ~ Authenticator: any IAuthenticator may be used, including your own as long
+ ~ as it is on the classpath. Out of the box, Cassandra provides
+ ~ org.apache.cassandra.auth.AllowAllAuthenticator and,
+ ~ org.apache.cassandra.auth.SimpleAuthenticator
+ ~ (SimpleAuthenticator uses access.properties and passwd.properties by
+ ~ default).
+ ~
+ ~ If you don't specify an authenticator, AllowAllAuthenticator is used.
+ -->
+ <Authenticator>org.apache.cassandra.auth.AllowAllAuthenticator</Authenticator>
+
+ <!--
+ ~ Partitioner: any IPartitioner may be used, including your own as long
+ ~ as it is on the classpath. Out of the box, Cassandra provides
+ ~ org.apache.cassandra.dht.RandomPartitioner,
+ ~ org.apache.cassandra.dht.OrderPreservingPartitioner, and
+ ~ org.apache.cassandra.dht.CollatingOrderPreservingPartitioner.
+ ~ (CollatingOPP colates according to EN,US rules, not naive byte
+ ~ ordering. Use this as an example if you need locale-aware collation.)
+ ~ Range queries require using an order-preserving partitioner.
+ ~
+ ~ Achtung! Changing this parameter requires wiping your data
+ ~ directories, since the partitioner can modify the sstable on-disk
+ ~ format.
+ -->
+ <Partitioner>org.apache.cassandra.dht.RandomPartitioner</Partitioner>
+
+ <!--
+ ~ If you are using an order-preserving partitioner and you know your key
+ ~ distribution, you can specify the token for this node to use. (Keys
+ ~ are sent to the node with the "closest" token, so distributing your
+ ~ tokens equally along the key distribution space will spread keys
+ ~ evenly across your cluster.) This setting is only checked the first
+ ~ time a node is started.
+
+ ~ This can also be useful with RandomPartitioner to force equal spacing
+ ~ of tokens around the hash space, especially for clusters with a small
+ ~ number of nodes.
+ -->
+ <InitialToken></InitialToken>
+
+ <!--
+ ~ Directories: Specify where Cassandra should store different data on
+ ~ disk. Keep the data disks and the CommitLog disks separate for best
+ ~ performance
+ -->
+ <CommitLogDirectory>data/cassandra/commitlog</CommitLogDirectory>
+ <DataFileDirectories>
+ <DataFileDirectory>data/cassandra/data</DataFileDirectory>
+ </DataFileDirectories>
+ <CalloutLocation>data/cassandra/callouts</CalloutLocation>
+ <StagingFileDirectory>data/cassandra/staging</StagingFileDirectory>
+
+
+ <!--
+ ~ Addresses of hosts that are deemed contact points. Cassandra nodes
+ ~ use this list of hosts to find each other and learn the topology of
+ ~ the ring. You must change this if you are running multiple nodes!
+ -->
+ <Seeds>
+ <Seed>127.0.0.1</Seed>
+ </Seeds>
+
+
+ <!-- Miscellaneous -->
+
+ <!-- Time to wait for a reply from other nodes before failing the command -->
+ <RpcTimeoutInMillis>5000</RpcTimeoutInMillis>
+ <!-- Size to allow commitlog to grow to before creating a new segment -->
+ <CommitLogRotationThresholdInMB>128</CommitLogRotationThresholdInMB>
+
+
+ <!-- Local hosts and ports -->
+
+ <!--
+ ~ Address to bind to and tell other nodes to connect to. You _must_
+ ~ change this if you want multiple nodes to be able to communicate!
+ ~
+ ~ Leaving it blank leaves it up to InetAddress.getLocalHost(). This
+ ~ will always do the Right Thing *if* the node is properly configured
+ ~ (hostname, name resolution, etc), and the Right Thing is to use the
+ ~ address associated with the hostname (it might not be).
+ -->
+ <ListenAddress>localhost</ListenAddress>
+ <!-- internal communications port -->
+ <StoragePort>7000</StoragePort>
+
+ <!--
+ ~ The address to bind the Thrift RPC service to. Unlike ListenAddress
+ ~ above, you *can* specify 0.0.0.0 here if you want Thrift to listen on
+ ~ all interfaces.
+ ~
+ ~ Leaving this blank has the same effect it does for ListenAddress,
+ ~ (i.e. it will be based on the configured hostname of the node).
+ -->
+ <ThriftAddress>localhost</ThriftAddress>
+ <!-- Thrift RPC port (the port clients connect to). -->
+ <ThriftPort>9160</ThriftPort>
+ <!--
+ ~ Whether or not to use a framed transport for Thrift. If this option
+ ~ is set to true then you must also use a framed transport on the
+ ~ client-side, (framed and non-framed transports are not compatible).
+ -->
+ <ThriftFramedTransport>false</ThriftFramedTransport>
+
+
+ <!--======================================================================-->
+ <!-- Memory, Disk, and Performance -->
+ <!--======================================================================-->
+
+ <!--
+ ~ Access mode. mmapped i/o is substantially faster, but only practical on
+ ~ a 64bit machine (which notably does not include EC2 "small" instances)
+ ~ or relatively small datasets. "auto", the safe choice, will enable
+ ~ mmapping on a 64bit JVM. Other values are "mmap", "mmap_index_only"
+ ~ (which may allow you to get part of the benefits of mmap on a 32bit
+ ~ machine by mmapping only index files) and "standard".
+ ~ (The buffer size settings that follow only apply to standard,
+ ~ non-mmapped i/o.)
+ -->
+ <DiskAccessMode>auto</DiskAccessMode>
+
+ <!--
+ ~ Buffer size to use when performing contiguous column slices. Increase
+ ~ this to the size of the column slices you typically perform.
+ ~ (Name-based queries are performed with a buffer size of
+ ~ ColumnIndexSizeInKB.)
+ -->
+ <SlicedBufferSizeInKB>64</SlicedBufferSizeInKB>
+
+ <!--
+ ~ Buffer size to use when flushing memtables to disk. (Only one
+ ~ memtable is ever flushed at a time.) Increase (decrease) the index
+ ~ buffer size relative to the data buffer if you have few (many)
+ ~ columns per key. Bigger is only better _if_ your memtables get large
+ ~ enough to use the space. (Check in your data directory after your
+ ~ app has been running long enough.) -->
+ <FlushDataBufferSizeInMB>32</FlushDataBufferSizeInMB>
+ <FlushIndexBufferSizeInMB>8</FlushIndexBufferSizeInMB>
+
+ <!--
+ ~ Add column indexes to a row after its contents reach this size.
+ ~ Increase if your column values are large, or if you have a very large
+ ~ number of columns. The competing causes are, Cassandra has to
+ ~ deserialize this much of the row to read a single column, so you want
+ ~ it to be small - at least if you do many partial-row reads - but all
+ ~ the index data is read for each access, so you don't want to generate
+ ~ that wastefully either.
+ -->
+ <ColumnIndexSizeInKB>64</ColumnIndexSizeInKB>
+
+ <!--
+ ~ Flush memtable after this much data has been inserted, including
+ ~ overwritten data. There is one memtable per column family, and
+ ~ this threshold is based solely on the amount of data stored, not
+ ~ actual heap memory usage (there is some overhead in indexing the
+ ~ columns).
+ -->
+ <MemtableThroughputInMB>64</MemtableThroughputInMB>
+ <!--
+ ~ Throughput setting for Binary Memtables. Typically these are
+ ~ used for bulk load so you want them to be larger.
+ -->
+ <BinaryMemtableThroughputInMB>256</BinaryMemtableThroughputInMB>
+ <!--
+ ~ The maximum number of columns in millions to store in memory per
+ ~ ColumnFamily before flushing to disk. This is also a per-memtable
+ ~ setting. Use with MemtableThroughputInMB to tune memory usage.
+ -->
+ <MemtableOperationsInMillions>0.3</MemtableOperationsInMillions>
+ <!--
+ ~ The maximum time to leave a dirty memtable unflushed.
+ ~ (While any affected columnfamilies have unflushed data from a
+ ~ commit log segment, that segment cannot be deleted.)
+ ~ This needs to be large enough that it won't cause a flush storm
+ ~ of all your memtables flushing at once because none has hit
+ ~ the size or count thresholds yet. For production, a larger
+ ~ value such as 1440 is recommended.
+ -->
+ <MemtableFlushAfterMinutes>60</MemtableFlushAfterMinutes>
+
+ <!--
+ ~ Unlike most systems, in Cassandra writes are faster than reads, so
+ ~ you can afford more of those in parallel. A good rule of thumb is 2
+ ~ concurrent reads per processor core. Increase ConcurrentWrites to
+ ~ the number of clients writing at once if you enable CommitLogSync +
+ ~ CommitLogSyncDelay. -->
+ <ConcurrentReads>8</ConcurrentReads>
+ <ConcurrentWrites>32</ConcurrentWrites>
+
+ <!--
+ ~ CommitLogSync may be either "periodic" or "batch." When in batch
+ ~ mode, Cassandra won't ack writes until the commit log has been
+ ~ fsynced to disk. It will wait up to CommitLogSyncBatchWindowInMS
+ ~ milliseconds for other writes, before performing the sync.
+
+ ~ This is less necessary in Cassandra than in traditional databases
+ ~ since replication reduces the odds of losing data from a failure
+ ~ after writing the log entry but before it actually reaches the disk.
+ ~ So the other option is "timed," where writes may be acked immediately
+ ~ and the CommitLog is simply synced every CommitLogSyncPeriodInMS
+ ~ milliseconds.
+ -->
+ <CommitLogSync>periodic</CommitLogSync>
+ <!--
+ ~ Interval at which to perform syncs of the CommitLog in periodic mode.
+ ~ Usually the default of 10000ms is fine; increase it if your i/o
+ ~ load is such that syncs are taking excessively long times.
+ -->
+ <CommitLogSyncPeriodInMS>10000</CommitLogSyncPeriodInMS>
+ <!--
+ ~ Delay (in milliseconds) during which additional commit log entries
+ ~ may be written before fsync in batch mode. This will increase
+ ~ latency slightly, but can vastly improve throughput where there are
+ ~ many writers. Set to zero to disable (each entry will be synced
+ ~ individually). Reasonable values range from a minimal 0.1 to 10 or
+ ~ even more if throughput matters more than latency.
+ -->
+ <!-- <CommitLogSyncBatchWindowInMS>1</CommitLogSyncBatchWindowInMS> -->
+
+ <!--
+ ~ Time to wait before garbage-collection deletion markers. Set this to
+ ~ a large enough value that you are confident that the deletion marker
+ ~ will be propagated to all replicas by the time this many seconds has
+ ~ elapsed, even in the face of hardware failures. The default value is
+ ~ ten days.
+ -->
+ <GCGraceSeconds>864000</GCGraceSeconds>
+
+</Storage>
View
68 test/resque-web_test.rb
@@ -3,7 +3,15 @@
# Root path test
context "on GET to /" do
- setup { get "/" }
+ # It is sad introducing this much verbosity to this file. Alas, shared setup isn't an option with this testing strategy.
+ setup do
+ ResqueTestBackendManager.setup
+ get "/"
+ end
+
+ teardown do
+ ResqueTestBackendManager.teardown
+ end
test "redirect to overview" do
follow_redirect!
@@ -12,7 +20,14 @@
# Global overview
context "on GET to /overview" do
- setup { get "/overview" }
+ setup do
+ ResqueTestBackendManager.setup
+ get "/overview"
+ end
+
+ teardown do
+ ResqueTestBackendManager.teardown
+ end
test "should at least display 'queues'" do
assert last_response.body.include?('Queues')
@@ -21,34 +36,69 @@
# Working jobs
context "on GET to /working" do
- setup { get "/working" }
-
+ setup do
+ ResqueTestBackendManager.setup
+ get "/working"
+ end
+
+ teardown do
+ ResqueTestBackendManager.teardown
+ end
+
should_respond_with_success
end
# Failed
context "on GET to /failed" do
- setup { get "/failed" }
+ setup do
+ ResqueTestBackendManager.setup
+ get "/failed"
+ end
+
+ teardown do
+ ResqueTestBackendManager.teardown
+ end
should_respond_with_success
end
# Stats
context "on GET to /stats/resque" do
- setup { get "/stats/resque" }
+ setup do
+ ResqueTestBackendManager.setup
+ get "/stats/resque"
+ end
+
+ teardown do
+ ResqueTestBackendManager.teardown
+ end
should_respond_with_success
end
context "on GET to /stats/redis" do
- setup { get "/stats/redis" }
+ setup do
+ ResqueTestBackendManager.setup
+ get "/stats/redis"
+ end
+
+ teardown do
+ ResqueTestBackendManager.teardown
+ end
should_respond_with_success
end
context "on GET to /stats/resque" do
- setup { get "/stats/keys" }
-
+ setup do
+ ResqueTestBackendManager.setup
+ get "/stats/keys"
+ end
+
+ teardown do
+ ResqueTestBackendManager.teardown
+ end
+
should_respond_with_success
end
View
20 test/resque_test.rb
@@ -2,12 +2,18 @@
context "Resque" do
setup do
- Resque.redis.flushall
+ ResqueTestBackendManager.setup
+
+ Resque.connection.flushall
Resque.push(:people, { 'name' => 'chris' })
Resque.push(:people, { 'name' => 'bob' })
Resque.push(:people, { 'name' => 'mark' })
end
+
+ teardown do
+ ResqueTestBackendManager.teardown
+ end
test "can put jobs on a queue" do
assert Resque::Job.create(:jobs, 'SomeJob', 20, '/tmp')
@@ -166,7 +172,7 @@
end
test "queues are always a list" do
- Resque.redis.flushall
+ Resque.connection.flushall
assert_equal [], Resque.queues
end
@@ -204,6 +210,7 @@
2.times { @worker.process }
job = @worker.reserve
+
@worker.working_on job
stats = Resque.info
@@ -213,10 +220,17 @@
@worker.done_working
stats = Resque.info
+
assert_equal 3, stats[:queues]
assert_equal 3, stats[:processed]
assert_equal 1, stats[:failed]
- assert_equal [Resque.redis.respond_to?(:server) ? 'localhost:9736' : 'redis://localhost:9736/0'], stats[:servers]
+
+ case Resque.connection_type.to_s
+ when 'redis'
+ assert_equal [Resque.connection.respond_to?(:server) ? 'localhost:9736' : 'redis://localhost:9736/0'], stats[:servers]
+ when 'mysql'
+ assert_equal([{:host => "localhost", :user => "root"}], stats[:servers])
+ end
end
test "decode bad json" do
View
100 test/test_helper.rb
@@ -1,47 +1,87 @@
-dir = File.dirname(File.expand_path(__FILE__))
-$LOAD_PATH.unshift dir + '/../lib'
+@dir = File.dirname(File.expand_path(__FILE__))
+$LOAD_PATH.unshift @dir + '/../lib'
$TESTING = true
require 'test/unit'
require 'rubygems'
require 'resque'