diff --git a/lib/gosim/network.rb b/lib/gosim/network.rb index 2d41d56..96255fb 100644 --- a/lib/gosim/network.rb +++ b/lib/gosim/network.rb @@ -37,32 +37,44 @@ def register_node(node) end def get_node(addr) - @nodes[node.addr] + @nodes[addr] end # Simple send packet that is always handled by Node#recv_packet def send_packet(src, receivers, packet) [*receivers].each do |receiver| - send_rpc_packet(:recv_packet, src, receiver, packet) + @sim.schedule_event(:handle_packet, + @sid, + rand(@mean_latency) + LATENCY_DEV, + GSNetworkPacket.new(id, src, receiver, packet)) + end + end + + def recv_packet(packet) + if @nodes[packet.dest].alive? + @sim.schedule_event(packet.id, packet.dest, 0, packet.data) + else + @sim.schedule_event(:handle_failed_packet, packet.src, 0, + FailedPacket.new(packet.dest, packet.data)) end end # An rpc send that gets handled by a specific method on the receiver - def send_rpc_packet(id, src, receiver, packet) - @sim.schedule_event(:handle_gs_network_packet, + def send_rpc_packet(id, src, dest, args) + @sim.schedule_event(:recv_rpc_packet, @sid, rand(@mean_latency) + LATENCY_DEV, - GSNetworkPacket.new(id, src, receiver, packet)) + GSNetworkPacket.new(id, src, dest, args)) end - def handle_gs_network_packet(packet) + def recv_rpc_packet(packet) if @nodes[packet.dest].alive? - @sim.schedule_event(packet.id, packet.dest, 0, packet.data) + @nodes[packet.dest].send(packet.id, *packet.data) else - @sim.schedule_event(:handle_failed_packet, packet.src, 0, - FailedPacket.new(packet.dest, packet.data)) + @nodes[packet.src].send(:handle_failed_rpc, packet.id, packet.data) end end + end class RPCInvalidMethodError < Exception; end @@ -71,36 +83,33 @@ class Peer def initialize(local_node, remote_node) @local_node = local_node @remote_node = remote_node + + @topo = Topology.instance + end + + def addr + @remote_node.addr end def method_missing(method, *args) - raise RPCInvalidMethodError unless @node.respond_to?(method) + raise RPCInvalidMethodError.new("#{method} not available on target node!") unless @remote_node.respond_to?(method) - @topo.send_rpc_packet(@local_node.addr, @remote_node.addr, args) + @topo.send_rpc_packet(method, @local_node.addr, @remote_node.addr, args) end end class Node < Entity - attr_reader :addr, :neighbor_ids + attr_reader :addr def initialize() super() @addr = @sid @topo = Topology.instance - @neighbor_ids = [] @alive = true @topo.register_node(self) end - def link(neighbors) - if neighbors.respond_to?(:to_ary) - @neighbor_ids += neighbors - else - @neighbor_ids << neighbors - end - end - def handle_liveness_packet(pkt) @alive = pkt.alive end @@ -119,17 +128,21 @@ def send_packet(receivers, pkt) # Override this in your subclass to do custom demuxing. def recv_packet(pkt) + log "default recv_packet handler..." end - def rpc_connect(addr) - + def get_peer(addr) + Peer.new(self, @topo.get_node(addr)) end # Implement this method to do something specific for your application. def handle_failed_packet(pkt) - puts "Got a failed packet! (#{pkt.data.class})" + log "Got a failed packet! (#{pkt.data.class})" end + def handle_failed_rpc(method, data) + log "Got a failed rpc call: #{method}(#{data.join(', ')})" + end end end # module Net end # module GoSim diff --git a/test/network_test.rb b/test/network_test.rb index 61fa8f3..3328419 100644 --- a/test/network_test.rb +++ b/test/network_test.rb @@ -13,13 +13,18 @@ def initialize @got_message = false @pkt_cache = [] @failed_packets = 0 + @neighbors = {} + end + + def add_neighbor(addr) + @neighbors[addr] = get_peer(addr) end def handle_packet(pkt) @got_message = true unless @pkt_cache.index(pkt.seq_num) - send_packet(:handle_packet, @neighbor_ids, pkt) unless @neighbor_ids.empty? + @neighbors.values.each {|n| n.handle_packet(pkt)} @pkt_cache << pkt.seq_num end end @@ -28,7 +33,7 @@ def got_message? @got_message end - def handle_failed_packet(pkt) + def handle_failed_rpc(method, args) @failed_packets += 1 end end @@ -41,31 +46,12 @@ def setup @sim = GoSim::Simulation.instance @topo = GoSim::Net::Topology.instance - #@sim.verbose @sim.quiet end def teardown @sim.reset end - - def test_linking - nodes = {} - NUM_NODES.times do - n = TestNode.new - nodes[n.sid] = n - end - - n = TestNode.new - n.link(nodes.keys) - assert_equal(NUM_NODES, n.neighbor_ids.size) - - n = TestNode.new - n.link(nodes[0]) - assert_equal(1, n.neighbor_ids.size) - n.link(nodes[1]) - assert_equal(2, n.neighbor_ids.size) - end def test_flood nodes = {} @@ -79,15 +65,15 @@ def test_flood nodes.each do |sid, node| (rand(CONNECTIVITY) + 1).times do - neighbor = nodes.keys[rand(NUM_NODES)] - node.link(neighbor) unless neighbor == sid + n_addr = nodes.keys[rand(NUM_NODES)] + node.add_neighbor(n_addr) unless n_addr == node.addr end end @sim.schedule_event(:handle_packet, nodes.keys[0], 0, Packet.new(1)) @sim.run - nodes.values.each { |node| assert(node.got_message?) } + nodes.values.each { |node| assert(node.got_message?, "#{node.addr}->#{node.got_message?}") } end def test_liveness_and_failure @@ -95,7 +81,7 @@ def test_liveness_and_failure node_a = TestNode.new node_b = TestNode.new - node_a.link(node_b.sid) + node_a.add_neighbor(node_b.addr) 2.times {|i| @sim.schedule_event(:handle_packet, node_a.sid, 0, Packet.new(i)) } 2.times {|i| @sim.schedule_event(:handle_packet, node_a.sid,