Skip to content

Commit

Permalink
Merge pull request #482 from lsst/tickets/DM-26302
Browse files Browse the repository at this point in the history
DM-26302: Avoid long-lived connections in Database
  • Loading branch information
andy-slac committed Mar 2, 2021
2 parents 1b946d6 + 683ceb2 commit c7341d5
Show file tree
Hide file tree
Showing 7 changed files with 314 additions and 240 deletions.
12 changes: 12 additions & 0 deletions python/lsst/daf/butler/registry/_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,18 @@ def transaction(self, *, savepoint: bool = False) -> Iterator[None]:
self._managers.dimensions.clearCaches()
raise

def resetConnectionPool(self) -> None:
"""Reset SQLAlchemy connection pool for registry database.
This operation is useful when using registry with fork-based
multiprocessing. To use registry across fork boundary one has to make
sure that there are no currently active connections (no session or
transaction is in progress) and connection pool is reset using this
method. This method should be called by the child process immediately
after the fork.
"""
self._db._engine.dispose()

def registerOpaqueTable(self, tableName: str, spec: ddl.TableSpec) -> None:
"""Add an opaque (to the `Registry`) table for use by a `Datastore` or
other data repository client.
Expand Down
43 changes: 23 additions & 20 deletions python/lsst/daf/butler/registry/databases/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,35 +63,38 @@ class PostgresqlDatabase(Database):
on the database being connected to; this is checked at connection time.
"""

