forked from MagicStack/asyncpg
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_execute.py
153 lines (115 loc) · 4.77 KB
/
test_execute.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# Copyright (C) 2016-present the asyncpg authors and contributors
# <see AUTHORS file>
#
# This module is part of asyncpg and is released under
# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0
import asyncio
import asyncpg
from asyncpg import _testbase as tb
class TestExecuteScript(tb.ConnectedTestCase):
async def test_execute_script_1(self):
self.assertEqual(self.con._protocol.queries_count, 0)
status = await self.con.execute('''
SELECT 1;
SELECT true FROM pg_type WHERE false = true;
SELECT generate_series(0, 9);
''')
self.assertEqual(self.con._protocol.queries_count, 1)
self.assertEqual(status, 'SELECT 10')
async def test_execute_script_2(self):
status = await self.con.execute('''
CREATE TABLE mytab (a int);
''')
self.assertEqual(status, 'CREATE TABLE')
try:
status = await self.con.execute('''
INSERT INTO mytab (a) VALUES ($1), ($2)
''', 10, 20)
self.assertEqual(status, 'INSERT 0 2')
finally:
await self.con.execute('DROP TABLE mytab')
async def test_execute_script_3(self):
with self.assertRaisesRegex(asyncpg.PostgresSyntaxError,
'cannot insert multiple commands'):
await self.con.execute('''
CREATE TABLE mytab (a int);
INSERT INTO mytab (a) VALUES ($1), ($2);
''', 10, 20)
async def test_execute_script_check_transactionality(self):
with self.assertRaises(asyncpg.PostgresError):
await self.con.execute('''
CREATE TABLE mytab (a int);
SELECT * FROM mytab WHERE 1 / 0 = 1;
''')
with self.assertRaisesRegex(asyncpg.PostgresError,
'"mytab" does not exist'):
await self.con.prepare('''
SELECT * FROM mytab
''')
async def test_execute_exceptions_1(self):
with self.assertRaisesRegex(asyncpg.PostgresError,
'relation "__dne__" does not exist'):
await self.con.execute('select * from __dne__')
async def test_execute_script_interrupted_close(self):
fut = self.loop.create_task(
self.con.execute('''SELECT pg_sleep(10)'''))
await asyncio.sleep(0.2)
self.assertFalse(self.con.is_closed())
await self.con.close()
self.assertTrue(self.con.is_closed())
with self.assertRaises(asyncpg.QueryCanceledError):
await fut
async def test_execute_script_interrupted_terminate(self):
fut = self.loop.create_task(
self.con.execute('''SELECT pg_sleep(10)'''))
await asyncio.sleep(0.2)
self.assertFalse(self.con.is_closed())
self.con.terminate()
self.assertTrue(self.con.is_closed())
with self.assertRaisesRegex(asyncpg.ConnectionDoesNotExistError,
'closed in the middle'):
await fut
self.con.terminate()
async def test_execute_many_1(self):
await self.con.execute('CREATE TEMP TABLE exmany (a text, b int)')
try:
result = await self.con.executemany('''
INSERT INTO exmany VALUES($1, $2)
''', [
('a', 1), ('b', 2), ('c', 3), ('d', 4)
])
self.assertIsNone(result)
result = await self.con.fetch('''
SELECT * FROM exmany
''')
self.assertEqual(result, [
('a', 1), ('b', 2), ('c', 3), ('d', 4)
])
# Empty set
result = await self.con.executemany('''
INSERT INTO exmany VALUES($1, $2)
''', ())
result = await self.con.fetch('''
SELECT * FROM exmany
''')
self.assertEqual(result, [
('a', 1), ('b', 2), ('c', 3), ('d', 4)
])
finally:
await self.con.execute('DROP TABLE exmany')
async def test_execute_many_2(self):
await self.con.execute('CREATE TEMP TABLE exmany (b int)')
try:
bad_data = ([1 / 0] for v in range(10))
with self.assertRaises(ZeroDivisionError):
async with self.con.transaction():
await self.con.executemany('''
INSERT INTO exmany VALUES($1)
''', bad_data)
good_data = ([v] for v in range(10))
async with self.con.transaction():
await self.con.executemany('''
INSERT INTO exmany VALUES($1)
''', good_data)
finally:
await self.con.execute('DROP TABLE exmany')