From 53ba58a655f186053bee7b968b38f0b2a92fd1db Mon Sep 17 00:00:00 2001 From: Rudi Giesler Date: Fri, 18 May 2018 09:20:35 +0200 Subject: [PATCH 1/4] Upgrade to django 1.11 --- scheduler/serializers.py | 1 + seed_scheduler/settings.py | 4 +++- seed_scheduler/urls.py | 11 +++++------ setup.py | 6 +++--- 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/scheduler/serializers.py b/scheduler/serializers.py index 9b626c6..1f875ed 100644 --- a/scheduler/serializers.py +++ b/scheduler/serializers.py @@ -39,6 +39,7 @@ class HookSerializer(serializers.ModelSerializer): class Meta: model = Hook read_only_fields = ('user',) + fields = '__all__' class ScheduleFailureSerializer(serializers.HyperlinkedModelSerializer): diff --git a/seed_scheduler/settings.py b/seed_scheduler/settings.py index 9606088..c5b6d95 100644 --- a/seed_scheduler/settings.py +++ b/seed_scheduler/settings.py @@ -145,7 +145,9 @@ 'DEFAULT_PERMISSION_CLASSES': ( 'rest_framework.permissions.IsAuthenticated', ), - 'DEFAULT_FILTER_BACKENDS': ('rest_framework.filters.DjangoFilterBackend',) + 'DEFAULT_FILTER_BACKENDS': ( + 'django_filters.rest_framework.DjangoFilterBackend', + ), } # Webhook event definition diff --git a/seed_scheduler/urls.py b/seed_scheduler/urls.py index d0b3117..a4d325b 100644 --- a/seed_scheduler/urls.py +++ b/seed_scheduler/urls.py @@ -1,20 +1,19 @@ import os -from django.conf.urls import patterns, include, url +from django.conf.urls import include, url from django.contrib import admin from scheduler import views +from rest_framework.authtoken.views import obtain_auth_token admin.site.site_header = os.environ.get('SCHEDULER_TITLE', 'Scheduler Admin') -urlpatterns = patterns( - '', +urlpatterns = [ url(r'^admin/', include(admin.site.urls)), url(r'^api/auth/', include('rest_framework.urls', namespace='rest_framework')), - url(r'^api/token-auth/', - 'rest_framework.authtoken.views.obtain_auth_token'), + url(r'^api/token-auth/', obtain_auth_token), url(r'^api/metrics/', views.MetricsView.as_view()), url(r'^api/health/', views.HealthcheckView.as_view()), url(r'^', include('scheduler.urls')), url(r'^docs/', include('rest_framework_docs.urls')), -) +] diff --git a/setup.py b/setup.py index bf15964..2055347 100644 --- a/setup.py +++ b/setup.py @@ -36,12 +36,12 @@ def get_version(package): packages=find_packages(), include_package_data=True, install_requires=[ - 'Django==1.9.12', - 'djangorestframework==3.3.2', + 'Django==1.11.13', + 'djangorestframework==3.7.7', 'dj-database-url==0.3.0', 'psycopg2==2.7.1', 'raven==5.32.0', - 'django-filter==0.12.0', + 'django-filter==1.0.2', 'whitenoise==2.0.6', 'celery==3.1.24', 'django-celery==3.1.17', From 055a7aad841782a3af2b403bbe1dd1c665ed1a14 Mon Sep 17 00:00:00 2001 From: Rudi Giesler Date: Fri, 18 May 2018 09:21:57 +0200 Subject: [PATCH 2/4] Switch to django 1.11 SSE iterator --- scheduler/tasks.py | 31 +++++-------------------------- 1 file changed, 5 insertions(+), 26 deletions(-) diff --git a/scheduler/tasks.py b/scheduler/tasks.py index 8952698..a5414a9 100644 --- a/scheduler/tasks.py +++ b/scheduler/tasks.py @@ -169,32 +169,11 @@ def run(self, schedule_type, lookup_id, **kwargs): task_run.save() # create tasks for each active schedule queued = 0 - schedules = schedules.values('id') - with transaction.atomic(), connection.cursor() as cur: - # A named cursor is declared here to make psycopg2 use a server - # side cursor. The SSC prevents the entire result set from being - # loaded into memory. - # NOTE: this can be replaced with just a call to a queryset's - # iterator() method in Django 1.11 as that directly supports using - # a SSC. - query = str(schedules.query) - cursor_name = '_cur_queue_tasks_{uuid}'.format(uuid=uuid4().hex) - cur.execute( - "DECLARE {cursor_name} CURSOR FOR {query}".format( - cursor_name=cursor_name, - query=query - ), - {'lookup_id': lookup_id} - ) - while True: - cur.execute("FETCH 10000 FROM {0}".format(cursor_name)) - chunk = cur.fetchall() - if not chunk: - break - for row in chunk: - DeliverTask.apply_async( - kwargs={"schedule_id": str(row[0])}) - queued += 1 + schedules = schedules.values('id', 'auth_token', 'endpoint', 'payload') + for schedule in schedules.iterator(): + schedule['schedule_id'] = str(schedule.pop('id')) + DeliverTask.apply_async(kwargs=schedule) + queued += 1 task_run.completed_at = now() task_run.save() From 4147f338237164c6118bfcb724e8743d91bff724 Mon Sep 17 00:00:00 2001 From: Rudi Giesler Date: Fri, 18 May 2018 09:22:24 +0200 Subject: [PATCH 3/4] Change DeliverTask to not require database calls --- .../commands/trigger_deliver_tasks.py | 7 ++++++- scheduler/tasks.py | 20 ++++++++++--------- scheduler/tests.py | 6 +++++- 3 files changed, 22 insertions(+), 11 deletions(-) diff --git a/scheduler/management/commands/trigger_deliver_tasks.py b/scheduler/management/commands/trigger_deliver_tasks.py index 9665490..ee27ede 100644 --- a/scheduler/management/commands/trigger_deliver_tasks.py +++ b/scheduler/management/commands/trigger_deliver_tasks.py @@ -177,7 +177,12 @@ def confirm(prompt): else: self.stdout.write('%s' % (schedule,)) DeliverTask.apply_async( - kwargs={"schedule_id": str(schedule.id)}) + kwargs={ + "schedule_id": str(schedule.id), + "auth_token": schedule.auth_token, + "endpoint": schedule.endpoint, + "payload": schedule.payload, + }) def get_addresses(self, identity, default_addr_type): details = identity.get('details', {}) diff --git a/scheduler/tasks.py b/scheduler/tasks.py index a5414a9..3526a47 100644 --- a/scheduler/tasks.py +++ b/scheduler/tasks.py @@ -55,7 +55,7 @@ class DeliverTask(Task): default_retry_delay = 5 max_retries = 5 - def run(self, schedule_id, **kwargs): + def run(self, schedule_id, auth_token, endpoint, payload, **kwargs): """ Runs an instance of a scheduled task """ @@ -66,21 +66,20 @@ def run(self, schedule_id, **kwargs): else: retry_delay = self.default_retry_delay - schedule = Schedule.objects.get(id=schedule_id) headers = {"Content-Type": "application/json"} - if schedule.auth_token is not None: - headers["Authorization"] = "Token %s" % schedule.auth_token + if auth_token is not None: + headers["Authorization"] = "Token %s" % auth_token try: response = requests.post( - url=schedule.endpoint, - data=json.dumps(schedule.payload), + url=endpoint, + data=json.dumps(payload), headers=headers, timeout=settings.DEFAULT_REQUEST_TIMEOUT ) # Expecting a 201, raise for errors. response.raise_for_status() except requests_exceptions.ConnectionError as exc: - l.info('Connection Error to endpoint: %s' % schedule.endpoint) + l.info('Connection Error to endpoint: %s' % endpoint) fire_metric.delay('scheduler.deliver_task.connection_error.sum', 1) self.retry(exc=exc, countdown=retry_delay) except requests_exceptions.HTTPError as exc: @@ -226,10 +225,13 @@ def run(self, **kwargs): l.info("Attempting to requeue <%s> failed schedules" % failures.count()) for failure in failures.iterator(): - schedule_id = str(failure.schedule_id) + schedule = Schedule.objects.values( + 'id', 'auth_token', 'endpoint', 'payload') + schedule = schedule.get(id=failure.schedule_id) + schedule['schedule_id'] = str(schedule.pop('id')) # Cleanup the failure before requeueing it. failure.delete() - deliver_task.delay(schedule_id) + DeliverTask.apply_async(kwargs=schedule) requeue_failed_tasks = RequeueFailedTasks() diff --git a/scheduler/tests.py b/scheduler/tests.py index d276e38..bf9dfdf 100644 --- a/scheduler/tests.py +++ b/scheduler/tests.py @@ -431,7 +431,11 @@ def test_deliver_task(self): # Execute result = deliver_task.apply_async(kwargs={ - "schedule_id": str(schedule.id)}) + "schedule_id": str(schedule.id), + "auth_token": schedule.auth_token, + "endpoint": schedule.endpoint, + "payload": schedule.payload, + }) # Check self.assertEqual(result.get(), True) From 68deef32d2c58d68d9fb875193675e1685940036 Mon Sep 17 00:00:00 2001 From: Rudi Giesler Date: Fri, 18 May 2018 10:29:23 +0200 Subject: [PATCH 4/4] Remove unnecessary imports --- scheduler/tasks.py | 1 - 1 file changed, 1 deletion(-) diff --git a/scheduler/tasks.py b/scheduler/tasks.py index 3526a47..875a297 100644 --- a/scheduler/tasks.py +++ b/scheduler/tasks.py @@ -5,7 +5,6 @@ from celery.task import Task from celery.utils.log import get_task_logger from django.conf import settings -from django.db import connection, transaction from django.utils.timezone import now from djcelery.models import CrontabSchedule, IntervalSchedule from seed_services_client.metrics import MetricsApiClient