Skip to content

Commit

Permalink
Moved RPC out of network into a new subclass of Node call RPCNode (al…
Browse files Browse the repository at this point in the history
…so in the Net namespace). New RPC namespace for all other RPC related functionality, which has also been moved out of topology. Much cleaner. Must inheriet from RPCNode if you want RPC functionality.
  • Loading branch information
Cyrus Hall committed Apr 19, 2007
1 parent a809f63 commit fec106a
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 138 deletions.
1 change: 1 addition & 0 deletions lib/gosim.rb
Expand Up @@ -46,6 +46,7 @@ def verbose
require 'gosim/simulation'
require 'gosim/defer'
require 'gosim/network'
require 'gosim/rpc'
require 'gosim/event_reader'
require 'gosim/data'
require 'gosim/event_cast'
Expand Down
2 changes: 1 addition & 1 deletion lib/gosim/defer.rb
Expand Up @@ -120,7 +120,7 @@ def is_callable?(obj)
end

def is_failure?(obj)
obj.is_a?(Failure)
obj.is_a?(Net::Failure) || obj.nil?
end
end
end
Expand Down
147 changes: 12 additions & 135 deletions lib/gosim/network.rb
@@ -1,99 +1,30 @@
module GoSim
module Net
LATENCY_MEAN = 100
LATENCY_DEV = 50
LATENCY_MEAN = 150
LATENCY_BASE = 30

GSNetworkPacket = Struct.new(:id, :src, :dest, :data)
FailedPacket = Struct.new(:dest, :data)

ERROR_NODE_FAILURE = 0

class RPCRequest
attr_reader :uid, :src, :dest, :method, :args

@@rpc_counter = 0

def RPCRequest.next_uid
@@rpc_counter += 1
end

def initialize(src, dest, method, args)
@src = src
@dest = dest
@method = method
@args = args

@uid = RPCRequest.next_uid
end
end

class RPCResponse
attr_reader :uid, :result

def initialize(uid, result)
@uid = uid
@result = result
end
end

# Add a no-return method to Deferred so it can clear state for methods
# without return values.
class RPCDeferred < Deferred
def initialize(uid = nil)
@uid = uid

super()
end

def default_callback(callback = nil, &block)
callback = callback || block

if callback
raise NotCallableError unless is_callable?(callback)
@default_cb = callback
end
end

def default_errback(errback = nil, &block)
errback = errback || block

if errback
raise NoterrableError unless is_callable?(errback)
@default_eb = errback
end
end

def run_callbacks
# Check for defaults. Call the appropriate one only if no calls have
# been provided
if is_failure?(@result)
@default_eb.call(@result) if !has_errbacks? && @default_eb
elsif !has_callbacks? && @default_cb
@default_cb.call(@result)
end

super()
end

def no_return
Topology.instance.remove_deferred(@uid)
end
end

class Topology < Entity
include Singleton

attr_reader :latency_mean, :latency_base

def initialize()
super()

@sim.add_observer(self)
@nodes = {}
@rpc_deferreds = {}
# @rpc_deferreds = {}
set_latency()
end

def set_latency(latency_mean = LATENCY_MEAN, latency_dev = LATENCY_DEV)
def set_latency(latency_mean = LATENCY_MEAN, latency_base = LATENCY_BASE)
@latency_mean = latency_mean
@latency_dev = latency_dev
@latency_base = latency_base
end

# Called by simulation when a reset occurs
Expand All @@ -111,17 +42,14 @@ def register_node(node)
def get_node(addr)
@nodes[addr]
end

def remove_deferred(uid)
@rpc_deferreds.delete(uid)
end
# private :get_node

# Simple send packet that is always handled by Node#recv_packet
def send_packet(src, receivers, packet)
[*receivers].each do |receiver|
@sim.schedule_event(:handle_packet,
@sid,
rand(@mean_latency) + LATENCY_DEV,
rand(@latency_mean) + @latency_base,
GSNetworkPacket.new(id, src, receiver, packet))
end
end
Expand All @@ -134,56 +62,8 @@ def recv_packet(packet)
FailedPacket.new(packet.dest, packet.data))
end
end

# Send an rpc request that gets handled by a specific method on the receiver
def rpc_request(src, dest, method, args)
request = RPCRequest.new(src, dest, method, args)
@sim.schedule_event(:handle_rpc_request,
@sid,
rand(@mean_latency) + LATENCY_DEV,
request)

deferred = RPCDeferred.new(request.uid)
@rpc_deferreds[request.uid] = deferred

return deferred
end

# Dispatches an RPC request to a specific method, and return a result
# unless the method returns nil.
def handle_rpc_request(request)
#puts "top of request"
if @nodes[request.dest].alive?
#puts "1 request...#{request.inspect}"

# If there is no response delete the deferred.
# TODO: Maybe we want to signal something to the deferred here also?
result = @nodes[request.dest].send(request.method, *request.args)

@sim.schedule_event(:handle_rpc_response,
@sid,
rand(@mean_latency) + LATENCY_DEV,
RPCResponse.new(request.uid, result))
else
#puts "2 request..."
if @rpc_deferreds.has_key?(request.uid)
@rpc_deferreds[request.uid].errback(Failure.new(request))
remove_deferred(request.uid)
end
end
end

