/
framework.py
299 lines (265 loc) · 12.5 KB
/
framework.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
#!/usr/bin/python
from fxpeer import *
PEERNAME = "NAME" # request a peer's canonical id
LISTPEERS = "LIST"
INSERTPEER = "JOIN"
QUERY = "QUER"
QRESPONSE = "RESP"
FILEGET = "FGET"
PEERQUIT = "QUIT"
REPLY = "REPL"
ERROR = "ERRO"
# Assumption in this program:
# peer id's in this application are just "host:port" strings
# ==============================================================================
class FilerPeer(FxPeer):
# ==============================================================================
""" Implements a file-sharing peer-to-peer entity based on the generic
P2P framework.
"""
# --------------------------------------------------------------------------
def __init__(self, maxpeers, serverport):
# --------------------------------------------------------------------------
""" Initializes the peer to support connections up to maxpeers number
of peers, with its server listening on the specified port. Also sets
the dictionary of local files to empty and adds handlers to the
FxPeer framework.
"""
FxPeer.__init__(self, maxpeers, serverport)
self.files = {} # available files: name --> peerid mapping
self.addrouter(self.__router)
handlers = {LISTPEERS: self.__handle_listpeers,
INSERTPEER: self.__handle_insertpeer,
PEERNAME: self.__handle_peername,
QUERY: self.__handle_query,
QRESPONSE: self.__handle_qresponse,
FILEGET: self.__handle_fileget,
PEERQUIT: self.__handle_quit
}
for mt in handlers:
self.addhandler(mt, handlers[mt])
# end FilerPeer constructor
# --------------------------------------------------------------------------
def __debug(self, msg):
# --------------------------------------------------------------------------
if self.debug:
fxdebug(msg)
# --------------------------------------------------------------------------
def __router(self, peerid):
# --------------------------------------------------------------------------
if peerid not in self.getpeerids():
return (None, None, None)
else:
rt = [peerid]
rt.extend(self.peers[peerid])
return rt
# --------------------------------------------------------------------------
def __handle_insertpeer(self, peerconn, data):
# --------------------------------------------------------------------------
""" Handles the INSERTPEER (join) message type. The message data
should be a string of the form, "peerid host port", where peer-id
is the canonical name of the peer that desires to be added to this
peer's list of peers, host and port are the necessary data to connect
to the peer.
"""
self.peerlock.acquire()
try:
try:
peerid, host, port = data.split()
if self.maxpeersreached():
self.__debug('maxpeers %d reached: connection terminating'
% self.maxpeers)
peerconn.senddata(ERROR, 'Join: too many peers')
return
# peerid = '%s:%s' % (host,port)
if peerid not in self.getpeerids() and peerid != self.myid:
self.addpeer(peerid, host, port)
self.__debug('added peer: %s' % peerid)
peerconn.senddata(REPLY, 'Join: peer added: %s' % peerid)
else:
peerconn.senddata(ERROR, 'Join: peer already inserted %s'
% peerid)
except:
self.__debug('invalid insert %s: %s' % (str(peerconn), data))
peerconn.senddata(ERROR, 'Join: incorrect arguments')
finally:
self.peerlock.release()
# end handle_insertpeer method
# --------------------------------------------------------------------------
def __handle_listpeers(self, peerconn, data):
# --------------------------------------------------------------------------
""" Handles the LISTPEERS message type. Message data is not used. """
self.peerlock.acquire()
try:
self.__debug('Listing peers %d' % self.numberofpeers())
peerconn.senddata(REPLY, '%d' % self.numberofpeers())
for pid in self.getpeerids():
host, port = self.getpeer(pid)
peerconn.senddata(REPLY, '%s %s %d' % (pid, host, port))
finally:
self.peerlock.release()
# --------------------------------------------------------------------------
def __handle_peername(self, peerconn, data):
# --------------------------------------------------------------------------
""" Handles the NAME message type. Message data is not used. """
peerconn.senddata(REPLY, self.myid)
# QUERY arguments: "return-peerid key ttl"
# --------------------------------------------------------------------------
def __handle_query(self, peerconn, data):
# --------------------------------------------------------------------------
""" Handles the QUERY message type. The message data should be in the
format of a string, "return-peer-id key ttl", where return-peer-id
is the name of the peer that initiated the query, key is the (portion
of the) file name being searched for, and ttl is how many further
levels of peers this query should be propagated on.
"""
# self.peerlock.acquire()
try:
peerid, key, ttl = data.split()
peerconn.senddata(REPLY, 'Query ACK: %s' % key)
except:
self.__debug('invalid query %s: %s' % (str(peerconn), data))
peerconn.senddata(ERROR, 'Query: incorrect arguments')
# self.peerlock.release()
t = threading.Thread(target=self.__processquery,
args=[peerid, key, int(ttl)])
t.start()
#
# --------------------------------------------------------------------------
def __processquery(self, peerid, key, ttl):
# --------------------------------------------------------------------------
""" Handles the processing of a query message after it has been
received and acknowledged, by either replying with a QRESPONSE message
if the file is found in the local list of files, or propagating the
message onto all immediate neighbors.
"""
for fname in self.files.keys():
if key in fname:
fpeerid = self.files[fname]
if not fpeerid: # local files mapped to None
fpeerid = self.myid
host, port = peerid.split(':')
# can't use sendtopeer here because peerid is not necessarily
# an immediate neighbor
self.connectandsend(host, int(port), QRESPONSE,
'%s %s' % (fname, fpeerid),
pid=peerid)
return
# will only reach here if key not found... in which case
# propagate query to neighbors
if ttl > 0:
msgdata = '%s %s %d' % (peerid, key, ttl - 1)
for nextpid in self.getpeerids():
self.sendtopeer(nextpid, QUERY, msgdata)
# --------------------------------------------------------------------------
def __handle_qresponse(self, peerconn, data):
# --------------------------------------------------------------------------
""" Handles the QRESPONSE message type. The message data should be
in the format of a string, "file-name peer-id", where file-name is
the file that was queried about and peer-id is the name of the peer
that has a copy of the file.
"""
try:
fname, fpeerid = data.split()
if fname in self.files:
self.__debug('Can\'t add duplicate file %s %s' %
(fname, fpeerid))
else:
self.files[fname] = fpeerid
except:
# if self.debug:
traceback.print_exc()
# --------------------------------------------------------------------------
def __handle_fileget(self, peerconn, data):
# --------------------------------------------------------------------------
""" Handles the FILEGET message type. The message data should be in
the format of a string, "file-name", where file-name is the name
of the file to be fetched.
"""
fname = data
if fname not in self.files:
self.__debug('File not found %s' % fname)
peerconn.senddata(ERROR, 'File not found')
return
try:
fd = file(fname, 'r')
filedata = ''
while True:
data = fd.read(2048)
if not len(data):
break;
filedata += data
fd.close()
except:
self.__debug('Error reading file %s' % fname)
peerconn.senddata(ERROR, 'Error reading file')
return
peerconn.senddata(REPLY, filedata)
# --------------------------------------------------------------------------
def __handle_quit(self, peerconn, data):
# --------------------------------------------------------------------------
""" Handles the QUIT message type. The message data should be in the
format of a string, "peer-id", where peer-id is the canonical
name of the peer that wishes to be unregistered from this
peer's directory.
"""
self.peerlock.acquire()
try:
peerid = data.lstrip().rstrip()
if peerid in self.getpeerids():
msg = 'Quit: peer removed: %s' % peerid
self.__debug(msg)
peerconn.senddata(REPLY, msg)
self.removepeer(peerid)
else:
msg = 'Quit: peer not found: %s' % peerid
self.__debug(msg)
peerconn.senddata(ERROR, msg)
finally:
self.peerlock.release()
# precondition: may be a good idea to hold the lock before going
# into this function
# --------------------------------------------------------------------------
def buildpeers(self, host, port, hops=1):
# --------------------------------------------------------------------------
""" buildpeers(host, port, hops)
Attempt to build the local peer list up to the limit stored by
self.maxpeers, using a simple depth-first search given an
initial host and port as starting point. The depth of the
search is limited by the hops parameter.
"""
if self.maxpeersreached() or not hops:
return
peerid = None
self.__debug("Building peers from (%s,%s)" % (host, port))
try:
_, peerid = self.connectandsend(host, port, PEERNAME, '')[0]
self.__debug("contacted " + peerid)
resp = self.connectandsend(host, port, INSERTPEER,
'%s %s %d' % (self.myid,
self.serverhost,
self.serverport))[0]
self.__debug(str(resp))
if (resp[0] != REPLY) or (peerid in self.getpeerids()):
return
self.addpeer(peerid, host, port)
# do recursive depth first search to add more peers
resp = self.connectandsend(host, port, LISTPEERS, '',
pid=peerid)
if len(resp) > 1:
resp.reverse()
resp.pop() # get rid of header count reply
while len(resp):
nextpid, host, port = resp.pop()[1].split()
if nextpid != self.myid:
self.buildpeers(host, port, hops - 1)
except:
if self.debug:
traceback.print_exc()
self.removepeer(peerid)
# --------------------------------------------------------------------------
def addlocalfile(self, filename):
# --------------------------------------------------------------------------
""" Registers a locally-stored file with the peer. """
self.files[filename] = None
self.__debug("Added local file %s" % filename)