Skip to content

Commit

Permalink
Use with engine.connect() context manager
Browse files Browse the repository at this point in the history
Consolidate autocommit across drivers that support it, xref.
#494 and
fix #486 .
  • Loading branch information
nsoranzo committed Mar 18, 2021
1 parent b3d75c3 commit 4f52578
Showing 1 changed file with 49 additions and 58 deletions.
107 changes: 49 additions & 58 deletions sqlalchemy_utils/functions/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,23 +539,25 @@ def create_database(url, encoding='utf8', template=None):

url = copy(make_url(url))
database = url.database
dialect_name = url.get_dialect().name
dialect_driver = url.get_dialect().driver

if url.drivername.startswith('postgres'):
if dialect_name == 'postgres':
url = _set_url_database(url, database="postgres")
elif url.drivername.startswith('mssql'):
elif dialect_name == 'mssql':
url = _set_url_database(url, database="master")
elif not url.drivername.startswith('sqlite'):
elif not dialect_name == 'sqlite':
url = _set_url_database(url, database=None)

if url.drivername == 'mssql+pyodbc':
if dialect_name == 'mssql' and dialect_driver in {'pymssql', 'pyodbc'}:
engine = sa.create_engine(url, connect_args={'autocommit': True})
elif url.drivername == 'postgresql+pg8000':
elif dialect_name == 'postgresql' and dialect_driver in {
'asyncpg', 'pg8000', 'psycopg2', 'psycopg2cffi'}:
engine = sa.create_engine(url, isolation_level='AUTOCOMMIT')
else:
engine = sa.create_engine(url)
result_proxy = None

if engine.dialect.name == 'postgresql':
if dialect_name == 'postgresql':
if not template:
template = 'template1'

Expand All @@ -565,31 +567,28 @@ def create_database(url, encoding='utf8', template=None):
quote(engine, template)
)

if engine.driver == 'psycopg2cffi' or engine.driver == "psycopg2":
with engine.connect().execution_options(
isolation_level="AUTOCOMMIT") as connection:
connection.execute(text)
else:
result_proxy = engine.execute(text)
with engine.connect() as connection:
connection.execute(text)

elif engine.dialect.name == 'mysql':
elif dialect_name == 'mysql':
text = "CREATE DATABASE {0} CHARACTER SET = '{1}'".format(
quote(engine, database),
encoding
)
result_proxy = engine.execute(text)
with engine.connect() as connection:
connection.execute(text)

elif engine.dialect.name == 'sqlite' and database != ':memory:':
elif dialect_name == 'sqlite' and database != ':memory:':
if database:
engine.execute("CREATE TABLE DB(id int);")
engine.execute("DROP TABLE DB;")
with engine.connect() as connection:
connection.execute("CREATE TABLE DB(id int);")
connection.execute("DROP TABLE DB;")

else:
text = 'CREATE DATABASE {0}'.format(quote(engine, database))
result_proxy = engine.execute(text)
with engine.connect() as connection:
connection.execute(text)

if result_proxy is not None:
result_proxy.close()
engine.dispose()


Expand All @@ -607,57 +606,49 @@ def drop_database(url):
"""

url = copy(make_url(url))

database = url.database
dialect_name = url.get_dialect().name
dialect_driver = url.get_dialect().driver

if url.drivername.startswith('postgres'):
if dialect_name == 'postgres':
url = _set_url_database(url, database="postgres")
elif url.drivername.startswith('mssql'):
elif dialect_name == 'mssql':
url = _set_url_database(url, database="master")
elif not url.drivername.startswith('sqlite'):
elif not dialect_name == 'sqlite':
url = _set_url_database(url, database=None)

if url.drivername == 'mssql+pyodbc':
if dialect_name == 'mssql' and dialect_driver in {'pymssql', 'pyodbc'}:
engine = sa.create_engine(url, connect_args={'autocommit': True})
elif url.drivername == 'postgresql+pg8000':
elif dialect_name == 'postgresql' and dialect_driver in {
'asyncpg', 'pg8000', 'psycopg2', 'psycopg2cffi'}:
engine = sa.create_engine(url, isolation_level='AUTOCOMMIT')
else:
engine = sa.create_engine(url)
conn_resource = None

if engine.dialect.name == 'sqlite' and database != ':memory:':
if dialect_name == 'sqlite' and database != ':memory:':
if database:
os.remove(database)

elif (
engine.dialect.name == 'postgresql' and
engine.driver in {'psycopg2', 'psycopg2cffi'}
):
connection = engine.connect().execution_options(
isolation_level="AUTOCOMMIT"
)

# Disconnect all users from the database we are dropping.
version = connection.dialect.server_version_info
pid_column = (
'pid' if (version >= (9, 2)) else 'procpid'
)
text = '''
SELECT pg_terminate_backend(pg_stat_activity.%(pid_column)s)
FROM pg_stat_activity
WHERE pg_stat_activity.datname = '%(database)s'
AND %(pid_column)s <> pg_backend_pid();
''' % {'pid_column': pid_column, 'database': database}
connection.execute(text)

# Drop the database.
text = 'DROP DATABASE {0}'.format(quote(connection, database))
connection.execute(text)
conn_resource = connection
elif dialect_name == 'postgresql':
with engine.connect() as connection:
# Disconnect all users from the database we are dropping.
version = connection.dialect.server_version_info
pid_column = (
'pid' if (version >= (9, 2)) else 'procpid'
)
text = '''
SELECT pg_terminate_backend(pg_stat_activity.%(pid_column)s)
FROM pg_stat_activity
WHERE pg_stat_activity.datname = '%(database)s'
AND %(pid_column)s <> pg_backend_pid();
''' % {'pid_column': pid_column, 'database': database}
connection.execute(text)

# Drop the database.
text = 'DROP DATABASE {0}'.format(quote(connection, database))
connection.execute(text)
else:
text = 'DROP DATABASE {0}'.format(quote(engine, database))
conn_resource = engine.execute(text)
with engine.connect() as connection:
connection.execute(text)

if conn_resource is not None:
conn_resource.close()
engine.dispose()

0 comments on commit 4f52578

Please sign in to comment.