This repository has been archived by the owner on Aug 26, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 682
/
test_indexes.py
105 lines (84 loc) · 3.42 KB
/
test_indexes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
from nose.tools import eq_, ok_
from django.conf import settings
from elasticsearch.exceptions import RequestError
from kuma.wiki.models import Document
from search.models import Index, DocumentType
from search.tests import ElasticTestCase
from search.index import get_indexing_es, get_indexes
class TestIndexes(ElasticTestCase):
fixtures = ['test_users.json', 'wiki/documents.json']
def tearDown(self):
super(TestIndexes, self).tearDown()
Index.objects.all().delete()
def test_get_current(self):
eq_(Index.objects.get_current().prefixed_name,
'%s-main_index' % settings.ES_INDEX_PREFIX)
def test_add_newindex(self):
index = Index.objects.create()
ok_(not index.populated)
index.populate()
index = Index.objects.get(pk=index.pk) # reload the index again
ok_(index.populated)
index.delete()
def test_promote_index(self):
index = Index.objects.create()
index.populate()
index = Index.objects.get(pk=index.pk) # reload the index again
ok_(index.populated)
index.promote()
ok_(index.promoted)
eq_(Index.objects.get_current().prefixed_name, index.prefixed_name)
index.demote()
ok_(not index.promoted)
main_name = '%s-main_index' % settings.ES_INDEX_PREFIX
eq_(Index.objects.get_current().prefixed_name, main_name)
index.delete()
def test_outdated(self):
# first create and populate an index
main_index = Index.objects.create()
main_index.populate()
main_index = Index.objects.get(pk=main_index.pk)
ok_(main_index.populated)
main_index.promote()
eq_(Index.objects.get_current().prefixed_name,
main_index.prefixed_name)
# then create a successor and render a document against the old index
successor_index = Index.objects.create()
doc = Document.objects.get(pk=1)
doc.render()
eq_(successor_index.outdated_objects.count(), 1)
# then populate the successor and see if we still have outdated objects
successor_index.populate()
successor_index = Index.objects.get(pk=successor_index.pk)
# check if the newly created index is empty
indexes_dict = dict(get_indexes())
eq_(indexes_dict[successor_index.prefixed_name], 0)
successor_index.promote()
eq_(successor_index.outdated_objects.count(), 0)
self.refresh() # refresh to make sure the index has the results ready
indexes_dict = dict(get_indexes())
# many due to translations
eq_(indexes_dict[successor_index.prefixed_name], 14)
S = DocumentType.search
eq_(S().all().count(), 7)
eq_(S().query(content__match='an article title')[0].slug,
'article-title')
def test_delete_index(self):
# first create and populate the index
index = Index.objects.create()
index.populate()
# then create it again and see if it blows up
es = get_indexing_es()
try:
es.indices.create(index.prefixed_name)
except RequestError:
pass
else:
assert False
# then delete it and check if recreating works without blowing up
index.delete()
try:
es.indices.create(index.prefixed_name)
except RequestError:
assert False
es.indices.delete(index.prefixed_name)