def __init__(self, *, connection: sqlalchemy.engine.Connection, origin: int,
def __init__(self, *, engine: sqlalchemy.engine.Engine, origin: int,
namespace: Optional[str] = None, writeable: bool = True):
super().__init__(origin=origin, connection=connection, namespace=namespace)
dbapi = connection.connection
try:
dsn = dbapi.get_dsn_parameters()
except (AttributeError, KeyError) as err:
raise RuntimeError("Only the psycopg2 driver for PostgreSQL is supported.") from err
if namespace is None:
namespace = connection.execute("SELECT current_schema();").scalar()
if not connection.execute("SELECT COUNT(*) FROM pg_extension WHERE extname='btree_gist';").scalar():
raise RuntimeError(
"The Butler PostgreSQL backend requires the btree_gist extension. "
"As extensions are enabled per-database, this may require an administrator to run "
"`CREATE EXTENSION btree_gist;` in a database before a butler client for it is initialized."
)
super().__init__(origin=origin, engine=engine, namespace=namespace)
with engine.connect() as connection:
dbapi = connection.connection
try:
dsn = dbapi.get_dsn_parameters()
except (AttributeError, KeyError) as err:
raise RuntimeError("Only the psycopg2 driver for PostgreSQL is supported.") from err
if namespace is None:
namespace = connection.execute("SELECT current_schema();").scalar()
query = "SELECT COUNT(*) FROM pg_extension WHERE extname='btree_gist';"
if not connection.execute(query).scalar():
raise RuntimeError(
"The Butler PostgreSQL backend requires the btree_gist extension. "
"As extensions are enabled per-database, this may require an administrator to run "
"`CREATE EXTENSION btree_gist;` in a database before a butler client for it is "
" initialized."
)
self.namespace = namespace
self.dbname = dsn.get("dbname")
self._writeable = writeable
self._shrinker = NameShrinker(connection.engine.dialect.max_identifier_length)

@classmethod
def connect(cls, uri: str, *, writeable: bool = True) -> sqlalchemy.engine.Connection:
return sqlalchemy.engine.create_engine(uri, poolclass=sqlalchemy.pool.NullPool).connect()
def makeEngine(cls, uri: str, *, writeable: bool = True) -> sqlalchemy.engine.Engine:
return sqlalchemy.engine.create_engine(uri)

@classmethod
def fromConnection(cls, connection: sqlalchemy.engine.Connection, *, origin: int,
namespace: Optional[str] = None, writeable: bool = True) -> Database:
return cls(connection=connection, origin=origin, namespace=namespace, writeable=writeable)
def fromEngine(cls, engine: sqlalchemy.engine.Engine, *, origin: int,
namespace: Optional[str] = None, writeable: bool = True) -> Database:
return cls(engine=engine, origin=origin, namespace=namespace, writeable=writeable)

@contextmanager
def transaction(self, *, interrupting: bool = False, savepoint: bool = False,
Expand Down
34 changes: 15 additions & 19 deletions python/lsst/daf/butler/registry/databases/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,13 @@ class SqliteDatabase(Database):
across databases well enough to define it.
"""

def __init__(self, *, connection: sqlalchemy.engine.Connection, origin: int,
def __init__(self, *, engine: sqlalchemy.engine.Engine, origin: int,
namespace: Optional[str] = None, writeable: bool = True):
super().__init__(origin=origin, connection=connection, namespace=namespace)
super().__init__(origin=origin, engine=engine, namespace=namespace)
# Get the filename from a call to 'PRAGMA database_list'.
with closing(connection.connection.cursor()) as cursor:
dbList = list(cursor.execute("PRAGMA database_list").fetchall())
with engine.connect() as connection:
with closing(connection.connection.cursor()) as cursor:
dbList = list(cursor.execute("PRAGMA database_list").fetchall())
if len(dbList) == 0:
raise RuntimeError("No database in connection.")
if namespace is None:
Expand All @@ -176,9 +177,9 @@ def makeDefaultUri(cls, root: str) -> Optional[str]:
return "sqlite:///" + os.path.join(root, "gen3.sqlite3")

@classmethod
def connect(cls, uri: Optional[str] = None, *, filename: Optional[str] = None,
writeable: bool = True) -> sqlalchemy.engine.Connection:
"""Create a `sqlalchemy.engine.Connection` from a SQLAlchemy URI or
def makeEngine(cls, uri: Optional[str] = None, *, filename: Optional[str] = None,
writeable: bool = True) -> sqlalchemy.engine.Engine:
"""Create a `sqlalchemy.engine.Engine` from a SQLAlchemy URI or
filename.
Parameters
Expand All @@ -188,18 +189,14 @@ def connect(cls, uri: Optional[str] = None, *, filename: Optional[str] = None,
filename : `str`
Name of the SQLite database file, or `None` to use an in-memory
database. Ignored if ``uri is not None``.
origin : `int`
An integer ID that should be used as the default for any datasets,
quanta, or other entities that use a (autoincrement, origin)
compound primary key.
writeable : `bool`, optional
If `True`, allow write operations on the database, including
``CREATE TABLE``.
Returns
-------
cs : `sqlalchemy.engine.Connection`
A database connection and transaction state.
engine : `sqlalchemy.engine.Engine`
A database engine.
"""
# In order to be able to tell SQLite that we want a read-only or
# read-write connection, we need to make the SQLite DBAPI connection
Expand Down Expand Up @@ -248,21 +245,20 @@ def connect(cls, uri: Optional[str] = None, *, filename: Optional[str] = None,
def creator() -> sqlite3.Connection:
return sqlite3.connect(target, check_same_thread=False, uri=True)

engine = sqlalchemy.engine.create_engine(uri, poolclass=sqlalchemy.pool.NullPool,
creator=creator)
engine = sqlalchemy.engine.create_engine(uri, creator=creator)

sqlalchemy.event.listen(engine, "connect", _onSqlite3Connect)
sqlalchemy.event.listen(engine, "begin", _onSqlite3Begin)
try:
return engine.connect()
return engine
except sqlalchemy.exc.OperationalError as err:
raise RuntimeError(f"Error creating connection with uri='{uri}', filename='{filename}', "
f"target={target}.") from err

@classmethod
def fromConnection(cls, connection: sqlalchemy.engine.Connection, *, origin: int,
namespace: Optional[str] = None, writeable: bool = True) -> Database:
return cls(connection=connection, origin=origin, writeable=writeable, namespace=namespace)
def fromEngine(cls, engine: sqlalchemy.engine.Engine, *, origin: int,
namespace: Optional[str] = None, writeable: bool = True) -> Database:
return cls(engine=engine, origin=origin, writeable=writeable, namespace=namespace)

def isWriteable(self) -> bool:
return self._writeable
Expand Down

0 comments on commit c7341d5

Please sign in to comment.