Skip to content

Commit

Permalink
use router<->dealer pipe to avoid direct connection to the client node
Browse files Browse the repository at this point in the history
  • Loading branch information
niamster committed Mar 15, 2015
1 parent 1d55459 commit 931c985
Show file tree
Hide file tree
Showing 9 changed files with 168 additions and 129 deletions.
2 changes: 1 addition & 1 deletion Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ source "http://rubygems.org"

gem 'celluloid', github: 'celluloid/celluloid', tag: 'v0.16.0'
gem 'celluloid-io', github: 'celluloid/celluloid-io', tag: 'v0.16.1'
gem 'celluloid-zmq', github: 'celluloid/celluloid-zmq', tag: 'v0.16.0'
gem 'celluloid-zmq', github: 'niamster/celluloid-zmq', branch: 'master'
gem 'celluloid-redis', github: 'celluloid/celluloid-redis', branch: 'master'
gem 'reel', github: 'celluloid/reel', tag: 'v0.4.0'

Expand Down
8 changes: 5 additions & 3 deletions lib/dcell.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require 'socket'
require 'securerandom'
require 'msgpack'
require 'uri'

Celluloid::ZMQ.init

Expand All @@ -13,12 +14,13 @@
require 'dcell/actor_proxy'
require 'dcell/directory'
require 'dcell/messages'
require 'dcell/sockets'
require 'dcell/server'
require 'dcell/node_manager'
require 'dcell/node'
require 'dcell/global'
require 'dcell/responses'
require 'dcell/mailbox_manager'
require 'dcell/server'
require 'dcell/info_service'
require 'dcell/registries/adapter'
require 'dcell/registries/errors'
Expand Down Expand Up @@ -141,7 +143,7 @@ def generate_node_id
@registry.unique
else
digest = Digest::SHA512.new
seed = Socket.gethostname + rand.to_s + Time.now.to_s + SecureRandom.hex
seed = ::Socket.gethostname + rand.to_s + Time.now.to_s + SecureRandom.hex
digest.update(seed).to_s
end
end
Expand All @@ -163,7 +165,7 @@ def configuration_accessors(configuration)

# DCell's actor dependencies
class SupervisionGroup < Celluloid::SupervisionGroup
supervise Server, as: :dcell_server, args: [DCell]
supervise RequestServer, as: :server
supervise InfoService, as: :info
end
DCell.add_local_actor :info
Expand Down
12 changes: 5 additions & 7 deletions lib/dcell/messages.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,19 @@ def __respond(rsp, pipe)
end

def respond(rsp)
__respond rsp, :default
__respond rsp, :response
end

# A request to open relay pipe
class RelayOpen < Message
def initialize(sender, from, meta)
def initialize(sender)
@id = DCell.id
@sender = sender
@from = from
@meta = meta
end

def dispatch
node = DCell::Node[id]
node.handle_relayopen @from, @meta
node.handle_relayopen
respond SuccessResponse.new(id, @sender[:address], node.rserver.addr)
rescue => e
# :nocov:
Expand All @@ -43,7 +41,7 @@ def to_msgpack(pk=nil)
{
type: self.class.name,
id: id,
args: [@sender, @from, @meta]
args: [@sender]
}.to_msgpack(pk)
end
end
Expand Down Expand Up @@ -77,7 +75,7 @@ def initialize

def dispatch
node = DCell::NodeCache.find id
node.detach if node
node.detach if node and node.alive?
end

def to_msgpack(pk=nil)
Expand Down
94 changes: 33 additions & 61 deletions lib/dcell/node.rb
Original file line number Diff line number Diff line change
@@ -1,34 +1,16 @@
require 'uri'

module DCell
# Exception raised when no response was received within a given timeout
class NoResponseError < Exception; end

# Exception raised when remote node appears dead
class DeadNodeError < Exception; end

class RelayServer
attr_accessor :addr

def initialize
uri = URI(DCell.addr)
@addr = "#{uri.scheme}://#{uri.host}:*"
@server = Server.new self
end

def terminate
@server.terminate if @server.alive?
end
end


# A node in a DCell cluster
class Node
include Celluloid
include Celluloid::FSM

attr_reader :id

finalizer :shutdown

# FSM
Expand All @@ -54,6 +36,7 @@ def initialize(id, addr, server=false)
@requests = ResourceManager.new
@actors = ResourceManager.new
@remote_dead = false
@leech = false

@heartbeat_rate = DCell.heartbeat_rate # How often to send heartbeats in seconds
@heartbeat_timeout = DCell.heartbeat_timeout # How soon until a lost heartbeat triggers a node partition
Expand Down Expand Up @@ -93,7 +76,7 @@ def actors
# Send a ping message with a given timeout
def ping(timeout=nil)
request = Message::Ping.new(Thread.mailbox)
send_request request, :default, timeout
send_request request, :request, timeout
end

