Skip to content

Commit

Permalink
let the upper layer decide how to reconnect and handle lost messages
Browse files Browse the repository at this point in the history
  • Loading branch information
niamster committed Feb 16, 2015
1 parent fd1935f commit 3cd23aa
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 61 deletions.
8 changes: 2 additions & 6 deletions lib/dcell/messages.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def initialize

def dispatch
node = DCell::NodeCache.find @id
node.terminate if node
node.detach if node
end

def to_msgpack(pk=nil)
Expand Down Expand Up @@ -133,11 +133,7 @@ def dispatch
Celluloid::Actor::async actor.mailbox, @message[:meth], *@message[:args]
return
end
if actor
rsp = __dispatch actor
else
rsp = DeadActorResponse.new(@id, @sender[:address], nil)
end
rsp = __dispatch actor
respond rsp
end

Expand Down
61 changes: 13 additions & 48 deletions lib/dcell/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class Node
state :partitioned do
@heartbeat.cancel if @heartbeat
Logger.warn "Communication with #{id} interrupted"
reattach
detach
end

# Access sugar to NodeManager methods
Expand All @@ -45,19 +45,17 @@ def initialize(id, addr)
end

def save_request(request)
return if request.kind_of? Message::Relay
@requests.register(request.id) {request}
end

def delete_request(request)
return if request.kind_of? Message::Relay
@requests.delete request.id
end

def retry_requests
def cancel_requests
@requests.each do |id, request|
address = request.sender.address
rsp = RetryResponse.new id, address
rsp = CancelResponse.new id, address
rsp.dispatch
end
end
Expand All @@ -72,26 +70,11 @@ def kill_actors
end
end

def reattach
def detach
kill_actors
addr = Directory[id].address
if addr
update_client_address addr
retry_requests
else
@remote_dead = true
terminate
end
end

def update_client_address(addr)
@heartbeat.cancel if @heartbeat
@addr = addr
if @socket
@socket.close
@socket = nil
end
socket
cancel_requests
@remote_dead = true
terminate
end

def update_server_address(addr)
Expand Down Expand Up @@ -133,40 +116,22 @@ def socket
def push_request(request)
send_message request
save_request request
response = receive(@heartbeat_timeout*2) do |msg|
response = receive do |msg|
msg.respond_to?(:request_id) && msg.request_id == request.id
end
delete_request request
response
end

def dead_actor
raise ::Celluloid::DeadActorError.new
end

def handle_response(request, response)
unless response
dead_actor if request.kind_of? Message::Relay
return false
end
return false if response.is_a? RetryResponse
dead_actor if response.is_a? DeadActorResponse
def send_request(request)
response = push_request request
return if response.is_a? CancelResponse
if response.is_a? ErrorResponse
klass = Utils::full_const_get response.value[:class]
msg = response.value[:msg]
raise klass.new msg
end
true
end

def send_request(request)
# FIXME: need a robust way to retry the lost requests
loop do
response = push_request request
if handle_response request, response
return response.value
end
end
response.value
end

# Find an call registered with a given name on this node
Expand Down Expand Up @@ -217,7 +182,7 @@ def send_message(message)
socket << message
end
alias_method :<<, :send_message

# Send a heartbeat message after the given interval
def send_heartbeat
return if DCell.id == id
Expand Down
7 changes: 2 additions & 5 deletions lib/dcell/responses.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ class SuccessResponse < Response; end
# Request failed
class ErrorResponse < Response; end

# Retry response (request to retry action)
class RetryResponse < Response; end

# Remote actor is dead
class DeadActorResponse < Response; end
# Internal response to cancel pending request (remote node is likely dead)
class CancelResponse < Response; end
end
11 changes: 9 additions & 2 deletions spec/dcell/node_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,15 @@ def wait_for_death(time)

it "raises exception on a sync call to dead actor" do
actor = @node[:test_actor]
actor.suicide 3
wait_for_death 3
actor.suicide 1
wait_for_death 1
expect {actor.value}.to raise_error Celluloid::DeadActorError
end

it "raises exception on a sync call to dead actor even if it was killed" do
actor = @node[:test_actor]
actor.suicide 1, :KILL
wait_for_death 1
expect {actor.value}.to raise_error Celluloid::DeadActorError
end

Expand Down

0 comments on commit 3cd23aa

Please sign in to comment.