Skip to content
This repository has been archived by the owner on Oct 5, 2021. It is now read-only.

Commit

Permalink
restore fire_th to let the choice to use Celery or not + Fix Celery c…
Browse files Browse the repository at this point in the history
…onfig for crontab
  • Loading branch information
Olivier Demah committed Jun 15, 2015
1 parent 1db7432 commit 1b9e1ea
Show file tree
Hide file tree
Showing 7 changed files with 293 additions and 69 deletions.
72 changes: 60 additions & 12 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,59 @@ For each TriggerHappy component, define one cache like below
},
CELERY
~~~~~~
Celery will handle tasks itself to populate the cache from provider services
and then exploit it to publish the data to the expected consumer services
From Settings
-------------
Define the broker then the scheduler
.. code:: python
BROKER_URL = 'redis://localhost:6379/0'
CELERYBEAT_SCHEDULE = {
'add-read-data': {
'task': 'django_th.tasks.read_data',
'schedule': crontab(minute='*/27'),
},
'add-publish-data': {
'task': 'django_th.tasks.publish_data',
'schedule': crontab(minute='*/59'),
},
}
From SUPERVISORD
----------------
.. code:: python
[program:django_th_worker]
user = foxmask
directory=/home/projects/trigger-happy/th
command=/home/projects/trigger-happy/bin/celery -A th worker --autoscale=10,3 -l info
autostart=true
autorestart=true
redirect_stderr=true
stdout_logfile=/home/projects/trigger-happy/logs/trigger-happy.log
stderr_logfile=/home/projects/trigger-happy/logs/trigger-happy-err.log
[program:django_th_beat]
user = foxmask
directory=/home/projects/trigger-happy/th
command=/home/projects/trigger-happy/bin/celery -A th beat -l info
autostart=true
autorestart=true
redirect_stderr=true
stdout_logfile=/home/projects/trigger-happy/logs/trigger-happy.log
stderr_logfile=/home/projects/trigger-happy/logs/trigger-happy-err.log
Setting up : Administration
===========================
Expand Down Expand Up @@ -228,26 +281,21 @@ For example :
* page 3 : the user gives a description
Fire the Triggers :
===================
Fire the Triggers by hands :
============================
Here are the available management commands :
Here are the available management commands to use if you dont plan to use Celery :
.. code:: python
Available subcommands:
[django_th]
fire_th
To start handling the queue of triggers you/your users configured, just set the management commands fire_th in a crontab or any other scheduler solution of your choice.
.. code:: python
fire_th # will read cache and publish data
fire_read_data # will put date in cache
manage.py fire_th # will the enchain both read and publish
manage.py fire_read_data # will get the data from any service and put them in cache
manage.py fire_publish_data # will read the data from the cache and put them to another service
To start handling the queue of triggers you/your users configured, just set those 2 management commands in a crontab or any other scheduler solution of your choice, if you dont want to use Celery
Also : Keep in mind to avoid to set a too short duration between 2 run to avoid to be blocked by the externals services (by their rate limitation) you/your users want to reach.
Expand Down
23 changes: 5 additions & 18 deletions django_th/celery.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,15 @@
from __future__ import absolute_import

from __future__ import absolute_import
import os

from celery import Celery
from celery.schedules import crontab
from django.conf import settings

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_th.settings')

app = Celery('django_th')
app = Celery('th')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

# will start the 2 tasks,
# one each 41 minutes
# the other one each hour
CELERYBEAT_SCHEDULE = {
'add-read-data': {
'task': 'tasks.read_data',
'schedule': crontab(minute='*/41'),
},
'add-publish-data': {
'task': 'tasks.publish_data',
'schedule': crontab(hour='*/1'),
},
}
11 changes: 11 additions & 0 deletions django_th/local_settings_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,16 @@
# CELERY
BROKER_URL = 'redis://localhost:6379/0'

CELERYBEAT_SCHEDULE = {
'add-read-data': {
'task': 'django_th.tasks.read_data',
'schedule': crontab(minute='*/27'),
},
'add-publish-data': {
'task': 'django_th.tasks.publish_data',
'schedule': crontab(minute='*/59'),
},
}

# REDISBOARD
REDISBOARD_DETAIL_FILTERS = ['.*']
22 changes: 0 additions & 22 deletions django_th/management/commands/fire_publish_data.py

This file was deleted.

71 changes: 63 additions & 8 deletions django_th/management/commands/fire_read_data.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,77 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# coding: utf-8
from __future__ import unicode_literals

from django.core.management.base import BaseCommand
from django.utils.log import getLogger

from django_th.tasks import read_data

from django_th.services import default_provider
from django_th.models import TriggerService
from django.utils.log import getLogger
# create logger
logger = getLogger('django_th.trigger_happy')


class Command(BaseCommand):

help = 'Trigger all the services and put them in cache'
help = 'Trigger all the services to put them in cache'

def handle(self, *args, **options):
"""
get all the triggers that need to be handled
run the main process
"""
read_data.delay()
default_provider.load_services()
trigger = TriggerService.objects.filter(status=True).select_related(
'consumer__name', 'provider__name')

for service in trigger:
self.put_in_cache.delay(service)

def put_in_cache(self, service):
# flag to know if we have to update
to_update = False
# flag to get the status of a service
status = False
# counting the new data to store to display them in the log
# provider - the service that offer data
service_name = str(service.provider.name.name)
service_provider = default_provider.get_service(service_name)

service_name = str(service.consumer.name.name)

# check if the service has already been triggered
# if date_triggered is None, then it's the first run
if service.date_triggered is None:
logger.debug("first run for %s => %s " % (
str(service.provider.name), str(service.consumer.name.name)))
to_update = True
status = True
# run run run
else:
# 1) get the data from the provider service
# get a timestamp of the last triggered of the service
datas = getattr(service_provider, 'read_data')(
service.provider.token,
service.id,
service.date_triggered)
if len(datas) > 0:
to_update = True
status = True

# update the date of the trigger at the end of the loop
sentence = "user: {} - provider: {} - {}"
if to_update:
if status:
logger.info((sentence + " - {} data put in cache").format(
service.user,
service.provider.name.name,
service.description,
len(datas)))
else:
logger.info((sentence + " AN ERROR OCCURS ").format(
service.user,
service.provider.name.name,
service.description))
else:
logger.info((sentence + " nothing new").format(
service.user,
service.provider.name.name,
service.description))

0 comments on commit 1b9e1ea

Please sign in to comment.