Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Use attempt_to_set_autocommit everywhere. #16615

Merged
merged 4 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/16615.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Use more generic database methods.
18 changes: 12 additions & 6 deletions synapse/storage/background_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@

if TYPE_CHECKING:
from synapse.server import HomeServer
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -746,10 +750,10 @@ async def create_index_in_background(
The named index will be dropped upon completion of the new index.
"""

def create_index_psql(conn: Connection) -> None:
def create_index_psql(conn: "LoggingDatabaseConnection") -> None:
conn.rollback()
# postgres insists on autocommit for the index
conn.set_session(autocommit=True) # type: ignore
conn.engine.attempt_to_set_autocommit(conn.conn, True)

try:
c = conn.cursor()
Expand Down Expand Up @@ -793,9 +797,9 @@ def create_index_psql(conn: Connection) -> None:
undo_timeout_sql = f"SET statement_timeout = {default_timeout}"
conn.cursor().execute(undo_timeout_sql)

conn.set_session(autocommit=False) # type: ignore
conn.engine.attempt_to_set_autocommit(conn.conn, False)

def create_index_sqlite(conn: Connection) -> None:
def create_index_sqlite(conn: "LoggingDatabaseConnection") -> None:
# Sqlite doesn't support concurrent creation of indexes.
#
# We assume that sqlite doesn't give us invalid indices; however
Expand Down Expand Up @@ -825,7 +829,9 @@ def create_index_sqlite(conn: Connection) -> None:
c.execute(sql)

if isinstance(self.db_pool.engine, engines.PostgresEngine):
runner: Optional[Callable[[Connection], None]] = create_index_psql
runner: Optional[
Callable[[LoggingDatabaseConnection], None]
] = create_index_psql
elif psql_only:
runner = None
else:
Expand Down
8 changes: 4 additions & 4 deletions synapse/storage/databases/main/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ def create_index(conn: LoggingDatabaseConnection) -> None:

# we have to set autocommit, because postgres refuses to
# CREATE INDEX CONCURRENTLY without it.
conn.set_session(autocommit=True)
conn.engine.attempt_to_set_autocommit(conn.conn, True)

try:
c = conn.cursor()
Expand All @@ -301,7 +301,7 @@ def create_index(conn: LoggingDatabaseConnection) -> None:
# we should now be able to delete the GIST index.
c.execute("DROP INDEX IF EXISTS event_search_fts_idx_gist")
finally:
conn.set_session(autocommit=False)
conn.engine.attempt_to_set_autocommit(conn.conn, False)

if isinstance(self.database_engine, PostgresEngine):
await self.db_pool.runWithConnection(create_index)
Expand All @@ -323,7 +323,7 @@ async def _background_reindex_search_order(

def create_index(conn: LoggingDatabaseConnection) -> None:
conn.rollback()
conn.set_session(autocommit=True)
conn.engine.attempt_to_set_autocommit(conn.conn, True)
c = conn.cursor()

# We create with NULLS FIRST so that when we search *backwards*
Expand All @@ -340,7 +340,7 @@ def create_index(conn: LoggingDatabaseConnection) -> None:
ON event_search(origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)
"""
)
conn.set_session(autocommit=False)
conn.engine.attempt_to_set_autocommit(conn.conn, False)

