Skip to content
Browse files

Added Presence. Useful when implementing a chat buddy list or a list …

…of servers that are alive. Requires manual regular garbage collection.
  • Loading branch information...
1 parent 3079b1c commit 008f565b35893ed0571080405ab4c4d6a12313ef @tobi committed May 31, 2010
Showing with 138 additions and 7 deletions.
  1. +107 −0 presence/presence.rb
  2. +3 −1 queue/client.rb
  3. +18 −0 queue/oddity.rb
  4. +10 −6 queue/queue.rb
View
107 presence/presence.rb
@@ -0,0 +1,107 @@
+require 'rubygems'
+require 'redis'
+require 'digest/md5'
+require 'msgpack'
+
+$redis = Redis.new
+
+class Presence
+
+
+ # By default it uses 60 secs or 1 minute buckets and starts reusing buckets every hour.
+
+ # This is useful for example for a chat buddy list, allows you to keep presence of all the
+ # people who are online by periodically adding their user ids to the presence class when they
+ # do http calls to you.
+ def initialize(name, seconds = 60, buckets = 10)
+ @name = name
+ @seconds = seconds
+ @buckets = buckets
+ end
+
+ def clear
+ @buckets.times do |num|
+ $redis.del "#{@name}:#{num-1}"
+ end
+ end
+
+ def add(token, time = Time.now.to_i)
+ $redis.sadd "#{@name}:#{recent_bucket_num(0, time)}", token
+ end
+
+ def present_in_buckets(number_of_buckets)
+ time = Time.now.to_i
+
+ keys = (1..number_of_buckets).collect do |times|
+ "#{@name}:#{recent_bucket_num(times - 1)}"
+ end
+
+ if keys.size > 1
+ $redis.sunion(keys)
+ else
+ $redis.smembers(keys.first)
+ end
+ end
+
+ private
+
+ def recent_bucket_num(num = 0, time = Time.now.to_i)
+ seconds_in_segment = time % (@buckets * @seconds)
+ closest_matching_bucket = seconds_in_segment / @seconds
+
+ return (closest_matching_bucket - num) % @buckets
+ end
+
+end
+
+require "test/unit"
+
+class TestLibraryFileName < Test::Unit::TestCase
+
+ def test_active_in_one_bucket
+ a = Presence.new("test")
+ a.clear
+ a.add 1
+ a.add 2
+ a.add 3
+ assert_equal ["1","2","3"], a.present_in_buckets(1).sort
+ end
+
+ def test_active_in_more_then_one_bucket
+ require 'active_support'
+ a = Presence.new("test")
+ a.clear
+ a.add 1, 2.minutes.ago.to_i
+ a.add 2
+ a.add 3
+ assert_equal ["2","3"], a.present_in_buckets(1).sort
+ assert_equal ["1","2","3"], a.present_in_buckets(5).sort
+ end
+
+
+
+
+ def test_recent_bucket
+ require 'time'
+ time = Time.parse("2010-05-30 14:55").to_i
+
+ a = Presence.new("test")
+ assert_equal 5, a.send(:recent_bucket_num,0, time)
+ assert_equal 4, a.send(:recent_bucket_num,1, time)
+ assert_equal 3, a.send(:recent_bucket_num,2, time)
+ assert_equal 2, a.send(:recent_bucket_num,3, time)
+ assert_equal 1, a.send(:recent_bucket_num,4, time)
+ assert_equal 0, a.send(:recent_bucket_num,5, time)
+ assert_equal 9, a.send(:recent_bucket_num,6, time)
+ assert_equal 8, a.send(:recent_bucket_num,7, time)
+ assert_equal 7, a.send(:recent_bucket_num,8, time)
+ assert_equal 6, a.send(:recent_bucket_num,9, time)
+ assert_equal 5, a.send(:recent_bucket_num,10, time)
+ assert_equal 4, a.send(:recent_bucket_num,11, time)
+ assert_equal 3, a.send(:recent_bucket_num,12, time)
+ assert_equal 2, a.send(:recent_bucket_num,13, time)
+ assert_equal 1, a.send(:recent_bucket_num,14, time)
+ assert_equal 0, a.send(:recent_bucket_num,15, time)
+ assert_equal 9, a.send(:recent_bucket_num,16, time)
+ end
+end
View
4 queue/client.rb
@@ -5,11 +5,13 @@
start_time = Time.now.to_i
msg = 0
-puts Queue.all
queue = Queue.new("testing")
+puts "Waiting: #{queue.size}"
+
queue.subscribe do |obj|
msg += 1
+ p obj
seconds = Time.now.to_i - start_time
puts "%.3f ... " % [msg.to_f / seconds]
end
View
18 queue/oddity.rb
@@ -0,0 +1,18 @@
+require 'rubygems'
+require 'redis'
+
+
+server = Redis.new
+
+puts "Pushing 1000 elements in array"
+1000.times do |i|
+ server.lpush 'q', i
+end
+server.send(:disconnect)
+
+client = Redis.new
+p client.llen 'q'
+
+loop do
+ p client.blpop('q', 0).last
+end
View
16 queue/queue.rb
@@ -1,7 +1,7 @@
require 'rubygems'
require 'redis'
require 'digest/md5'
-#require 'msgpack'
+require 'msgpack'
$redis = Redis.new
@@ -34,17 +34,21 @@ def push(object)
def subscribe
loop do
hash = $redis.blpop(@queue_name, 0)[1]
- object = $redis.get("msg:#{hash}")
- if object
+
+ if object = $redis.get("msg:#{hash}")
begin
yield MessagePack.unpack(object)
- # Done, remove message from redis.
- $redis.del("msg:#{hash}")
- rescue
+ rescue => e
+ puts e
# Error, add the message again to the end of the queue
$redis.rpush(@queue_name, hash)
raise
+ else
+ # Done, remove message from redis.
+ $redis.del("msg:#{hash}")
end
+ else
+ p "did not get an object? hash:#{hash.inspect} obj:#{object.inspect}"
end
end
end

0 comments on commit 008f565

Please sign in to comment.
Something went wrong with that request. Please try again.