Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-26302: Avoid long-lived connections in Database #482

Merged
merged 3 commits into from
Mar 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions python/lsst/daf/butler/registry/_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,18 @@ def transaction(self, *, savepoint: bool = False) -> Iterator[None]:
self._managers.dimensions.clearCaches()
raise

def resetConnectionPool(self) -> None:
"""Reset SQLAlchemy connection pool for registry database.

This operation is useful when using registry with fork-based
multiprocessing. To use registry across fork boundary one has to make
sure that there are no currently active connections (no session or
transaction is in progress) and connection pool is reset using this
method. This method should be called by the child process immediately
after the fork.
"""
self._db._engine.dispose()

def registerOpaqueTable(self, tableName: str, spec: ddl.TableSpec) -> None:
"""Add an opaque (to the `Registry`) table for use by a `Datastore` or
other data repository client.
Expand Down
43 changes: 23 additions & 20 deletions python/lsst/daf/butler/registry/databases/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,35 +63,38 @@ class PostgresqlDatabase(Database):
on the database being connected to; this is checked at connection time.
"""

def __init__(self, *, connection: sqlalchemy.engine.Connection, origin: int,
def __init__(self, *, engine: sqlalchemy.engine.Engine, origin: int,
namespace: Optional[str] = None, writeable: bool = True):
super().__init__(origin=origin, connection=connection, namespace=namespace)
dbapi = connection.connection
try:
dsn = dbapi.get_dsn_parameters()
except (AttributeError, KeyError) as err:
raise RuntimeError("Only the psycopg2 driver for PostgreSQL is supported.") from err
if namespace is None:
namespace = connection.execute("SELECT current_schema();").scalar()
if not connection.execute("SELECT COUNT(*) FROM pg_extension WHERE extname='btree_gist';").scalar():
raise RuntimeError(
"The Butler PostgreSQL backend requires the btree_gist extension. "
"As extensions are enabled per-database, this may require an administrator to run "
"`CREATE EXTENSION btree_gist;` in a database before a butler client for it is initialized."
)
super().__init__(origin=origin, engine=engine, namespace=namespace)
with engine.connect() as connection:
dbapi = connection.connection
try:
dsn = dbapi.get_dsn_parameters()
except (AttributeError, KeyError) as err:
raise RuntimeError("Only the psycopg2 driver for PostgreSQL is supported.") from err
if namespace is None:
namespace = connection.execute("SELECT current_schema();").scalar()
query = "SELECT COUNT(*) FROM pg_extension WHERE extname='btree_gist';"
if not connection.execute(query).scalar():
raise RuntimeError(
"The Butler PostgreSQL backend requires the btree_gist extension. "
"As extensions are enabled per-database, this may require an administrator to run "
"`CREATE EXTENSION btree_gist;` in a database before a butler client for it is "
" initialized."
)
self.namespace = namespace
self.dbname = dsn.get("dbname")
self._writeable = writeable
self._shrinker = NameShrinker(connection.engine.dialect.max_identifier_length)

@classmethod
def connect(cls, uri: str, *, writeable: bool = True) -> sqlalchemy.engine.Connection:
return sqlalchemy.engine.create_engine(uri, poolclass=sqlalchemy.pool.NullPool).connect()
def makeEngine(cls, uri: str, *, writeable: bool = True) -> sqlalchemy.engine.Engine:
return sqlalchemy.engine.create_engine(uri)

@classmethod
def fromConnection(cls, connection: sqlalchemy.engine.Connection, *, origin: int,
namespace: Optional[str] = None, writeable: bool = True) -> Database:
return cls(connection=connection, origin=origin, namespace=namespace, writeable=writeable)
def fromEngine(cls, engine: sqlalchemy.engine.Engine, *, origin: int,
namespace: Optional[str] = None, writeable: bool = True) -> Database:
return cls(engine=engine, origin=origin, namespace=namespace, writeable=writeable)

@contextmanager
def transaction(self, *, interrupting: bool = False, savepoint: bool = False,
Expand Down
34 changes: 15 additions & 19 deletions python/lsst/daf/butler/registry/databases/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,13 @@ class SqliteDatabase(Database):
across databases well enough to define it.
"""

def __init__(self, *, connection: sqlalchemy.engine.Connection, origin: int,
def __init__(self, *, engine: sqlalchemy.engine.Engine, origin: int,
namespace: Optional[str] = None, writeable: bool = True):
super().__init__(origin=origin, connection=connection, namespace=namespace)
super().__init__(origin=origin, engine=engine, namespace=namespace)
# Get the filename from a call to 'PRAGMA database_list'.
with closing(connection.connection.cursor()) as cursor:
dbList = list(cursor.execute("PRAGMA database_list").fetchall())
with engine.connect() as connection:
with closing(connection.connection.cursor()) as cursor:
dbList = list(cursor.execute("PRAGMA database_list").fetchall())
if len(dbList) == 0:
raise RuntimeError("No database in connection.")
if namespace is None:
Expand All @@ -176,9 +177,9 @@ def makeDefaultUri(cls, root: str) -> Optional[str]:
return "sqlite:///" + os.path.join(root, "gen3.sqlite3")

@classmethod
def connect(cls, uri: Optional[str] = None, *, filename: Optional[str] = None,
writeable: bool = True) -> sqlalchemy.engine.Connection:
"""Create a `sqlalchemy.engine.Connection` from a SQLAlchemy URI or
def makeEngine(cls, uri: Optional[str] = None, *, filename: Optional[str] = None,
writeable: bool = True) -> sqlalchemy.engine.Engine:
"""Create a `sqlalchemy.engine.Engine` from a SQLAlchemy URI or
filename.

Parameters
Expand All @@ -188,18 +189,14 @@ def connect(cls, uri: Optional[str] = None, *, filename: Optional[str] = None,
filename : `str`
Name of the SQLite database file, or `None` to use an in-memory
database. Ignored if ``uri is not None``.
origin : `int`
An integer ID that should be used as the default for any datasets,
quanta, or other entities that use a (autoincrement, origin)
compound primary key.
writeable : `bool`, optional
If `True`, allow write operations on the database, including
``CREATE TABLE``.

Returns
-------
cs : `sqlalchemy.engine.Connection`
A database connection and transaction state.
engine : `sqlalchemy.engine.Engine`
A database engine.
"""
# In order to be able to tell SQLite that we want a read-only or
# read-write connection, we need to make the SQLite DBAPI connection
Expand Down Expand Up @@ -248,21 +245,20 @@ def connect(cls, uri: Optional[str] = None, *, filename: Optional[str] = None,
def creator() -> sqlite3.Connection:
return sqlite3.connect(target, check_same_thread=False, uri=True)

engine = sqlalchemy.engine.create_engine(uri, poolclass=sqlalchemy.pool.NullPool,
creator=creator)
engine = sqlalchemy.engine.create_engine(uri, creator=creator)

sqlalchemy.event.listen(engine, "connect", _onSqlite3Connect)
sqlalchemy.event.listen(engine, "begin", _onSqlite3Begin)
try:
return engine.connect()
return engine
except sqlalchemy.exc.OperationalError as err:
raise RuntimeError(f"Error creating connection with uri='{uri}', filename='{filename}', "
f"target={target}.") from err

@classmethod
def fromConnection(cls, connection: sqlalchemy.engine.Connection, *, origin: int,
namespace: Optional[str] = None, writeable: bool = True) -> Database:
return cls(connection=connection, origin=origin, writeable=writeable, namespace=namespace)
def fromEngine(cls, engine: sqlalchemy.engine.Engine, *, origin: int,
namespace: Optional[str] = None, writeable: bool = True) -> Database:
return cls(engine=engine, origin=origin, writeable=writeable, namespace=namespace)

def isWriteable(self) -> bool:
return self._writeable
Expand Down