await self.db_pool.runWithConnection(create_index)

Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/state/bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ def reindex_txn(conn: LoggingDatabaseConnection) -> None:
conn.rollback()
if isinstance(self.database_engine, PostgresEngine):
# postgres insists on autocommit for the index
conn.set_session(autocommit=True)
conn.engine.attempt_to_set_autocommit(conn.conn, True)
try:
txn = conn.cursor()
txn.execute(
Expand All @@ -501,7 +501,7 @@ def reindex_txn(conn: LoggingDatabaseConnection) -> None:
)
txn.execute("DROP INDEX IF EXISTS state_groups_state_id")
finally:
conn.set_session(autocommit=False)
conn.engine.attempt_to_set_autocommit(conn.conn, False)
else:
txn = conn.cursor()
txn.execute(
Expand Down
15 changes: 5 additions & 10 deletions tests/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
from synapse.server import HomeServer
from synapse.storage import DataStore
from synapse.storage.database import LoggingDatabaseConnection
from synapse.storage.engines import PostgresEngine, create_engine
from synapse.storage.engines import create_engine
from synapse.storage.prepare_database import prepare_database
from synapse.types import ISynapseReactor, JsonDict
from synapse.util import Clock
Expand Down Expand Up @@ -1029,18 +1029,15 @@ def setup_test_homeserver(

# Create the database before we actually try and connect to it, based off
# the template database we generate in setupdb()
if isinstance(db_engine, PostgresEngine):
import psycopg2.extensions

if USE_POSTGRES_FOR_TESTS:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is weirdly so that mypy treats db_engine as a BaseDatabaseEngine and not a PostgresEngine or it dislikes that db_conn is a Connection (protocol) instead of psycopg2.extensions.connection.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably related to

class PostgresEngine(
BaseDatabaseEngine[psycopg2.extensions.connection, psycopg2.extensions.cursor]
):
which was my way of trying to teach mypy which connection and cursor type to expect when we're using the PostgresEngine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Although if we had to do the type-assert on line -1042 then it clearly wasn't working.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right -- from those mypy is able to deduce that the PostgresEngine creates a psycopg2.extensions.connection; but the API expects a Connection protocol, and it doesn't seem to understand that psycopg2.extensions.connection matches Connection?

I wonder if we should make LoggingTransaction etc. generic and be passing these through so that the type checking knows the real types of things? I guess you still need a generic protocol type though for use with the BaseDatabaseEngine class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't seem to understand that psycopg2.extensions.connection matches Connection?

That's a surprise to me:

2023-11-09 21:04:54 ✔  $ cat temp.py
import psycopg2.extensions
import typing
import synapse.storage.types

pg_conn: psycopg2.extensions.connection
generic_conn: synapse.storage.types.Connection

generic_conn = pg_conn


dmr on titan in synapse on  develop is 📦 v1.96.0rc1 via 🐍 v3.11.6 (matrix-synapse-py3.11) via 🦀 v1.68.0 
2023-11-09 21:05:01 ✔  $ mypy temp.py 
Success: no issues found in 1 source file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests/server.py:1085: error: Argument 1 to "attempt_to_set_autocommit" of "PostgresEngine" has incompatible type "Connection"; expected "connection"  [arg-type]

More than happy to do another PR changing it back though?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤷 No strong opinions here!

db_conn = db_engine.module.connect(
database=POSTGRES_BASE_DB,
user=POSTGRES_USER,
host=POSTGRES_HOST,
port=POSTGRES_PORT,
password=POSTGRES_PASSWORD,
)
assert isinstance(db_conn, psycopg2.extensions.connection)
db_conn.autocommit = True
db_engine.attempt_to_set_autocommit(db_conn, True)
cur = db_conn.cursor()
cur.execute("DROP DATABASE IF EXISTS %s;" % (test_db,))
cur.execute(
Expand All @@ -1065,13 +1062,12 @@ def setup_test_homeserver(

hs.setup()

if isinstance(db_engine, PostgresEngine):
if USE_POSTGRES_FOR_TESTS:
database_pool = hs.get_datastores().databases[0]

# We need to do cleanup on PostgreSQL
def cleanup() -> None:
import psycopg2
import psycopg2.extensions

# Close all the db pools
database_pool._db_pool.close()
Expand All @@ -1086,8 +1082,7 @@ def cleanup() -> None:
port=POSTGRES_PORT,
password=POSTGRES_PASSWORD,
)
assert isinstance(db_conn, psycopg2.extensions.connection)
db_conn.autocommit = True
db_engine.attempt_to_set_autocommit(db_conn, True)
cur = db_conn.cursor()

# Try a few times to drop the DB. Some things may hold on to the
Expand Down