Skip to content


Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
branch: master
Fetching contributors…

Cannot retrieve contributors at this time

363 lines (320 sloc) 12.797 kb
"""feedfinder: Find the Web feed for a Web page
feed(uri) - returns feed found for a URI
feeds(uri) - returns all feeds found for a URI
>>> import feedfinder
>>> feedfinder.feed('')
>>> feedfinder.feeds('')
Can also use from the command line. Feeds are returned one per line:
$ python
How it works:
0. At every step, feeds are minimally verified to make sure they are really feeds.
1. If the URI points to a feed, it is simply returned; otherwise
the page is downloaded and the real fun begins.
2. Feeds pointed to by LINK tags in the header of the page (autodiscovery)
3. <A> links to feeds on the same server ending in ".rss", ".rdf", ".xml", or
4. <A> links to feeds on the same server containing "rss", "rdf", "xml", or "atom"
5. <A> links to feeds on external servers ending in ".rss", ".rdf", ".xml", or
6. <A> links to feeds on external servers containing "rss", "rdf", "xml", or "atom"
7. Try some guesses about common places for feeds (index.xml, atom.xml, etc.).
8. As a last ditch effort, we search Syndic8 for feeds matching the URI
__version__ = "1.36"
__date__ = "2006-04-24"
__maintainer__ = "Aaron Swartz ("
__author__ = "Mark Pilgrim ("
__copyright__ = "Copyright 2002-4, Mark Pilgrim; 2006 Aaron Swartz"
__license__ = "Python"
__credits__ = """Abe Fettig for a patch to sort Syndic8 feeds by popularity
Also Jason Diamond, Brian Lalor for bug reporting and patches"""
_debug = 0
import sgmllib, urllib, urlparse, re, sys, robotparser
import threading
class TimeoutError(Exception): pass
def timelimit(timeout):
def internal(function):
def internal2(*args, **kw):
class Calculator(threading.Thread):
def __init__(self):
self.result = None
self.error = None
def run(self):
self.result = function(*args, **kw)
self.error = sys.exc_info()
c = Calculator()
c.setDaemon(True) # don't hold up exiting
if c.isAlive():
raise TimeoutError
if c.error:
raise c.error[0], c.error[1]
return c.result
return internal2
return internal
# XML-RPC support allows feedfinder to query Syndic8 for possible matches.
# Python 2.3 now comes with this module by default, otherwise you can download it
import xmlrpclib #
except ImportError:
xmlrpclib = None
if not dict:
def dict(aList):
rc = {}
for k, v in aList:
rc[k] = v
return rc
def _debuglog(message):
if _debug: print message
class URLGatekeeper:
"""a class to track robots.txt rules across multiple servers"""
def __init__(self):
self.rpcache = {} # a dictionary of RobotFileParser objects, by domain
self.urlopener = urllib.FancyURLopener()
self.urlopener.version = "feedfinder/" + __version__ + " " + self.urlopener.version + " +"
self.urlopener.addheaders = [('User-agent', self.urlopener.version)]
robotparser.URLopener.version = self.urlopener.version
robotparser.URLopener.addheaders = self.urlopener.addheaders
def _getrp(self, url):
protocol, domain = urlparse.urlparse(url)[:2]
if self.rpcache.has_key(domain):
return self.rpcache[domain]
baseurl = '%s://%s' % (protocol, domain)
robotsurl = urlparse.urljoin(baseurl, 'robots.txt')
_debuglog('fetching %s' % robotsurl)
rp = robotparser.RobotFileParser(robotsurl)
self.rpcache[domain] = rp
return rp
def can_fetch(self, url):
rp = self._getrp(url)
allow = rp.can_fetch(self.urlopener.version, url)
_debuglog("gatekeeper of %s says %s" % (url, allow))
return allow
def get(self, url):
if not self.can_fetch(url): return ''
return ''
_gatekeeper = URLGatekeeper()
class BaseParser(sgmllib.SGMLParser):
def __init__(self, baseuri):
self.links = []
self.baseuri = baseuri
def normalize_attrs(self, attrs):
def cleanattr(v):
v = sgmllib.charref.sub(lambda m: unichr(int(m.groups()[0])), v)
v = v.strip()
v = v.replace('&lt;', '<').replace('&gt;', '>').replace('&apos;', "'").replace('&quot;', '"').replace('&amp;', '&')
return v
attrs = [(k.lower(), cleanattr(v)) for k, v in attrs]
attrs = [(k, k in ('rel','type') and v.lower() or v) for k, v in attrs]
return attrs
def do_base(self, attrs):
attrsD = dict(self.normalize_attrs(attrs))
if not attrsD.has_key('href'): return
self.baseuri = attrsD['href']
def error(self, *a, **kw): pass # we're not picky
class LinkParser(BaseParser):
FEED_TYPES = ('application/rss+xml',
def do_link(self, attrs):
attrsD = dict(self.normalize_attrs(attrs))
if not attrsD.has_key('rel'): return
rels = attrsD['rel'].split()
if 'alternate' not in rels: return
if attrsD.get('type') not in self.FEED_TYPES: return
if not attrsD.has_key('href'): return
self.links.append(urlparse.urljoin(self.baseuri, attrsD['href']))
class ALinkParser(BaseParser):
def start_a(self, attrs):
attrsD = dict(self.normalize_attrs(attrs))
if not attrsD.has_key('href'): return
self.links.append(urlparse.urljoin(self.baseuri, attrsD['href']))
def makeFullURI(uri):
if uri.startswith('feed://'):
uri = 'http://' + uri.split('feed://', 1).pop()
for x in ['http', 'https']:
if uri.startswith('%s://' % x):
return uri
return 'http://%s' % uri
def getLinks(data, baseuri):
p = LinkParser(baseuri)
return p.links
def getALinks(data, baseuri):
p = ALinkParser(baseuri)
return p.links
def getLocalLinks(links, baseuri):
baseuri = baseuri.lower()
urilen = len(baseuri)
return [l for l in links if l.lower().startswith(baseuri)]
def isFeedLink(link):
if link.startswith(''): return True
if link.endswith('/feeds/posts/default'): return True
return link[-4:].lower() in ('.rss', '.rdf', '.xml', '.atom', 'atom/',
'/atom', '/feed')
def isXMLRelatedLink(link):
link = link.lower()
return link.count('rss') + link.count('rdf') + link.count('xml') + link.count('atom') + link.count('feed')
def couldBeFeedData(data):
data = data.lower()
except TimeoutError: # server down, give up
return []
if data.count('<html'): return 0
return data.count('<rss') + data.count('<rdf') + data.count('<feed')
def isFeed(uri):
_debuglog('seeing if %s is a feed' % uri)
protocol = urlparse.urlparse(uri)
if protocol[0] not in ('http', 'https'): return 0
data = _gatekeeper.get(uri)
return couldBeFeedData(data)
def sortFeeds(feed1Info, feed2Info):
return cmp(feed2Info['headlines_rank'], feed1Info['headlines_rank'])
def getFeedsFromSyndic8(uri):
feeds = []
server = xmlrpclib.Server('')
feedids = server.syndic8.FindFeeds(uri)
infolist = server.syndic8.GetFeedInfo(feedids, ['headlines_rank','status','dataurl'])
feeds = [f['dataurl'] for f in infolist if f['status']=='Syndicated']
_debuglog('found %s feeds through Syndic8' % len(feeds))
return feeds
def feeds(uri, all=False, querySyndic8=False):
fulluri = makeFullURI(uri)
data = _gatekeeper.get(fulluri)
return []
# is this already a feed?
if couldBeFeedData(data):
return [fulluri]
# nope, it's a page, try LINK tags first
_debuglog('looking for LINK tags')
feeds = getLinks(data, fulluri)
feeds = []
_debuglog('found %s feeds through LINK tags' % len(feeds))
feeds = filter(isFeed, feeds)
if all or not feeds:
# no LINK tags, look for regular <A> links that point to feeds
_debuglog('no LINK tags, looking at A tags')
links = getALinks(data, fulluri)
links = []
locallinks = getLocalLinks(links, fulluri)
# look for obvious feed links on the same server
feeds.extend(filter(isFeed, filter(isFeedLink, locallinks)))
if all or not feeds:
# look harder for feed links on the same server
feeds.extend(filter(isFeed, filter(isXMLRelatedLink, locallinks)))
if all or not feeds:
# look for obvious feed links on another server
feeds.extend(filter(isFeed, filter(isFeedLink, links)))
if all or not feeds:
# look harder for feed links on another server
feeds.extend(filter(isFeed, filter(isXMLRelatedLink, links)))
if all or not feeds:
_debuglog('no A tags, guessing')
suffixes = [ # filenames used by popular software:
'atom.xml', # blogger, TypePad
'index.atom', # MT, apparently
'index.rdf', # MT
'rss.xml', # Dave Winer/Manila
'index.xml', # MT
'index.rss' # Slash
feeds.extend(filter(isFeed, [urlparse.urljoin(fulluri, x) for x in suffixes]))
if (all or not feeds) and querySyndic8:
# still no luck, search Syndic8 for feeds (requires xmlrpclib)
_debuglog('still no luck, searching Syndic8')
if hasattr(__builtins__, 'set') or __builtins__.has_key('set'):
feeds = list(set(feeds))
return feeds
getFeeds = feeds # backwards-compatibility
def feed(uri):
#todo: give preference to certain feed formats
feedlist = feeds(uri)
if feedlist:
return feedlist[0]
return None
##### test harness ######
def test():
uri = ''
failed = []
count = 0
while 1:
data = _gatekeeper.get(uri)
if data.find('Atom autodiscovery test') == -1: break
count += 1
links = getLinks(data, uri)
if not links:
print '\n*** FAILED ***', uri, 'could not find link'
elif len(links) > 1:
print '\n*** FAILED ***', uri, 'found too many links'
atomdata = urllib.urlopen(links[0]).read()
if atomdata.find('<link rel="alternate"') == -1:
print '\n*** FAILED ***', uri, 'retrieved something that is not a feed'
backlink = atomdata.split('href="').pop().split('"')[0]
if backlink != uri:
print '\n*** FAILED ***', uri, 'retrieved wrong feed'
if data.find('<link rel="next" href="') == -1: break
uri = urlparse.urljoin(uri, data.split('<link rel="next" href="').pop().split('"')[0])
print count, 'tests executed,', len(failed), 'failed'
if __name__ == '__main__':
args = sys.argv[1:]
if args and args[0] == '--debug':
_debug = 1
if args:
uri = args[0]
uri = ''
if uri == 'test':
print "\n".join(getFeeds(uri))
Jump to Line
Something went wrong with that request. Please try again.