# Friendlier inspection
Expand Down Expand Up @@ -141,54 +124,41 @@ def detach
# Graceful termination of the node
def shutdown
transition :shutdown
unless @remote_dead or DCell.id == id
unless @remote_dead or DCell.id == @id
kill_actors
farewell if Directory[id].alive? rescue IOError
farewell
end
@socket.close if @socket
@rsocket.close if @rsocket
@rserver.terminate if @rserver
NodeCache.delete id
@socket.terminate if @socket && @socket.alive?
@rsocket.terminate if @rsocket && @rsocket.alive?
@rserver.terminate if @rserver && @rserver.alive?
NodeCache.delete @id
MailboxManager.delete Thread.mailbox
Logger.info "Disconnected from #{id}"
end

# Obtain the node's 0MQ socket
def __socket(addr)
raise IOError unless addr

socket = Celluloid::ZMQ::PushSocket.new
begin
socket.connect addr
socket.linger = @heartbeat_timeout * 1000
rescue IOError
socket.close
socket = nil
raise
end
socket
Logger.info "Disconnected from #{@id}"
end

# Obtain socket for relay messages
def rsocket
return @rsocket if @rsocket
send_relayopen unless @raddr
@rsocket = __socket @raddr
@rsocket = ClientServer.new @raddr, @heartbeat_timeout*1000
end

# Obtain socket for management messages
def socket
return @socket if @socket
@socket = __socket @addr
@socket = ClientServer.new @addr, @heartbeat_timeout*1000
@socket.farewell = true
transition :connected
@socket
end

# Pack and send a message to another DCell node
def send_message(message, pipe=:default)
def send_message(message, pipe=:request)
queue = nil
if pipe == :default
if pipe == :request
queue = socket
elsif pipe == :response
queue = Celluloid::Actor[:server]
elsif pipe == :relay
queue = rsocket
end
Expand All @@ -198,11 +168,11 @@ def send_message(message, pipe=:default)
rescue => e
abort e
end
queue << message
queue.write @id, message
end

# Send request and wait for response
def push_request(request, pipe=:default, timeout=nil)
def push_request(request, pipe=:request, timeout=nil)
send_message request, pipe
save_request request
response = receive(timeout) do |msg|
Expand All @@ -214,7 +184,7 @@ def push_request(request, pipe=:default, timeout=nil)
end

# Send request and handle unroll response
def send_request(request, pipe=:default, timeout=nil)
def send_request(request, pipe=:request, timeout=nil)
response = push_request request, pipe, timeout
return if response.is_a? CancelResponse
if response.is_a? ErrorResponse
Expand All @@ -239,35 +209,37 @@ def async_relay(message)

# Goodbye message to remote actor
def farewell
return unless Directory[@id].alive?
request = Message::Farewell.new
send_message request
rescue
end

# Send a heartbeat message after the given interval
def send_heartbeat
return if DCell.id == id
request = DCell::Message::Heartbeat.new id
send_message request
return if DCell.id == @id
request = DCell::Message::Heartbeat.new @id
send_message request, @leech ? :response : :request
@heartbeat = after(@heartbeat_rate) { send_heartbeat }
end

# Handle an incoming heartbeat for this node
def handle_heartbeat(from)
return if from == id
return if from == @id
@leech = true unless state == :connected
transition :connected
transition :partitioned, delay: @heartbeat_timeout
end

# Send an advertising message
def send_relayopen
meta = {raddr: rserver.addr}
request = Message::RelayOpen.new(Thread.mailbox, id, meta)
request = Message::RelayOpen.new(Thread.mailbox)
@raddr = send_request request
end

# Handle an incoming node advertising message for this node
def handle_relayopen(from, meta)
@raddr = meta[:raddr]
def handle_relayopen
@rsocket = rserver
end

def rserver
Expand All @@ -277,22 +249,22 @@ def rserver

# Update TTL in registry
def update_ttl
Directory[id].update_ttl
Directory[@id].update_ttl
@ttl = after(@ttl_rate) { update_ttl }
end

def on_connected
send_heartbeat
unless id == DCell.id
unless @id == DCell.id
transition :partitioned, delay: @heartbeat_timeout
end
Logger.info "Connected to #{id}"
Logger.info "Connected to #{@id}"
end

def on_partitioned
@heartbeat.cancel if @heartbeat
@ttl.cancel if @ttl
Logger.warn "Communication with #{id} interrupted"
Logger.warn "Communication with #{@id} interrupted"
detach
end
end
Expand Down
Loading

0 comments on commit 931c985

Please sign in to comment.