Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge pull request #89 from mollyproject/batch-to-celery

Batch processing replaced with Celery tasks.
  • Loading branch information...
commit 7e5ae8ffd25a19a7fab80549d6f2e5effdf32a53 2 parents 3d7fcb6 + e8042f1
@davbo davbo authored
Showing with 418 additions and 727 deletions.
  1. +24 −0 docs/source/getting-started/configuring.txt
  2. +23 −4 docs/source/getting-started/installing.txt
  3. +0 −13 docs/source/ref/batch-processing.txt
  4. +39 −10 docs/source/topics/application-framework.txt
  5. +3 −2 molly/apps/contact/providers/__init__.py
  6. +3 −1 molly/apps/feeds/providers/__init__.py
  7. +7 −17 molly/apps/feeds/providers/ical.py
  8. +7 −18 molly/apps/feeds/providers/rss.py
  9. +7 −17 molly/apps/feeds/providers/talks_cam.py
  10. +3 −4 molly/apps/home/views.py
  11. +5 −3 molly/apps/library/providers/__init__.py
  12. +5 −2 molly/apps/places/providers/__init__.py
  13. +8 −13 molly/apps/places/providers/acislive.py
  14. +6 −6 molly/apps/places/providers/atcocif.py
  15. +4 −3 molly/apps/places/providers/bbc_tpeg.py
  16. +4 −5 molly/apps/places/providers/cif.py
  17. +5 −4 molly/apps/places/providers/naptan.py
  18. +12 −10 molly/apps/places/providers/osm.py
  19. +4 −3 molly/apps/places/providers/postcodes.py
  20. +4 −2 molly/apps/podcasts/providers/__init__.py
  21. +5 −9 molly/apps/podcasts/providers/opml.py
  22. +4 −4 molly/apps/podcasts/providers/pp.py
  23. +6 −5 molly/apps/podcasts/providers/rss.py
  24. +2 −2 molly/apps/podcasts/tests.py
  25. +3 −2 molly/apps/search/providers/__init__.py
  26. +2 −1  molly/apps/service_status/providers/rss_module.py
  27. +5 −2 molly/apps/transport/providers/__init__.py
  28. +4 −7 molly/apps/weather/providers/bbc.py
  29. +0 −222 molly/batch_processing/__init__.py
  30. +0 −23 molly/batch_processing/admin.py
  31. 0  molly/batch_processing/management/__init__.py
  32. 0  molly/batch_processing/management/commands/__init__.py
  33. +0 −70 molly/batch_processing/management/commands/create_crontab.py
  34. +0 −11 molly/batch_processing/management/commands/run_batch.py
  35. +0 −53 molly/batch_processing/migrations/0001_initial.py
  36. +0 −40 molly/batch_processing/migrations/0002_auto__add_field_batch_last_run_failed.py
  37. 0  molly/batch_processing/migrations/__init__.py
  38. +0 −98 molly/batch_processing/models.py
  39. +0 −8 molly/batch_processing/scripts/run_batch.py
  40. +15 −3 molly/commands/site_template/settings.py
  41. +18 −0 molly/conf/admin.py
  42. +32 −0 molly/conf/celery_util.py
  43. +1 −1  molly/conf/default_settings.py
  44. +128 −0 molly/conf/provider.py
  45. +6 −15 molly/conf/settings.py
  46. +5 −2 molly/geolocation/providers/__init__.py
  47. +4 −0 molly/urls.py
  48. +2 −11 molly/utils/management/commands/deploy.py
  49. +3 −1 setup.py
