Skip to content

Commit

Permalink
Adapted NTS classes to use multiple monitor peers.
Browse files Browse the repository at this point in the history
  • Loading branch information
mertemba committed Aug 16, 2015
1 parent 7c79aa2 commit af8f2b7
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 69 deletions.
2 changes: 1 addition & 1 deletion src/monitor_nts.py
Expand Up @@ -108,7 +108,7 @@ def process_message(self, message, sender):
IP_addr = socket.inet_ntoa(IP_addr)
port = socket.ntohs(port)
peer = (IP_addr, port)
print("NTS: Received [send hello to %s %s]" % (peer_id, peer))
print("NTS: Received peer ID %s %s" % (peer_id, peer))
# Sending hello not needed as monitor and peer already communicated
if peer not in self.peer_list:
self.peer_list.append(peer)
Expand Down
20 changes: 11 additions & 9 deletions src/peer_nts.py
Expand Up @@ -138,19 +138,19 @@ def start_send_hello_thread(self):
def receive_the_list_of_peers_2(self):
# {{{

# The monitor peer endpoint has already been received
assert len(self.peer_list) == 1
# The monitor peer endpoints have already been received
assert len(self.peer_list) == self.number_of_monitors

sys.stdout.write(Color.green)
_print_("NTS: Requesting the number of peers from splitter")
sys.stdout.write(Color.none)
# Add 1 as the monitor peer was already received
self.number_of_peers = socket.ntohs(struct.unpack("H",
self.splitter_socket.recv(struct.calcsize("H")))[0]) + 1
self.splitter_socket.recv(struct.calcsize("H")))[0]) + self.number_of_monitors
_print_("NTS: The size of the team is %d (apart from me)" % self.number_of_peers)

# Skip the monitor peer
for _ in range(self.number_of_peers - 1):
for _ in range(self.number_of_peers - self.number_of_monitors):
message = self.splitter_socket.recv(common.PEER_ID_LENGTH +
struct.calcsize("4sHH"))
peer_id = message[:common.PEER_ID_LENGTH]
Expand Down Expand Up @@ -205,10 +205,11 @@ def try_to_disconnect_from_the_splitter(self):

# Note: This peer is *not* the monitor peer.

# Send UDP packets to splitter and monitor peer
# Send UDP packets to splitter and monitor peers
# to create working NAT entries and to determine the
# source port allocation type of the NAT of this peer
self.say_hello(self.peer_list[0])
for peer in self.peer_list[:self.number_of_monitors]:
self.say_hello(peer)
self.say_hello(self.splitter)
# Directly start packet sending
self.hello_messages_event.set()
Expand Down Expand Up @@ -237,7 +238,7 @@ def try_to_disconnect_from_the_splitter(self):
del self.hello_messages[:]
# Resetting peer lists
del self.initial_peer_list[:]
del self.peer_list[1:] # Leave monitor in list
del self.peer_list[self.number_of_monitors:] # Leave monitors in list
# Recreate the socket
# This similar to Peer_DBS.listen_to_the_team, but binds to a new random port
self.team_socket.close()
Expand All @@ -252,8 +253,9 @@ def try_to_disconnect_from_the_splitter(self):
# Say hello to splitter again, to retry incorporation
# 'N' for 'not incorporated'
self.send_message((self.peer_id + 'N', self.splitter))
# Say hello to monitor again, to keep the NAT entry alive
self.send_message((self.peer_id + 'N', self.peer_list[0]))
# Say hello to monitors again, to keep the NAT entry alive
for peer in self.peer_list[:self.number_of_monitors]:
self.send_message((self.peer_id + 'N', peer))
# Receive all peer endpoints and send hello messages
self.receive_the_list_of_peers_2()

Expand Down
127 changes: 68 additions & 59 deletions src/splitter_nts.py
Expand Up @@ -43,17 +43,17 @@ def __init__(self):

# {{{ The arriving peers. Key: ID.
# Value: (serve_socket, peer_address,
# source_port_to_splitter, source_port_to_monitor, arrive_time)
# source_port_to_splitter, source_ports_to_monitors, arrive_time)
# where source_port_to_splitter is the public source port towards the splitter
# and source_port_to_monitor is the public source port towards the monitor.
# and source_ports_to_monitors are the source ports towards the monitors.
# }}}
self.arriving_peers = {}

