-
Notifications
You must be signed in to change notification settings - Fork 0
/
PicoRendezvous.py
83 lines (73 loc) · 2.44 KB
/
PicoRendezvous.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#!/usr/bin/env python
"""Minimalist Rendezvous Client for Python"""
__version__ = "0.2"
__author__ = "Rui Carmo (http://the.taoofmac.com)"
__copyright__ = "(C) 2004 Rui Carmo. Code under BSD License."
import string, os, socket, struct
from SocketServer import *
_MDNS_ADDR = '224.0.0.251'
_MDNS_PORT = 5353
class PicoRendezvous(UDPServer):
allow_reuse_address = True
replies = []
def __init__(self):
UDPServer.__init__(self, ("localhost", _MDNS_PORT), _ReplyHandler)
# end def
def query(self, proto):
self.data = struct.pack( "!HHHHHH", 0, 0, 1, 0, 0, 0 )
# pack query
parts = proto.split('.')
if parts[-1] == '':
parts = parts[:-1]
for part in parts:
utf = part.encode('utf-8')
l = len(utf)
self.data += struct.pack("B", l)
self.data += struct.pack('!' + str(l) + 's', utf)
self.data += struct.pack("B", 0)
# query for any PTR records
self.data += struct.pack( "!BBH", 0, 12, 1 )
try:
self.socket.sendto(self.data,0,(_MDNS_ADDR,_MDNS_PORT))
except:
pass
tenths = 0
while tenths < 20:
self.handle_request()
tenths = tenths + 1
return self.replies
# end def
def server_bind(self):
self.group = ('', _MDNS_PORT)
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except:
pass
self.socket.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_TTL, 255)
self.socket.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_LOOP, 1)
try:
self.socket.bind(self.group)
except:
pass
hostname = socket.gethostname() + '.local'
hostip = socket.gethostbyname(hostname)
host = socket.inet_aton(hostip)
self.socket.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_IF, host + socket.inet_aton('0.0.0.0'))
self.socket.setsockopt(socket.SOL_IP, socket.IP_ADD_MEMBERSHIP, socket.inet_aton(_MDNS_ADDR) + socket.inet_aton('0.0.0.0'))
self.socket.settimeout(0.1)
# end def
# end class
class _ReplyHandler(DatagramRequestHandler):
def handle(self):
ip = self.client_address[0]
if ip not in self.server.replies:
self.server.replies.append(ip)
# end def
# end class
if __name__ == '__main__':
print "Starting Unit Test"
print " - please make sure Growl is listening for network notifications"
p = PicoRendezvous()
print p.query('_growl._tcp')