View
24 docs/source/getting-started/configuring.txt
@@ -262,6 +262,30 @@ The following settings will make this "just work" with Molly::
PIPELINE_CSS_COMPRESSOR = 'molly.utils.compress.MollyCSSFilter'
PIPELINE_JS_COMPRESSOR = 'pipeline.compressors.jsmin.JSMinCompressor'
+Celery settings
+^^^^^^^^^^^^^^^
+.. versionadded:: 1.4
+
+We include a few sane-defaults for running Celery. These are::
+
+ BROKER_URL = "amqp://molly:molly@localhost:5672//"
+ CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"
+ CELERYD_CONCURRENCY = 1
+ CELERY_RETRY_DELAY = 3 * 60
+ CELERY_MAX_RETRIES = 3
+
+The only setting you should worry about here is the `BROKER_URL`. This setting
+is passed from Celery to the transport layer library Kombu, which has excellent
+`documentation for the possible broker options`__ and their limitations. The
+default we provide will attempt to use RabbitMQ with vhost `molly`, on
+`localhost`, connecting as user `molly`.
+
+__ http://ask.github.com/kombu/userguide/connections.html#urls
+
+Remaining options are best explained in the `Celery documentation`__.
+
+__ http://docs.celeryproject.org/en/2.4/configuration.html
+
Molly settings
^^^^^^^^^^^^^^
View
27 docs/source/getting-started/installing.txt
@@ -16,14 +16,14 @@ In order to install Molly, there are several non-Python dependencies which are n
Fedora
""""""
-.. code-block ::
+.. code-block:: none
su -c 'yum install python-pip libxml-devel libxslt-devel python-devel postgresql-devel openldap-devel openssl-devel gdal-python proj geos libjpeg-devel imagemagick gcc make git libyaml'
CentOS
""""""
-.. code-block ::
+.. code-block:: none
su -c 'yum install python-pip python26 python-setuptools python26-devel binutils libxslt-devel cyrus-sasl-devel openldap-devel ImageMagick proj proj-devel postgresql-devel postgresql-contrib geos-3.1.0 geos-devel-3.1.0 gdal libjpeg-devel make gcc openssl-devel libyaml-devel'
@@ -34,13 +34,13 @@ Ubuntu/Debian
of libgeos, libgdal and postgresql which changes the package name. The
version below are for Ubuntu 10.04.
-.. code-block ::
+.. code-block:: none
sudo apt-get install python-pip build-essential python-gdal proj libgeos-3.1.0 binutils libgdal1-1.6.0 postgresql-server-dev-8.4 python-setuptools python-dev libxslt-dev libldap2-dev libsasl2-dev libjpeg-dev imagemagick libyaml
.. note:: Versions below are for Ubuntu 10.11
-.. code-block ::
+.. code-block:: none
sudo apt-get install python-pip build-essential python-gdal proj libgeos-3.2.2 binutils libgdal1-1.7.0 postgresql-server-dev-8.4 python-setuptools python-dev libxslt1-dev libldap2-dev libsasl2-dev libjpeg62-dev imagemagick python-yaml
@@ -219,6 +219,25 @@ You will also have a ``compiled_media`` folder, which should be ignored, and a
``site_media`` folder, which is where you should put any overrides for media on
your site.
+Running Celery
+--------------
+
+.. versionadded:: 1.4
+
+Molly now runs it's periodic tasks (e.g. importing map data from OpenStreetMap)
+by using the popular Distributed Task Queue, `Celery <http://celeryproject.org>`_.
+
+Celery requires us to install a *message broker*, the most popular choice here is
+`RabbitMQ <http://www.rabbitmq.com>`_. There are other brokers available, as always
+we recommend reviewing the excellent Celery documentation to learn more.
+
+Molly's installation will have setup the celery worker, ``celeryd`` and task
+scheduler ``celerybeat`` for us. We just have to start them::
+
+ python manage.py celeryd
+ python manage.py celerybeat
+
+
Deploying Molly
---------------
View
13 docs/source/ref/batch-processing.txt
@@ -1,13 +0,0 @@
-:mod:`molly.batch_processing` -- Batch Processing
-=================================================
-
-.. module :: molly.batch_processing
-
-This allows for other applications to set up jobs to run on a schedule.
-
-Configuration
--------------
-
-This application has no configuration, and is not added as an ``Application``
-to the ``APPLICATIONS`` list in ``settings.py``, it should be added to the
-``INSTALLED_APPS`` list like a standard Django app
View
49 docs/source/topics/application-framework.txt
@@ -204,20 +204,49 @@ A provider maps an external interface onto the model used by the application.
Most applications provide a ``providers.BaseProvider`` class which specifies
an interface to be implemented by a provider for that application.
-Extra base classes
-------------------
+.. versionadded:: 1.4
+ Providers now all subclass ``molly.conf.provider.Provider``
+.. Extra base classes
+.. ------------------ TODO?
+Task Processing
+---------------
+.. versionadded:: 1.4
-Batch jobs
-----------
+Celery is used to provide asynchronous task processing. For an introduction to
+the basics of Celery we recommend you take a look at the `"Getting Started with
+Celery"`__ guide.
-A provider can annotate methods to be included in a crontab using the
-:meth:`molly.conf.settings.batch` decorator::
+__ http://docs.celeryproject.org/en/latest/getting-started/index.html
- @batch('%d 9 * * mon' % random.randint(0, 59))
- def import_data(self, metadata, output):
- # do stuff
+Molly uses a modified version of the Celery task decorator located in
+``molly.conf.provider.task`` this should be used in a similar the previous
+@batch decorator to identify any methods on a provider to run async via celery.
+
+See this (simplified) example from ``molly.apps.feeds.providers.rss``::
+
+ @task(run_every=timedelta(minutes=60))
+ def import_data(self, **metadata):
+ """
+ Pulls RSS feeds
+ """
+
+ from molly.apps.feeds.models import Feed
+ for feed in Feed.objects.filter(provider=self.class_path):
+ logger.debug("Importing: %s - %s" % (feed.title, feed.rss_url))
+ self.import_feed.delay(feed)
return metadata
-For more information, see :doc:`../ref/batch-processing`.
+ # Override CELERY_RETRY_DELAY and CELERY_MAX_RETRIES
+ @task(default_retry_delay=5, max_retries=2)
+ def import_feed(self, feed):
+ from molly.apps.feeds.models import Item
+ feed_data = feedparser.parse(feed.rss_url)
+ # Do stuff with feed_data
+
+We can iterate through all feeds and launch tasks to import them asynchronously
+using ``task.delay()``. This convention has been applied through all the
+standard providers packaged with Molly. Note the default_retry_delay and max_retries
+are overridden on import_feed. This means each feed will only be retried 2 times, with
+5 seconds between each of those retries.
View
5 molly/apps/contact/providers/__init__.py
@@ -1,7 +1,8 @@
+from molly.conf.provider import Provider
from molly.apps.contact.forms import GenericContactForm
from django.utils.translation import ugettext as _
-class BaseContactProvider(object):
+class BaseContactProvider(Provider):
class NoSuchResult(KeyError):
pass
@@ -88,4 +89,4 @@ class TooManyResults(Exception):
pass
else:
del ldap
- from mit import LDAPContactProvider
+ from mit import LDAPContactProvider
View
4 molly/apps/feeds/providers/__init__.py
@@ -1,4 +1,6 @@
-class BaseFeedsProvider(object):
+from molly.conf.provider import Provider
+
+class BaseFeedsProvider(Provider):
pass
from rss import RSSFeedsProvider
View
24 molly/apps/feeds/providers/ical.py
@@ -1,7 +1,5 @@
-from datetime import datetime
+from datetime import datetime, timedelta
import urllib2
-import random
-import traceback
import logging
from icalendar import Calendar
from icalendar.prop import vDatetime, vDate, vText
@@ -9,7 +7,7 @@
socket.setdefaulttimeout(5)
from molly.external_media import sanitise_html
-from molly.conf.settings import batch
+from molly.conf.provider import task
from molly.apps.feeds.providers import BaseFeedsProvider
@@ -26,26 +24,18 @@ class ICalFeedsProvider(BaseFeedsProvider):
"""
verbose_name = 'iCal'
- @batch('%d * * * *' % random.randint(0, 59))
- def import_data(self, metadata, output):
+ @task(run_every=timedelta(minutes=60))
+ def import_data(self, **metadata):
"""
Pulls iCal feeds
"""
-
from molly.apps.feeds.models import Feed
for feed in Feed.objects.filter(provider=self.class_path):
- output.write("Importing %s\n" % feed.title)
- try:
- self.import_feed(feed)
- except Exception, e:
- output.write("Error importing %s\n" % feed.title)
- traceback.print_exc(file=output)
- output.write('\n')
- logger.warn("Error importing feed %r" % feed.title,
- exc_info=True, extra={'url': feed.rss_url})
-
+ logger.debug("Importing: %s - %s" % (feed.title, feed.rss_url))
+ self.import_feed.delay(feed)
return metadata
+ @task()
def import_feed(self, feed):
from molly.apps.feeds.models import Item, vCard
View
25 molly/apps/feeds/providers/rss.py
@@ -1,14 +1,12 @@
-from datetime import datetime
+from datetime import datetime, timedelta
import feedparser
import time
-import random
-import traceback
import logging
import socket
socket.setdefaulttimeout(5)
from molly.external_media import sanitise_html
-from molly.conf.settings import batch
+from molly.conf.provider import task
from molly.apps.feeds.providers import BaseFeedsProvider
@@ -28,26 +26,19 @@ def struct_to_datetime(s):
class RSSFeedsProvider(BaseFeedsProvider):
verbose_name = 'RSS'
- @batch('%d * * * *' % random.randint(0, 59))
- def import_data(self, metadata, output):
+ @task(run_every=timedelta(minutes=60))
+ def import_data(self, **metadata):
"""
Pulls RSS feeds
"""
from molly.apps.feeds.models import Feed
for feed in Feed.objects.filter(provider=self.class_path):
- output.write("Importing %s\n" % feed.title)
- try:
- self.import_feed(feed)
- except Exception, e:
- output.write("Error importing %s\n" % feed.title)
- traceback.print_exc(file=output)
- output.write('\n')
- logger.warn("Error importing feed %r" % feed.title,
- exc_info=True, extra={'url': feed.rss_url})
-
+ logger.debug("Importing: %s - %s" % (feed.title, feed.rss_url))
+ self.import_feed.delay(feed)
return metadata
+ @task(default_retry_delay=5, max_retries=3)
def import_feed(self, feed):
from molly.apps.feeds.models import Item
@@ -99,5 +90,3 @@ def import_feed(self, feed):
for item in Item.objects.filter(feed=feed):
if item not in items:
item.delete()
-
- return items
View
24 molly/apps/feeds/providers/talks_cam.py
@@ -1,14 +1,12 @@
-from datetime import datetime
+from datetime import datetime, timedelta
from lxml import etree
import urllib2
-import random
-import traceback
import logging
import socket
socket.setdefaulttimeout(5)
from molly.external_media import sanitise_html
-from molly.conf.settings import batch
+from molly.conf.provider import task
from molly.apps.feeds.providers import BaseFeedsProvider
@@ -20,26 +18,18 @@
class TalksCamFeedsProvider(BaseFeedsProvider):
verbose_name = 'TalksCam'
- @batch('%d * * * *' % random.randint(0, 59))
- def import_data(self, metadata, output):
+ @task(run_every=timedelta(minutes=60))
+ def import_data(self, **metadata):
"""
Pulls TalksCam feeds
"""
-
from molly.apps.feeds.models import Feed
for feed in Feed.objects.filter(provider=self.class_path):
- output.write("Importing %s\n" % feed.title)
- try:
- self.import_feed(feed)
- except Exception, e:
- output.write("Error importing %s\n" % feed.title)
- traceback.print_exc(file=output)
- output.write('\n')
- logger.warn("Error importing feed %r" % feed.title,
- exc_info=True, extra={'url': feed.rss_url})
-
+ logger.debug("Importing: %s - %s" % (feed.title, feed.rss_url))
+ self.import_feed.delay(feed)
return metadata
+ @task()
def import_feed(self, feed):
from molly.apps.feeds.models import Item, vCard
View
7 molly/apps/home/views.py
@@ -12,8 +12,7 @@
from molly.utils.breadcrumbs import *
from molly.favourites import get_favourites
from molly.wurfl import device_parents
-from molly import conf
-from molly.conf.applications import app_by_application_name, has_app_by_application_name
+from molly.conf.applications import app_by_application_name, has_app_by_application_name, has_app, all_apps
from molly.apps.weather.models import Weather
from models import UserMessage
@@ -47,7 +46,7 @@ def handle_GET(self, request, context):
and not request.GET.get('preview') == 'true'
and not internal_referer
and not settings.DEBUG
- and conf.has_app('molly.apps.desktop')
+ and has_app('molly.apps.desktop')
and request.REQUEST.get('format') is None):
return self.redirect(reverse('desktop:index'), request)
@@ -83,7 +82,7 @@ def handle_GET(self, request, context):
'url': reverse('%s:index' % app.local_name) \
if app.has_urlconf else None,
'display_to_user': app.display_to_user,
- } for app in conf.all_apps()]
+ } for app in all_apps()]
# Add accesskeys to the first 9 apps to be displayed to the user
for i, app in enumerate(
View
8 molly/apps/library/providers/__init__.py
@@ -1,4 +1,6 @@
-class BaseLibrarySearchProvider(object):
+from molly.conf.provider import Provider
+
+class BaseLibrarySearchProvider(Provider):
"""
Abstract class implementing the interface for a provider for the library app
"""
@@ -23,7 +25,7 @@ def control_number_search(self, control_number):
from z3950 import Z3950
-class BaseMetadataProvider(object):
+class BaseMetadataProvider(Provider):
"""
Abstract class implementing the interface for a provider which fetches
book covers
@@ -37,4 +39,4 @@ def annotate(self, books):
"""
pass
-from google import GoogleBooksProvider
+from google import GoogleBooksProvider
View
7 molly/apps/places/providers/__init__.py
@@ -1,4 +1,7 @@
-class BaseMapsProvider(object):
+from molly.conf.provider import Provider
+
+
+class BaseMapsProvider(Provider):
def import_data(self):
pass
@@ -16,4 +19,4 @@ def augment_metadata(self, entities, **kwargs):
from bbc_tpeg import BBCTPEGPlacesProvider
from tfl import TubeRealtimeProvider
from atcocif import AtcoCifTimetableProvider
-from timetables import TimetableAnnotationProvider
+from timetables import TimetableAnnotationProvider
View
21 molly/apps/places/providers/acislive.py
@@ -1,5 +1,5 @@
import threading
-from datetime import datetime
+from datetime import datetime, timedelta
from lxml import etree
import re
import logging
@@ -7,7 +7,6 @@
import socket
socket.setdefaulttimeout(5)
from urllib2 import urlopen
-import random
from django.db import transaction, reset_queries, connection
from django.http import Http404
@@ -17,7 +16,7 @@
from molly.apps.places import get_entity
from molly.apps.places.providers.naptan import NaptanMapsProvider
from molly.utils.i18n import set_name_in_language
-from molly.conf.settings import batch
+from molly.conf.provider import task
logger = logging.getLogger(__name__)
@@ -254,12 +253,8 @@ def __init__(self, urls=None):
urls = [instance[0] for instance in ACISLiveMapsProvider.ACISLIVE_URLS.items()]
self.urls = urls
- @batch('%d 10 * * sat' % random.randint(0, 59))
- def import_data(self, metadata, output):
-
- self._output = output
-
-
+ @task(run_every=timedelta(days=7))
+ def import_data(self, **metadata):
# Searching can flag up the same results again and again, so store
# which ones we've found
found_routes = set()
@@ -269,7 +264,7 @@ def import_data(self, metadata, output):
# Try and find all bus routes in the system
for term in list(ascii_lowercase) + map(str, range(0,9)):
found_routes = self._scrape_search(
- url, self.SEARCH_PAGE % (url, term), found_routes, output)
+ url, self.SEARCH_PAGE % (url, term), found_routes)
# Now try and find buses that don't exist on that system any more
for route in Route.objects.filter(external_ref__startswith=url):
@@ -277,7 +272,7 @@ def import_data(self, metadata, output):
logger.info('Removed route not found on system: %s', route)
route.delete()
- def _scrape_search(self, url, search_page, found_routes, output):
+ def _scrape_search(self, url, search_page, found_routes):
results = etree.parse(urlopen(search_page), parser = etree.HTMLParser())
for tr in results.find('.//table').findall('tr')[1:]:
reset_queries()
@@ -306,11 +301,11 @@ def _scrape_search(self, url, search_page, found_routes, output):
route.operator = operator
route.service_name = destination
route.save()
- self._scrape(route, link, output)
+ self._scrape(route, link)
return found_routes
- def _scrape(self, route, url, output):
+ def _scrape(self, route, url):
url += '&showall=1'
service = etree.parse(urlopen(url), parser = etree.HTMLParser())
route.stops.clear()
View
12 molly/apps/places/providers/atcocif.py
@@ -15,7 +15,7 @@
from molly.apps.places.models import (Entity, Route, StopOnRoute, Source,
Journey, ScheduledStop)
from molly.apps.places.providers import BaseMapsProvider, NaptanMapsProvider
-from molly.conf.settings import batch
+from molly.conf.provider import task
from molly.utils.i18n import set_name_in_language
logger = getLogger(__name__)
@@ -37,15 +37,15 @@ def __init__(self, url):
self._cache = EntityCache()
self._entity_type = NaptanMapsProvider(None)._get_entity_types()['BCT'][0]
- @batch('%d 10 * * wed' % random.randint(0, 59))
- def import_data(self, metadata, output):
+ @task(run_every=timedelta(days=7))
+ def import_data(self, **metadata):
deleted_routes = set(Route.objects.filter(external_ref__startswith=self._url).values_list('external_ref'))
archive = ZipFile(StringIO(urlopen(self._url).read()))
for file in archive.namelist():
- output.write(file)
+ logger.info(file)
routes = self._import_cif(archive.open(file))
- output.write(': %d routes in file\n' % len(routes))
+ logger.info(': %d routes in file\n' % len(routes))
self._import_routes(routes)
deleted_routes -= set(self._url + route['id'] for route in routes)
archive.close()
@@ -276,4 +276,4 @@ def _get_source(self):
if __name__ == '__main__':
- AtcoCifTimetableProvider('http://store.datagm.org.uk/sets/TfGM/GMPTE_CIF.zip').import_data({}, sys.stdout)
+ AtcoCifTimetableProvider('http://store.datagm.org.uk/sets/TfGM/GMPTE_CIF.zip').import_data({})
View
7 molly/apps/places/providers/bbc_tpeg.py
@@ -1,5 +1,6 @@
import os, os.path, urllib, random, zipfile, tempfile
from lxml import etree
+from datetime import timedelta
from django.contrib.gis.geos import Point, LineString
from django.conf import settings
@@ -8,7 +9,7 @@
from molly.apps.places.providers import BaseMapsProvider
from molly.apps.places.models import Source, Entity, EntityType, EntityTypeCategory
-from molly.conf.settings import batch
+from molly.conf.provider import task
from molly.utils.i18n import override, set_name_in_language
class BBCTPEGResolver(etree.Resolver):
@@ -50,8 +51,8 @@ class BBCTPEGPlacesProvider(BaseMapsProvider):
def __init__(self, url=_TPEG_URL):
self._tpeg_url = url
- @batch('%d-59/3 * * * *' % random.randint(0, 2))
- def import_data(self, metadata, output):
+ @task(run_every=timedelta(minutes=3))
+ def import_data(self, **metadata):
source, entity_type = self._get_source(), self._get_entity_type()
parser = etree.XMLParser(load_dtd=True)
View
9 molly/apps/places/providers/cif.py
@@ -7,8 +7,7 @@
It is dissimilar, but related to, the ATCO-CIF standard
"""
-from datetime import date, time
-import random
+from datetime import date, time, timedelta
from string import capwords
from django.core.exceptions import ObjectDoesNotExist
@@ -18,7 +17,7 @@
from molly.apps.places.models import (Entity, EntityType, EntityTypeCategory,
Source, Route, Journey, ScheduledStop)
from molly.apps.places.providers import BaseMapsProvider
-from molly.conf.settings import batch
+from molly.conf.provider import task
class CifTimetableProvider(BaseMapsProvider):
@@ -188,8 +187,8 @@ def import_from_string(self, cif):
with transaction.commit_on_success():
self._save_stops(self._save_journey(self._save_route()))
- @batch('%d 15 * * mon' % random.randint(0, 59))
- def import_from_file(self, metadata, output):
+ @task(run_every=timedelta(days=7))
+ def import_from_file(self, **metadata):
with open(self._filename) as file:
for line in file:
self._handle_line(line)
View
9 molly/apps/places/providers/naptan.py
@@ -8,6 +8,7 @@
import csv
from warnings import warn
from collections import defaultdict
+from datetime import timedelta
try:
from cStringIO import StringIO
except:
@@ -24,7 +25,7 @@
from molly.apps.places.providers import BaseMapsProvider
from molly.apps.places.models import EntityType, Entity, EntityGroup, Source, EntityTypeCategory
-from molly.conf.settings import batch
+from molly.conf.provider import task
from molly.utils.i18n import override, set_name_in_language
class NaptanContentHandler(ContentHandler):
@@ -673,8 +674,8 @@ def __init__(self, method=None, areas=None, username=None, password=None):
areas += ('910',)
self._areas = areas
- @batch('%d 10 * * mon' % random.randint(0, 59))
- def import_data(self, metadata, output):
+ @task(run_every=timedelta(days=7))
+ def import_data(self, **metadata):
username, password = self._username, self._password
self._source = self._get_source()
@@ -775,4 +776,4 @@ def _get_source(self):
else:
if __name__ == '__main__':
p = NaptanMapsProvider(method='ftp', username=SECRETS.journeyweb[0], password=SECRETS.journeyweb[1], areas=('340',))
- p.import_data(None, None)
+ p.import_data()
View
22 molly/apps/places/providers/osm.py
@@ -6,8 +6,10 @@
import random
import os
import yaml
+import logging
from xml.sax import saxutils, handler, make_parser
+from datetime import timedelta
from django.db import reset_queries
from django.contrib.gis.geos import Point, LineString, LinearRing
@@ -23,7 +25,10 @@
from molly.utils.misc import AnyMethodRequest
from molly.utils.i18n import override, set_name_in_language
from molly.geolocation import reverse_geocode
-from molly.conf.settings import batch
+from molly.conf.provider import task
+
+
+logger = logging.getLogger(__name__)
def node_id(id):
return "N%d" % int(id)
@@ -31,12 +36,11 @@ def way_id(id):
return "W%d" % int(id)
class OSMHandler(handler.ContentHandler):
- def __init__(self, source, entity_types, find_types, output, lat_north=None,
+ def __init__(self, source, entity_types, find_types, lat_north=None,
lat_south=None, lon_west=None, lon_east=None, identities={}):
self.source = source
self.entity_types = entity_types
self.find_types = find_types
- self.output = output
self._lat_north = lat_north
self._lat_south = lat_south
self._lon_west = lon_west
@@ -213,7 +217,7 @@ def endDocument(self):
entity.delete()
self.delete_count += 1
- self.output.write("""\
+ logger.info("""\
Complete
Created: %6d
Modified: %6d
@@ -301,8 +305,8 @@ def to_tuple(tag):
else:
self.identities = {}
- @batch('%d 9 * * mon' % random.randint(0, 59))
- def import_data(self, metadata, output):
+ @task(run_every=timedelta(days=7))
+ def import_data(self, **metadata):
"Imports places data from OpenStreetMap"
old_etag = metadata.get('etag', '')
@@ -310,17 +314,15 @@ def import_data(self, metadata, output):
request = AnyMethodRequest(self._url, method='HEAD')
response = urllib2.urlopen(request)
new_etag = response.headers['ETag'][1:-1]
- self.output = output
if not settings.DEBUG and new_etag == old_etag:
- output.write('OSM data not updated. Not updating.\n')
+ logger.info('OSM data not updated. Not updating.\n')
return
parser = make_parser(['xml.sax.xmlreader.IncrementalParser'])
parser.setContentHandler(OSMHandler(self._get_source(),
self._get_entity_types(),
lambda tags, type_list=None: self._find_types(tags, self._osm_tags if type_list is None else type_list),
- output,
self._lat_north,
self._lat_south,
self._lon_west,
@@ -442,7 +444,7 @@ def disambiguate_titles(self, source):
else:
title = inferred_name
except:
- self.output.write("Couldn't geocode for %s\n" % inferred_name)
+ logger.info("Couldn't geocode for %s\n" % inferred_name)
title = inferred_name
try:
name = entity.names.get(language_code=lang_code)
View
7 molly/apps/places/providers/postcodes.py
@@ -8,6 +8,7 @@
import os.path
import re
+from datetime import timedelta
from django.db import transaction, reset_queries
from django.conf import settings
from django.contrib.gis.geos import Point
@@ -18,7 +19,7 @@
from molly.apps.places.models import Entity, EntityType, Source, EntityTypeCategory
from molly.utils.i18n import override, set_name_in_language
-from molly.conf.settings import batch
+from molly.conf.provider import task
class PostcodesMapsProvider(BaseMapsProvider):
def __init__(self, codepoint_path, import_areas=None):
@@ -35,8 +36,8 @@ def _download_codepoint_open(self):
archive_file.write(archive_url.read())
archive_file.close()
- @batch('%d 12 1 1 *' % random.randint(0, 59))
- def import_data(self, metadata, output):
+ @task(run_every=timedelta(days=365))
+ def import_data(self, **metadata):
entity_type, source = self._get_entity_type(), self._get_source()
View
6 molly/apps/podcasts/providers/__init__.py
@@ -1,6 +1,8 @@
-class BasePodcastsProvider(object):
+from molly.conf.provider import Provider
+
+class BasePodcastsProvider(Provider):
pass
from rss import RSSPodcastsProvider
from pp import PodcastProducerPodcastsProvider
-from opml import OPMLPodcastsProvider
+from opml import OPMLPodcastsProvider
View
14 molly/apps/podcasts/providers/opml.py
@@ -1,5 +1,5 @@
from xml.etree import ElementTree as ET
-from datetime import datetime
+from datetime import datetime, timedelta
import urllib
import re
import email
@@ -10,7 +10,7 @@
from django.template.defaultfilters import slugify
from django.utils.translation import ugettext_noop as _
-from molly.conf.settings import batch
+from molly.conf.provider import task
from molly.utils.i18n import set_name_in_language
from molly.apps.podcasts.providers import BasePodcastsProvider
from molly.apps.podcasts.models import Podcast, PodcastItem, PodcastCategory, PodcastEnclosure
@@ -70,8 +70,8 @@ def parse_outline(self, outline):
self.update_podcast(podcast)
- @batch('%d * * * *' % random.randint(0, 59))
- def import_data(self, metadata, output):
+ @task(run_every=timedelta(minutes=60))
+ def import_data(self, **metadata):
self._category = None
@@ -89,8 +89,6 @@ def import_data(self, metadata, output):
self.parse_outline(outline)
rss_urls.append(outline.attrib['xmlUrl'])
except Exception, e:
- output.write("Update of podcast %r failed." % outline.attrib['xmlUrl'])
- traceback.print_exc(file=output)
if not failure_logged:
logger.exception("Update of podcast %r failed.", outline.attrib['xmlUrl'])
failure_logged = True
@@ -103,8 +101,6 @@ def import_data(self, metadata, output):
self.parse_outline(outline)
rss_urls.append(outline.attrib['xmlUrl'])
except Exception, e:
- output.write("Update of podcast %r failed." % outline.attrib['xmlUrl'])
- traceback.print_exc(file=output)
if not failure_logged:
logger.exception("Update of podcast %r failed.", outline.attrib['xmlUrl'])
failure_logged = True
@@ -114,4 +110,4 @@ def import_data(self, metadata, output):
if not podcast.rss_url in rss_urls:
podcast.delete()
- return metadata
+ return metadata
View
8 molly/apps/podcasts/providers/pp.py
@@ -1,5 +1,5 @@
from lxml import etree
-from datetime import datetime
+from datetime import datetime, timedelta
import urllib
import re
import email
@@ -7,7 +7,7 @@
from django.conf import settings
-from molly.conf.settings import batch
+from molly.conf.provider import task
from molly.apps.podcasts.providers import BasePodcastsProvider
from molly.apps.podcasts.models import Podcast, PodcastItem, PodcastCategory, PodcastEnclosure
from molly.utils.i18n import set_name_in_language
@@ -18,8 +18,8 @@ class PodcastProducerPodcastsProvider(RSSPodcastsProvider):
def __init__(self, url):
self.url = url
- @batch('%d * * * *' % random.randint(0, 59))
- def import_data(self, metadata, output):
+ @task(run_every=timedelta(minutes=60))
+ def import_data(self, **metadata):
atom = self.atom
xml = etree.parse(urllib.urlopen(self.url))
View
11 molly/apps/podcasts/providers/rss.py
@@ -1,8 +1,8 @@
import random, urllib, email
from lxml import etree
-from datetime import datetime
+from datetime import datetime, timedelta
-from molly.conf.settings import batch
+from molly.conf.provider import task
import dateutil.parser
from molly.apps.podcasts.providers import BasePodcastsProvider
@@ -36,8 +36,8 @@ def __init__(self, podcasts, medium=None):
def atom(self):
return Namespace('http://www.w3.org/2005/Atom')
- @batch('%d * * * *' % random.randint(0, 59))
- def import_data(self, metadata, output):
+ @task(run_every=timedelta(minutes=60))
+ def import_data(self, **metadata):
for slug, url in self.podcasts:
podcast, url = Podcast.objects.get_or_create(
provider=self.class_path,
@@ -47,7 +47,7 @@ def import_data(self, metadata, output):
podcast.medium = self.medium
podcast.slug = slug
- self.update_podcast(podcast)
+ self.update_podcast.delay(podcast)
def determine_license(self, o):
license = o.find('{http://purl.org/dc/terms/}license') or \
@@ -55,6 +55,7 @@ def determine_license(self, o):
return getattr(license, 'text', None)
+ @task()
def update_podcast(self, podcast):
atom = self.atom
def gct(node, names):
View
4 molly/apps/podcasts/tests.py
@@ -13,7 +13,7 @@ def setUp(self):
opml = OPMLPodcastsProvider(url = 'http://www.bbc.co.uk/radio/opml/bbc_podcast_opml_v2.xml',
rss_re = r'http://downloads.bbc.co.uk/podcasts/(.+)/rss.xml')
opml.class_path = 'molly.providers.apps.podcasts.opml.OPMLPodcastsProvider'
- opml.import_data({}, sys.stdout)
+ opml.import_data({})
def testPodcasts(self):
podcasts = Podcast.objects.all()
@@ -23,4 +23,4 @@ def testPodcasts(self):
r = c.get('/podcasts/%s/' % podcast.category.slug)
r = c.get('/podcasts/%s/%s/' % (podcast.category.slug, podcast.slug))
- self.assertTrue(r.context['podcast'].podcastitem_set.count() > 0)
+ self.assertTrue(r.context['podcast'].podcastitem_set.count() > 0)
View
5 molly/apps/search/providers/__init__.py
@@ -1,5 +1,6 @@
import logging
+from molly.conf.provider import Provider
from django.core.urlresolvers import resolve, Resolver404
from django.http import Http404
from django.views.generic.simple import redirect_to
@@ -7,7 +8,7 @@
logger = logging.getLogger(__name__)
-class BaseSearchProvider(object):
+class BaseSearchProvider(Provider):
def perform_search(self, request, query, application=None):
"""
@@ -103,4 +104,4 @@ def _load_query_expansion_terms(self):
return {}
from google_search_appliance import GSASearchProvider
-from application_search import ApplicationSearchProvider
+from application_search import ApplicationSearchProvider
View
3  molly/apps/service_status/providers/rss_module.py
@@ -1,8 +1,9 @@
+from molly.conf.provider import Provider
from datetime import datetime
import dateutil.parser
import feedparser
-class RSSModuleServiceStatusProvider(object):
+class RSSModuleServiceStatusProvider(Provider):
def __init__(self, name, slug, url):
self.name, self.slug, self.url = name, slug, url
View
7 molly/apps/transport/providers/__init__.py
@@ -1,4 +1,7 @@
-class BaseTransitLineStatusProvider():
+from molly.conf.provider import Provider
+
+
+class BaseTransitLineStatusProvider(Provider):
def get_status(self):
# Return a dictionary with a key of 'service_name', which is the human
@@ -8,4 +11,4 @@ def get_status(self):
# "disruption_reason"
return {}
-from molly.apps.transport.providers.tfl import TubeStatusProvider
+from molly.apps.transport.providers.tfl import TubeStatusProvider
View
11 molly/apps/weather/providers/bbc.py
@@ -12,7 +12,7 @@
from django.contrib.gis.geos import Point
from django.utils.translation import ugettext_lazy as _
-from molly.conf.settings import batch
+from molly.conf.provider import Provider, task
from molly.apps.weather.models import (
Weather, OUTLOOK_CHOICES, VISIBILITY_CHOICES, PRESSURE_STATE_CHOICES,
SCALE_CHOICES, PTYPE_OBSERVATION, PTYPE_FORECAST
@@ -20,7 +20,7 @@
logger = logging.getLogger(__name__)
-class BBCWeatherProvider(object):
+class BBCWeatherProvider(Provider):
"""
Scrapes BBC RSS feeds to obtain weather information
"""
@@ -103,8 +103,8 @@ def _find_choice_match(choices, verbose):
+ r'(Latest Observations|Forecast) for (?P<name>.+)'
)
- @batch('%d-%d/15 * * * *' % (lambda x:(x, x+45))(random.randint(0, 14)))
- def import_data(self, metadata, output):
+ @task(run_every=timedelta(minutes=15))
+ def import_data(self, **metadata):
"""
Pulls weather data from the BBC
"""
@@ -113,9 +113,6 @@ def import_data(self, metadata, output):
observations = self.get_observations_data()
forecasts = self.get_forecast_data()
except Exception as e:
- output.write("Error importing weather data from BBC\n")
- traceback.print_exc(file=output)
- output.write('\n')
logger.exception("Error importing weather data from BBC")
return metadata
View
222 molly/batch_processing/__init__.py
@@ -1,222 +0,0 @@
-import getpass
-import simplejson
-import os
-import os.path
-import imp
-import sys
-from subprocess import call, check_call
-from datetime import datetime, timedelta
-
-from django.conf import settings
-
-from molly.conf import all_apps, app_by_local_name, app_by_application_name
-from molly.batch_processing.models import Batch
-from molly.utils.misc import get_norm_sys_path
-
-def load_batches():
- batch_details = []
- for app in all_apps():
- for provider in app.providers:
- for method_name in dir(provider):
- method = getattr(provider, method_name)
- if not getattr(method, 'is_batch', False):
- continue
-
- batch_details.append({
- 'title': method.__doc__ or provider.class_path,
- 'local_name': app.local_name,
- 'provider_name': provider.class_path,
- 'method_name': method_name,
- 'cron_stmt': method.cron_stmt,
- 'initial_metadata': method.initial_metadata,
- })
-
- batches = set()
- for batch_detail in batch_details:
- batch, created = Batch.objects.get_or_create(
- local_name = batch_detail['local_name'],
- provider_name = batch_detail['provider_name'],
- method_name = batch_detail['method_name'],
- defaults = {'title': batch_detail['title'],
- 'cron_stmt': batch_detail['cron_stmt'],
- '_metadata': simplejson.dumps(batch_detail['initial_metadata'])})
- batches.add(batch)
- for batch in Batch.objects.all():
- if not batch in batches:
- batch.delete()
-
-def run_batch(local_name, provider_name, method_name, tee_to_stdout=True):
- # This will force the loading of the molly.utils app, attaching its log
- # handler lest the batch logs anything that needs e-mailing.
- app_by_application_name('molly.utils')
-
- batch = Batch.objects.get(
- local_name=local_name,
- provider_name=provider_name,
- method_name=method_name)
-
- batch.run(tee_to_stdout)
-
- return batch.log
-
-def _escape(s):
- return s.replace('\\', '\\\\').replace('"', '\\"')
-
-if os.name == 'nt':
- def create_crontab(filename):
- def next_hour(minute):
- if datetime.now().time().minute < int(minute):
- return datetime.now().hour
- else:
- return str((datetime.now() + timedelta(hours=1)).hour).rjust(2, '0')
-
- load_batches()
-
- # Use the Windows task scheduler to create tasks
- # http://support.microsoft.com/kb/814596
- for batch in Batch.objects.all():
- # Delete any old scheduling of this task
- call(['schtasks', '/delete', '/f', '/tn', batch.provider_name])
-
- # Try and convert a cron statement into a Windows one...
- minute, hour, day, month, dow = batch.cron_stmt.split()
-
- # This doesn't capture all cases, but it does capture the ones
- # that ship with Molly, and most common ones. Please report any
- # bugs that affect you at issues.mollyproject.org
- if month != '*':
- args = ['/sc', 'MONTHLY']
- args += ['/m', ','.join(map(lambda mon: {
- '1': 'JAN',
- '2': 'FEB',
- '3': 'MAR',
- '4': 'APR',
- '5': 'MAY',
- '6': 'JUN',
- '7': 'JUL',
- '8': 'AUG',
- '9': 'SEP',
- '10': 'OCT',
- '11': 'NOV',
- '12': 'DEC',
- }.get(mon, mon), month.split(',')))]
- args += ['/d', day,
- '/st', '%s:%s:00' % (hour.rjust(2, '0'), minute.rjust(2, '0'))]
-
- elif day != '*':
- args = ['/sc', 'MONTHLY',
- '/d', day,
- '/st', '%s:%s:00' % (hour.rjust(2, '0'), minute.rjust(2, '0'))]
-
- elif dow != '*':
- args = ['/sc', 'WEEKLY',
- '/d', {
- '0': 'SUN',
- '1': 'MON',
- '2': 'TUE',
- '3': 'WED',
- '4': 'THU',
- '5': 'FRI',
- '6': 'SAT',
- '7': 'SUN',
- }.get(dow, dow.upper()),
- '/st', '%s:%s:00' % (hour.rjust(2, '0'), minute.rjust(2, '0'))]
-
- elif hour != '*':
- if '/' in hour or ',' in hour:
- if '/' in hour:
- times, frequency = hour.split('/')
- times = times.split('-')[0]
- else:
- times, second = hour.split(',')[:2]
- frequency = str(int(second) - int(times))
-
- args = ['/sc', 'HOURLY',
- '/mo', frequency,
- '/st', '%s:%s:00' % (next_hour(minute), minute.rjust(2, '0'))]
-
- else:
- args = ['/sc', 'DAILY',
- '/st', '%s:%s:00' % (hour.rjust(2, '0'), minute.rjust(2, '0'))]
-
- elif minute != '*':
- if '/' in minute or ',' in minute:
- if '/' in minute:
- times, frequency = minute.split('/')
- times = times.split('-')[0]
- else:
- times, second = minute.split(',')[:2]
- frequency = str(int(second) - int(times))
-
- args = ['/sc', 'MINUTE',
- '/mo', frequency,
- '/st', '%s:%s:00' % (next_hour(times), times.rjust(2, '0'))]
-
- else:
- # Can't guarantee when on the hour this runs, from what I
- # can see?
- args = ['/sc', 'HOURLY',
- '/st', '%s:%s:00' % (next_hour(minute), minute.rjust(2, '0'))]
-
- else:
- args = ['/sc', 'MINUTE']
-
- try:
- project_path = imp.find_module(os.environ['DJANGO_SETTINGS_MODULE'].split('.')[0])[1]
- except ImportError:
- project_path = os.path.dirname(imp.find_module('settings')[1])
-
- command = "'%s' '%s' run_batch '%s' '%s' '%s'" % (
- os.path.join(os.path.dirname(sys.executable), 'pythonw.exe'),
- os.path.abspath(os.path.join(project_path, 'manage.py')),
- batch.local_name,
- batch.provider_name,
- batch.method_name,
- )
-
- # Create a new task
- check_call(['schtasks', '/create',
- '/tn', batch.provider_name, # Task name
- '/tr', command, # Command to run
- ] + args)
-
-else:
-
- def create_crontab(filename, include_user=False):
- """
- If include_user is True, generates a cron file with a user column,
- suitable for use in /etc/cron.d
- """
- load_batches()
-
- sys_path = get_norm_sys_path()
-
- f = open(filename, 'w') if isinstance(filename, basestring) else filename
- f.write("# Generated by Molly. Do not edit by hand, or else your changes\n")
- f.write("# will be overwritten.\n\n")
- f.write('MAILTO="%s"\n' % ','.join(l[1] for l in settings.ADMINS))
- f.write("DJANGO_SETTINGS_MODULE=%s\n" % os.environ['DJANGO_SETTINGS_MODULE'])
- f.write("PYTHONPATH=%s\n\n" % ':'.join(sys_path))
-
- for batch in Batch.objects.all():
- if not batch.enabled:
- continue
-
- line_args = {
- 'time': batch.cron_stmt.ljust(20),
- 'user': '',
- 'python': sys.executable,
- 'run_batch': os.path.abspath(os.path.join(os.path.dirname(__file__), 'scripts', 'run_batch.py')),
- 'batch_local_name': _escape(batch.local_name),
- 'batch_provider_name': _escape(batch.provider_name),
- 'batch_method_name': _escape(batch.method_name),
- }
- if include_user:
- line_args['user'] = getpass.getuser()
-
- f.write(
- '%(time)s %(user)s %(python)s %(run_batch)s '
- '"%(batch_local_name)s" "%(batch_provider_name)s" '
- '"%(batch_method_name)s"\n' % line_args)
-
- f.close()
View
23 molly/batch_processing/admin.py
@@ -1,23 +0,0 @@
-import threading
-
-from models import Batch
-from django.contrib import admin
-from django.utils.translation import ugettext as _
-
-def run_batch(modeladmin, request, queryset):
- for batch in queryset:
- if not batch.currently_running:
- batch.pending = True
- batch.save()
- thread = threading.Thread(target=batch.run)
- thread.daemon = True
- thread.start()
-run_batch.short__description = _("Run selected batch jobs")
-
-class BatchAdmin(admin.ModelAdmin):
- list_display = ['title', 'cron_stmt', 'enabled', 'last_run',
- 'last_run_failed', 'pending', 'currently_running']
- ordering = ['title']
- actions = [run_batch]
-
-admin.site.register(Batch, BatchAdmin)
View
0  molly/batch_processing/management/__init__.py
No changes.
View
0  molly/batch_processing/management/commands/__init__.py
No changes.
View
70 molly/batch_processing/management/commands/create_crontab.py
@@ -1,70 +0,0 @@
-from optparse import make_option
-from StringIO import StringIO
-from subprocess import Popen, PIPE, call
-import errno
-import os
-import sys
-
-from django.core.management.base import NoArgsCommand
-
-from molly.batch_processing import create_crontab
-
-class Command(NoArgsCommand):
- help = "Generates a crontab"
-
- option_list = NoArgsCommand.option_list + (
- make_option('--pipe-to-crontab',
- action='store_true',
- dest='pipe_to_crontab',
- default=False,
- help='Pipe the cron output directly to the crontab command'),
- make_option('--write-system-cron',
- action='store_true',
- dest='write_system_cron',
- default=False,
- help=(
- "Write a system cronfile and link it to /etc/cron.d. "
- "This option and --pipe-to-crontab are mutually "
- "exclusive, so don't call both")),
- )
-
- def handle(self, *args, **options):
- if options['pipe_to_crontab'] and options['write_system_cron']:
- raise RuntimeError("Can't use --pipe-to-crontab and --write-system-cron together")
- if options['pipe_to_crontab']:
- f = StringIO()
- elif options['write_system_cron']:
- # Write a cronfile to ~/.cron
- homedir = os.getenv('USERPROFILE') or os.getenv('HOME')
- outdir = os.path.join(homedir, '.cron')
- try:
- os.mkdir(outdir)
- except OSError as e:
- # Just ignore if the directory already exists
- if e.errno != errno.EEXIST:
- raise e
- filename = os.path.join(outdir, 'molly.cron')
- if os.path.exists(filename):
- os.remove(filename)
- f = open(filename, 'w')
-
- # Try to make our cron file owned by root and symlink it
- print "Attempting to give root ownership of %(filename)s and symlink it to /etc/cron.d/" % { 'filename': filename }
- cmd = "sudo sh -c 'chown root %(filename)s && ln -sf %(filename)s /etc/cron.d/'" % { 'filename': filename }
- returncode = call(cmd, shell=True)
- if returncode:
- print("Couldn't install your cronfile. File written to %(filename)s, try running:\n\n\t%(cmd)s" % {
- 'filename': filename,
- 'cmd': cmd,
- })
-
- elif len(args) == 0:
- f = sys.stdout
- else:
- f = open(args[0], 'w')
-
- create_crontab(f, include_user=options['write_system_cron'])
-
- if options['pipe_to_crontab']:
- cron = Popen('crontab', stdin=PIPE)
- cron.communicate(input=f.getvalue())
View
11 molly/batch_processing/management/commands/run_batch.py
@@ -1,11 +0,0 @@
-import sys
-
-from django.core.management.base import NoArgsCommand
-
-from molly.batch_processing import run_batch
-
-class Command(NoArgsCommand):
- help = "Runs a batch job"
-
- def handle(self, *args, **options):
- run_batch(*args[0:3])
View
53 molly/batch_processing/migrations/0001_initial.py
@@ -1,53 +0,0 @@
-# encoding: utf-8
-import datetime
-from south.db import db
-from south.v2 import SchemaMigration
-from django.db import models
-
-class Migration(SchemaMigration):
-
- def forwards(self, orm):
-
- # Adding model 'Batch'
- db.create_table('batch_processing_batch', (
- ('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
- ('title', self.gf('django.db.models.fields.TextField')()),
- ('local_name', self.gf('django.db.models.fields.TextField')()),
- ('provider_name', self.gf('django.db.models.fields.TextField')()),
- ('method_name', self.gf('django.db.models.fields.TextField')()),
- ('cron_stmt', self.gf('django.db.models.fields.TextField')()),
- ('enabled', self.gf('django.db.models.fields.BooleanField')(default=True)),
- ('_metadata', self.gf('django.db.models.fields.TextField')(default='null')),
- ('last_run', self.gf('django.db.models.fields.DateTimeField')(null=True, blank=True)),
- ('pending', self.gf('django.db.models.fields.BooleanField')(default=False)),
- ('currently_running', self.gf('django.db.models.fields.BooleanField')(default=False)),
- ('log', self.gf('django.db.models.fields.TextField')(blank=True)),
- ))
- db.send_create_signal('batch_processing', ['Batch'])
-
-
- def backwards(self, orm):
-
- # Deleting model 'Batch'
- db.delete_table('batch_processing_batch')
-
-
- models = {
- 'batch_processing.batch': {
- 'Meta': {'object_name': 'Batch'},
- '_metadata': ('django.db.models.fields.TextField', [], {'default': "'null'"}),
- 'cron_stmt': ('django.db.models.fields.TextField', [], {}),
- 'currently_running': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
- 'enabled': ('django.db.models.fields.BooleanField', [], {'default': 'True'}),
- 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
- 'last_run': ('django.db.models.fields.DateTimeField', [], {'null': 'True', 'blank': 'True'}),
- 'local_name': ('django.db.models.fields.TextField', [], {}),
- 'log': ('django.db.models.fields.TextField', [], {'blank': 'True'}),
- 'method_name': ('django.db.models.fields.TextField', [], {}),
- 'pending': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
- 'provider_name': ('django.db.models.fields.TextField', [], {}),
- 'title': ('django.db.models.fields.TextField', [], {})
- }
- }
-
- complete_apps = ['batch_processing']
View
40 molly/batch_processing/migrations/0002_auto__add_field_batch_last_run_failed.py
@@ -1,40 +0,0 @@
-# encoding: utf-8
-import datetime
-from south.db import db
-from south.v2 import SchemaMigration
-from django.db import models
-
-class Migration(SchemaMigration):
-
- def forwards(self, orm):
-
- # Adding field 'Batch.last_run_failed'
- db.add_column('batch_processing_batch', 'last_run_failed', self.gf('django.db.models.fields.BooleanField')(default=False), keep_default=False)
-
-
- def backwards(self, orm):
-
- # Deleting field 'Batch.last_run_failed'
- db.delete_column('batch_processing_batch', 'last_run_failed')
-
-
- models = {
- 'batch_processing.batch': {
- 'Meta': {'object_name': 'Batch'},
- '_metadata': ('django.db.models.fields.TextField', [], {'default': "'null'"}),
- 'cron_stmt': ('django.db.models.fields.TextField', [], {}),
- 'currently_running': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
- 'enabled': ('django.db.models.fields.BooleanField', [], {'default': 'True'}),
- 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
- 'last_run': ('django.db.models.fields.DateTimeField', [], {'null': 'True', 'blank': 'True'}),
- 'last_run_failed': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
- 'local_name': ('django.db.models.fields.TextField', [], {}),
- 'log': ('django.db.models.fields.TextField', [], {'blank': 'True'}),
- 'method_name': ('django.db.models.fields.TextField', [], {}),
- 'pending': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
- 'provider_name': ('django.db.models.fields.TextField', [], {}),
- 'title': ('django.db.models.fields.TextField', [], {})
- }
- }
-
- complete_apps = ['batch_processing']
View
0  molly/batch_processing/migrations/__init__.py
No changes.
View
98 molly/batch_processing/models.py
@@ -1,98 +0,0 @@
-import simplejson
-import traceback
-import sys
-import logging
-from datetime import datetime
-from StringIO import StringIO
-
-from django.db import models, DatabaseError, transaction
-
-from molly.conf import all_apps, app_by_local_name
-
-logger = logging.getLogger(__name__)
-
-class TeeStringIO(StringIO):
- def __init__(self, *args, **kwargs):
- self.other = kwargs.pop('other')
- StringIO.__init__(self, *args, **kwargs)
-
- def write(self, *args, **kwargs):
- self.other.write(*args, **kwargs)
- StringIO.write(self, *args, **kwargs)
-
-class Batch(models.Model):
- title = models.TextField()
- local_name = models.TextField()
- provider_name = models.TextField()
- method_name = models.TextField()
- cron_stmt = models.TextField()
- enabled = models.BooleanField(default=True)
-
- _metadata = models.TextField(default='null')
- last_run = models.DateTimeField(null=True, blank=True)
- pending = models.BooleanField(default=False)
- currently_running = models.BooleanField(default=False)
- last_run_failed = models.BooleanField(default=False)
- log = models.TextField(blank=True)
-
- def get_metadata(self):
- try:
- return self.__metadata
- except AttributeError:
- self.__metadata = simplejson.loads(self._metadata)
- return self.__metadata
- def set_metadata(self, metadata):
- self.__metadata = metadata
- metadata = property(get_metadata, set_metadata)
-
- def save(self, *args, **kwargs):
- try:
- self._metadata = simplejson.dumps(self.__metadata)
- except AttributeError:
- pass
- super(Batch, self).save(*args, **kwargs)
-
- def run(self, tee_to_stdout=False):
- if self.currently_running:
- return
-
- try:
- output = TeeStringIO(other=sys.stdout) if tee_to_stdout else StringIO()
-
- self.currently_running = True
- self.pending = False
- self.save()
-
- providers = app_by_local_name(self.local_name).providers
- for provider in providers:
- if provider.class_path == self.provider_name:
- break
- else:
- raise AssertionError
-
- method = getattr(provider, self.method_name)
-
- self.metadata = method(self.metadata, output)
- except Exception as e:
- if isinstance(e, DatabaseError):
- transaction.rollback()
- if output.getvalue():
- output.write("\n\n")
- traceback.print_exc(file=output)
- if self.last_run_failed:
- log = logger.info('Batch %r threw an uncaught exception (repeat failure)' % self.title,
- exc_info=True)
- else:
- logger.exception('Batch %r threw an uncaught exception' % self.title)
- self.last_run_failed = True
- else:
- self.last_run_failed = False
- finally:
- self.log = output.getvalue()
- self.last_run = datetime.utcnow()
- self.currently_running = False
- self.save()
-
- def __unicode__(self):
- return self.title
-
View
8 molly/batch_processing/scripts/run_batch.py
@@ -1,8 +0,0 @@
-#!/usr/bin/env python
-
-import sys
-
-from molly.batch_processing import run_batch
-
-if __name__ == '__main__':
- run_batch(sys.argv[1], sys.argv[2], sys.argv[3], False)
View
18 molly/commands/site_template/settings.py
@@ -12,12 +12,24 @@
# This next block of code is preamble that is used later on, you can safely
# skip it:
-from oauth.oauth import OAuthSignatureMethod_PLAINTEXT
-import os, os.path, imp
-from molly.conf.settings import Application, extract_installed_apps, Authentication, ExtraBase, Provider
+import os
+import imp
+from molly.conf.settings import Application, extract_installed_apps
+from molly.conf.settings import ProviderConf as Provider
+from molly.conf.celery_util import prepare_celery
from molly.conf.default_settings import *
from molly.utils.media import get_compress_groups
+# Celery configuration
+BROKER_URL = "amqp://molly:molly@localhost:5672//"
+CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"
+CELERYD_CONCURRENCY = 1
+CELERY_RETRY_DELAY = 3 * 60
+CELERY_MAX_RETRIES = 3
+
+# Register Django-Celery and initialise our providers.
+prepare_celery()
+
# The following import and mimetypes.add_types correct the - possibly wrong - mime type of svg files
# in certain versions of Django.
import mimetypes
View
18 molly/conf/admin.py
@@ -0,0 +1,18 @@
+from celery.app import current_app
+from molly.conf.celery_util import init_providers
+from djcelery.admin import PeriodicTaskAdmin
+
+
+def run_now(modeladmin, request, queryset):
+ app = current_app()
+ for pt in queryset:
+ try:
+ app.tasks[pt.task].apply_async()
+ except KeyError:
+ init_providers()
+ app.tasks[pt.task].apply_async()
+run_now.short_description = "Place this task on queue to run now."
+
+
+class RunnablePeriodicTaskAdmin(PeriodicTaskAdmin):
+ actions = [run_now]
View
32 molly/conf/celery_util.py
@@ -0,0 +1,32 @@
+import djcelery
+
+from molly.conf.applications import all_apps
+from celery.signals import beat_init, worker_init
+
+
+def init_providers():
+ """Calls all the providers which in turns registers any Celery tasks
+ attached to that provider.
+ """
+ for app in all_apps():
+ for p in app.providers:
+ p()
+
+
+def celeryd_discover_tasks(sender=None, conf=None, **kwargs):
+ init_providers()
+
+
+def beat_update_schedule(sender=None, conf=None, **kwargs):
+ """Calling get_schedule will sync the schedule with any discovered tasks."""
+ celeryd_discover_tasks()
+ sender.get_scheduler().get_schedule()
+
+
+def prepare_celery():
+ """Runs the djcelery loader which is required to set the correct backend/scheduler
+ Then adds some signal callbacks to ensure all tasks are registered correctly.
+ """
+ djcelery.setup_loader()
+ beat_init.connect(beat_update_schedule)
+ worker_init.connect(celeryd_discover_tasks)
View
2  molly/conf/default_settings.py
@@ -91,10 +91,10 @@
'django.contrib.sites', # Django's sites API, this is a prerequisite for the comments API
'django.contrib.gis', # Geodjango - this is required
'django.contrib.comments', # Django's comments API - used in the feature vote app
- 'molly.batch_processing', # This is a part of Molly that handles the batch jobs
'django.contrib.staticfiles', # Staticfiles handles media for Molly
'pipeline', # Pipeline is an external library that minifies JS and CSS
'south', # South handles changes to database schema
+ 'djcelery', # Celery tasks run our periodic batch processing
)
# Don't do South migrations when running unit tests - just do a syncdb
View
128 molly/conf/provider.py
@@ -0,0 +1,128 @@
+try:
+ import json
+except ImportError:
+ import simplejson as json
+
+from django.conf import settings
+from djcelery.models import PeriodicTask as PerodicTaskModel
+from celery.task import PeriodicTask, Task
+
+
+class Provider(object):
+
+ def __call__(self):
+ return self
+
+ @classmethod
+ def register_tasks(cls, *args, **kwargs):
+ """Constructor, looks for decorated tasks and registers them with Celery,
+ this is done in a non-standard way as Celery tasks can only be functions, not methods.
+
+ Tasks with the run_every option set are subclasses of BatchTask which
+ handles storing a cache of metadata between each task execution.
+ """
+ ins = cls(*args, **kwargs)
+ for attr_name in dir(cls):
+ if callable(getattr(cls, attr_name)):
+ fun = getattr(cls, attr_name)
+ else:
+ continue
+ if hasattr(fun, 'task'):
+ # This is a decorated method
+ periodic_task = 'run_every' in fun.task
+ name = "%s.%s.%s" % (cls.__module__, cls.__name__, attr_name)
+ new_attr_name = '__task_%s' % attr_name
+ if periodic_task:
+ base = BatchTask
+
+ def run(self, **kwargs):
+ meth = getattr(self.provider, self.true_method)
+ metadata = self.get_metadata()
+ try:
+ return meth(**metadata)
+ except Exception, exc:
+ self.get_logger().warning(
+ "Exception raised, retrying: %s" % exc)
+ self.retry(exc=exc, countdown=self.default_retry_delay,
+ max_retries=self.max_retries)
+ else:
+ base = Task
+
+ def run(self, *args, **kwargs):
+ meth = getattr(self.provider, self.true_method)
+ try:
+ return meth(*args)
+ except Exception, exc:
+ self.get_logger().warning(
+ "Exception raised, retrying: %s" % exc)
+ self.retry(exc=exc, countdown=self.default_retry_delay,
+ max_retries=self.max_retries)
+
+ def __init__(self, provider=ins, base=base, kwargs=fun.task):
+ self.provider = provider
+ self.metadata = kwargs.get('initial_metadata', dict())
+ self.run_every = kwargs.get('run_every', None)
+ base.__init__(self) # Only 1 base class, so this is safe.
+ self.default_retry_delay = kwargs.get('default_retry_delay',
+ settings.CELERY_RETRY_DELAY)
+ self.max_retries = kwargs.get('max_retries',
+ settings.CELERY_MAX_RETRIES)
+ t = type(name, (base,), {'__init__': __init__,
+ '__module__': cls.__module__,
+ 'run': run,
+ 'name': name,
+ 'true_method': new_attr_name,
+ })
+ # We rename the decorated method to __task_<method_name>
+ # and set the old method to the Task instance.
+ setattr(ins, new_attr_name, getattr(ins, attr_name))
+ setattr(ins, attr_name, t())
+ return ins
+
+
+class BatchTask(PeriodicTask):
+ """Subclass of Celery's PeriodicTask which handles a local
+ cache of metadata. This metadata is stored in the kwargs field on the
+ djcelery.PeriodicTask model as JSON encoded kwargs.
+
+ Our task metadata represents the return values from each task execution.
+ This means you can cache (for example an ETag - see OSM provider) between
+ task runs and save making uncecessary calls. Metadata is provided as the
+ keyword arguments to all BatchTasks
+
+ Tasks decorated which don't specify 'run_every' cannot store metadata.
+ """
+ abstract = True
+
+ def get_metadata(self):
+ """Metadata getter, the null value for metadata is an empty dict"""
+ pt = PerodicTaskModel.objects.get(task=self.name)
+ metadata = pt.kwargs or self.metadata
+ if metadata:
+ return json.loads(metadata)
+ else:
+ return {}
+
+ def set_metadata(self, metadata, expires=None):
+ PerodicTaskModel.objects.filter(task=self.name).update(
+ kwargs=json.dumps(metadata))
+
+ def after_return(self, status, value, *args, **kwargs):
+ if value and isinstance(value, dict):
+ try:
+ self.set_metadata(value)
+ except:
+ self.get_logger().exception("Unable to store metadata.")
+
+
+def task(**kwargs):
+ """Sets a .task attribute on each function decorated, this indictes
+ this function should be registered as a task with Celery
+
+ TODO: Extend this functionality to implement a wrapping function to
+ capture the kwargs passed through by celery.
+ """
+ def dec(fun):
+ fun.task = kwargs
+ return fun
+ return dec
View
21 molly/conf/settings.py
@@ -31,7 +31,6 @@ def __init__(self, application_name, local_name, title, **kwargs):
self.extra_bases = kwargs.pop('extra_bases', ())
self.urlconf = kwargs.pop('urlconf', application_name+'.urls')
self.kwargs = kwargs
- self.batches = []
self.conf = None
kwargs['display_to_user'] = kwargs.get('display_to_user', True)
@@ -47,10 +46,10 @@ def get(self):
providers = []
for provider in self.providers:
- if isinstance(provider, Provider):
+ if isinstance(provider, ProviderConf):
providers.append(provider())
else:
- providers.append(Provider(provider)())
+ providers.append(ProviderConf(provider)())
bases = tuple(base() for base in self.extra_bases)
if self.secure:
@@ -88,8 +87,8 @@ def get(self):
for key in self.kwargs:
if key != 'provider' and key.endswith('provider'):
provider = self.kwargs[key]
- if not isinstance(provider, Provider):
- provider = Provider(provider)
+ if not isinstance(provider, ProviderConf):
+ provider = ProviderConf(provider)
providers.append(provider())
self.kwargs[key] = provider()
self.conf = type(self.local_name.capitalize()+'Conf', (ApplicationConf,), self.kwargs)()
@@ -206,7 +205,7 @@ def __call__(self):
def extract_installed_apps(applications):
return tuple(app.application_name for app in applications)
-class Provider(object):
+class ProviderConf(object):
def __init__(self, klass, **kwargs):
self.klass, self.kwargs = klass, kwargs
@@ -217,14 +216,6 @@ def __call__(self):
mod_name, cls_name = self.klass.rsplit('.', 1)
module = import_module(mod_name)
klass = getattr(module, cls_name)
- self._provider = klass(**self.kwargs)
+ self._provider = klass.register_tasks(**self.kwargs)
self._provider.class_path = self.klass
return self._provider
-
-def batch(cron_stmt, initial_metadata={}):
- def g(f):
- f.is_batch = True
- f.cron_stmt = cron_stmt
- f.initial_metadata = initial_metadata
- return f
- return g
View
7 molly/geolocation/providers/__init__.py
@@ -1,4 +1,7 @@
-class BaseGeolocationProvider(object):
+from molly.conf.provider import Provider
+
+
+class BaseGeolocationProvider(Provider):
def reverse_geocode(self, lon, lat):
@@ -9,4 +12,4 @@ def geocode(self, query):
return []
from cloudmade import CloudmadeGeolocationProvider
-from places import PlacesGeolocationProvider
+from places import PlacesGeolocationProvider
View
4 molly/urls.py
@@ -1,13 +1,17 @@
from django.conf.urls.defaults import *
from django.conf import settings
from django.contrib import admin
+from djcelery.models import PeriodicTask
+from molly.conf.admin import RunnablePeriodicTaskAdmin
from molly.conf import applications, all_apps
from molly.utils.views import ReverseView
from molly.utils.i18n import SetLanguageView, javascript_catalog
# Admin
admin.autodiscover()
+admin.site.unregister(PeriodicTask)
+admin.site.register(PeriodicTask, RunnablePeriodicTaskAdmin)
urlpatterns = patterns('',
(r'^adm/', include(admin.site.urls)), # Admin site
View
13 molly/utils/management/commands/deploy.py
@@ -12,15 +12,9 @@ class Command(NoArgsCommand):
dest='develop',
default=False,
help='Create symlinks, rather than copy, existing media, then start the dev server'),
- ) + (
- make_option('--skip-cron',
- action='store_true',
- dest='skip_cron',
- default=False,
- help='Skip creating a crontab'),
)
-
- def handle_noargs(self, skip_cron, develop, **options):
+
+ def handle_noargs(self, develop, **options):
call_command('sync_and_migrate')
try:
from molly.wurfl import wurfl_data
@@ -36,8 +30,5 @@ def handle_noargs(self, skip_cron, develop, **options):
# have been changed...
call_command('synccompress')
call_command('synccompress', force=True)
- if not skip_cron:
- call_command('create_crontab', write_system_cron=(os.name != 'nt'))
if develop:
call_command('runserver')
-
View
4 setup.py
@@ -103,7 +103,9 @@
"django-slimmer",
'pyyaml',
'icalendar==2.2',
- 'mockito'
+ 'mockito',
+ 'celery==2.5.3',
+ 'django-celery==2.5.5',
],
scripts = [
'molly/commands/molly-admin'
Please sign in to comment.
Something went wrong with that request. Please try again.