This repository has been archived by the owner on Nov 14, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
tests.py
195 lines (139 loc) · 4.9 KB
/
tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import peewee as pw
import pytest
from curio.task import ContextTask
# Print all queries
import logging
logger = logging.getLogger('peewee')
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)
@pytest.fixture(params=[
'asyncio', 'trio',
pytest.param(('curio', {'taskcls': ContextTask}), id='curio'),
], autouse=True)
def aiolib(request):
return request.param
@pytest.fixture
def User():
return type('Model', (pw.Model,), {
'username': pw.CharField(),
'name': pw.CharField(null=True),
})
def test_connect():
from aiopeewee import db_url
from playhouse import cockroachdb as crdb
db = db_url.connect('cockroachdb+async://')
assert isinstance(db, crdb.CockroachDatabase)
db = db_url.connect('crdb+async://')
assert isinstance(db, crdb.CockroachDatabase)
db = db_url.connect('cockroachdb+pool+async://')
assert isinstance(db, crdb.CockroachDatabase)
db = db_url.connect('crdb+pool+async://')
assert isinstance(db, crdb.CockroachDatabase)
db = db_url.connect('mysql+async://')
assert isinstance(db, pw.MySQLDatabase)
db = db_url.connect('mysql+pool+async://')
assert isinstance(db, pw.MySQLDatabase)
db = db_url.connect('postgres+async://')
assert isinstance(db, pw.PostgresqlDatabase)
db = db_url.connect('postgresql+async://')
assert isinstance(db, pw.PostgresqlDatabase)
db = db_url.connect('postgres+pool+async://')
assert isinstance(db, pw.PostgresqlDatabase)
db = db_url.connect('postgresql+pool+async://')
assert isinstance(db, pw.PostgresqlDatabase)
db = db_url.connect('sqlite+async://')
assert isinstance(db, pw.SqliteDatabase)
db = db_url.connect('sqlite+pool+async://')
assert isinstance(db, pw.SqliteDatabase)
assert db_url.schemes['postgresext+async']
assert db_url.schemes['postgresext+pool+async']
assert db_url.schemes['sqliteext+async']
assert db_url.schemes['sqliteext+pool+async']
async def test_basic():
from aiopeewee import db_url, _AsyncConnectionState, SqliteDatabaseAsync
db = db_url.connect('sqlite+async:///:memory:')
assert db
assert isinstance(db, SqliteDatabaseAsync)
assert isinstance(db._state, _AsyncConnectionState)
c1 = await db.connect_async(True)
c2 = await db.connect_async(True)
assert c1 is c2
assert db._state.conn
db.close()
assert not db._state.conn
async with db as conn:
assert conn == db._state.conn
assert conn != c1
async def test_pool():
from aiopeewee import db_url, PooledSqliteDatabaseAsync
from aiopeewee._compat import aio_sleep, aio_wait
db = db_url.connect('sqlite+pool+async:///:memory:', max_connections=3, timeout=.1)
assert db
assert isinstance(db, PooledSqliteDatabaseAsync)
db.connect()
c0 = db.connection()
c1 = await db.connect_async(True)
assert c1 is c0
db.close_all()
async def connect():
conn = await db.connect_async()
await aio_sleep(.02)
await db.close_async()
return conn
results = await aio_wait(
connect(),
connect(),
connect(),
connect(),
connect(),
)
assert all(results)
assert len(set(results)) == 3
assert not db._waiters
async def test_sqlite():
from aiopeewee import db_url
db = db_url.connect('sqlite+async:///:memory:')
async def middleware():
async with db:
return db.execute_sql('select 42').fetchone()
res, = await middleware()
assert res == 42
async def test_transactions(User):
from aiopeewee import db_url
# Test sync transactions
db = db_url.connect('sqlite+async:///:memory:')
User._meta.database = db
User.create_table()
with db.atomic() as txn:
User.create(username='charlie')
with db.atomic() as txn2:
User.create(username='huey')
txn2.rollback()
assert User.select().count() == 1
assert User.get().username == 'charlie'
txn.rollback()
# Test async transactions
async with db.atomic_async() as txn:
User.create(username='charlie')
with db.atomic_async() as txn2:
User.create(username='huey')
txn2.rollback()
assert User.select().count() == 1
assert User.get().username == 'charlie'
txn.rollback()
async def test_asgi():
from aiopeewee import PeeweeASGIPlugin
from asgi_tools import App
from asgi_tools.tests import ASGITestClient
app = App(debug=True)
client = ASGITestClient(app)
db = PeeweeASGIPlugin(url='sqlite+async:///:memory:')
app.middleware(db.middleware)
@app.route('/')
async def sql(request):
result, = db.execute_sql(f"select { request.url.query['num'] }").fetchone()
return result
res = await client.get('/', query={'num': 42})
assert res.status_code == 200
assert await res.json() == 42
# TODO: transactions, context