Skip to content

Commit

Permalink
task events filter
Browse files Browse the repository at this point in the history
update cleanup task
  • Loading branch information
kidig committed Jul 15, 2016
1 parent 58b0523 commit db7dec5
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 27 deletions.
20 changes: 19 additions & 1 deletion robust/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,28 @@
from .utils import unwrap_payload


class TaskEventsFilter(admin.SimpleListFilter):
title = parameter_name = 'events'

def lookups(self, request, model_admin):
return [
('succeed', 'Succeed'),
('troubled', 'Troubled'),
]

def queryset(self, request, queryset):
if self.value() == 'troubled':
queryset = queryset.with_fails()
elif self.value() == 'succeed':
queryset = queryset.without_fails()
return queryset


@admin.register(Task)
class TaskAdmin(BaseDjangoObjectActions, admin.ModelAdmin):
list_display = ('name', 'payload_unwraped', 'status', 'created_at', 'updated_at')
fields = readonly_fields = ('status', 'name', 'payload', 'tags', 'eta', 'traceback')
list_filter = ('status',)
list_filter = (TaskEventsFilter, 'status')
search_fields = ('name',)
change_actions = actions = ('retry',)
change_form_template = 'admin/robust/task/change_form.html'
Expand All @@ -19,6 +36,7 @@ def payload_unwraped(self, obj):
return unwrap_payload(obj.payload)

payload_unwraped.short_description = 'payload'
payload_unwraped.admin_order_field = 'payload'

def get_actions(self, request):
actions = super(TaskAdmin, self).get_actions(request)
Expand Down
15 changes: 15 additions & 0 deletions robust/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,26 @@
from .exceptions import TaskTransactionError


class TaskQuerySet(models.QuerySet):
def __init__(self, *args, **kwargs):
super(TaskQuerySet, self).__init__(*args, **kwargs)
self._filter_fails = dict(events__status__in=[Task.RETRY, Task.FAILED])

def with_fails(self):
return self.filter(**self._filter_fails).distinct()

def without_fails(self):
return self.filter(~models.Q(**self._filter_fails)).distinct()


class TaskManager(models.Manager):
_query_cache_lock = threading.Lock()
_query_cache = None
_query_limits = None

def get_queryset(self):
return TaskQuerySet(self.model, using=self._db)

@classmethod
def reset_query_cache(cls):
with cls._query_cache_lock:
Expand Down
1 change: 0 additions & 1 deletion robust/templates/admin/robust/change_list.html
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{% extends "admin/change_list.html" %}


{% block object-tools-items %}
{% for tool in objectactions %}
<li class="objectaction-item" data-tool-name="{{ tool.name }}">
Expand Down
75 changes: 53 additions & 22 deletions robust/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from django.utils import timezone

from .exceptions import TaskTransactionError, Retry
from .models import Task, RateLimitRun
from .models import Task, RateLimitRun, TaskEvent
from .runners import SimpleRunner
from .utils import task, TaskWrapper, PayloadProcessor, cleanup

Expand Down Expand Up @@ -397,33 +397,64 @@ def test_kwargs_non_json_serializable(self):
class CleanupTest(TransactionTestCase):
def setUp(self):
self.now = datetime.now()
self.succ_task = Task.objects.create(name='test_succeed', status=Task.SUCCEED)
self.succ_task_exp = Task.objects.create(name='test_succeed_exp', status=Task.SUCCEED)
Task.objects.filter(pk=self.succ_task_exp.pk).update(updated_at=self.now - timedelta(hours=1, seconds=30))

self.fail_task = Task.objects.create(name='test_failed', status=Task.FAILED)
self.fail_task_exp = Task.objects.create(name='test_failed_exp', status=Task.FAILED)
Task.objects.filter(pk=self.fail_task_exp.pk).update(updated_at=self.now - timedelta(weeks=1, seconds=30))
def tearDown(self):
Task.objects.all().delete()
super().tearDown()

def _create_task(self, name, status, updated_timedelta=None):
task = Task.objects.create(name=name, status=status)
if updated_timedelta:
self._set_update_at(task.pk, updated_timedelta)
return task

