Skip to content

Commit

Permalink
lazy relay pipe creation
Browse files Browse the repository at this point in the history
  • Loading branch information
niamster committed Mar 10, 2015
1 parent 1701294 commit eb444cb
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 17 deletions.
35 changes: 31 additions & 4 deletions lib/dcell/messages.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,51 @@ def respond(rsp)
__respond rsp, :default
end

# A request to open relay pipe
class RelayOpen < Message
def initialize(sender, from, meta)
@id = DCell.id
@sender = sender
@from = from
@meta = meta
end

def dispatch
node = DCell::Node[id]
node.handle_relayopen @from, @meta
respond SuccessResponse.new(id, @sender[:address], node.rserver.addr)
rescue => e
# :nocov:
respond ErrorResponse.new(id, @sender[:address], {class: e.class.name, msg: e.to_s})
# :nocov:
end

def to_msgpack(pk=nil)
{
type: self.class.name,
id: id,
args: [@sender, @from, @meta]
}.to_msgpack(pk)
end
end

# Heartbeat messages inform other nodes this node is healthy
class Heartbeat < Message
def initialize(from, raddr)
def initialize(from)
@id = DCell.id
@from = from
@raddr = raddr
end

def dispatch
node = DCell::Node[id]
node.handle_heartbeat @from, @raddr if node
node.handle_heartbeat @from if node
end

def to_msgpack(pk=nil)
{
type: self.class.name,
id: id,
args: [@from, @raddr]
args: [@from]
}.to_msgpack(pk)
end
end
Expand Down
34 changes: 21 additions & 13 deletions lib/dcell/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def initialize
end

def terminate
@server.terminate
@server.terminate if @server.alive?
end
end

Expand Down Expand Up @@ -51,9 +51,6 @@ class << self

def initialize(id, addr, server=false)
@id, @addr = id, addr
@raddr = nil
@socket, @rsocket = nil, nil
@ttl, @heartbeat = nil, nil
@requests = ResourceManager.new
@actors = ResourceManager.new
@remote_dead = false
Expand All @@ -68,7 +65,6 @@ def initialize(id, addr, server=false)
Logger.warn "Node '#{@id}' looks dead"
raise DeadNodeError.new
end
@rserver = RelayServer.new

# Total hax to accommodate the new Celluloid::FSM API
attach self
Expand Down Expand Up @@ -176,11 +172,7 @@ def __socket(addr)
# Obtain socket for relay messages
def rsocket
return @rsocket if @rsocket
# a backup if relay message was the first one
unless @raddr
Logger.warn "Remote relay pipe of node #{id} is not yet ready"
return socket
end
send_relayopen unless @raddr
@rsocket = __socket @raddr
end

Expand Down Expand Up @@ -254,19 +246,35 @@ def farewell
# Send a heartbeat message after the given interval
def send_heartbeat
return if DCell.id == id
request = DCell::Message::Heartbeat.new id, @rserver.addr
request = DCell::Message::Heartbeat.new id
send_message request
@heartbeat = after(@heartbeat_rate) { send_heartbeat }
end

# Handle an incoming heartbeat for this node
def handle_heartbeat(from, raddr)
def handle_heartbeat(from)
return if from == id
@raddr = raddr
transition :connected
transition :partitioned, delay: @heartbeat_timeout
end

# Send an advertising message
def send_relayopen
meta = {raddr: rserver.addr}
request = Message::RelayOpen.new(Thread.mailbox, id, meta)
@raddr = send_request request
end

# Handle an incoming node advertising message for this node
def handle_relayopen(from, meta)
@raddr = meta[:raddr]
end

def rserver
return @rserver if @rserver
@rserver = RelayServer.new
end

# Update TTL in registry
def update_ttl
Directory[id].update_ttl
Expand Down

0 comments on commit eb444cb

Please sign in to comment.