def handle_rpc_response(response)
#puts "response...#{response}"
if @rpc_deferreds.has_key?(response.uid)
@rpc_deferreds[response.uid].callback(response.result)
remove_deferred(response.uid)
end
end
end

class RPCInvalidMethodError < Exception; end

class Peer
attr_reader :addr

Expand All @@ -203,9 +83,9 @@ def initialize(local_node, remote_addr)
end

def method_missing(method, *args)
raise RPCInvalidMethodError.new("#{method} not available on target node!") unless @remote_node.respond_to?(method)
raise RPC::RPCInvalidMethodError.new("#{method} not available on target node!") unless @remote_node.respond_to?(method)

deferred = @topo.rpc_request(@local_node.addr, @remote_node.addr, method, args)
deferred = @local_node.rpc_request(@local_node.addr, @remote_node.addr, method, args)
deferred.default_callback(@default_cb) if @default_cb
deferred.default_errback(@default_eb) if @default_eb

Expand Down Expand Up @@ -259,9 +139,6 @@ def handle_failed_packet(pkt)
log {"Got a failed packet! (#{pkt.data.class})"}
end

def handle_failed_rpc(method, data)
log {"Got a failed rpc call: #{method}(#{data.join(', ')})"}
end
end
end # module Net
end # module GoSim
147 changes: 147 additions & 0 deletions lib/gosim/rpc.rb
@@ -0,0 +1,147 @@
module GoSim
module RPC
class RPCRequest
attr_reader :uid, :src, :dest, :method, :args

@@rpc_counter = 0

def RPCRequest.next_uid
@@rpc_counter += 1
end

def initialize(src, dest, method, args)
@src = src
@dest = dest
@method = method
@args = args

@uid = RPCRequest.next_uid
end
end

class RPCResponse
attr_reader :uid, :result

def initialize(uid, result = nil)
@uid = uid
@result = result
end
end

class RPCErrorResponse < RPCResponse; end

class RPCDeferred < Net::Deferred
def initialize(uid = nil)
@uid = uid

super()
end

def default_callback(callback = nil, &block)
callback = callback || block

if callback
raise NotCallableError unless is_callable?(callback)
@default_cb = callback
end
end

def default_errback(errback = nil, &block)
errback = errback || block

if errback
raise NoterrableError unless is_callable?(errback)
@default_eb = errback
end
end

def run_callbacks
# Check for defaults. Call the appropriate one only if no calls have
# been provided
if is_failure?(@result)
@default_eb.call(@result) if !has_errbacks? && @default_eb
elsif !has_callbacks? && @default_cb
@default_cb.call(@result)
end

super()
end

def no_return
Topology.instance.remove_deferred(@uid)
end
end

class RPCInvalidMethodError < Exception; end
end

module Net
class RPCNode < Net::Node
def initialize
super

@topo = Topology::instance
@rpc_deferreds = {}
end

def remove_deferred(uid)
@rpc_deferreds.delete(uid)
end

# Send an rpc request that gets handled by a specific method on the receiver
def rpc_request(src, dest, method, args)
request = RPC::RPCRequest.new(src, dest, method, args)
@sim.schedule_event(:handle_rpc_request,
dest,
rand(@topo.latency_mean) + @topo.latency_base,
request)

deferred = RPC::RPCDeferred.new(request.uid)
@rpc_deferreds[request.uid] = deferred

return deferred
end

# Dispatches an RPC request to a specific method, and return a result
# unless the method returns nil.
def handle_rpc_request(request)
#puts "top of request"
if alive?
#puts "1 request...#{request.inspect}"

# If there is no response delete the deferred.
# TODO: Maybe we want to signal something to the deferred here also?
result = send(request.method, *request.args)

@sim.schedule_event(:handle_rpc_response,
request.src,
rand(@topo.latency_mean) + @topo.latency_base,
RPC::RPCResponse.new(request.uid, result))
else
#puts "2 request..."
@sim.schedule_event(:handle_rpc_response,
request.src,
rand(@topo.latency_mean) + @topo.latency_base,
RPC::RPCErrorResponse.new(request.uid))
end
end

def handle_rpc_response(response)
#puts "response...#{response}"
if @rpc_deferreds.has_key?(response.uid)
if response.class == RPC::RPCErrorResponse
@rpc_deferreds[response.uid].errback(response.result)
else
@rpc_deferreds[response.uid].callback(response.result)
end
remove_deferred(response.uid)
end
end

def handle_failed_rpc(method, data)
log {"Got a failed rpc call: #{method}(#{data.join(', ')})"}
end
end # RPCNode

end # Net
end # GoSim
6 changes: 4 additions & 2 deletions test/network_test.rb
Expand Up @@ -5,7 +5,7 @@

Packet = Struct.new(:seq_num)

class TestNode < GoSim::Net::Node
class TestNode < GoSim::Net::RPCNode
attr_reader :failed_packets

def initialize
Expand Down Expand Up @@ -89,8 +89,10 @@ def test_liveness_and_failure

node_a.add_neighbor(node_b.addr)

@topo.set_latency(0, 0)

@sim.schedule_event(:alive, node_b.sid, 4999, false)
10.times {|i| @sim.schedule_event(:handle_packet, node_a.sid, i*1000, Packet.new(i)) }
@sim.schedule_event(:alive, node_b.sid, 5000, false)
@sim.run

assert_equal(5, node_a.failed_packets)
Expand Down

0 comments on commit fec106a

Please sign in to comment.