Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
branch: master
Fetching contributors…

Cannot retrieve contributors at this time

531 lines (470 sloc) 22.786 kb
#
# django-atompub by James Tauber <http://jtauber.com/>
# http://code.google.com/p/django-atompub/
# An implementation of the Atom format and protocol for Django
#
# For instructions on how to use this module to generate Atom feeds,
# see http://code.google.com/p/django-atompub/wiki/UserGuide
#
#
# Copyright (c) 2007, James Tauber
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
from xml.sax.saxutils import XMLGenerator
from datetime import datetime
import urlparse
GENERATOR_TEXT = 'django-atompub'
GENERATOR_ATTR = {
'uri': 'http://code.google.com/p/django-atompub/',
'version': 'r33'
}
# based on django.utils.xmlutils.SimplerXMLGenerator
class SimplerXMLGenerator(XMLGenerator):
def addQuickElement(self, name, contents=None, attrs=None):
"Convenience method for adding an element with no children"
if attrs is None:
attrs = {}
self.startElement(name, attrs)
if contents is not None:
self.characters(contents)
self.endElement(name)
# based on django.utils.feedgenerator.rfc3339_date
def rfc3339_date(date):
return date.strftime('%Y-%m-%dT%H:%M:%SZ')
# based on django.utils.feedgenerator.get_tag_uri
def get_tag_uri(url, date):
"Creates a TagURI. See http://diveintomark.org/archives/2004/05/28/howto-atom-id"
parts = urlparse.urlparse(url)
date_part = ""
if date is not None:
date_part = ",%s:" % date.strftime("%Y-%m-%d")
return "tag:%s%s%s/%s" % (
parts.hostname,
date_part,
parts.path,
parts.fragment,
)
# based on django.contrib.syndication.feeds.Feed
class Feed(object):
VALIDATE = True
def __init__(self, slug, feed_url):
# @@@ slug and feed_url are not used yet
pass
def __get_dynamic_attr(self, attname, obj, default=None):
try:
attr = getattr(self, attname)
except AttributeError:
return default
if callable(attr):
# Check func_code.co_argcount rather than try/excepting the
# function and catching the TypeError, because something inside
# the function may raise the TypeError. This technique is more
# accurate.
if hasattr(attr, 'func_code'):
argcount = attr.func_code.co_argcount
else:
argcount = attr.__call__.func_code.co_argcount
if argcount == 2: # one argument is 'self'
return attr(obj)
else:
return attr()
return attr
def get_feed(self, extra_params=None):
if extra_params:
try:
obj = self.get_object(extra_params.split('/'))
except (AttributeError, LookupError):
raise LookupError('Feed does not exist')
else:
obj = None
feed = AtomFeed(
atom_id=self.__get_dynamic_attr('feed_id', obj),
title=self.__get_dynamic_attr('feed_title', obj),
updated=self.__get_dynamic_attr('feed_updated', obj),
icon=self.__get_dynamic_attr('feed_icon', obj),
logo=self.__get_dynamic_attr('feed_logo', obj),
rights=self.__get_dynamic_attr('feed_rights', obj),
subtitle=self.__get_dynamic_attr('feed_subtitle', obj),
authors=self.__get_dynamic_attr('feed_authors', obj, default=[]),
categories=self.__get_dynamic_attr('feed_categories', obj, default=[]),
contributors=self.__get_dynamic_attr('feed_contributors', obj, default=[]),
links=self.__get_dynamic_attr('feed_links', obj, default=[]),
extra_attrs=self.__get_dynamic_attr('feed_extra_attrs', obj),
hide_generator=self.__get_dynamic_attr('hide_generator', obj, default=False)
)
items = self.__get_dynamic_attr('items', obj)
if items is None:
raise LookupError('Feed has no items field')
for item in items:
feed.add_item(
atom_id=self.__get_dynamic_attr('item_id', item),
title=self.__get_dynamic_attr('item_title', item),
updated=self.__get_dynamic_attr('item_updated', item),
content=self.__get_dynamic_attr('item_content', item),
published=self.__get_dynamic_attr('item_published', item),
rights=self.__get_dynamic_attr('item_rights', item),
source=self.__get_dynamic_attr('item_source', item),
summary=self.__get_dynamic_attr('item_summary', item),
authors=self.__get_dynamic_attr('item_authors', item, default=[]),
categories=self.__get_dynamic_attr('item_categories', item, default=[]),
contributors=self.__get_dynamic_attr('item_contributors', item, default=[]),
links=self.__get_dynamic_attr('item_links', item, default=[]),
extra_attrs=self.__get_dynamic_attr('item_extra_attrs', None, default={}),
)
if self.VALIDATE:
feed.validate()
return feed
class ValidationError(Exception):
pass
# based on django.utils.feedgenerator.SyndicationFeed and django.utils.feedgenerator.Atom1Feed
class AtomFeed(object):
mime_type = 'application/atom+xml'
ns = u'http://www.w3.org/2005/Atom'
def __init__(self, atom_id, title, updated=None, icon=None, logo=None, rights=None,
subtitle=None, authors=[], categories=[], contributors=[], links=[],
extra_attrs={}, hide_generator=False):
if atom_id is None:
raise LookupError('Feed has no feed_id field')
if title is None:
raise LookupError('Feed has no feed_title field')
# if updated == None, we'll calculate it
self.feed = {
'id': atom_id,
'title': title,
'updated': updated,
'icon': icon,
'logo': logo,
'rights': rights,
'subtitle': subtitle,
'authors': authors,
'categories': categories,
'contributors': contributors,
'links': links,
'extra_attrs': extra_attrs,
'hide_generator': hide_generator,
}
self.items = []
def add_item(self, atom_id, title, updated, content=None, published=None, rights=None,
source=None, summary=None, authors=[], categories=[], contributors=[], links=[],
extra_attrs={}):
if atom_id is None:
raise LookupError('Feed has no item_id method')
if title is None:
raise LookupError('Feed has no item_title method')
if updated is None:
raise LookupError('Feed has no item_updated method')
self.items.append({
'id': atom_id,
'title': title,
'updated': updated,
'content': content,
'published': published,
'rights': rights,
'source': source,
'summary': summary,
'authors': authors,
'categories': categories,
'contributors': contributors,
'links': links,
'extra_attrs': extra_attrs,
})
def latest_updated(self):
"""
Returns the latest item's updated or the current time if there are no items.
"""
updates = [item['updated'] for item in self.items]
if len(updates) > 0:
updates.sort()
return updates[-1]
else:
# @@@ really we should allow a feed to define its "start" for this case
return datetime.now()
def write_text_construct(self, handler, element_name, data):
if isinstance(data, tuple):
text_type, text = data
if text_type == 'xhtml':
handler.startElement(element_name, {'type': text_type})
handler._write(text) # write unescaped -- it had better be well-formed XML
handler.endElement(element_name)
else:
handler.addQuickElement(element_name, text, {'type': text_type})
else:
handler.addQuickElement(element_name, data)
def write_person_construct(self, handler, element_name, person):
handler.startElement(element_name, {})
handler.addQuickElement(u'name', person['name'])
if 'uri' in person:
handler.addQuickElement(u'uri', person['uri'])
if 'email' in person:
handler.addQuickElement(u'email', person['email'])
handler.endElement(element_name)
def write_link_construct(self, handler, link):
if 'length' in link:
link['length'] = str(link['length'])
handler.addQuickElement(u'link', None, link)
def write_category_construct(self, handler, category):
handler.addQuickElement(u'category', None, category)
def write_source(self, handler, data):
handler.startElement(u'source', {})
if data.get('id'):
handler.addQuickElement(u'id', data['id'])
if data.get('title'):
self.write_text_construct(handler, u'title', data['title'])
if data.get('subtitle'):
self.write_text_construct(handler, u'subtitle', data['subtitle'])
if data.get('icon'):
handler.addQuickElement(u'icon', data['icon'])
if data.get('logo'):
handler.addQuickElement(u'logo', data['logo'])
if data.get('updated'):
handler.addQuickElement(u'updated', rfc3339_date(data['updated']))
for category in data.get('categories', []):
self.write_category_construct(handler, category)
for link in data.get('links', []):
self.write_link_construct(handler, link)
for author in data.get('authors', []):
self.write_person_construct(handler, u'author', author)
for contributor in data.get('contributors', []):
self.write_person_construct(handler, u'contributor', contributor)
if data.get('rights'):
self.write_text_construct(handler, u'rights', data['rights'])
handler.endElement(u'source')
def write_content(self, handler, data):
if isinstance(data, tuple):
content_dict, text = data
if content_dict.get('type') == 'xhtml':
handler.startElement(u'content', content_dict)
handler._write(text) # write unescaped -- it had better be well-formed XML
handler.endElement(u'content')
else:
handler.addQuickElement(u'content', text, content_dict)
else:
handler.addQuickElement(u'content', data)
def write(self, outfile, encoding):
handler = SimplerXMLGenerator(outfile, encoding)
handler.startDocument()
feed_attrs = {u'xmlns': self.ns}
if self.feed.get('extra_attrs'):
feed_attrs.update(self.feed['extra_attrs'])
handler.startElement(u'feed', feed_attrs)
handler.addQuickElement(u'id', self.feed['id'])
self.write_text_construct(handler, u'title', self.feed['title'])
if self.feed.get('subtitle'):
self.write_text_construct(handler, u'subtitle', self.feed['subtitle'])
if self.feed.get('icon'):
handler.addQuickElement(u'icon', self.feed['icon'])
if self.feed.get('logo'):
handler.addQuickElement(u'logo', self.feed['logo'])
if self.feed['updated']:
handler.addQuickElement(u'updated', rfc3339_date(self.feed['updated']))
else:
handler.addQuickElement(u'updated', rfc3339_date(self.latest_updated()))
for category in self.feed['categories']:
self.write_category_construct(handler, category)
for link in self.feed['links']:
self.write_link_construct(handler, link)
for author in self.feed['authors']:
self.write_person_construct(handler, u'author', author)
for contributor in self.feed['contributors']:
self.write_person_construct(handler, u'contributor', contributor)
if self.feed.get('rights'):
self.write_text_construct(handler, u'rights', self.feed['rights'])
if not self.feed.get('hide_generator'):
handler.addQuickElement(u'generator', GENERATOR_TEXT, GENERATOR_ATTR)
self.write_items(handler)
handler.endElement(u'feed')
def write_items(self, handler):
for item in self.items:
entry_attrs = item.get('extra_attrs', {})
handler.startElement(u'entry', entry_attrs)
handler.addQuickElement(u'id', item['id'])
self.write_text_construct(handler, u'title', item['title'])
handler.addQuickElement(u'updated', rfc3339_date(item['updated']))
if item.get('published'):
handler.addQuickElement(u'published', rfc3339_date(item['published']))
if item.get('rights'):
self.write_text_construct(handler, u'rights', item['rights'])
if item.get('source'):
self.write_source(handler, item['source'])
for author in item['authors']:
self.write_person_construct(handler, u'author', author)
for contributor in item['contributors']:
self.write_person_construct(handler, u'contributor', contributor)
for category in item['categories']:
self.write_category_construct(handler, category)
for link in item['links']:
self.write_link_construct(handler, link)
if item.get('summary'):
self.write_text_construct(handler, u'summary', item['summary'])
if item.get('content'):
self.write_content(handler, item['content'])
handler.endElement(u'entry')
def validate(self):
def validate_text_construct(obj):
if isinstance(obj, tuple):
if obj[0] not in ['text', 'html', 'xhtml']:
return False
# @@@ no validation is done that 'html' text constructs are valid HTML
# @@@ no validation is done that 'xhtml' text constructs are well-formed XML or valid
# XHTML
return True
if not validate_text_construct(self.feed['title']):
raise ValidationError('feed title has invalid type')
if self.feed.get('subtitle'):
if not validate_text_construct(self.feed['subtitle']):
raise ValidationError('feed subtitle has invalid type')
if self.feed.get('rights'):
if not validate_text_construct(self.feed['rights']):
raise ValidationError('feed rights has invalid type')
alternate_links = {}
for link in self.feed.get('links'):
if link.get('rel') == 'alternate' or link.get('rel') is None:
key = (link.get('type'), link.get('hreflang'))
if key in alternate_links:
raise ValidationError('alternate links must have unique type/hreflang')
alternate_links[key] = link
if self.feed.get('authors'):
feed_author = True
else:
feed_author = False
for item in self.items:
if not feed_author and not item.get('authors'):
if item.get('source') and item['source'].get('authors'):
pass
else:
raise ValidationError('if no feed author, all entries must have author '
'(possibly in source)')
if not validate_text_construct(item['title']):
raise ValidationError('entry title has invalid type')
if item.get('rights'):
if not validate_text_construct(item['rights']):
raise ValidationError('entry rights has invalid type')
if item.get('summary'):
if not validate_text_construct(item['summary']):
raise ValidationError('entry summary has invalid type')
source = item.get('source')
if source:
if source.get('title'):
if not validate_text_construct(source['title']):
raise ValidationError('source title has invalid type')
if source.get('subtitle'):
if not validate_text_construct(source['subtitle']):
raise ValidationError('source subtitle has invalid type')
if source.get('rights'):
if not validate_text_construct(source['rights']):
raise ValidationError('source rights has invalid type')
alternate_links = {}
for link in item.get('links'):
if link.get('rel') == 'alternate' or link.get('rel') is None:
key = (link.get('type'), link.get('hreflang'))
if key in alternate_links:
raise ValidationError('alternate links must have unique type/hreflang')
alternate_links[key] = link
if not item.get('content'):
if not alternate_links:
raise ValidationError('if no content, entry must have alternate link')
if item.get('content') and isinstance(item.get('content'), tuple):
content_type = item.get('content')[0].get('type')
if item.get('content')[0].get('src'):
if item.get('content')[1]:
raise ValidationError('content with src should be empty')
if not item.get('summary'):
raise ValidationError('content with src requires a summary too')
if content_type in ['text', 'html', 'xhtml']:
raise ValidationError('content with src cannot have type of text, html or '
'xhtml')
if content_type:
if '/' in content_type and \
not content_type.startswith('text/') and \
not content_type.endswith('/xml') and not content_type.endswith('+xml') and \
content_type not in ['application/xml-external-parsed-entity',
'application/xml-dtd']:
# @@@ check content is Base64
if not item.get('summary'):
raise ValidationError('content in Base64 requires a summary too')
if content_type not in ['text', 'html', 'xhtml'] and '/' not in content_type:
raise ValidationError('content type does not appear to be valid')
# @@@ no validation is done that 'html' text constructs are valid HTML
# @@@ no validation is done that 'xhtml' text constructs are well-formed XML or
# valid XHTML
return
return
class LegacySyndicationFeed(AtomFeed):
"""
Provides an SyndicationFeed-compatible interface in its __init__ and
add_item but is really a new AtomFeed object.
"""
def __init__(self, title, link, description, language=None, author_email=None,
author_name=None, author_link=None, subtitle=None, categories=[],
feed_url=None, feed_copyright=None):
atom_id = link
title = title
updated = None # will be calculated
rights = feed_copyright
subtitle = subtitle
author_dict = {'name': author_name}
if author_link:
author_dict['uri'] = author_link
if author_email:
author_dict['email'] = author_email
authors = [author_dict]
if categories:
categories = [{'term': term} for term in categories]
links = [{'rel': 'alternate', 'href': link}]
if feed_url:
links.append({'rel': 'self', 'href': feed_url})
if language:
extra_attrs = {'xml:lang': language}
else:
extra_attrs = {}
# description ignored (as with Atom1Feed)
AtomFeed.__init__(self, atom_id, title, updated, rights=rights, subtitle=subtitle,
authors=authors, categories=categories, links=links,
extra_attrs=extra_attrs)
def add_item(self, title, link, description, author_email=None, author_name=None,
author_link=None, pubdate=None, comments=None, unique_id=None, enclosure=None,
categories=[], item_copyright=None):
if unique_id:
atom_id = unique_id
else:
atom_id = get_tag_uri(link, pubdate)
title = title
updated = pubdate
if item_copyright:
rights = item_copyright
else:
rights = None
if description:
summary = 'html', description
else:
summary = None
author_dict = {'name': author_name}
if author_link:
author_dict['uri'] = author_link
if author_email:
author_dict['email'] = author_email
authors = [author_dict]
categories = [{'term': term} for term in categories]
links = [{'rel': 'alternate', 'href': link}]
if enclosure:
links.append({'rel': 'enclosure', 'href': enclosure.url, 'length': enclosure.length,
'type': enclosure.mime_type})
AtomFeed.add_item(self, atom_id, title, updated, rights=rights, summary=summary,
authors=authors, categories=categories, links=links)
Jump to Line
Something went wrong with that request. Please try again.