Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge pull request #32 from VMDX/rss-feed

RSS Feed for Events in Stashboard
  • Loading branch information...
commit 5adfbba5415d7210fb51372b528c77f7cd3da2eb 2 parents 3de1724 + 1a427f1
@kyleconroy kyleconroy authored
View
4 requirements.txt
@@ -2,5 +2,5 @@ requests==0.7.4
mock==0.7.2
nose==1.1.2
sphinx
--e git+git@github.com:derferman/webtest.git#egg=WebTest
--e git+git@github.com:derferman/python-oauth2.git#egg=oauth
+-e git+git@github.com:kyleconroy/webtest.git#egg=WebTest
+-e git+git@github.com:kyleconroy/python-oauth2.git#egg=oauth
View
64 stashboard/handlers/site.py
@@ -42,6 +42,7 @@
from django.utils import simplejson as json
from time import mktime
from models import List, Status, Service, Event, Profile
+import xml.etree.ElementTree as et
from utils import authorized
from wsgiref.handlers import format_date_time
@@ -359,3 +360,66 @@ class CredentialsRedirectHandler(BaseHandler):
def get(self):
self.redirect("/admin/credentials")
+
+class RSSHandler(BaseHandler):
+ """ Feed of the last settings.RSS_NUM_EVENTS_TO_FETCH events """
+
+ def get(self):
+ self.response.headers['Content-Type'] = "application/rss+xml; charset=utf-8"
+
+ host = self.request.headers.get('host', 'nohost')
+ base_url = self.request.scheme + "://" + host
+
+ events = []
+ query = Event.all().order("-start")
+
+ # Filter query by requested services, if specified in the 'service' URL parameter.
+ service_list = []
+ for service_arg in self.request.get_all('services'):
+ service_list.extend(service_arg.split(','))
+ service_list = map(lambda serv_slug: Service.get_by_slug(serv_slug), service_list)
+ # filter out any non-existent services
+ service_list = filter(lambda service: not service is None, service_list)
+
+ service_string = 'all services'
+ if len(service_list) > 0:
+ query.filter('service IN', service_list)
+ if len(service_list) == 1:
+ service_string = 'the %s service' % service_list[0].name
+ elif len(service_list) == 2:
+ service_string = 'the %s and %s services' % (service_list[0].name, service_list[1].name)
+ else:
+ service_string = 'the %s, and %s services' % (', '.join([service.name for service in service_list[:-1]]), service_list[-1].name)
+
+ # Create the root 'rss' element
+ rss_xml = et.Element('rss')
+ rss_xml.set('version', '2.0')
+
+ # Create the channel element and its metadata elements
+ channel = et.SubElement(rss_xml, 'channel')
+ title = et.SubElement(channel, 'title')
+ title.text = '%s Service Events' % settings.SITE_NAME
+ description = et.SubElement(channel, 'description')
+ description.text = 'This feed shows the last %d events on %s on %s.' % (settings.RSS_NUM_EVENTS_TO_FETCH, service_string, settings.SITE_NAME)
+ link = et.SubElement(channel, 'link')
+ link.text = base_url
+
+ # Create each of the feed events.
+ item_subelements = {
+ 'title': lambda(event): '[%s - %s] %s' % (event.service.name, event.status.name, unicode(event.message)),
+ 'description': lambda(event): '%s' % unicode(event.message),
+ 'link': lambda(event): '%s/services/%s' % (base_url, event.service.slug),
+ 'category': lambda(event): event.service.name,
+ 'pubDate': lambda(event): format_date_time(mktime(event.start.timetuple())),
+ 'guid': lambda(event): '%s/api/v1/services/%s/events/%s' % (base_url, event.service.slug, unicode(event.key()))
+ }
+
+ for event in query.fetch(settings.RSS_NUM_EVENTS_TO_FETCH):
+ item = et.SubElement(channel, 'item')
+ for tag, text_func in item_subelements.iteritems():
+ subelement = et.SubElement(item, tag)
+ subelement.text = text_func(event)
+
+ self.response.out.write('<?xml version="1.0" encoding="UTF-8"?>\n')
+ self.response.out.write(et.tostring(rss_xml))
+
View
1  stashboard/main.py
@@ -65,6 +65,7 @@
(r'/documentation/credentials', site.CredentialsRedirectHandler),
(r'/documentation/(.+)', site.DocumentationHandler),
(r'/documentation', site.BaseDocumentationHandler),
+ (r'/rss', site.RSSHandler),
]
ADMIN = [
View
3  stashboard/settings.py
@@ -7,6 +7,9 @@
SITE_URL = "http://stashbooard.appspot.com"
REPORT_URL = "mailto:help@stashboard.org"
+# RSS Feed settings
+RSS_NUM_EVENTS_TO_FETCH = 50
+
# OAuth Consumer Credentials
CONSUMER_KEY = 'anonymous'
CONSUMER_SECRET = 'anonymous'
View
4 stashboard/static/css/style.css
@@ -485,5 +485,7 @@ form.admin ul.images li img {
font-weight: bold;
}
-
+.service-rss {
+ float: right;
+}
View
BIN  stashboard/static/images/feed-icon-14x14.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
View
1  stashboard/templates/base.html
@@ -13,6 +13,7 @@
<div id="wrapper">
<header>
<div class="actions button-container">
+ <a href="/rss" class="button pill big" title="Subscribe to the RSS feed"><img src="/images/feed-icon-14x14.png"/></a>
{% if admin %}
<a class="button pill big" href="/admin">Admin</a>
{% endif %}
View
3  stashboard/templates/service.html
@@ -2,8 +2,7 @@
{% block content %}
-<h2>{{ service.name }}</h2>
-
+<h2>{{ service.name }} <a class="button pill service-rss" href="/rss?services={{ service.slug }}"><img src="/images/feed-icon-14x14.png"/></a></h2>
<p id="serviceDescription"> {{ service.description }}</p>
{% if start_date %}
View
82 tests/test_rss.py
@@ -0,0 +1,82 @@
+import json
+import datetime
+import random
+import xml.etree.ElementTree
+
+import models
+import settings
+from test_api import StashboardTest
+
+class RSSFeedTest(StashboardTest):
+
+ def setUp(self):
+ super(RSSFeedTest, self).setUp()
+ self.services = []
+ self.statuses = []
+ for name in ['web', 'billing', 'notifications']:
+ service = models.Service(name=name.title(),
+ slug=name,
+ description="test service")
+ service.put()
+ self.services.append(service)
+ for name in ['up', 'down', 'warning']:
+ status = models.Status(name=name.title(),
+ slug=name,
+ description="test status",
+ image="test image")
+ status.put()
+ self.statuses.append(status)
+
+ def test_empty_feed(self):
+ response = self.get("/rss")
+ self.assertEquals(response.headers["Content-Type"], "application/rss+xml")
+ self.assertEquals(response.status_code, 200)
+
+ def test_feed_with_events(self):
+ event = models.Event(start=datetime.datetime.now(), message="test hello",
+ service=random.choice(self.services),
+ status=random.choice(self.statuses))
+ event.put()
+ response = self.get("/rss")
+ self.assertEquals(response.headers["Content-Type"], "application/rss+xml")
+ self.assertEquals(response.status_code, 200)
+
+ result = xml.etree.ElementTree.fromstring(response.content)
+ self.assertEquals(len(result), 1)
+ self.assertEquals(result[0].tag, 'channel')
+ channel = result[0]
+
+ # assert that response has 1 event.
+ self.assertEquals(len(channel.findall('item')), 1)
+
+ # assert existence of title, description, and link elements in channel.
+ for tag in ['title', 'description', 'link']:
+ self.assertEquals(len(channel.findall(tag)), 1)
+
+ item = channel.find('item')
+ # assert existence of title, description, link, category, pubDate, and guid
+ # elements in channel.
+ for tag in ['title', 'description', 'link', 'category', 'pubDate', 'guid']:
+ self.assertEquals(len(item.findall(tag)), 1)
+
+
+ def test_feed_does_not_exceed_max(self):
+ for _ in xrange(settings.RSS_NUM_EVENTS_TO_FETCH + 20):
+ event = models.Event(start=datetime.datetime.now(), message="test hello",
+ service=random.choice(self.services),
+ status=random.choice(self.statuses))
+ event.put()
+
+ response = self.get("/rss")
+
+ self.assertEquals(response.headers["Content-Type"], "application/rss+xml")
+ self.assertEquals(response.status_code, 200)
+
+ result = xml.etree.ElementTree.fromstring(response.content)
+ self.assertEquals(len(result), 1)
+ self.assertEquals(result[0].tag, 'channel')
+ channel = result[0]
+
+ # assert that response does not have more than RSS_NUM_EVENTS_TO_FETCH events.
+ self.assertEquals(len(channel.findall('item')), settings.RSS_NUM_EVENTS_TO_FETCH)
+
Please sign in to comment.
Something went wrong with that request. Please try again.