Skip to content

Commit

Permalink
add chat
Browse files Browse the repository at this point in the history
  • Loading branch information
avalanche123 committed Feb 24, 2012
1 parent 8012a6a commit ef690d4
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 9 deletions.
61 changes: 56 additions & 5 deletions demos/presence/ruby/lib/presence/client.rb
Expand Up @@ -4,22 +4,27 @@
module Presence
class Client
def initialize(context, name, text=nil, threads=Thread)
@context = context
@name = name
@peers = {}
@threads = threads
@context = context
@name = name
@peers = {}
@threads = threads
@messages = Queue.new
end

def connect
@threads.abort_on_exception = true
start_sub
request_peers
start_push
start_message_sub
start_message_push
end

def disconnect
stop_sub
stop_push
stop_message_sub
stop_message_push
@context.terminate
end

Expand All @@ -31,6 +36,8 @@ def run
$stdout.puts @peers.inspect
when "text"
@text = $stdin.gets.chomp
when "say"
@messages << $stdin.gets.chomp
when "quit"
break
end
Expand Down Expand Up @@ -90,6 +97,7 @@ def process_change(change)
rescue JSON::ParserError
return
end
@peers[client['name']] ||= {}
@peers[client['name']].merge!(client)
end

Expand All @@ -99,11 +107,54 @@ def request_peers

request.send_string('list')
request.recv_string(data = '')
JSON.parse(data).each do |name, peer|
begin
peers = JSON.parse(data)
rescue JSON::ParserError
peers = []
end

peers.each do |name, peer|
@peers[name] = peer
end

request.close
end

def start_message_sub
@message_subscribed = false
@message_sub = spawn_socket("tcp://localhost:10005", ZMQ::SUB) do |sock|
@message_subscribed = sock.setsockopt(ZMQ::SUBSCRIBE, '') == 0 unless @message_subscribed
sock.recv_string(message = '')
process_message(message)
end
end

def stop_message_sub
@message_sub[:stop] = true
end

def process_message(message)
begin
data = JSON.parse(message)
rescue JSON::ParserError
return
end
$stdout << sprintf("[%s] <%s> %s", data['timestamp'], data['name'], data['text'])
$stdout << "\r\n"
end

def start_message_push
@message_push = spawn_socket("tcp://localhost:10004", ZMQ::PUSH) do |sock|
message = @messages.pop
sock.send_string(JSON.generate({
"name" => @name,
"text" => message
}))
end
end

def stop_message_push
@message_push[:stop] = true
end
end
end
49 changes: 45 additions & 4 deletions demos/presence/ruby/lib/presence/server.rb
Expand Up @@ -4,23 +4,28 @@
module Presence
class Server
def initialize(context, threads=Thread)
@context = context
@threads = threads
@changes = Queue.new
@clients = {}
@context = context
@threads = threads
@changes = Queue.new
@messages = Queue.new
@clients = {}
end

def start
@threads.abort_on_exception = true
start_pub
start_router
start_pull
start_message_pull
start_message_pub
end

def stop
stop_pub
stop_router
stop_pull
stop_message_pull
stop_message_pub
@context.terminate
end

Expand Down Expand Up @@ -130,5 +135,41 @@ def process_change(change)
client["timeout"] = data["timeout"]
@clients[data["name"]] = client unless @clients[data["name"]]
end

def start_message_pull
@message_pull = spawn_socket('tcp://*:10004', ZMQ::PULL) do |sock|
sock.recv_string(change = '')
process_message(change)
end
end

def stop_message_pull
@message_pull[:stop] = true
end

def process_message(raw)
begin
data = JSON.parse(raw)
rescue JSON::ParserError
return
end

@messages << JSON.generate({
"name" => data['name'],
"text" => data['text'],
"timestamp" => Time.now
})
end

def start_message_pub
@message_pub = spawn_socket('tcp://*:10005', ZMQ::PUB) do |sock|
message = @messages.pop
sock.send_string(message)
end
end

def stop_message_pub
@message_pub[:stop] = true
end
end
end

0 comments on commit ef690d4

Please sign in to comment.