def _set_update_at(self, task_id, updated_timedelta):
Task.objects.filter(pk=task_id).update(updated_at=self.now - updated_timedelta)

def test_succeed(self):
task = self._create_task('test_succeed', Task.SUCCEED)

self.assertEqual(Task.objects.count(), 1)
cleanup()
self.assertEqual(Task.objects.count(), 1)

self._set_update_at(task.pk, timedelta(hours=1, seconds=10))
cleanup()
self.assertEqual(Task.objects.count(), 0)

self.pend_task = Task.objects.create(name='test_pending', status=Task.PENDING)
self.pend_task2 = Task.objects.create(name='test_pending', status=Task.PENDING)
Task.objects.filter(pk=self.pend_task2.pk).update(updated_at=self.now - timedelta(weeks=2))
def test_succeed_with_fails(self):
task = self._create_task('test_succeed', Task.SUCCEED, timedelta(hours=1, seconds=10))
TaskEvent.objects.create(task=task, status=Task.RETRY, created_at=self.now)

self.retry_task = Task.objects.create(name='test_retry', status=Task.RETRY)
self.retry_task2 = Task.objects.create(name='test_retry', status=Task.RETRY)
Task.objects.filter(pk=self.retry_task2.pk).update(updated_at=self.now - timedelta(weeks=2))
task2 = self._create_task('test_succeed', Task.SUCCEED, timedelta(hours=1, seconds=10))
TaskEvent.objects.create(task=task2, status=Task.FAILED, created_at=self.now)

def test_cleanup(self):
self.assertEqual(Task.objects.count(), 8)
self.assertEqual(Task.objects.count(), 2)
cleanup()
self.assertEqual(Task.objects.count(), 2)

self._set_update_at(task.pk, timedelta(weeks=1, seconds=10))
self._set_update_at(task2.pk, timedelta(weeks=1, seconds=10))
cleanup()

self.assertEqual(Task.objects.count(), 0)

def test_expired(self):
tasks = []
for status, name in Task.STATUS_CHOICES:
tasks.append(self._create_task(name, status))

task = self._create_task('test_succeed_with_fails', Task.SUCCEED)
TaskEvent.objects.create(task=task, status=Task.FAILED, created_at=self.now)
tasks.append(task)

self.assertEqual(Task.objects.count(), 5)
cleanup()
self.assertEqual(Task.objects.count(), 5)

Task.objects.all().update(updated_at=self.now-timedelta(weeks=1, seconds=10))
cleanup()
self.assertEqual(Task.objects.count(), 6)
self.assertEqual(Task.objects.filter(name='test_succeed').count(), 1)
self.assertEqual(Task.objects.filter(name='test_succeed_exp').count(), 0)
self.assertEqual(Task.objects.filter(name='test_failed').count(), 1)
self.assertEqual(Task.objects.filter(name='test_failed_exp').count(), 0)
self.assertEqual(Task.objects.filter(name='test_retry').count(), 2)
self.assertEqual(Task.objects.filter(name='test_pending').count(), 2)
self.assertEqual(Task.objects.count(), 0)


class AdminTest(TransactionTestCase):
Expand Down
8 changes: 5 additions & 3 deletions robust/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,14 @@ def decorator(fn):

@task()
def cleanup():
from .models import Task
from .models import Task, TaskEvent
now = datetime.now()
succeed_task_expire = now - getattr(settings, 'ROBUST_SUCCEED_TASK_EXPIRE', timedelta(hours=1))
failed_task_expire = now - getattr(settings, 'ROBUST_FAILED_TASK_EXPIRE', timedelta(weeks=1))

troubled_pks = TaskEvent.objects.filter(status__in=[Task.RETRY, Task.FAILED]).values_list('task_id', flat=True)

Task.objects.filter(
Q(status=Task.SUCCEED, updated_at__lte=succeed_task_expire) |
Q(status=Task.FAILED, updated_at__lte=failed_task_expire)
~Q(pk__in=troubled_pks) & Q(updated_at__lte=succeed_task_expire) |
Q(updated_at__lte=failed_task_expire)
).delete()

0 comments on commit db7dec5

Please sign in to comment.