-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.py
44 lines (35 loc) · 1.74 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from contextvars import ContextVar
from cors import add_cors_headers
from options import setup_options
from routes.addresses import bp_address
from routes.emails import bp_email
from routes.maps import bp_address_type, bp_email_type, bp_gender, bp_membership_fee_category, bp_phone_type, \
bp_person_mapping, bp_organization_mapping, bp_mapping
from routes.memberships import bp_memberships
from routes.organizations import bp_organization
from routes.people import bp_person
from routes.phones import bp_phone
from sanic import Sanic
from sanic.request import Request
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
app = Sanic("MembershipManagementSystem")
bind = create_async_engine("sqlite+aiosqlite:///dev.db", echo=True)
_base_model_session_ctx = ContextVar("session")
@app.middleware("request")
async def inject_session(request: Request) -> None:
request.ctx.session = sessionmaker(bind, AsyncSession, expire_on_commit=False)()
request.ctx.session_ctx_token = _base_model_session_ctx.set(request.ctx.session)
@app.middleware("response")
async def close_session(request: Request, response) -> None:
if hasattr(request.ctx, "session_ctx_token"):
_base_model_session_ctx.reset(request.ctx.session_ctx_token)
await request.ctx.session.close()
app.blueprint([
bp_gender, bp_membership_fee_category, bp_address_type, bp_phone_type, bp_email_type, bp_person, bp_organization,
bp_address, bp_email, bp_phone, bp_memberships, bp_person_mapping, bp_organization_mapping, bp_mapping
])
# Add OPTIONS handlers to any route that is missing it
app.register_listener(setup_options, "before_server_start")
# Fill in CORS headers
app.register_middleware(add_cors_headers, "response")