Skip to content

Commit

Permalink
Merge 887d44d into 7cec7da
Browse files Browse the repository at this point in the history
  • Loading branch information
macro1 committed Aug 1, 2014
2 parents 7cec7da + 887d44d commit ae7ebcd
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 25 deletions.
48 changes: 28 additions & 20 deletions simple_history/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,27 +60,35 @@ def most_recent(self):
return self.instance.__class__(*values)

def as_of(self, date):
"""
Returns an instance of the original model with all the attributes set
according to what was present on the object on the date provided.
"""Get a snapshot as of a specific date.
Returns an instance, or an iterable of the instances, of the
original model with all the attributes set according to what
was present on the object on the date provided.
"""
if not self.instance:
raise TypeError("Can't use as_of() without a %s instance." %
self.model._meta.object_name)
tmp = []
for field in self.instance._meta.fields:
if isinstance(field, models.ForeignKey):
tmp.append(field.name + "_id")
else:
tmp.append(field.name)
fields = tuple(tmp)
qs = self.filter(history_date__lte=date)
return self._as_of_set(date)
queryset = self.filter(history_date__lte=date)
try:
values = qs.values_list('history_type', *fields)[0]
history_obj = queryset[0]
except IndexError:
raise self.instance.DoesNotExist("%s had not yet been created." %
self.instance._meta.object_name)
if values[0] == '-':
raise self.instance.DoesNotExist("%s had already been deleted." %
self.instance._meta.object_name)
return self.instance.__class__(*values[1:])
raise self.instance.DoesNotExist(
"%s had not yet been created." %
self.instance._meta.object_name)
if history_obj.history_type == '-':
raise self.instance.DoesNotExist(
"%s had already been deleted." %
self.instance._meta.object_name)
return history_obj.instance

def _as_of_set(self, date):
model = type(self.model().instance) # a bit of a hack to get the model
pk_attr = model._meta.pk.name
queryset = self.filter(history_date__lte=date)
for original_pk in set(
queryset.order_by().values_list(pk_attr, flat=True)):
changes = queryset.filter(**{pk_attr: original_pk})
last_change = changes.latest('history_date')
if changes.filter(history_date=last_change.history_date, history_type='-').exists():
continue
yield last_change.instance
2 changes: 2 additions & 0 deletions simple_history/tests/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from .test_models import *
from .test_admin import *
from .test_commands import *
from .test_manager import *

81 changes: 81 additions & 0 deletions simple_history/tests/tests/test_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from datetime import datetime, timedelta
from django.test import TestCase
try:
from django.contrib.auth import get_user_model
except ImportError:
from django.contrib.auth.models import User
else:
User = get_user_model()

from .. import models


class AsOfTest(TestCase):
model = models.Document

def setUp(self):
user = User.objects.create_user("tester", "tester@example.com")
self.now = datetime.now()
self.yesterday = self.now - timedelta(days=1)
self.obj = self.model.objects.create()
self.obj.changed_by = user
self.obj.save()
self.model.objects.all().delete() # allows us to leave PK on instance
self.delete_history, self.change_history, self.create_history = (
self.model.history.all())
self.create_history.history_date = self.now - timedelta(days=2)
self.create_history.save()
self.change_history.history_date = self.now - timedelta(days=1)
self.change_history.save()
self.delete_history.history_date = self.now
self.delete_history.save()

def test_created_after(self):
"""An object created after the 'as of' date should not be
included.
"""
as_of_list = list(
self.model.history.as_of(self.now - timedelta(days=5)))
self.assertFalse(as_of_list)

def test_deleted_before(self):
"""An object deleted before the 'as of' date should not be
included.
"""
as_of_list = list(
self.model.history.as_of(self.now + timedelta(days=1)))
self.assertFalse(as_of_list)

def test_deleted_after(self):
"""An object created before, but deleted after the 'as of'
date should be included.
"""
as_of_list = list(
self.model.history.as_of(self.now - timedelta(days=1)))
self.assertEqual(len(as_of_list), 1)
self.assertEqual(as_of_list[0].pk, self.obj.pk)

def test_modified(self):
"""An object modified before the 'as of' date should reflect
the last version.
"""
as_of_list = list(
self.model.history.as_of(self.now - timedelta(days=1)))
self.assertEqual(as_of_list[0].changed_by, self.obj.changed_by)


class AsOfAdditionalTestCase(TestCase):

def test_create_and_delete(self):
now = datetime.now()
document = models.Document.objects.create()
document.delete()
for doc_change in models.Document.history.all():
doc_change.history_date = now
doc_change.save()
docs_as_of_tmw = models.Document.history.as_of(now + timedelta(days=1))
self.assertFalse(list(docs_as_of_tmw))
5 changes: 0 additions & 5 deletions simple_history/tests/tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,11 +425,6 @@ def test_as_of(self):
self.assertEqual(question_as_of(times[1]), "how's it going?")
self.assertEqual(question_as_of(times[2]), "what's up?")

def test_as_of_on_model_class(self):
Poll.objects.create(question="what's up?", pub_date=today)
time = Poll.history.all()[0].history_date
self.assertRaises(TypeError, Poll.history.as_of, time)

def test_as_of_nonexistant(self):
# Unsaved poll
poll = Poll(question="what's up?", pub_date=today)
Expand Down

0 comments on commit ae7ebcd

Please sign in to comment.