-
-
Notifications
You must be signed in to change notification settings - Fork 210
/
__init__.py
59 lines (47 loc) · 1.86 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.pool import NullPool
import time
import psycopg2
# This value must be incremented after schema changes on replicated tables!
SCHEMA_VERSION = 4
engine = None
DUMP_DEFAULT_THREAD_COUNT = 4
def init_db_connection(connect_str):
"""Initializes database connection using the specified Flask app.
Configuration file must contain `SQLALCHEMY_DATABASE_URI` key. See
https://pythonhosted.org/Flask-SQLAlchemy/config.html#configuration-keys
for more info.
"""
global engine
while True:
try:
engine = create_engine(connect_str, poolclass=NullPool)
print("Connection to db established!")
break
except psycopg2.OperationalError as e:
print("Couldn't establish connection to db: {}".format(str(e)))
print("Sleeping 2 seconds and trying again...")
time.sleep(2)
def run_sql_script(sql_file_path):
with open(sql_file_path) as sql:
with engine.connect() as connection:
connection.execute(sql.read())
def run_sql_script_without_transaction(sql_file_path):
with open(sql_file_path) as sql:
connection = engine.connect()
connection.connection.set_isolation_level(0)
lines = sql.read().splitlines()
try:
for line in lines:
# TODO: Not a great way of removing comments. The alternative is to catch
# the exception sqlalchemy.exc.ProgrammingError "can't execute an empty query"
if line and not line.startswith("--"):
connection.execute(line)
except sqlalchemy.exc.ProgrammingError as e:
print("Error: {}".format(e))
return False
finally:
connection.connection.set_isolation_level(1)
connection.close()
return True