Skip to content

Commit 5353b13

Browse files
committed
Add example for server and non server usage
1 parent c2c8d20 commit 5353b13

File tree

3 files changed

+173
-0
lines changed

3 files changed

+173
-0
lines changed

examples/global_pool/README.md

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
2+
This example is about how to use asyncpg conveniently in both server environment and non-server environment. Users from pymongo will be comfortable for demo provided. The demo does not require to write data type schema like norm orm do. The process is adapted for non-async datadb users: first to get db handler, then table handler ,and then run code.
3+
4+
The demo code provide PG class,which save connection pool in global dictionary with database name as key. Each database has one and only one pool. Different pg instances can share same db pool.
5+
6+
# Non Server Environment
7+
```build
8+
from pg import PG
9+
10+
# define your data process logic here
11+
async def run_many_sqls_in_transaction(table):
12+
sqls = [
13+
"sql1",
14+
"sql2",
15+
"sql3",
16+
]
17+
await table.trans(sqls)
18+
async def run_one_sql(table):
19+
sql="INSERT INTO ......"
20+
await table.execute(sql)
21+
async def select(table):
22+
sql="select ......"
23+
b = await table.select(sql)
24+
print(11, b)
25+
26+
pgdb = PG("host", "port", "user", "password", "database")
27+
table = pgdb['test']
28+
table.run(run_many_sqls_in_transaction)
29+
table.run(run_one_sql)
30+
table.run(select)
31+
32+
33+
```
34+
35+
In non-server environment, data process logic is written in async function. Run this function in PG class method "run" will guarantee connection pool will be terminated.
36+
37+
# Server Environment
38+
```build
39+
# example by fastapi
40+
41+
from pg import PG
42+
from fastapi import FastAPI
43+
from fastapi.testclient import TestClient
44+
45+
app = FastAPI()
46+
47+
# Do not need init connection pool here,pg class will init automatically when pool is none.
48+
@app.on_event("startup")
49+
async def startup_event():
50+
pass
51+
52+
@app.on_event("shutdown")
53+
async def shutdown_event():
54+
pgdb = PG("host", "port", "user", "password", "database")
55+
pgdb.terminate_pool()
56+
57+
58+
@app.get("/xxx")
59+
async def handlers():
60+
pgdb = PG("host", "port", "user", "password", "database")
61+
table = pgdb['test']
62+
data= await table.select("select ....")
63+
return data
64+
65+
66+
```
67+
The PG class will create pool with first call. The pool will be terminated during server lifetime shutdown phase.

examples/global_pool/pg.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import asyncio
2+
3+
import asyncpg
4+
5+
DB = {}
6+
7+
8+
class PG:
9+
def __init__(self, host, port, user, pwd, dbName):
10+
self.user = user
11+
self.pwd = pwd
12+
self.host = host
13+
self.port = port
14+
self.dbName = dbName
15+
self.db = None
16+
self.table = None
17+
18+
def __getitem__(self, tb):
19+
self.table = tb
20+
return self
21+
22+
async def db_pool(self):
23+
global DB
24+
if DB.get(self.dbName, None) is None:
25+
DB[self.dbName] = await asyncpg.create_pool(
26+
host=self.host,
27+
port=self.port,
28+
user=self.user,
29+
password=self.pwd,
30+
database=self.dbName,
31+
)
32+
33+
self.db = DB[self.dbName]
34+
print("DB", self.dbName, self.db)
35+
return self
36+
37+
# 如果测算程序不调用,切数据库会出现pgadmin看不到表格的问题.
38+
def terminate_pool(self):
39+
print("will terminate", self.db)
40+
if self.db is not None:
41+
self.db.terminate()
42+
DB.pop(self.dbName, None)
43+
44+
async def check(self):
45+
if self.dbName is None:
46+
raise "no db name"
47+
if self.table is None:
48+
raise "no table name"
49+
if self.db is None:
50+
await self.db_pool()
51+
52+
async def execute(self, sql):
53+
print("sql===", sql)
54+
await self.check()
55+
async with self.db.acquire() as conn:
56+
await conn.execute(sql)
57+
58+
async def trans(self, sqls):
59+
await self.check()
60+
async with self.db.acquire() as conn:
61+
async with conn.transaction():
62+
for sql in sqls:
63+
print("sql===", sql)
64+
await conn.execute(sql)
65+
66+
async def select(self, sql):
67+
await self.check()
68+
async with self.db.acquire() as conn:
69+
q = await conn.fetch(sql)
70+
return [dict(i) for i in q]
71+
72+
def run(self, f):
73+
print(self.dbName, self.table)
74+
asyncio.get_event_loop().run_until_complete(f(self))
75+
self.terminate_pool()
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from pg import PG
2+
3+
'''
4+
PG class will save connection pool in global dictionary DB with database name as dic key. Each database has one and only one pool.
5+
Different pg instances can share db pool.
6+
'''
7+
8+
9+
10+
async def run_many_sqls_in_transaction(table):
11+
sqls = [
12+
"sql1",
13+
"sql2",
14+
"sql3",
15+
]
16+
await table.trans(sqls)
17+
18+
19+
async def run_one_sql(table):
20+
sql="INSERT INTO ......"
21+
await table.execute(sql)
22+
async def select(table):
23+
sql="select ......"
24+
b = await table.select(sql)
25+
print(11, b)
26+
27+
pgdb = PG("host", "port", "user", "password", "database")
28+
table = pgdb['test']
29+
table.run(run_many_sqls_in_transaction)
30+
table.run(run_one_sql)
31+
table.run(select)

0 commit comments

Comments
 (0)