Skip to content
This repository has been archived by the owner on Sep 2, 2019. It is now read-only.

Commit

Permalink
Moving to rubyredis client lib for now until it's merged in with redi…
Browse files Browse the repository at this point in the history
…s-rb. Starting to refactor records_for
  • Loading branch information
whoahbot committed May 24, 2009
1 parent 2dba9a3 commit 7533c27
Show file tree
Hide file tree
Showing 5 changed files with 275 additions and 25 deletions.
13 changes: 5 additions & 8 deletions README.textile
Expand Up @@ -2,20 +2,17 @@ h1. DM-Redis

This is an experimental datamapper adapter for the <a href="http://code.google.com/p/redis/wiki/README">Redis</a> key-value database.

Please be aware that this is very alpha quality software! It is not recommended for production use.
Please be aware that this is very alpha quality software! It is not recommended for production use yet. If you find a bug, or are using dm-redis, please let me know.

h1. TODO

Currently, the records_for(query) method returns an array of hashes containing the key => id pairs. Unfortunately, this gets run for every query(and subsequent lazy queries), which means that the more keys you have, the slower it is. Boo! I'm in the process of refactoring how this works. If you have suggestions, please get in touch.
Refactoring +records_for+ to filter down the set of records returned.

h1. Install

Prerequisites
* Redis:
** <a href="http://code.google.com/p/redis/">Redis 0.096</a>
Prerequisites:
* Redis, git version:
** <a href="http://github.com/antirez/redis/">Redis, git version</a>
* Gems:
** <a href="http://github.com/ezmobius/redis-rb">redis-rb 0.03</a> By the excellent Ezra Zygmuntowicz
It's best to install the version of the redis gem that is available in the client-libraries dir of the redis distribution.
** <a href="http://github.com/datamapper/extlib">extlib</a>, dependency for dm-core
** <a href="http://github.com/datamapper/do/">data-objects</a>, dependency for dm-core
** <a href="http://github.com/datamapper/dm-core/tree/next">dm-core</a> next branch
9 changes: 5 additions & 4 deletions benchmark/benchit.rb
@@ -1,7 +1,8 @@
require 'rubygems'
require 'dm-core'
require File.expand_path(File.join(File.dirname(__FILE__), '..', 'lib/redis_adapter.rb'))
require 'benchmark'
require File.expand_path(File.join(File.dirname(__FILE__), '..', 'lib/redis_adapter.rb'))
require File.expand_path(File.join(File.dirname(__FILE__), '..', 'lib/rubyredis.rb'))

