Permalink
Switch branches/tags
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
491 lines (371 sloc) 11.3 KB
#--
# Copyright (c) 2005-2013, John Mettraux, jmettraux@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# Made in Japan.
#++
require 'redis'
require 'zlib'
require 'rufus-json'
require 'ruote/storage/base'
require 'ruote/redis/version'
module Ruote
module Redis
#
# A Redis storage for ruote.
#
# The constructor accepts two arguments, the first one is a Redis instance
# ( see http://github.com/ezmobius/redis-rb ), the second one is the classic
# ruote engine options( see
# http://ruote.rubyforge.org/configuration.html#engine )
#
# require 'redis' # gem install redis
# require 'ruote' # gem install ruote
# require 'ruote-redis' # gem install ruote-redis
#
# engine = Ruote::Engine.new(
# Ruote::Worker.new(
# Ruote::Redis::RedisStorage.new('db'=> 14, 'thread_safe' => true)))
#
#
# == em-redis
#
# Not tried, but I guess, that substituting an instance of em-redis for
# the redis instance passed to the constructor might work.
# http://github.com/madsimian/em-redis
#
# If you try and it works, feedback is welcome
# http://groups.google.com/group/openwferu-users
#
#
# == 'pop_count' option
#
# By default, when the worker queries this storage for msgs to process,
# the storage will try to pop 28 msgs. This number can be changed thanks
# to the 'pop_count' option, like in:
#
# engine = Ruote::Engine.new(
# Ruote::Worker.new(
# Ruote::Redis::RedisStorage.new(
# 'db'=> 14, 'thread_safe' => true, 'pop_count' => 56)))
#
# Don't put too high a number, it increases the chance of msgs getting lost
# in case of the worker going down.
#
# (if there is a need to avoid such a scenario in the future,
# Redis' rpoplpush might come in handy).
#
class Storage
include Ruote::StorageBase
attr_reader :redis
# Listing the redis options to differentiate them from ruote storage
# options.
#
REDIS_OPTIONS = %w[ host port path db thread_safe logger ]
# A Redis storage for ruote.
#
# Can be initialized in two ways
#
# Ruote::Redis::Storage.new(
# ::Redis.new(
# :host => '127.0.0.1',
# :db => 13,
# :thread_safe => true))
#
# or
#
# Ruote::Redis::Storage.new(
# 'host' => '127.0.0.1',
# 'db' => 13,
# 'thread_safe' => true)
#
# The first style is probably better avoided.
#
# == compression
#
# This storage, by default, compresses (using zlib) the JSON documents
# before placing them in Redis.
#
# The 'zip' option controls the compression level:
#
# * 'none': no compression, JSON strings are directly placed in Redis
# * 'size': max compression, smallest output
# * 'speed': best speed, compresses but favours speed over small size
# * 'default': default zlib setting, balance between speed and size
#
# The idea behind this feature is to not consume too much memory on the
# Redis host.
#
# It's OK to turn this feature on and off, this storage will read data
# compressed or not (yes this storage can read older data sets produced
# when ruote-redis had no compression feature).
#
def initialize(redis, options={})
if options == {} && redis.is_a?(Hash)
redis_options, options = redis.partition { |k, v|
REDIS_OPTIONS.include?(k.to_s)
}
redis_options = Hash[redis_options.collect { |k, v| [ k.to_sym, v ] }]
options = Hash[options]
redis = ::Redis.new(redis_options)
end
@redis = redis
@options = options
@pop_count = @options['pop_count'] || 28
# Returns an array of the (String) keys that match the given pattern.
#
# Returns an empty array if anything goes wrong.
#
def @redis.keys_to_a(pattern)
if (a = (keys(pattern) rescue nil)).is_a?(Array)
a
else
[]
end
end
@zip =
case @options['zip']
when 'none' then nil
when 'size' then Zlib::BEST_COMPRESSION
when 'speed' then Zlib::BEST_SPEED
else Zlib::DEFAULT_COMPRESSION
end
replace_engine_configuration(options)
end
# Returns true if the doc is successfully deleted.
#
def reserve(doc)
return true if doc['type'] == 'msgs'
(@redis.del(key_for(doc)) == 1)
end
def put_msg(action, options)
doc = prepare_msg_doc(action, options)
doc['put_at'] = Ruote.now_to_utc_s
@redis.lpush('msgs', encode(doc))
nil
end
# Note: the worker argument is not used in this storage implementation.
#
def get_msgs
@redis.pipelined {
@pop_count.times { @redis.rpop('msgs') }
}.compact.collect { |d|
decode(d)
}
end
def put_schedule(flavour, owner_fei, s, msg)
doc = prepare_schedule_doc(flavour, owner_fei, s, msg)
return nil unless doc
doc['_rev'] = '0'
doc['put_at'] = Ruote.now_to_utc_s
@redis.set(key_for(doc), encode(doc))
doc['_id']
end
def delete_schedule(schedule_id)
return unless schedule_id
@redis.del(key_for('schedules', schedule_id))
end
def put(doc, opts={})
key = key_for(doc)
rev = doc['_rev']
lock(key) do
current_doc = do_get(key)
current_rev = current_doc ? current_doc['_rev'] : nil
if current_rev && rev != current_rev
#
# version in storage is newer than version being put,
# (eturn version in storage)
#
current_doc
elsif rev && current_rev.nil?
#
# document deleted, put fails (return true)
#
true
else
#
# put is successful (return nil)
#
doc = doc.send(
opts[:update_rev] ? :merge! : :merge,
{ '_rev' => (rev.to_i + 1).to_s, 'put_at' => Ruote.now_to_utc_s })
@redis.set(key, encode(doc))
nil
end
end
end
def get(type, key)
do_get(key_for(type, key))
end
def delete(doc)
rev = doc['_rev']
raise ArgumentError.new("can't delete doc without _rev") unless rev
key = key_for(doc)
lock(key) do
current_doc = do_get(key)
if current_doc.nil?
#
# document is [already] gone, delete fails (return true)
#
true
elsif current_doc['_rev'] != rev
#
# version in storage doesn't match version to delete
# (return version in storage)
#
current_doc
else
#
# delete is successful (return nil)
#
@redis.del(key)
nil
end
end
end
def get_many(type, key=nil, opts={})
keys = key ? Array(key) : nil
#ids = if type == 'msgs' || type == 'schedules'
# @redis.keys_to_a("#{type}/*")
ids = if keys == nil
@redis.keys_to_a("#{type}/*")
elsif keys.first.is_a?(String)
keys.collect { |k|
@redis.keys_to_a("#{type}/*!#{k}#{type == 'schedules' ? '-*' : ''}")
}.flatten
else #if keys.first.is_a?(Regexp)
@redis.keys_to_a("#{type}/*").select { |i|
i = i[type.length + 1..-1]
# removing "^type/"
keys.find { |k| k.match(i) }
}
end
ids = ids.reject { |i| i.match(LOCK_KEY) }
ids = ids.sort
ids = ids.reverse if opts[:descending]
skip = opts[:skip] || 0
limit = opts[:limit] || ids.length
ids = ids[skip, limit]
docs = ids.length > 0 && @redis.mget(*ids)
docs = docs.is_a?(Array) ? docs : []
docs = docs.each_with_object({}) do |doc, h|
doc = decode(doc)
h[doc['_id']] = doc if doc
end
return docs.size if opts[:count]
docs = docs.values.sort_by { |d| d['_id'] }
opts[:descending] ? docs.reverse : docs
end
def ids(type)
@redis.keys_to_a("#{type}/*").reject { |i|
i.match(LOCK_KEY)
}.collect { |i|
i.split('/').last
}.sort
end
def purge!
2.times { @redis.flushdb rescue nil }
# 2 times to work around Redis::ProtocolError '3'
end
# Shuts this worker down.
#
# (This close / shutdown dichotomy has to be resolved at some point...)
#
def close
@redis.quit
end
# Shuts this worker down.
#
# (This close / shutdown dichotomy has to be resolved at some point...)
#
def shutdown
@redis.quit
end
# Mainly used by ruote's test/unit/ut_17_storage.rb
#
def add_type(type)
# nothing to be done
end
# Nukes a db type and reputs it(losing all the documents that were in it).
#
def purge_type!(type)
@redis.keys_to_a("#{type}/*").each { |k| (@redis.del(k) rescue nil) }
end
# Simply calls @redis.reconnect
#
def reconnect
if @redis.respond_to?(:reconnect)
@redis.reconnect
else
@redis.client.reconnect
end
end
protected
LOCK_KEY = /-lock$/
# A locking mecha.
#
# Mostly inspired from http://code.google.com/p/redis/wiki/SetnxCommand
#
def lock(key, &block)
kl = "#{key}-lock"
loop do
break if @redis.setnx(kl, Time.now.to_f.to_s) != false
# locking successful
#
# already locked
t = @redis.get(kl)
@redis.del(kl) if t && Time.now.to_f - t.to_f > 60.0
# after 1 minute, locks time out
sleep 0.007 # let's try to lock again after a while
end
#@redis.expire(kl, 2)
# this doesn't work, it makes the next call to setnx succeed
result = block.call
@redis.del(kl)
result
end
# key_for(doc)
# key_for(type, key)
#
def key_for(*args)
a = args.first
(a.is_a?(Hash) ? [ a['type'], a['_id'] ] : args[0, 2]).join('/')
end
def do_get(key)
decode(@redis.get(key))
end
def decode(s)
return nil unless s
s = Zlib::Inflate.inflate(s) if s[0, 1] != '{'
Rufus::Json.decode(s)
end
def encode(doc)
s = Rufus::Json.encode(doc)
s = Zlib::Deflate.deflate(s, @zip) if @zip
s
end
end
#
# Keeping Ruote::Redis::RedisStorage for backward compatibility.
#
class RedisStorage < Storage
end
end
end