# {{{ The peers that are being incorporated, have closed their TCP
# connection to splitter and try to connect to all existing peers.
# They will be removed from the team if taking too long to connect to peers.
# key: peer_id; value: (peer, incorporation_time, source_port_to_splitter,
# source_port_to_monitor, serve_socket).
# source_ports_to_monitors, serve_socket).
# The source port values are set when the peer retries incorporation.
# }}}
self.incorporating_peers = {}
Expand Down Expand Up @@ -86,36 +86,36 @@ def send_the_list_of_peers(self, peer_serve_socket):
# are sent together with their IDs when a Peer_NTS instance has been
# created from the Peer_DBS instance, in send_the_list_of_peers_2()

if len(self.peer_list) == 0:
print("NTS: Sending an empty list of peers")
message = struct.pack("H", socket.htons(0))
peer_serve_socket.sendall(message)
else:
print("NTS: Sending the monitor as the list of peers")
# Send a peer list size of 1
message = struct.pack("H", socket.htons(1))
peer_serve_socket.sendall(message)
# Send the monitor endpoint
message = struct.pack("4sH", socket.inet_aton(self.peer_list[0][ADDR]), \
socket.htons(self.peer_list[0][PORT]))
print("NTS: Sending the monitors as the list of peers")
# Send the number of monitors
message = struct.pack("H", socket.htons(self.MONITOR_NUMBER))
peer_serve_socket.sendall(message)
# Send a peer list size equal to the number of monitor peers
message = struct.pack("H",
socket.htons(min(self.MONITOR_NUMBER, len(self.peer_list))))
peer_serve_socket.sendall(message)
# Send the monitor endpoints
for peer in self.peer_list[:self.MONITOR_NUMBER]:
message = struct.pack("4sH", socket.inet_aton(peer[ADDR]), \
socket.htons(peer[PORT]))
peer_serve_socket.sendall(message)

# }}}

def send_the_list_of_peers_2(self, peer_serve_socket, peer):
# {{{

# Send all peers except the monitor peer with their peer ID
number_of_other_peers = len(self.peer_list)-1
# Send all peers except the monitor peers with their peer ID
number_of_other_peers = len(self.peer_list)-self.MONITOR_NUMBER
if peer in self.peer_list:
number_of_other_peers -= 1
if __debug__:
print("DBS: Sending the list of peers except the monitor (%d peers)",
number_of_other_peers)
print("NTS: Sending the list of peers except the monitor (%d peers)" \
% number_of_other_peers)
message = struct.pack("H", socket.htons(number_of_other_peers))
peer_serve_socket.sendall(message)

for p in self.peer_list[1:]:
for p in self.peer_list[self.MONITOR_NUMBER:]:
if p == peer: # Do not send its endpoint to the peer itself
continue
# Also send the port step of the existing peer, in case
Expand Down Expand Up @@ -193,17 +193,19 @@ def handle_a_peer_arrival(self, connection):
peer_id = self.generate_id()
print("NTS: Sending ID %s to peer %s" % (peer_id, new_peer))
serve_socket.sendall(peer_id)
if len(self.peer_list) == 0:
if len(self.peer_list) < self.MONITOR_NUMBER:
# Directly incorporate the monitor peer into the team.
# The source ports are all set to the same, as the monitor peer
# The source ports are all set to the same, as the monitor peers
# should be publicly accessible
self.ids[new_peer] = peer_id
self.port_steps[new_peer] = 0
self.insert_peer(new_peer)
self.send_new_peer(peer_id, new_peer, [new_peer[1]]*self.MONITOR_NUMBER)
serve_socket.close()
return new_peer
else:
self.arriving_peers[peer_id] = (serve_socket, new_peer[0], 0, 0, time.time())
self.arriving_peers[peer_id] = (serve_socket, new_peer[0], 0, \
[0]*self.MONITOR_NUMBER, time.time())
# Splitter will continue with incorporate_peer() as soon as the arriving
# peer has sent UDP packets to splitter and monitor

Expand All @@ -213,35 +215,37 @@ def incorporate_peer(self, peer_id):
# {{{

serve_socket, peer_address, source_port_to_splitter, \
source_port_to_monitor, arrive_time = self.arriving_peers[peer_id]
source_ports_to_monitors, arrive_time = self.arriving_peers[peer_id]

print("NTS: Incorporating the peer %s. Source ports: %s, %s"
% (peer_id, source_port_to_splitter, source_port_to_monitor))
% (peer_id, source_port_to_splitter, source_ports_to_monitors))

