Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add type hints to schema deltas #15497

Merged
merged 12 commits into from Apr 27, 2023
1 change: 1 addition & 0 deletions changelog.d/15496.misc
@@ -0,0 +1 @@
Improve type hints.
3 changes: 0 additions & 3 deletions mypy.ini
Expand Up @@ -28,13 +28,10 @@ files =
# https://docs.python.org/3/library/re.html#re.X
exclude = (?x)
^(
|synapse/storage/databases/__init__.py
|synapse/storage/databases/main/cache.py
|synapse/storage/schema/
)$

[mypy-synapse.metrics._reactor_metrics]
disallow_untyped_defs = False
# This module imports select.epoll. That exists on Linux, but doesn't on macOS.
# See https://github.com/matrix-org/synapse/pull/11771.
warn_unused_ignores = False
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/__init__.py
Expand Up @@ -95,7 +95,7 @@ def __init__(self, main_store_class: Type[DataStoreT], hs: "HomeServer"):
# If we're on a process that can persist events also
# instantiate a `PersistEventsStore`
if hs.get_instance_name() in hs.config.worker.writers.events:
persist_events = PersistEventsStore(hs, database, main, db_conn)
persist_events = PersistEventsStore(hs, database, main, db_conn) # type: ignore[arg-type]

if "state" in database_config.databases:
logger.info(
Expand Down Expand Up @@ -133,6 +133,6 @@ def __init__(self, main_store_class: Type[DataStoreT], hs: "HomeServer"):

# We use local variables here to ensure that the databases do not have
# optional types.
self.main = main
self.main = main # type: ignore[assignment]
self.state = state
self.persist_events = persist_events
16 changes: 9 additions & 7 deletions synapse/storage/databases/main/cache.py
Expand Up @@ -205,13 +205,13 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None:
)
elif row.type == EventsStreamCurrentStateRow.TypeId:
assert isinstance(data, EventsStreamCurrentStateRow)
self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token)
self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined]

if data.type == EventTypes.Member:
self.get_rooms_for_user_with_stream_ordering.invalidate(
self.get_rooms_for_user_with_stream_ordering.invalidate( # type: ignore[attr-defined]
(data.state_key,)
)
self.get_rooms_for_user.invalidate((data.state_key,))
self.get_rooms_for_user.invalidate((data.state_key,)) # type: ignore[attr-defined]
else:
raise Exception("Unknown events stream row type %s" % (row.type,))

Expand All @@ -229,7 +229,7 @@ def _invalidate_caches_for_event(
# This invalidates any local in-memory cached event objects, the original
# process triggering the invalidation is responsible for clearing any external
# cached objects.
self._invalidate_local_get_event_cache(event_id)
self._invalidate_local_get_event_cache(event_id) # type: ignore[attr-defined]

self._attempt_to_invalidate_cache("have_seen_event", (room_id, event_id))
self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,))
Expand All @@ -242,10 +242,10 @@ def _invalidate_caches_for_event(
self._attempt_to_invalidate_cache("_get_membership_from_event_id", (event_id,))

if not backfilled:
self._events_stream_cache.entity_has_changed(room_id, stream_ordering)
self._events_stream_cache.entity_has_changed(room_id, stream_ordering) # type: ignore[attr-defined]

if redacts:
self._invalidate_local_get_event_cache(redacts)
self._invalidate_local_get_event_cache(redacts) # type: ignore[attr-defined]
# Caches which might leak edits must be invalidated for the event being
# redacted.
self._attempt_to_invalidate_cache("get_relations_for_event", (redacts,))
Expand All @@ -254,7 +254,7 @@ def _invalidate_caches_for_event(
self._attempt_to_invalidate_cache("get_thread_id_for_receipts", (redacts,))

if etype == EventTypes.Member:
self._membership_stream_cache.entity_has_changed(state_key, stream_ordering)
self._membership_stream_cache.entity_has_changed(state_key, stream_ordering) # type: ignore[attr-defined]
self._attempt_to_invalidate_cache(
"get_invited_rooms_for_local_user", (state_key,)
)
Expand Down Expand Up @@ -378,6 +378,8 @@ def _send_invalidation_to_replication(
)

if isinstance(self.database_engine, PostgresEngine):
assert self._cache_id_gen is not None

# get_next() returns a context manager which is designed to wrap
# the transaction. However, we want to only get an ID when we want
# to use it, here, so we need to call __enter__ manually, and have
Expand Down
4 changes: 0 additions & 4 deletions synapse/storage/schema/main/delta/20/pushers.py
Expand Up @@ -81,7 +81,3 @@ def run_create(cur, database_engine, *args, **kwargs):
cur.execute("DROP TABLE pushers")
cur.execute("ALTER TABLE pushers2 RENAME TO pushers")
logger.info("Moved %d pushers to new table", count)


def run_upgrade(*args, **kwargs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is safe to remove, because we do a hasattr check:

if not is_empty and hasattr(module, "run_upgrade"):

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should have called that out, sorry. 👍

pass
4 changes: 0 additions & 4 deletions synapse/storage/schema/main/delta/25/fts.py
Expand Up @@ -72,7 +72,3 @@ def run_create(cur, database_engine, *args, **kwargs):
)

cur.execute(sql, ("event_search", progress_json))


def run_upgrade(*args, **kwargs):
pass
4 changes: 0 additions & 4 deletions synapse/storage/schema/main/delta/27/ts.py
Expand Up @@ -51,7 +51,3 @@ def run_create(cur, database_engine, *args, **kwargs):
)

cur.execute(sql, ("event_origin_server_ts", progress_json))


def run_upgrade(*args, **kwargs):
pass
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of the way mypy assigns python files to modules (it essentially searches the directory tree until it finds a __init__.py) we need all Python schema deltas to be unique.

This was the only conflict; I played with some of the options for changing this behavior (namespace packages, etc.) but it seemed easiest to just rename the single offender.

This is from a really really old schema so I think renaming it won't break anything, but I'm not 100% confident on that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only breakage I can see: someone who is still on schema 31 might end up trying to apply the renamed migration after it was already run, if they upgraded directly to a Synapse version with this change.

This looks to be around 7 years old now. I think we should be blasé about breaking setups which are that old and suffer from known security problems.

Expand Up @@ -80,7 +80,3 @@ def run_create(cur, database_engine, *args, **kwargs):
cur.execute("DROP TABLE pushers")
cur.execute("ALTER TABLE pushers2 RENAME TO pushers")
logger.info("Moved %d pushers to new table", count)


def run_upgrade(cur, database_engine, *args, **kwargs):
pass
4 changes: 0 additions & 4 deletions synapse/storage/schema/main/delta/31/search_update.py
Expand Up @@ -56,7 +56,3 @@ def run_create(cur, database_engine, *args, **kwargs):
)

