Find file
Fetching contributors…
Cannot retrieve contributors at this time
309 lines (253 sloc) 11.6 KB
# Licensed under the MIT license
# Copyright 2005, Tim Potter <>
# Copyright 2006 John-Mark Gurney <>
# Copyright (C) 2006 Fluendo, S.A. (
# Copyright 2006,2007 Frank Scholz <>
# Implementation of a SSDP server under Twisted Python.
import random
import string
import sys
import time
import socket
from twisted.internet.protocol import DatagramProtocol
from twisted.internet import reactor, error
from twisted.internet import task
from coherence import log, SERVER_ID
import coherence.extern.louie as louie
SSDP_PORT = 1900
class SSDPServer(DatagramProtocol, log.Loggable):
"""A class implementing a SSDP server. The notifyReceived and
searchReceived methods are called when the appropriate type of
datagram is received by the server."""
logCategory = 'ssdp'
known = {}
_callbacks = {}
def __init__(self,test=False):
# Create SSDP server
self.test = test
if self.test == False:
self.port = reactor.listenMulticast(SSDP_PORT, self, listenMultiple=True)
self.resend_notify_loop = task.LoopingCall(self.resendNotify)
self.resend_notify_loop.start(777.0, now=False)
self.check_valid_loop = task.LoopingCall(self.check_valid)
self.check_valid_loop.start(333.0, now=False)
except error.CannotListenError, err:
self.warning("There seems to be already a SSDP server running on this host, no need starting a second one.")
self.active_calls = []
def shutdown(self):
for call in reactor.getDelayedCalls():
if call.func == self.send_it:
if self.test == False:
if self.resend_notify_loop.running:
if self.check_valid_loop.running:
'''Make sure we send out the byebye notifications.'''
for st in self.known:
if self.known[st]['MANIFESTATION'] == 'local':
def datagramReceived(self, data, (host, port)):
"""Handle a received multicast datagram."""
header, payload = data.split('\r\n\r\n')[:2]
except ValueError, err:
print err
print 'Arggg,', data
import pdb; pdb.set_trace()
lines = header.split('\r\n')
cmd = string.split(lines[0], ' ')
lines = map(lambda x: x.replace(': ', ':', 1), lines[1:])
lines = filter(lambda x: len(x) > 0, lines)
headers = [string.split(x, ':', 1) for x in lines]
headers = dict(map(lambda x: (x[0].lower(), x[1]), headers))
self.msg('SSDP command %s %s - from %s:%d' % (cmd[0], cmd[1], host, port))
self.debug('with headers:', headers)
if cmd[0] == 'M-SEARCH' and cmd[1] == '*':
# SSDP discovery
self.discoveryRequest(headers, (host, port))
elif cmd[0] == 'NOTIFY' and cmd[1] == '*':
# SSDP presence
self.notifyReceived(headers, (host, port))
self.warning('Unknown SSDP command %s %s' % (cmd[0], cmd[1]))
# make raw data available
# send out the signal after we had a chance to register the device
louie.send('UPnP.SSDP.datagram_received', None, data, host, port)
def register(self, manifestation, usn, st, location,
"""Register a service or device that this SSDP server will
respond to."""'Registering %s (%s)' % (st, location))
self.known[usn] = {}
self.known[usn]['USN'] = usn
self.known[usn]['LOCATION'] = location
self.known[usn]['ST'] = st
self.known[usn]['EXT'] = ''
self.known[usn]['SERVER'] = server
self.known[usn]['CACHE-CONTROL'] = cache_control
self.known[usn]['MANIFESTATION'] = manifestation
self.known[usn]['SILENT'] = silent
self.known[usn]['HOST'] = host
self.known[usn]['last-seen'] = time.time()
if manifestation == 'local':
if st == 'upnp:rootdevice':
louie.send('Coherence.UPnP.SSDP.new_device', None, device_type=st, infos=self.known[usn])
#self.callback("new_device", st, self.known[usn])
def unRegister(self, usn):
self.msg("Un-registering %s" % usn)
st = self.known[usn]['ST']
if st == 'upnp:rootdevice':
louie.send('Coherence.UPnP.SSDP.removed_device', None, device_type=st, infos=self.known[usn])
#self.callback("removed_device", st, self.known[usn])
del self.known[usn]
def isKnown(self, usn):
return self.known.has_key(usn)
def notifyReceived(self, headers, (host, port)):
"""Process a presence announcement. We just remember the
details of the SSDP service announced."""'Notification from (%s,%d) for %s' % (host, port, headers['nt']))
self.debug('Notification headers:', headers)
if headers['nts'] == 'ssdp:alive':
self.known[headers['usn']]['last-seen'] = time.time()
self.debug('updating last-seen for %r' % headers['usn'])
except KeyError:
self.register('remote', headers['usn'], headers['nt'], headers['location'],
headers['server'], headers['cache-control'], host=host)
elif headers['nts'] == 'ssdp:byebye':
if self.isKnown(headers['usn']):
self.warning('Unknown subtype %s for notification type %s' %
(headers['nts'], headers['nt']))
louie.send('Coherence.UPnP.Log', None, 'SSDP', host, 'Notify %s for %s' % (headers['nts'], headers['usn']))
def send_it(self,response,destination,delay,usn):'send discovery response delayed by %ds for %s to %r' % (delay,usn,destination))
except (AttributeError,socket.error), msg:"failure sending out byebye notification: %r" % msg)
def discoveryRequest(self, headers, (host, port)):
"""Process a discovery request. The response must be sent to
the address specified by (host, port)."""'Discovery request from (%s,%d) for %s' % (host, port, headers['st']))'Discovery request for %s' % headers['st'])
louie.send('Coherence.UPnP.Log', None, 'SSDP', host, 'M-Search for %s' % headers['st'])
# Do we know about this service?
for i in self.known.values():
if i['MANIFESTATION'] == 'remote':
if(headers['st'] == 'ssdp:all' and
i['SILENT'] == True):
if( i['ST'] == headers['st'] or
headers['st'] == 'ssdp:all'):
response = []
response.append('HTTP/1.1 200 OK')
for k, v in i.items():
if k == 'USN':
usn = v
if k not in ('MANIFESTATION','SILENT','HOST'):
response.append('%s: %s' % (k, v))
response.append('DATE: %s' % time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()))
response.extend(('', ''))
delay = random.randint(0, int(headers['mx']))
reactor.callLater(delay, self.send_it,
'\r\n'.join(response), (host, port), delay, usn)
def doNotify(self, usn):
"""Do notification"""
if self.known[usn]['SILENT'] == True:
return'Sending alive notification for %s' % usn)
resp = [ 'NOTIFY * HTTP/1.1',
'NTS: ssdp:alive',
stcpy = dict(self.known[usn].iteritems())
stcpy['NT'] = stcpy['ST']
del stcpy['ST']
del stcpy['MANIFESTATION']
del stcpy['SILENT']
del stcpy['HOST']
del stcpy['last-seen']
resp.extend(map(lambda x: ': '.join(x), stcpy.iteritems()))
resp.extend(('', ''))
self.debug('doNotify content', resp)
self.transport.write('\r\n'.join(resp), (SSDP_ADDR, SSDP_PORT))
self.transport.write('\r\n'.join(resp), (SSDP_ADDR, SSDP_PORT))
except (AttributeError,socket.error), msg:"failure sending out alive notification: %r" % msg)
def doByebye(self, usn):
"""Do byebye"""'Sending byebye notification for %s' % usn)
resp = [ 'NOTIFY * HTTP/1.1',
'NTS: ssdp:byebye',
stcpy = dict(self.known[usn].iteritems())
stcpy['NT'] = stcpy['ST']
del stcpy['ST']
del stcpy['MANIFESTATION']
del stcpy['SILENT']
del stcpy['HOST']
del stcpy['last-seen']
resp.extend(map(lambda x: ': '.join(x), stcpy.iteritems()))
resp.extend(('', ''))
self.debug('doByebye content', resp)
if self.transport:
self.transport.write('\r\n'.join(resp), (SSDP_ADDR, SSDP_PORT))
except (AttributeError,socket.error), msg:"failure sending out byebye notification: %r" % msg)
except KeyError, msg:
self.debug("error building byebye notification: %r" % msg)
def resendNotify( self):
for usn in self.known:
if self.known[usn]['MANIFESTATION'] == 'local':
def check_valid(self):
""" check if the discovered devices are still ok, or
if we haven't received a new discovery response
self.debug("Checking devices/services are still valid")
removable = []
for usn in self.known:
if self.known[usn]['MANIFESTATION'] != 'local':
_,expiry = self.known[usn]['CACHE-CONTROL'].split('=')
expiry = int(expiry)
now = time.time()
last_seen = self.known[usn]['last-seen']
self.debug("Checking if %r is still valid - last seen %d (+%d), now %d" % (self.known[usn]['USN'],last_seen,expiry,now))
if last_seen + expiry + 30 < now:
self.debug("Expiring: %r" % self.known[usn])
if self.known[usn]['ST'] == 'upnp:rootdevice':
louie.send('Coherence.UPnP.SSDP.removed_device', None, device_type=self.known[usn]['ST'], infos=self.known[usn])
while len(removable) > 0:
usn = removable.pop(0)
del self.known[usn]
def subscribe(self, name, callback):
def unsubscribe(self, name, callback):
callbacks = self._callbacks.get(name,[])
if callback in callbacks:
self._callbacks[name] = callbacks
def callback(self, name, *args):
for callback in self._callbacks.get(name,[]):