Skip to content
This repository has been archived by the owner on Mar 10, 2020. It is now read-only.

Commit

Permalink
Merge pull request #33 from praekelt/feature/reduce-database-calls-fo…
Browse files Browse the repository at this point in the history
…r-delivering-schedules

Reduce database calls for delivering schedules
  • Loading branch information
rudigiesler committed May 18, 2018
2 parents e3f9789 + 68deef3 commit fceede6
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 48 deletions.
7 changes: 6 additions & 1 deletion scheduler/management/commands/trigger_deliver_tasks.py
Expand Up @@ -177,7 +177,12 @@ def confirm(prompt):
else:
self.stdout.write('%s' % (schedule,))
DeliverTask.apply_async(
kwargs={"schedule_id": str(schedule.id)})
kwargs={
"schedule_id": str(schedule.id),
"auth_token": schedule.auth_token,
"endpoint": schedule.endpoint,
"payload": schedule.payload,
})

def get_addresses(self, identity, default_addr_type):
details = identity.get('details', {})
Expand Down
1 change: 1 addition & 0 deletions scheduler/serializers.py
Expand Up @@ -39,6 +39,7 @@ class HookSerializer(serializers.ModelSerializer):
class Meta:
model = Hook
read_only_fields = ('user',)
fields = '__all__'


class ScheduleFailureSerializer(serializers.HyperlinkedModelSerializer):
Expand Down
52 changes: 16 additions & 36 deletions scheduler/tasks.py
Expand Up @@ -5,7 +5,6 @@
from celery.task import Task
from celery.utils.log import get_task_logger
from django.conf import settings
from django.db import connection, transaction
from django.utils.timezone import now
from djcelery.models import CrontabSchedule, IntervalSchedule
from seed_services_client.metrics import MetricsApiClient
Expand Down Expand Up @@ -55,7 +54,7 @@ class DeliverTask(Task):
default_retry_delay = 5
max_retries = 5

def run(self, schedule_id, **kwargs):
def run(self, schedule_id, auth_token, endpoint, payload, **kwargs):
"""
Runs an instance of a scheduled task
"""
Expand All @@ -66,21 +65,20 @@ def run(self, schedule_id, **kwargs):
else:
retry_delay = self.default_retry_delay

schedule = Schedule.objects.get(id=schedule_id)
headers = {"Content-Type": "application/json"}
if schedule.auth_token is not None:
headers["Authorization"] = "Token %s" % schedule.auth_token
if auth_token is not None:
headers["Authorization"] = "Token %s" % auth_token
try:
response = requests.post(
url=schedule.endpoint,
data=json.dumps(schedule.payload),
url=endpoint,
data=json.dumps(payload),
headers=headers,
timeout=settings.DEFAULT_REQUEST_TIMEOUT
)
# Expecting a 201, raise for errors.
response.raise_for_status()
except requests_exceptions.ConnectionError as exc:
l.info('Connection Error to endpoint: %s' % schedule.endpoint)
l.info('Connection Error to endpoint: %s' % endpoint)
fire_metric.delay('scheduler.deliver_task.connection_error.sum', 1)
self.retry(exc=exc, countdown=retry_delay)
except requests_exceptions.HTTPError as exc:
Expand Down Expand Up @@ -169,32 +167,11 @@ def run(self, schedule_type, lookup_id, **kwargs):
task_run.save()
# create tasks for each active schedule
queued = 0
schedules = schedules.values('id')
with transaction.atomic(), connection.cursor() as cur:
# A named cursor is declared here to make psycopg2 use a server
# side cursor. The SSC prevents the entire result set from being
# loaded into memory.
# NOTE: this can be replaced with just a call to a queryset's
# iterator() method in Django 1.11 as that directly supports using
# a SSC.
query = str(schedules.query)
cursor_name = '_cur_queue_tasks_{uuid}'.format(uuid=uuid4().hex)
cur.execute(
"DECLARE {cursor_name} CURSOR FOR {query}".format(
cursor_name=cursor_name,
query=query
),
{'lookup_id': lookup_id}
)
while True:
cur.execute("FETCH 10000 FROM {0}".format(cursor_name))
chunk = cur.fetchall()
if not chunk:
break
for row in chunk:
DeliverTask.apply_async(
kwargs={"schedule_id": str(row[0])})
queued += 1
schedules = schedules.values('id', 'auth_token', 'endpoint', 'payload')
for schedule in schedules.iterator():
schedule['schedule_id'] = str(schedule.pop('id'))
DeliverTask.apply_async(kwargs=schedule)
queued += 1

