Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Remove manys calls to cursor_to_dict (#16431)
Browse files Browse the repository at this point in the history
This avoids calling cursor_to_dict and then immediately
unpacking the values in the dict for other users. By not
creating the intermediate dictionary we can avoid allocating
the dictionary and strings for the keys, which should generally
be more performant.

Additionally this improves type hints by avoid Dict[str, Any]
dictionaries coming out of the database layer.
  • Loading branch information
clokep committed Oct 5, 2023
1 parent 4e302b3 commit fa90702
Show file tree
Hide file tree
Showing 16 changed files with 320 additions and 228 deletions.
2 changes: 1 addition & 1 deletion changelog.d/16429.misc
Original file line number Diff line number Diff line change
@@ -1 +1 @@
Reduce the size of each replication command instance.
Reduce memory allocations.
1 change: 1 addition & 0 deletions changelog.d/16431.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reduce memory allocations.
2 changes: 1 addition & 1 deletion synapse/push/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@
class PusherConfig:
"""Parameters necessary to configure a pusher."""

id: Optional[str]
id: Optional[int]
user_name: str

profile_tag: str
Expand Down
11 changes: 5 additions & 6 deletions synapse/storage/databases/main/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,10 @@ def get_global_account_data_for_user(
sql += " AND content != '{}'"

txn.execute(sql, (user_id,))
rows = self.db_pool.cursor_to_dict(txn)

return {
row["account_data_type"]: db_to_json(row["content"]) for row in rows
account_data_type: db_to_json(content)
for account_data_type, content in txn
}

return await self.db_pool.runInteraction(
Expand Down Expand Up @@ -196,13 +196,12 @@ def get_room_account_data_for_user_txn(
sql += " AND content != '{}'"

txn.execute(sql, (user_id,))
rows = self.db_pool.cursor_to_dict(txn)

by_room: Dict[str, Dict[str, JsonDict]] = {}
for row in rows:
room_data = by_room.setdefault(row["room_id"], {})
for room_id, account_data_type, content in txn:
room_data = by_room.setdefault(room_id, {})

room_data[row["account_data_type"]] = db_to_json(row["content"])
room_data[account_data_type] = db_to_json(content)

return by_room

Expand Down
29 changes: 7 additions & 22 deletions synapse/storage/databases/main/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,7 @@
# limitations under the License.
import logging
import re
from typing import (
TYPE_CHECKING,
Any,
Dict,
List,
Optional,
Pattern,
Sequence,
Tuple,
cast,
)
from typing import TYPE_CHECKING, List, Optional, Pattern, Sequence, Tuple, cast

from synapse.appservice import (
ApplicationService,
Expand Down Expand Up @@ -353,21 +343,15 @@ async def get_oldest_unsent_txn(

def _get_oldest_unsent_txn(
txn: LoggingTransaction,
) -> Optional[Dict[str, Any]]:
) -> Optional[Tuple[int, str]]:
# Monotonically increasing txn ids, so just select the smallest
# one in the txns table (we delete them when they are sent)
txn.execute(
"SELECT * FROM application_services_txns WHERE as_id=?"
"SELECT txn_id, event_ids FROM application_services_txns WHERE as_id=?"
" ORDER BY txn_id ASC LIMIT 1",
(service.id,),
)
rows = self.db_pool.cursor_to_dict(txn)
if not rows:
return None

entry = rows[0]

return entry
return cast(Optional[Tuple[int, str]], txn.fetchone())

entry = await self.db_pool.runInteraction(
"get_oldest_unsent_appservice_txn", _get_oldest_unsent_txn
Expand All @@ -376,16 +360,17 @@ def _get_oldest_unsent_txn(
if not entry:
return None

event_ids = db_to_json(entry["event_ids"])
txn_id, event_ids_str = entry

event_ids = db_to_json(event_ids_str)
events = await self.get_events_as_list(event_ids)

# TODO: to-device messages, one-time key counts, device list summaries and unused
# fallback keys are not yet populated for catch-up transactions.
# We likely want to populate those for reliability.
return AppServiceTransaction(
service=service,
id=entry["txn_id"],
id=txn_id,
events=events,
ephemeral=[],
to_device_messages=[],
Expand Down
12 changes: 6 additions & 6 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -1413,25 +1413,25 @@ async def get_local_devices_not_accessed_since(

def get_devices_not_accessed_since_txn(
txn: LoggingTransaction,
) -> List[Dict[str, str]]:
) -> List[Tuple[str, str]]:
sql = """
SELECT user_id, device_id
FROM devices WHERE last_seen < ? AND hidden = FALSE
"""
txn.execute(sql, (since_ms,))
return self.db_pool.cursor_to_dict(txn)
return cast(List[Tuple[str, str]], txn.fetchall())

rows = await self.db_pool.runInteraction(
"get_devices_not_accessed_since",
get_devices_not_accessed_since_txn,
)

devices: Dict[str, List[str]] = {}
for row in rows:
for user_id, device_id in rows:
# Remote devices are never stale from our point of view.
if self.hs.is_mine_id(row["user_id"]):
user_devices = devices.setdefault(row["user_id"], [])
user_devices.append(row["device_id"])
if self.hs.is_mine_id(user_id):
user_devices = devices.setdefault(user_id, [])
user_devices.append(device_id)

return devices

Expand Down
22 changes: 6 additions & 16 deletions synapse/storage/databases/main/end_to_end_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -921,14 +921,10 @@ def _get_bare_e2e_cross_signing_keys_bulk_txn(
}

txn.execute(sql, params)
rows = self.db_pool.cursor_to_dict(txn)

for row in rows:
user_id = row["user_id"]
key_type = row["keytype"]
key = db_to_json(row["keydata"])
for user_id, key_type, key_data, _ in txn:
user_keys = result.setdefault(user_id, {})
user_keys[key_type] = key
user_keys[key_type] = db_to_json(key_data)

return result

Expand Down Expand Up @@ -988,13 +984,9 @@ def _get_e2e_cross_signing_signatures_txn(
query_params.extend(item)

txn.execute(sql, query_params)
rows = self.db_pool.cursor_to_dict(txn)

# and add the signatures to the appropriate keys
for row in rows:
key_id: str = row["key_id"]
target_user_id: str = row["target_user_id"]
target_device_id: str = row["target_device_id"]
for target_user_id, target_device_id, key_id, signature in txn:
key_type = devices[(target_user_id, target_device_id)]
# We need to copy everything, because the result may have come
# from the cache. dict.copy only does a shallow copy, so we
Expand All @@ -1012,13 +1004,11 @@ def _get_e2e_cross_signing_signatures_txn(
].copy()
if from_user_id in signatures:
user_sigs = signatures[from_user_id] = signatures[from_user_id]
user_sigs[key_id] = row["signature"]
user_sigs[key_id] = signature
else:
signatures[from_user_id] = {key_id: row["signature"]}
signatures[from_user_id] = {key_id: signature}
else:
target_user_key["signatures"] = {
from_user_id: {key_id: row["signature"]}
}
target_user_key["signatures"] = {from_user_id: {key_id: signature}}

return keys

Expand Down
9 changes: 3 additions & 6 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1654,8 +1654,6 @@ def _add_to_cache(
) -> None:
to_prefill = []

rows = []

ev_map = {e.event_id: e for e, _ in events_and_contexts}
if not ev_map:
return
Expand All @@ -1676,10 +1674,9 @@ def _add_to_cache(
)

txn.execute(sql + clause, args)
rows = self.db_pool.cursor_to_dict(txn)
for row in rows:
event = ev_map[row["event_id"]]
if not row["rejects"] and not row["redacts"]:
for event_id, redacts, rejects in txn:
event = ev_map[event_id]
if not rejects and not redacts:
to_prefill.append(EventCacheEntry(event=event, redacted_event=None))

async def external_prefill() -> None:
Expand Down
18 changes: 13 additions & 5 deletions synapse/storage/databases/main/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,13 +434,21 @@ def _get_active_presence(self, db_conn: Connection) -> List[UserPresenceState]:

txn = db_conn.cursor()
txn.execute(sql, (PresenceState.OFFLINE,))
rows = self.db_pool.cursor_to_dict(txn)
rows = txn.fetchall()
txn.close()

for row in rows:
row["currently_active"] = bool(row["currently_active"])

return [UserPresenceState(**row) for row in rows]
return [
UserPresenceState(
user_id=user_id,
state=state,
last_active_ts=last_active_ts,
last_federation_update_ts=last_federation_update_ts,
last_user_sync_ts=last_user_sync_ts,
status_msg=status_msg,
currently_active=bool(currently_active),
)
for user_id, state, last_active_ts, last_federation_update_ts, last_user_sync_ts, status_msg, currently_active in rows
]

def take_presence_startup_info(self) -> List[UserPresenceState]:
active_on_startup = self._presence_on_startup
Expand Down
Loading

0 comments on commit fa90702

Please sign in to comment.