DataMapper.setup(:default, {
:adapter => "redis",
Expand All @@ -20,8 +21,8 @@ class Post
end

Benchmark.benchmark do |x|
x.report { Post.all.each { |p| p.text } }
x.report { Post.all.each {|x| x.text } }
end

redis = Redis.new(:db => 15)
redis.flush_db
redis = RedisClient.new(:db => 15)
redis.flushdb
33 changes: 23 additions & 10 deletions lib/redis_adapter.rb
@@ -1,4 +1,4 @@
require 'redis'
require File.expand_path(File.join(File.dirname(__FILE__), 'rubyredis'))

module DataMapper
module Adapters
Expand Down Expand Up @@ -39,11 +39,15 @@ def create(resources)
def read(query)
records = records_for(query).each do |record|
query.fields.each do |property|
next if query.model.key.include?(property.name)
record[property.name.to_s] = property.typecast(@redis["#{query.model}:#{record[redis_key_for(query.model)]}:#{property.name}"])
end
end

query.filter_records(records)
records = query.match_records(records)
records = query.sort_records(records)
records = query.limit_records(records)
records
end

##
Expand Down Expand Up @@ -116,7 +120,7 @@ def update_attributes(resources)
end

##
# Retrieves all of the records for a particular model
# Retrieves records for a particular model.
#
# @param [DataMapper::Query] query
# The query used to locate the resources
Expand All @@ -126,13 +130,22 @@ def update_attributes(resources)
#
# @api private
def records_for(query)
set = @redis.set_members("#{query.model}:#{redis_key_for(query.model)}:all")
arr = Array.new(set.size)
set.each_with_index do |val, i|
arr[i] = {"#{redis_key_for(query.model)}" => val.to_i}
keys = []
query.conditions.operands.select {|o| o.is_a?(Conditions::EqualToComparison) && query.model.key.include?(o.property)}.each do |o|
if @redis.set_member?("#{query.model}:#{redis_key_for(query.model)}:all", o.value)
keys << {"#{redis_key_for(query.model)}" => o.value}
end
end

arr

# TODO: Implement other conditions to filter down the records retrieved
# Keys are empty, fall back and load all the values for this model
if keys.empty?
@redis.set_members("#{query.model}:#{redis_key_for(query.model)}:all").each do |val|
keys << {"#{redis_key_for(query.model)}" => val.to_i}
end
end

keys
end

##
Expand All @@ -148,7 +161,7 @@ def records_for(query)
# @api semipublic
def initialize(name, uri_or_options)
super
@redis = Redis.new(@options)
@redis = RedisClient.new(@options)
end
end # class RedisAdapter

Expand Down
237 changes: 237 additions & 0 deletions lib/rubyredis.rb
@@ -0,0 +1,237 @@
# RubyRedis is an alternative implementatin of Ruby client library written
# by Salvatore Sanfilippo.
#
# The aim of this library is to create an alternative client library that is
# much simpler and does not implement every command explicitly but uses
# method_missing instead.

require 'socket'
require 'set'

begin
if (RUBY_VERSION >= '1.9')
require 'timeout'
RedisTimer = Timeout
else
require 'system_timer'
RedisTimer = SystemTimer
end
rescue LoadError
RedisTimer = nil
end

class RedisClient
BulkCommands = {
"set"=>true, "setnx"=>true, "rpush"=>true, "lpush"=>true, "lset"=>true,
"lrem"=>true, "sadd"=>true, "srem"=>true, "sismember"=>true,
"echo"=>true, "getset"=>true, "smove"=>true
}

ConvertToBool = lambda{|r| r == 0 ? false : r}

ReplyProcessor = {
"exists" => ConvertToBool,
"sismember"=> ConvertToBool,
"sadd"=> ConvertToBool,
"srem"=> ConvertToBool,
"smove"=> ConvertToBool,
"move"=> ConvertToBool,
"setnx"=> ConvertToBool,
"del"=> ConvertToBool,
"renamenx"=> ConvertToBool,
"expire"=> ConvertToBool,
"keys" => lambda{|r| r.split(" ")},
"info" => lambda{|r|
info = {}
r.each_line {|kv|
k,v = kv.split(":",2).map{|x| x.chomp}
info[k.to_sym] = v
}
info
}
}

Aliases = {
"flush_db" => "flushdb",
"flush_all" => "flushall",
"last_save" => "lastsave",
"key?" => "exists",
"delete" => "del",
"randkey" => "randomkey",
"list_length" => "llen",
"push_tail" => "rpush",
"push_head" => "lpush",
"pop_tail" => "rpop",
"pop_head" => "lpop",
"list_set" => "lset",
"list_range" => "lrange",
"list_trim" => "ltrim",
"list_index" => "lindex",
"list_rm" => "lrem",
"set_add" => "sadd",
"set_delete" => "srem",
"set_count" => "scard",
"set_member?" => "sismember",
"set_members" => "smembers",
"set_intersect" => "sinter",
"set_intersect_store" => "sinterstore",
"set_inter_store" => "sinterstore",
"set_union" => "sunion",
"set_union_store" => "sunionstore",
"set_diff" => "sdiff",
"set_diff_store" => "sdiffstore",
"set_move" => "smove",
"set_unless_exists" => "setnx",
"rename_unless_exists" => "renamenx"
}

def initialize(opts={})
@host = opts[:host] || '127.0.0.1'
@port = opts[:port] || 6379
@db = opts[:db] || 0
@timeout = opts[:timeout] || 0
connect_to_server
end

def to_s
"Redis Client connected to #{@host}:#{@port} against DB #{@db}"
end

def connect_to_server
@sock = connect_to(@host,@port,@timeout == 0 ? nil : @timeout)
call_command(["select",@db]) if @db != 0
end

def connect_to(host, port, timeout=nil)
# We support connect() timeout only if system_timer is availabe
# or if we are running against Ruby >= 1.9
# Timeout reading from the socket instead will be supported anyway.
if @timeout != 0 and RedisTimer
begin
sock = TCPSocket.new(host, port, 0)
rescue Timeout::Error
@sock = nil
raise Timeout::Error, "Timeout connecting to the server"
end
else
sock = TCPSocket.new(host, port, 0)
end

# If the timeout is set we set the low level socket options in order
# to make sure a blocking read will return after the specified number
# of seconds. This hack is from memcached ruby client.
if timeout
secs = Integer(timeout)
usecs = Integer((timeout - secs) * 1_000_000)
optval = [secs, usecs].pack("l_2")
sock.setsockopt Socket::SOL_SOCKET, Socket::SO_RCVTIMEO, optval
sock.setsockopt Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, optval
end
sock
end

def method_missing(*argv)
call_command(argv)
end

def call_command(argv)
# this wrapper to raw_call_command handle reconnection on socket
# error. We try to reconnect just one time, otherwise let the error
# araise.
connect_to_server if !@sock
begin
raw_call_command(argv)
rescue Errno::ECONNRESET
@sock.close
connect_to_server
raw_call_command(argv)
end
end

def raw_call_command(argv)
bulk = nil
argv[0] = argv[0].to_s.downcase
argv[0] = Aliases[argv[0]] if Aliases[argv[0]]
if BulkCommands[argv[0]]
bulk = argv[-1].to_s
argv[-1] = bulk.length
end
@sock.write(argv.join(" ")+"\r\n")
@sock.write(bulk+"\r\n") if bulk

# Post process the reply if needed
processor = ReplyProcessor[argv[0]]
processor ? processor.call(read_reply) : read_reply
end

def select(*args)
raise "SELECT not allowed, use the :db option when creating the object"
end

def [](key)
get(key)
end

def []=(key,value)
set(key,value)
end

def sort(key, opts={})
cmd = []
cmd << "SORT #{key}"
cmd << "BY #{opts[:by]}" if opts[:by]
cmd << "GET #{[opts[:get]].flatten * ' GET '}" if opts[:get]
cmd << "#{opts[:order]}" if opts[:order]
cmd << "LIMIT #{opts[:limit].join(' ')}" if opts[:limit]
call_command(cmd)
end

def incr(key,increment=nil)
call_command(increment ? ["incrby",key,increment] : ["incr",key])
end

def decr(key,decrement=nil)
call_command(decrement ? ["decrby",key,decrement] : ["decr",key])
end

def read_reply
# We read the first byte using read() mainly because gets() is
# immune to raw socket timeouts.
begin
rtype = @sock.read(1)
rescue Errno::EAGAIN
# We want to make sure it reconnects on the next command after the
# timeout. Otherwise the server may reply in the meantime leaving
# the protocol in a desync status.
@sock = nil
raise Errno::EAGAIN, "Timeout reading from the socket"
end

raise Errno::ECONNRESET,"Connection lost" if !rtype
line = @sock.gets
case rtype
when "-"
raise "-"+line.strip
when "+"
line.strip
when ":"
line.to_i
when "$"
bulklen = line.to_i
return nil if bulklen == -1
data = @sock.read(bulklen)
@sock.read(2) # CRLF
data
when "*"
objects = line.to_i
return nil if bulklen == -1
res = []
objects.times {
res << read_reply
}
res
else
raise "Protocol error, got '#{rtype}' as initial reply byte"
end
end
end
8 changes: 5 additions & 3 deletions spec/dm_redis_adapter_spec.rb
@@ -1,18 +1,20 @@
require File.dirname(__FILE__) + '/spec_helper'
require File.expand_path(File.join(File.dirname(__FILE__), '..', 'lib/redis_adapter.rb'))
require File.expand_path(File.join(File.dirname(__FILE__), '..', 'lib/rubyredis'))

require 'dm-core/spec/adapter_shared_spec'

describe DataMapper::Adapters::RedisAdapter do
before(:all) do
@adapter = DataMapper.setup(:default, {
:adapter => "redis",
:database => 15
:db => 15
})
end

after(:all) do
redis = Redis.new(:db => 15)
redis.flush_db
redis = RedisClient.new(:db => 15)
redis.flushdb
end

it_should_behave_like 'An Adapter'
Expand Down

0 comments on commit 7533c27

Please sign in to comment.