Skip to content

Commit

Permalink
Merge pull request #86 from sontek/broker_options
Browse files Browse the repository at this point in the history
Implement transport broker options
  • Loading branch information
sontek committed Feb 23, 2021
2 parents ea410e3 + 010125e commit de21e63
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 2 deletions.
7 changes: 7 additions & 0 deletions README.rst
Expand Up @@ -78,11 +78,18 @@ An example ini configuration looks like this:
imports = app1.tasks
app2.tasks
[celery:broker_transport_options]
visibility_timeout = 18000
max_retries = 5
[celerybeat:task1]
task = app1.tasks.Task1
type = crontab
schedule = {"minute": 0}
You'll notice the configuration options that are dictionaries or have
multiple values will be split into their own sections.

Scheduled/Periodic Tasks
-----------------------------
To use celerybeat (periodic tasks) you need to declare 1 ``celerybeat`` config
Expand Down
25 changes: 23 additions & 2 deletions pyramid_celery/loaders.py
Expand Up @@ -22,6 +22,10 @@ def timedelta(value):
'timedelta': timedelta,
'integer': int
}
BROKER_TRANSPORT_OPTIONS_MAP = {
'visibility_timeout': int,
'max_retries': int,
}


def safe_json(get, section, key):
Expand Down Expand Up @@ -127,14 +131,31 @@ def read_configuration(self, fail_silently=True):

beat_config = {}
route_config = {}
dict_settings = [
'broker_transport_options',
]

for section in self.parser.sections():
sections = section.split(':', 1)
# [celery] -> name=celery
# [celery:options] -> name=options
# [celery:options:task] -> name=options:task

name = sections[0]
if len(sections) > 1:
name = sections[1]

if section.startswith('celerybeat:'):
name = section.split(':', 1)[1]
beat_config[name] = get_beat_config(self.parser, section)
elif section.startswith('celeryroute:'):
name = section.split(':', 1)[1]
route_config[name] = get_route_config(self.parser, section)
elif name in dict_settings:
options = {}
for key, value in self.parser.items(section):
if key in BROKER_TRANSPORT_OPTIONS_MAP:
options[key] = BROKER_TRANSPORT_OPTIONS_MAP[key](value)

config_dict[name] = options

if beat_config:
config_dict['beat_schedule'] = beat_config
Expand Down
5 changes: 5 additions & 0 deletions tests/configs/dev.ini
Expand Up @@ -4,6 +4,7 @@ celery_imports = myapp.tasks
otherapp.tasks
result_backend = sqlite:///celery_results.db
result_serializer = json

accept_content =
json
xml
Expand All @@ -14,6 +15,10 @@ admins =
john,john@initrode.example
exceptions,exceptions@majortech.example

[celery:broker_transport_options]
visibility_timeout = 18000
max_retries = 5

[celerybeat:task1]
# Execute every hour
task = myapp.tasks.Task1
Expand Down
4 changes: 4 additions & 0 deletions tests/test_celery.py
Expand Up @@ -16,6 +16,10 @@ def test_includeme_custom_config():
config.configure_celery('tests/configs/dev.ini')
assert celery_app.conf['broker_url'] == 'redis://localhost:1337/0'
assert celery_app.conf['timezone'] == 'America/Los_Angeles'
assert celery_app.conf['broker_transport_options'] == {
'visibility_timeout': 18000,
'max_retries': 5,
}


@pytest.mark.unit
Expand Down

0 comments on commit de21e63

Please sign in to comment.