cur.execute(sql, ("event_search_order", progress_json))


def run_upgrade(cur, database_engine, *args, **kwargs):
pass
4 changes: 0 additions & 4 deletions synapse/storage/schema/main/delta/33/event_fields.py
Expand Up @@ -51,7 +51,3 @@ def run_create(cur, database_engine, *args, **kwargs):
)

cur.execute(sql, ("event_fields_sender_url", progress_json))


def run_upgrade(cur, database_engine, *args, **kwargs):
pass
4 changes: 0 additions & 4 deletions synapse/storage/schema/main/delta/34/cache_stream.py
Expand Up @@ -40,7 +40,3 @@ def run_create(cur, database_engine, *args, **kwargs):

for statement in get_statements(CREATE_TABLE.splitlines()):
cur.execute(statement)


def run_upgrade(cur, database_engine, *args, **kwargs):
pass
4 changes: 0 additions & 4 deletions synapse/storage/schema/main/delta/34/received_txn_purge.py
Expand Up @@ -26,7 +26,3 @@ def run_create(cur, database_engine, *args, **kwargs):
cur.execute("DELETE FROM received_transactions")

cur.execute("CREATE INDEX received_transactions_ts ON received_transactions(ts)")


def run_upgrade(cur, database_engine, *args, **kwargs):
pass
4 changes: 0 additions & 4 deletions synapse/storage/schema/main/delta/37/remove_auth_idx.py
Expand Up @@ -79,7 +79,3 @@ def run_create(cur, database_engine, *args, **kwargs):

for statement in get_statements(drop_constraint.splitlines()):
cur.execute(statement)


def run_upgrade(cur, database_engine, *args, **kwargs):
pass
4 changes: 0 additions & 4 deletions synapse/storage/schema/main/delta/42/user_dir.py
Expand Up @@ -78,7 +78,3 @@ def run_create(cur, database_engine, *args, **kwargs):
cur.execute(statement)
else:
raise Exception("Unrecognized database engine")


def run_upgrade(*args, **kwargs):
pass
Expand Up @@ -57,7 +57,3 @@ def run_create(cur, database_engine, *args, **kwargs):

for statement in get_statements(FIX_INDEXES.splitlines()):
cur.execute(statement)


def run_upgrade(*args, **kwargs):
pass
Expand Up @@ -16,10 +16,6 @@
"""


def run_upgrade(cur, database_engine, *args, **kwargs):
pass


def run_create(cur, database_engine, *args, **kwargs):
if isinstance(database_engine, PostgresEngine):
select_clause = """
Expand Down
4 changes: 0 additions & 4 deletions synapse/storage/schema/main/delta/58/06dlols_unique_idx.py
Expand Up @@ -27,10 +27,6 @@
logger = logging.getLogger(__name__)


def run_upgrade(*args, **kwargs):
pass


def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
# some instances might already have this index, in which case we can skip this
if isinstance(database_engine, PostgresEngine):
Expand Down
4 changes: 0 additions & 4 deletions synapse/storage/schema/main/delta/58/11user_id_seq.py
Expand Up @@ -28,7 +28,3 @@ def run_create(cur, database_engine, *args, **kwargs):

next_id = find_max_generated_user_id_localpart(cur) + 1
cur.execute("CREATE SEQUENCE user_id_seq START WITH %s", (next_id,))


def run_upgrade(*args, **kwargs):
pass
4 changes: 0 additions & 4 deletions synapse/storage/schema/main/delta/59/01ignored_user.py
Expand Up @@ -27,10 +27,6 @@
logger = logging.getLogger(__name__)


def run_upgrade(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
pass


def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
logger.info("Creating ignored_users table")
execute_statements_from_stream(cur, StringIO(_create_commands))
Expand Down
Expand Up @@ -64,7 +64,3 @@ def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs
(6103, 'replace_room_depth_min_depth', '{}', 'populate_room_depth2')
"""
)


def run_upgrade(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
pass
4 changes: 0 additions & 4 deletions synapse/storage/schema/state/delta/47/state_group_seq.py
Expand Up @@ -28,7 +28,3 @@ def run_create(cur, database_engine, *args, **kwargs):
start_val = row[0] + 1

cur.execute("CREATE SEQUENCE state_group_id_seq START WITH %s", (start_val,))


def run_upgrade(*args, **kwargs):
pass