-
Notifications
You must be signed in to change notification settings - Fork 0
/
rdt.py
270 lines (220 loc) · 9.12 KB
/
rdt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
"""
rdt.py
Author: Michael P. Lang
Caleb Wrobel
Email: michael@mplang.net
Data Modified: 1 December 2012
"""
import threading
import socket
import time
import queue
from math import ceil
class Rdt(object):
seq_number = 1 # Static sequence number variable shared amongst all
# instances of the Rdt class. Valid values are [1,
# 2147483647] (i.e., a 32-bit signed integer).
def __init__(self, hostname):
"""
Initialize default values.
Arguments:
hostname -- hostname of this (sending) client.
"""
self.hostname = hostname
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.listen_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.message_queue = queue.Queue()
self.stdout_lock = threading.Lock()
self.fragments = {}
self.MTU = 128
# Estimated RTT in seconds
self.estimated_RTT = 0.1
self.dev_RTT = 0.0
# Socket timeout interval in seconds
self.timeout_interval = 1.0
# Maximum number of retransmission attempts for a packet
self.max_retries = 3
# Maximum sequence number value (2^31 - 1)
self.MAX_SEQ_NUM = 2147483647
# If a (host_id, comm_id) pair has already been processed
# add it to the list
self.closed_communications = []
def send(self, comm_id, data, address):
"""
Method called from upper layer to handle message. Complete RDT packets
are transmitted via UDP.
Arguments:
data -- the (unencoded) upper-layer data to transmit.
address -- the destination IP address/FQDN and port passed as a
2-tuple.
Returns:
True if upper-layer data was sent successfully, else returns False.
"""
for header, packet in self.make_packets(comm_id, data):
ack_seq = 0
retries = 0
sample_RTT = 0.0
while not ack_seq:
if retries == self.max_retries:
# Reached maximum number of retransmissions.
# TODO: Send RST packet?
self.sock.settimeout(None) # Place socket in blocking mode
self.increment_seq_number()
return False
#print("Sending to {} @ {}".format(address[0], address[1]))
self.sock.sendto(packet.encode(), address)
self.sock.settimeout(self.timeout_interval)
while True:
try:
start_time = time.perf_counter()
response = self.sock.recv(1024)
sample_RTT = time.perf_counter() - start_time
ack_seq = self.process_response(header, response)
if ack_seq == Rdt.seq_number or ack_seq == 0:
# Keep only packets with the correct seq_number,
# drop other packets
break
except:
# Transmission timed-out
# TODO: Catch only timeout exceptions
self.timeout_interval *= 2
ack_seq = 0
break
retries += 1
if retries == 1:
# Packet was successfully sent and acknowledged on first try
self.estimated_RTT *= 0.875
self.estimated_RTT += (0.125 * sample_RTT)
self.dev_RTT *= 0.75
self.dev_RTT += (0.25 * abs(sample_RTT - self.estimated_RTT))
self.timeout_interval = self.estimated_RTT + 4 * self.dev_RTT
self.sock.settimeout(None) # Place socket in blocking mode
self.increment_seq_number()
return True
def increment_seq_number(self):
if Rdt.seq_number == self.MAX_SEQ_NUM:
Rdt.seq_number = 1
else:
Rdt.seq_number += 1
def make_packets(self, comm_id, data):
"""
Creates packets containing the payload from the upper layer for
transmissiond to the receiver. Data is split into MTU-byte chunks (each
of which will be the payload for an individual packet), creates a
header for each chunk, and makes a packet by prepending the header to
the payload.
Arguments:
comm_id -- The communication ID associated with this message. Used to
keep track of separate messages from the same host.
data -- The (unencoded) payload to be sent in this packet.
Returns:
Yields a tuple containing the current header and complete packet as
unencoded strings.
"""
data_length = len(data)
# Number of packets to be sent
num_packets = self.MTU * (ceil(data_length / self.MTU) - 1)
for i in range(0, data_length, self.MTU):
flags = ""
if i == 0:
# First packet
flags += "SYN"
if i == num_packets:
# Last packet
flags += "FIN"
payload = data[i:i + self.MTU]
header = [self.hostname, str(comm_id), str(Rdt.seq_number), flags]
packet = "{0} {1} {2} {3} {4}".format(header[0], header[1], header[2], header[3], payload)
yield (header, packet)
def start_server(self, port):
"""
Called by upper-layer server to initialize the server.
Arguments:
port -- The port number to listen on.
"""
self.listen_sock.bind(('', port))
thread = threading.Thread(target=self.listen_thread, args=())
thread.start()
#_thread.start_new(self.listen_thread, ())
def listen_thread(self):
"""
Listens to socket. Creates a new thread to handle received data.
"""
while True:
data, address = self.listen_sock.recvfrom(1024)
if not data:
break
thread = threading.Thread(target=self.process_pkt, args=(data, address))
thread.start()
def process_pkt(self, data, address):
"""
Decodes and processes the packet.
Arguments:
data -- Complete packet fresh from the socket as a bytestring.
address -- The address tuple of the incoming connection.
"""
header, payload = self.extract(data.decode())
self.send_ack(header, address)
if "SYN" in header[3]:
self.fragments[(header[0], header[1])] = {}
if (header[0], header[1]) in self.fragments:
self.fragments[(header[0], header[1])][header[2]] = payload
if "FIN" in header[3]:
# If we get a FIN packet without a SYN packet, ignore it.
# This should mean a duplicate FIN that we've already processed.
self.reassemble_message((header[0], header[1]))
def extract(self, data):
"""
Extracts the upper-layer message payload from the packet.
This implementation creates a list of strings split by the space character.
The header fields: pkt_host, pkt_commid, pkt_seq, and pkt_flags are
extracted from the list. Then the data payload portion is reassembled by
joining the remaining items.
Arguments:
data -- An RDT packet.
Returns:
A tuple containing the RDT header(as a list) and the upper-layer
message payload string.
"""
split_pkt = [field for field in data.split(" ")]
# pkt_host, pkt_commid, pkt_seq, pkt_flags = split_pkt[:4]
pkt_header = split_pkt[:4]
pkt_payload = " ".join(split_pkt[4:])
return (pkt_header, pkt_payload)
def send_ack(self, header, address):
ack_header = "{} {} {} {}ACK".format(self.hostname, header[1], header[2], header[3])
self.sock.sendto(ack_header.encode(), address)
def reassemble_message(self, msgkey):
if msgkey in self.closed_communications:
del self.fragments[msgkey]
else:
message = ''.join([self.fragments[msgkey][i] for i in sorted(self.fragments[msgkey], key=int)])
del self.fragments[msgkey]
self.closed_communications.append(msgkey)
self.message_queue.put(message)
def close(self):
"""
Explicitly releases the socket. Called by upper-layer application.
"""
self.sock.close()
self.listen_sock.close()
def now(self):
"""
Returns the current time. Used for logging and debugging purposes.
"""
return time.ctime(time.time())
def process_response(self, out_header, response):
"""
Returns the sequence number of the response packet or 0 if the
response packet is not an ACK.
"""
in_header = self.extract(response.decode())[0]
if "ACK" in in_header[3]:
return int(in_header[2])
return 0
def receive_data(self):
"""
Called by upper layer to receive messages from the queue.
"""
return self.message_queue.get(True, 5)
#return self.message_queue.get_nowait()