-
Notifications
You must be signed in to change notification settings - Fork 147
/
multi_db.py
139 lines (119 loc) · 5.54 KB
/
multi_db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
from unittest import skipIf
from django import VERSION as DJANGO_VERSION
from django.conf import settings
from django.db import DEFAULT_DB_ALIAS, connections, transaction
from django.test import TransactionTestCase
from .models import Test
@skipIf(len(settings.DATABASES) == 1,
'We can’t change the DB used since there’s only one configured')
class MultiDatabaseTestCase(TransactionTestCase):
multi_db = True
def setUp(self):
self.t1 = Test.objects.create(name='test1')
self.t2 = Test.objects.create(name='test2')
self.db_alias2 = next(alias for alias in settings.DATABASES
if alias != DEFAULT_DB_ALIAS)
connection2 = connections[self.db_alias2]
self.is_sqlite2 = connection2.vendor == 'sqlite'
self.is_mysql2 = connection2.vendor == 'mysql'
if connection2.vendor in ('mysql', 'postgresql'):
# We need to reopen the connection or Django
# will execute an extra SQL request below.
connection2.cursor()
def is_django_21_below_and_sqlite2(self):
"""
Note: See test_utils.py with this function name
Checks if Django 2.1 or below and SQLite2
"""
django_version = DJANGO_VERSION
if not self.is_sqlite2:
# Immediately know if SQLite
return False
if django_version[0] < 2:
# Takes Django 0 and 1 out of the picture
return True
else:
if django_version[0] == 2 and django_version[1] < 2:
# Takes Django 2.0-2.1 out
return True
return False
def test_read(self):
with self.assertNumQueries(1):
data1 = list(Test.objects.all())
self.assertListEqual(data1, [self.t1, self.t2])
with self.assertNumQueries(1, using=self.db_alias2):
data2 = list(Test.objects.using(self.db_alias2))
self.assertListEqual(data2, [])
with self.assertNumQueries(0, using=self.db_alias2):
data3 = list(Test.objects.using(self.db_alias2))
self.assertListEqual(data3, [])
def test_invalidate_other_db(self):
"""
Tests if the non-default database is invalidated when modified.
"""
with self.assertNumQueries(1, using=self.db_alias2):
data1 = list(Test.objects.using(self.db_alias2))
self.assertListEqual(data1, [])
with self.assertNumQueries(2 if self.is_django_21_below_and_sqlite2() else 1,
using=self.db_alias2):
t3 = Test.objects.using(self.db_alias2).create(name='test3')
with self.assertNumQueries(1, using=self.db_alias2):
data2 = list(Test.objects.using(self.db_alias2))
self.assertListEqual(data2, [t3])
def test_invalidation_independence(self):
"""
Tests if invalidation doesn’t affect the unmodified databases.
"""
with self.assertNumQueries(1):
data1 = list(Test.objects.all())
self.assertListEqual(data1, [self.t1, self.t2])
with self.assertNumQueries(2 if self.is_django_21_below_and_sqlite2() else 1,
using=self.db_alias2):
Test.objects.using(self.db_alias2).create(name='test3')
with self.assertNumQueries(0):
data2 = list(Test.objects.all())
self.assertListEqual(data2, [self.t1, self.t2])
def test_heterogeneous_atomics(self):
"""
Checks that an atomic block for a database nested inside
another atomic block for another database has no impact on their
caching.
"""
with transaction.atomic():
with transaction.atomic(self.db_alias2):
with self.assertNumQueries(1):
data1 = list(Test.objects.all())
self.assertListEqual(data1, [self.t1, self.t2])
with self.assertNumQueries(1, using=self.db_alias2):
data2 = list(Test.objects.using(self.db_alias2))
self.assertListEqual(data2, [])
t3 = Test.objects.using(self.db_alias2).create(name='test3')
with self.assertNumQueries(1, using=self.db_alias2):
data3 = list(Test.objects.using(self.db_alias2))
self.assertListEqual(data3, [t3])
with self.assertNumQueries(0):
data4 = list(Test.objects.all())
self.assertListEqual(data4, [self.t1, self.t2])
with self.assertNumQueries(1):
data5 = list(Test.objects.filter(name='test3'))
self.assertListEqual(data5, [])
def test_heterogeneous_atomics_independence(self):
"""
Checks that interrupting an atomic block after the commit of another
atomic block for another database nested inside it
correctly invalidates the cache for the committed transaction.
"""
with self.assertNumQueries(1, using=self.db_alias2):
data1 = list(Test.objects.using(self.db_alias2))
self.assertListEqual(data1, [])
try:
with transaction.atomic():
with transaction.atomic(self.db_alias2):
t3 = Test.objects.using(
self.db_alias2).create(name='test3')
raise ZeroDivisionError
except ZeroDivisionError:
pass
with self.assertNumQueries(1, using=self.db_alias2):
data2 = list(Test.objects.using(self.db_alias2))
self.assertListEqual(data2, [t3])