new_peer = (peer_address, source_port_to_splitter)
if len(self.peer_list) != 0:
if len(self.peer_list) >= self.MONITOR_NUMBER:
try:
self.send_the_list_of_peers_2(serve_socket, new_peer)
except Exception as e:
print("NTS: %s" % e)

port_diff = abs(source_port_to_monitor - source_port_to_splitter)
self.port_steps[new_peer] = None
for source_port_to_monitor in source_ports_to_monitors:
self.update_port_step(new_peer, source_port_to_monitor)

self.send_new_peer(peer_id, new_peer, port_diff, source_port_to_monitor)
self.send_new_peer(peer_id, new_peer, source_ports_to_monitors)

# Insert the peer into the list
self.ids[new_peer] = peer_id
self.port_steps[new_peer] = port_diff
self.insert_peer(new_peer)
# The peer is in the team, but is not connected to all peers yet,
# so add to the list
self.incorporating_peers[peer_id] = (new_peer, time.time(), 0, 0, serve_socket)
self.incorporating_peers[peer_id] = (new_peer, time.time(), 0, \
[0]*self.MONITOR_NUMBER, serve_socket)

del self.arriving_peers[peer_id]

# }}}

def send_new_peer(self, peer_id, new_peer, port_step, source_port_to_monitor):
def send_new_peer(self, peer_id, new_peer, source_ports_to_monitors):
# {{{

if __debug__:
Expand All @@ -250,15 +254,15 @@ def send_new_peer(self, peer_id, new_peer, port_step, source_port_to_monitor):
for peer_number, peer in enumerate(self.peer_list):
if peer == new_peer: # Do not send its endpoint to the peer itself
continue
if peer_number == 0:
if peer_number < self.MONITOR_NUMBER:
# Send only the endpoint of the peer to the monitor,
# as the arriving peer and the monitor already communicated
message = peer_id + struct.pack("4sH", socket.inet_aton(new_peer[0]), \
socket.htons(source_port_to_monitor))
socket.htons(source_ports_to_monitors[peer_number]))
else:
# Send all information necessary for port prediction to the existing peers
message = peer_id + struct.pack("4sHHH", socket.inet_aton(new_peer[0]), \
socket.htons(new_peer[1]), socket.htons(port_step), \
socket.htons(new_peer[1]), socket.htons(self.port_steps[new_peer]), \
socket.htons(peer_number))

# Hopefully one of these packets arrives
Expand All @@ -285,18 +289,19 @@ def retry_to_incorporate_peer(self, peer_id):
# {{{

# Update source port information
peer, start_time, source_port_to_splitter, source_port_to_monitor, \
peer, start_time, source_port_to_splitter, source_ports_to_monitors, \
serve_socket = self.incorporating_peers[peer_id]
self.update_port_step(peer, source_port_to_splitter)
self.update_port_step(peer, source_port_to_monitor)
for source_port_to_monitor in source_ports_to_monitors:
self.update_port_step(peer, source_port_to_monitor)
# Update peer lists
new_peer = (peer[0], source_port_to_splitter)
self.update_peer(peer, new_peer)
self.incorporating_peers[peer_id] = (new_peer, start_time, 0, 0, serve_socket)
self.incorporating_peers[peer_id] = (new_peer, start_time, 0, \
[0]*self.MONITOR_NUMBER, serve_socket)

# Send the updated endpoint to the existing peers
self.send_new_peer(peer_id, new_peer, self.port_steps[new_peer],
source_port_to_monitor)
self.send_new_peer(peer_id, new_peer, source_ports_to_monitors)

# Send all peers to the retrying peer
try:
Expand All @@ -312,6 +317,8 @@ def update_port_step(self, peer, source_port):
# Skip check if measured port step is 0
if self.port_steps[peer] == 0:
return
if self.port_steps[peer] == None:
self.port_steps[peer] = 0
# Update source port information
port_diff = abs(peer[1] - source_port)
previous_port_step = self.port_steps[peer]
Expand Down Expand Up @@ -378,20 +385,20 @@ def moderate_the_team(self):
print('NTS: Peer ID %s is not an arriving peer' % peer_id)
continue

