-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsession.py
90 lines (70 loc) · 2.19 KB
/
session.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
from typing import Any
import os
from contextvars import ContextVar
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.exc import PendingRollbackError
import traceback
from . import daemon
from .business_objects import general
from .util import collect_engine_variables
from threading import Lock
import time
session_lock = Lock()
request_id_ctx_var = ContextVar("request_id", default=None)
def get_request_id():
return request_id_ctx_var.get()
(
pool_size,
pool_max_overflow,
pool_recycle,
pool_use_lifo,
pool_pre_ping,
) = collect_engine_variables()
engine = create_engine(
os.getenv("POSTGRES"),
pool_size=pool_size,
max_overflow=pool_max_overflow,
pool_recycle=pool_recycle,
pool_use_lifo=pool_use_lifo,
pool_pre_ping=pool_pre_ping,
)
session = scoped_session(
sessionmaker(autocommit=False, autoflush=True, bind=engine),
scopefunc=get_request_id,
)
## uncomment following lines to enable db logging
""" import logging
logging.basicConfig()
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
"""
def check_session_and_rollback():
try:
_ = session.connection()
except PendingRollbackError:
print("session issue detected, rollback initiated", flush=True)
print(traceback.format_exc(), flush=True)
while session.registry().in_transaction():
session.rollback()
def get_engine_dialect() -> Any:
if not engine:
return None
return engine.dialect
def start_session_cleanup_thread():
"""
Start a thread that cleans up sessions older than 5 minutes.
"""
daemon.run_without_db_token(__start_session_cleanup)
def __start_session_cleanup():
while True:
with session_lock:
sessions = general.get_session_lookup(exclude_last_x_seconds=20 * 60)
for session in sessions:
try:
general.force_remove_and_refresh_session_by_id(
session["session_id"]
)
print("Session removed", session, flush=True)
except Exception:
traceback.print_exc()
time.sleep(10)