Skip to content

Commit

Permalink
Implemented RPC semantics for networking.
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeff Rose committed Feb 27, 2007
1 parent 845aabc commit 5688574
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 49 deletions.
61 changes: 37 additions & 24 deletions lib/gosim/network.rb
Expand Up @@ -37,32 +37,44 @@ def register_node(node)
end

def get_node(addr)
@nodes[node.addr]
@nodes[addr]
end

# Simple send packet that is always handled by Node#recv_packet
def send_packet(src, receivers, packet)
[*receivers].each do |receiver|
send_rpc_packet(:recv_packet, src, receiver, packet)
@sim.schedule_event(:handle_packet,
@sid,
rand(@mean_latency) + LATENCY_DEV,
GSNetworkPacket.new(id, src, receiver, packet))
end
end

def recv_packet(packet)
if @nodes[packet.dest].alive?
@sim.schedule_event(packet.id, packet.dest, 0, packet.data)
else
@sim.schedule_event(:handle_failed_packet, packet.src, 0,
FailedPacket.new(packet.dest, packet.data))
end
end

# An rpc send that gets handled by a specific method on the receiver
def send_rpc_packet(id, src, receiver, packet)
@sim.schedule_event(:handle_gs_network_packet,
def send_rpc_packet(id, src, dest, args)
@sim.schedule_event(:recv_rpc_packet,
@sid,
rand(@mean_latency) + LATENCY_DEV,
GSNetworkPacket.new(id, src, receiver, packet))
GSNetworkPacket.new(id, src, dest, args))
end

def handle_gs_network_packet(packet)
def recv_rpc_packet(packet)
if @nodes[packet.dest].alive?
@sim.schedule_event(packet.id, packet.dest, 0, packet.data)
@nodes[packet.dest].send(packet.id, *packet.data)
else
@sim.schedule_event(:handle_failed_packet, packet.src, 0,
FailedPacket.new(packet.dest, packet.data))
@nodes[packet.src].send(:handle_failed_rpc, packet.id, packet.data)
end
end

end

class RPCInvalidMethodError < Exception; end
Expand All @@ -71,36 +83,33 @@ class Peer
def initialize(local_node, remote_node)
@local_node = local_node
@remote_node = remote_node

@topo = Topology.instance
end

def addr
@remote_node.addr
end

def method_missing(method, *args)
raise RPCInvalidMethodError unless @node.respond_to?(method)
raise RPCInvalidMethodError.new("#{method} not available on target node!") unless @remote_node.respond_to?(method)

@topo.send_rpc_packet(@local_node.addr, @remote_node.addr, args)
@topo.send_rpc_packet(method, @local_node.addr, @remote_node.addr, args)
end
end

class Node < Entity
attr_reader :addr, :neighbor_ids
attr_reader :addr

def initialize()
super()
@addr = @sid
@topo = Topology.instance
@neighbor_ids = []
@alive = true

@topo.register_node(self)
end

def link(neighbors)
if neighbors.respond_to?(:to_ary)
@neighbor_ids += neighbors
else
@neighbor_ids << neighbors
end
end

def handle_liveness_packet(pkt)
@alive = pkt.alive
end
Expand All @@ -119,17 +128,21 @@ def send_packet(receivers, pkt)

# Override this in your subclass to do custom demuxing.
def recv_packet(pkt)
log "default recv_packet handler..."
end

def rpc_connect(addr)

def get_peer(addr)
Peer.new(self, @topo.get_node(addr))
end

# Implement this method to do something specific for your application.
def handle_failed_packet(pkt)
puts "Got a failed packet! (#{pkt.data.class})"
log "Got a failed packet! (#{pkt.data.class})"
end

def handle_failed_rpc(method, data)
log "Got a failed rpc call: #{method}(#{data.join(', ')})"
end
end
end # module Net
end # module GoSim
36 changes: 11 additions & 25 deletions test/network_test.rb
Expand Up @@ -13,13 +13,18 @@ def initialize
@got_message = false
@pkt_cache = []
@failed_packets = 0
@neighbors = {}
end

def add_neighbor(addr)
@neighbors[addr] = get_peer(addr)
end

def handle_packet(pkt)
@got_message = true

unless @pkt_cache.index(pkt.seq_num)
send_packet(:handle_packet, @neighbor_ids, pkt) unless @neighbor_ids.empty?
@neighbors.values.each {|n| n.handle_packet(pkt)}
@pkt_cache << pkt.seq_num
end
end
Expand All @@ -28,7 +33,7 @@ def got_message?
@got_message
end

def handle_failed_packet(pkt)
def handle_failed_rpc(method, args)
@failed_packets += 1
end
end
Expand All @@ -41,31 +46,12 @@ def setup
@sim = GoSim::Simulation.instance
@topo = GoSim::Net::Topology.instance

#@sim.verbose
@sim.quiet
end

def teardown
@sim.reset
end

def test_linking
nodes = {}
NUM_NODES.times do
n = TestNode.new
nodes[n.sid] = n
end

n = TestNode.new
n.link(nodes.keys)
assert_equal(NUM_NODES, n.neighbor_ids.size)

n = TestNode.new
n.link(nodes[0])
assert_equal(1, n.neighbor_ids.size)
n.link(nodes[1])
assert_equal(2, n.neighbor_ids.size)
end

def test_flood
nodes = {}
Expand All @@ -79,23 +65,23 @@ def test_flood

nodes.each do |sid, node|
(rand(CONNECTIVITY) + 1).times do
neighbor = nodes.keys[rand(NUM_NODES)]
node.link(neighbor) unless neighbor == sid
n_addr = nodes.keys[rand(NUM_NODES)]
node.add_neighbor(n_addr) unless n_addr == node.addr
end
end

@sim.schedule_event(:handle_packet, nodes.keys[0], 0, Packet.new(1))
@sim.run

nodes.values.each { |node| assert(node.got_message?) }
nodes.values.each { |node| assert(node.got_message?, "#{node.addr}->#{node.got_message?}") }
end

def test_liveness_and_failure
nodes = {}
node_a = TestNode.new
node_b = TestNode.new

node_a.link(node_b.sid)
node_a.add_neighbor(node_b.addr)

2.times {|i| @sim.schedule_event(:handle_packet, node_a.sid, 0, Packet.new(i)) }
2.times {|i| @sim.schedule_event(:handle_packet, node_a.sid,
Expand Down

0 comments on commit 5688574

Please sign in to comment.