peer_data = self.arriving_peers[peer_id]
if peer_data[1] != sender[0]:
if self.arriving_peers[peer_id][1] != sender[0]:
print('NTS: ID %s: peer address over TCP (%s) and UDP (%s) is different'
% (peer_id, peer_data[1], sender[0]))
% (peer_id, self.arriving_peers[peer_id][1], sender[0]))
source_port_to_splitter = sender[1]
# Update peer information
self.arriving_peers[peer_id] = (peer_data[0], peer_data[1],
source_port_to_splitter, peer_data[3], peer_data[4])
self.arriving_peers[peer_id] = self.arriving_peers[peer_id][:2] \
+ (source_port_to_splitter,) + self.arriving_peers[peer_id][3:]

if source_port_to_splitter != 0 and peer_data[3] != 0:
if self.arriving_peers[peer_id][2] != 0 and \
0 not in self.arriving_peers[peer_id][3]:
# Source ports are known, incorporate the peer
self.incorporate_peer(peer_id)

elif len(self.peer_list) > 0 and sender == self.peer_list[0] and \
elif sender in self.peer_list[:self.MONITOR_NUMBER] and \
len(message) == common.PEER_ID_LENGTH + struct.calcsize("H"):
# Packet is from monitor
peer_id = message[:common.PEER_ID_LENGTH]
Expand All @@ -402,14 +409,15 @@ def moderate_the_team(self):
print('NTS: Peer ID %s is not an arriving peer' % peer_id)
continue

peer_data = self.arriving_peers[peer_id]
source_port_to_monitor = socket.ntohs(struct.unpack("H",
message[common.PEER_ID_LENGTH:])[0])
# Get monitor number
index = self.peer_list.index(sender)
# Update peer information
self.arriving_peers[peer_id] = (peer_data[0], peer_data[1],
peer_data[2], source_port_to_monitor, peer_data[4])
self.arriving_peers[peer_id][3][index] = source_port_to_monitor

if peer_data[2] != 0 and source_port_to_monitor != 0:
if self.arriving_peers[peer_id][2] != 0 and \
0 not in self.arriving_peers[peer_id][3]:
# All source ports are known, incorporate the peer
self.incorporate_peer(peer_id)

Expand Down Expand Up @@ -460,17 +468,17 @@ def moderate_the_team(self):
print('NTS: Peer %s retries incorporation from same port, ignoring' % peer_id)
continue
print('NTS: Peer %s retries incorporation from %s' % (peer_id, sender))
peer_data = self.incorporating_peers[peer_id]
source_port_to_splitter = sender[1]
# Update peer information
self.incorporating_peers[peer_id] = (peer_data[0], peer_data[1],
source_port_to_splitter, peer_data[3], peer_data[4])
self.incorporating_peers[peer_id] = self.incorporating_peers[peer_id][:2] \
+ (source_port_to_splitter,) + self.incorporating_peers[peer_id][3:]

if source_port_to_splitter != 0 and peer_data[3] != 0:
if self.incorporating_peers[peer_id][2] != 0 and \
0 not in self.incorporating_peers[peer_id][3]:
# All source ports are known, incorporate the peer
self.retry_to_incorporate_peer(peer_id)

elif len(self.peer_list) > 0 and sender == self.peer_list[0] and \
elif sender in self.peer_list[:self.MONITOR_NUMBER] and \
len(message) == common.PEER_ID_LENGTH+1 + struct.calcsize("H"):
# Packet is from monitor
peer_id = message[:common.PEER_ID_LENGTH]
Expand All @@ -481,14 +489,15 @@ def moderate_the_team(self):
print('NTS: Peer ID %s is not an incorporating peer' % peer_id)
continue

peer_data = self.incorporating_peers[peer_id]
source_port_to_monitor = socket.ntohs(struct.unpack("H",
message[common.PEER_ID_LENGTH+1:])[0])
# Get monitor number
index = self.peer_list.index(sender)
# Update peer information
self.incorporating_peers[peer_id] = (peer_data[0], peer_data[1],
peer_data[2], source_port_to_monitor, peer_data[4])
self.incorporating_peers[peer_id][3][index] = source_port_to_monitor

if peer_data[2] != 0 and source_port_to_monitor != 0:
if self.incorporating_peers[peer_id][2] != 0 and \
0 not in self.incorporating_peers[peer_id][3]:
# All source ports are known, incorporate the peer
self.retry_to_incorporate_peer(peer_id)

Expand Down

0 comments on commit af8f2b7

Please sign in to comment.