task_run.completed_at = now()
task_run.save()
Expand Down Expand Up @@ -247,10 +224,13 @@ def run(self, **kwargs):
l.info("Attempting to requeue <%s> failed schedules" %
failures.count())
for failure in failures.iterator():
schedule_id = str(failure.schedule_id)
schedule = Schedule.objects.values(
'id', 'auth_token', 'endpoint', 'payload')
schedule = schedule.get(id=failure.schedule_id)
schedule['schedule_id'] = str(schedule.pop('id'))
# Cleanup the failure before requeueing it.
failure.delete()
deliver_task.delay(schedule_id)
DeliverTask.apply_async(kwargs=schedule)


requeue_failed_tasks = RequeueFailedTasks()
6 changes: 5 additions & 1 deletion scheduler/tests.py
Expand Up @@ -431,7 +431,11 @@ def test_deliver_task(self):

# Execute
result = deliver_task.apply_async(kwargs={
"schedule_id": str(schedule.id)})
"schedule_id": str(schedule.id),
"auth_token": schedule.auth_token,
"endpoint": schedule.endpoint,
"payload": schedule.payload,
})

# Check
self.assertEqual(result.get(), True)
Expand Down
4 changes: 3 additions & 1 deletion seed_scheduler/settings.py
Expand Up @@ -145,7 +145,9 @@
'DEFAULT_PERMISSION_CLASSES': (
'rest_framework.permissions.IsAuthenticated',
),
'DEFAULT_FILTER_BACKENDS': ('rest_framework.filters.DjangoFilterBackend',)
'DEFAULT_FILTER_BACKENDS': (
'django_filters.rest_framework.DjangoFilterBackend',
),
}

# Webhook event definition
Expand Down
11 changes: 5 additions & 6 deletions seed_scheduler/urls.py
@@ -1,20 +1,19 @@
import os
from django.conf.urls import patterns, include, url
from django.conf.urls import include, url
from django.contrib import admin
from scheduler import views
from rest_framework.authtoken.views import obtain_auth_token

admin.site.site_header = os.environ.get('SCHEDULER_TITLE', 'Scheduler Admin')


urlpatterns = patterns(
'',
urlpatterns = [
url(r'^admin/', include(admin.site.urls)),
url(r'^api/auth/',
include('rest_framework.urls', namespace='rest_framework')),
url(r'^api/token-auth/',
'rest_framework.authtoken.views.obtain_auth_token'),
url(r'^api/token-auth/', obtain_auth_token),
url(r'^api/metrics/', views.MetricsView.as_view()),
url(r'^api/health/', views.HealthcheckView.as_view()),
url(r'^', include('scheduler.urls')),
url(r'^docs/', include('rest_framework_docs.urls')),
)
]
6 changes: 3 additions & 3 deletions setup.py
Expand Up @@ -36,12 +36,12 @@ def get_version(package):
packages=find_packages(),
include_package_data=True,
install_requires=[
'Django==1.9.12',
'djangorestframework==3.3.2',
'Django==1.11.13',
'djangorestframework==3.7.7',
'dj-database-url==0.3.0',
'psycopg2==2.7.1',
'raven==5.32.0',
'django-filter==0.12.0',
'django-filter==1.0.2',
'whitenoise==2.0.6',
'celery==3.1.24',
'django-celery==3.1.17',
Expand Down

0 comments on commit fceede6

Please sign in to comment.