Skip to content
This repository has been archived by the owner on Jul 23, 2023. It is now read-only.

Commit

Permalink
use redis hash as canonical presence reference so that when a connect…
Browse files Browse the repository at this point in the history
…ion to a channel goes to a slanger instance that does not have that particular in memory and therefore is not subscribed to pubsub updates for it, can build the initial presence state internally using the hash and subscribe to updates using pubsub
  • Loading branch information
stevegraham committed Oct 1, 2011
1 parent 7d8a0fd commit 8ba802b
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 2 deletions.
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ GEM
activesupport (>= 3.1.0)
i18n (~> 0.6.0)
yajl-ruby (>= 0.7.7)
haml (3.1.2)
haml (3.1.3)
hiredis (0.3.2)
i18n (0.6.0)
multi_json (1.0.3)
Expand Down
1 change: 1 addition & 0 deletions example/app.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require 'pusher'
require 'digest/md5'
require 'thin'
require 'json'

set :views, File.dirname(__FILE__) + '/templates'
set :port, 3000
Expand Down
1 change: 1 addition & 0 deletions lib/slanger/handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
require 'active_support/core_ext/hash'
require 'securerandom'
require 'signature'
require 'fiber'

module Slanger
class Handler
Expand Down
23 changes: 22 additions & 1 deletion lib/slanger/presence_channel.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require 'glamazon'
require 'eventmachine'
require 'forwardable'
require 'fiber'

module Slanger
class PresenceChannel < Channel
Expand All @@ -26,6 +27,8 @@ def subscribe(msg, callback, &blk)
publisher = publish_connection_notification subscription_id: public_subscription_id, online: true,
channel_data: channel_data, channel: channel_id

roster_add public_subscription_id, channel_data

# fuuuuuuuuuccccccck!
publisher.callback do
EM.next_tick do
Expand All @@ -48,12 +51,30 @@ def subscribers
def unsubscribe(public_subscription_id)
# Unsubcribe from EM::Channel
channel.unsubscribe(internal_subscription_table.delete(public_subscription_id)) # if internal_subscription_table[public_subscription_id]
roster_remove public_subscription_id
# Notify all instances
publish_connection_notification subscription_id: public_subscription_id, online: false, channel: channel_id
end

private

def get_roster
Fiber.new do
f = Fiber.current
Slanger::Redis.hgetall(channel_id).
callback { |res| f.resume res }
Fiber.yield
end.resume
end

def roster_add(key, value)
Slanger::Redis.hset(channel_id, key, value)
end

def roster_remove(key)
Slanger::Redis.hdel(channel_id, key)
end

def publish_connection_notification(payload, retry_count=0)
Slanger::Redis.publish('slanger:connection_notification', payload.to_json).
tap { |r| r.errback { publish_connection_notification payload, retry_count.succ unless retry_count == 5 } }
Expand All @@ -62,7 +83,7 @@ def publish_connection_notification(payload, retry_count=0)
# This is the state of the presence channel across the system. kept in sync
# with redis pubsub
def subscriptions
@subscriptions ||= {}
@subscriptions = @subscriptions || get_roster || Hash.new
end

# This is used map public subscription ids to em channel subscription ids.
Expand Down
6 changes: 6 additions & 0 deletions lib/slanger/redis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,14 @@ def self.extended base

def_delegator :publisher, :publish
def_delegators :subscriber, :on, :subscribe
def_delegators :regular_connection, :hgetall, :hdel, :hset

private

def regular_connection
@regular_connection ||= EM::Hiredis.connect Slanger::Config.redis_address
end

def publisher
@publisher ||= EM::Hiredis.connect Slanger::Config.redis_address
end
Expand Down

0 comments on commit 8ba802b

Please sign in to comment.