Skip to content

Commit

Permalink
Initial functioning code extracted from pusherapp
Browse files Browse the repository at this point in the history
  • Loading branch information
mloughran committed Feb 3, 2011
1 parent f1505ab commit 710660b
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 1 deletion.
2 changes: 2 additions & 0 deletions em-hiredis.gemspec
Expand Up @@ -12,6 +12,8 @@ Gem::Specification.new do |s|
s.summary = %q{Eventmachine redis client}
s.description = %q{Eventmachine redis client using hiredis native parser}

s.add_dependency 'hiredis'

s.rubyforge_project = "em-hiredis"

s.files = `git ls-files`.split("\n")
Expand Down
19 changes: 18 additions & 1 deletion lib/em-hiredis.rb
@@ -1,5 +1,22 @@
require 'eventmachine'

module EM
module Hiredis
# Your code goes here...
class << self
attr_writer :logger

def logger
@logger ||= begin
require 'logger'
log = Logger.new(STDOUT)
log.level = Logger::WARN
log
end
end
end
end
end

require 'em-hiredis/event_emitter'
require 'em-hiredis/connection'
require 'em-hiredis/client'
121 changes: 121 additions & 0 deletions lib/em-hiredis/client.rb
@@ -0,0 +1,121 @@
module EM::Hiredis
class Client
PUBSUB_MESSAGES = %w{message pmessage}.freeze

include EM::Hiredis::EventEmitter
include EM::Deferrable

def self.connect(host = 'localhost', port = 6379)
new(host, port)
end

def initialize(host, port)
@host, @port = host, port
@subs, @psubs = [], []
@defs = []
@connection = EM.connect(host, port, Connection, host, port)

@connection.on(:closed) {
if @connected
@defs.each { |d| d.fail("Redis disconnected") }
@defs = []
@deferred_status = nil
@connected = false
@reconnecting = true
reconnect
else
EM.add_timer(1) { reconnect }
end
}

@connection.on(:connected) {
@connected = true
select(@db) if @db
@subs.each { |s| method_missing(:subscribe, s) }
@psubs.each { |s| method_missing(:psubscribe, s) }
succeed

if @reconnecting
@reconnecting = false
emit(:reconnected)
end
}

@connection.on(:message) { |reply|
if RuntimeError === reply
raise "Replies out of sync: #{reply.inspect}" if @defs.empty?
deferred = @defs.shift
deferred.fail(reply) if deferred
else
if reply && PUBSUB_MESSAGES.include?(reply[0]) # reply can be nil
kind, subscription, d1, d2 = *reply

case kind.to_sym
when :message
emit(:message, subscription, d1)
when :pmessage
emit(:pmessage, subscription, d1, d2)
end
else
raise "Replies out of sync: #{reply.inspect}" if @defs.empty?
deferred = @defs.shift
deferred.succeed(reply) if deferred
end
end
}

@connected = false
@reconnecting = false
end

def subscribe(channel)
@subs << channel
method_missing(:subscribe, channel)
end

def unsubscribe(channel)
@subs.delete(channel)
method_missing(:unsubscribe, channel)
end

def psubscribe(channel)
@psubs << channel
method_missing(:psubscribe, channel)
end

def punsubscribe(channel)
@psubs.delete(channel)
method_missing(:punsubscribe, channel)
end

def select(db)
@db = db
method_missing(:select, db)
end

def method_missing(sym, *args)
deferred = EM::DefaultDeferrable.new
# Shortcut for defining the callback case with just a block
deferred.callback { |result| yield(result) } if block_given?

if @connected
@connection.send_command(sym, *args)
@defs.push(deferred)
else
callback {
@connection.send_command(sym, *args)
@defs.push(deferred)
}
end

return deferred
end

private

def reconnect
EM::Hiredis.logger.debug("Trying to reconnect to Redis")
@connection.reconnect @host, @port
end
end
end
61 changes: 61 additions & 0 deletions lib/em-hiredis/connection.rb
@@ -0,0 +1,61 @@
require 'hiredis/reader'

module EM::Hiredis
class Connection < EM::Connection
include EM::Hiredis::EventEmitter

def initialize(host, port)
super
@host, @port = host, port
end

def connection_completed
EM::Hiredis.logger.info("Connected to Redis")
@reader = ::Hiredis::Reader.new
emit(:connected)
end

def receive_data(data)
@reader.feed(data)
until (reply = @reader.gets) == false
emit(:message, reply)
end
end

def unbind
EM::Hiredis.logger.info("Disconnected from Redis")
emit(:closed)
end

def send_command(sym, *args)
send_data(command(sym, *args))
end

protected

COMMAND_DELIMITER = "\r\n"

def command(*args)
command = []
command << "*#{args.size}"

args.each do |arg|
arg = arg.to_s
command << "$#{string_size arg}"
command << arg
end

command.join(COMMAND_DELIMITER) + COMMAND_DELIMITER
end

if "".respond_to?(:bytesize)
def string_size(string)
string.to_s.bytesize
end
else
def string_size(string)
string.to_s.size
end
end
end
end
29 changes: 29 additions & 0 deletions lib/em-hiredis/event_emitter.rb
@@ -0,0 +1,29 @@
module EM::Hiredis
module EventEmitter
def on(event, &listener)
_listeners[event] << listener
end

def emit(event, *args)
_listeners[event].each { |l| l.call(*args) }
end

def remove_listener(event, &listener)
_listeners[event].delete(listener)
end

def remove_all_listeners(event)
_listeners.delete(event)
end

def listeners(event)
_listeners[event]
end

private

def _listeners
@_listeners ||= Hash.new { |h,k| h[k] = [] }
end
end
end

0 comments on commit 710660b

Please sign in to comment.