Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Revert the bugfix on email pushers & incorporate feedback from PR review
Browse files Browse the repository at this point in the history
  • Loading branch information
sandhose committed Mar 23, 2023
1 parent 6d9fba6 commit 0c57662
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 24 deletions.
1 change: 0 additions & 1 deletion changelog.d/15280.bugfix

This file was deleted.

4 changes: 2 additions & 2 deletions synapse/handlers/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -1507,7 +1507,7 @@ async def delete_access_token(self, access_token: str) -> None:
# XXX(quenting): This is only needed until the 'set_device_id_for_pushers'
# background update completes.
if token.token_id is not None:
await self.hs.get_pusherpool().remove_http_pushers_by_access_tokens(
await self.hs.get_pusherpool().remove_pushers_by_access_tokens(
token.user_id, (token.token_id,)
)

Expand Down Expand Up @@ -1539,7 +1539,7 @@ async def delete_access_tokens_for_user(
# delete pushers associated with the access tokens
# XXX(quenting): This is only needed until the 'set_device_id_for_pushers'
# background update completes.
await self.hs.get_pusherpool().remove_http_pushers_by_access_tokens(
await self.hs.get_pusherpool().remove_pushers_by_access_tokens(
user_id, (token_id for _, token_id, _ in tokens_and_devices)
)

Expand Down
22 changes: 9 additions & 13 deletions synapse/push/pusherpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from synapse.push import Pusher, PusherConfig, PusherConfigException
from synapse.push.pusher import PusherFactory
from synapse.replication.http.push import ReplicationRemovePusherRestServlet
from synapse.types import JsonDict, RoomStreamToken
from synapse.types import JsonDict, RoomStreamToken, StrCollection
from synapse.util.async_helpers import concurrently_execute
from synapse.util.threepids import canonicalise_email

Expand Down Expand Up @@ -128,9 +128,9 @@ async def add_or_update_pusher(
last_stream_ordering = self.store.get_room_max_stream_ordering()

# Before we actually persist the pusher, we check if the user already has one
# this app ID and pushkey. If so, we want to keep the access token and device ID
# in place, since this could be one device modifying (e.g. enabling/disabling)
# another device's pusher.
# for this app ID and pushkey. If so, we want to keep the access token and
# device ID in place, since this could be one device modifying
# (e.g. enabling/disabling) another device's pusher.
# XXX(quenting): Even though we're not persisting the access_token_id for new
# pushers anymore, we still need to copy existing access_token_ids over when
# updating a pusher, in case the "set_device_id_for_pushers" background update
Expand Down Expand Up @@ -203,7 +203,7 @@ async def remove_pushers_by_app_id_and_pushkey_not_user(
)
await self.remove_pusher(p.app_id, p.pushkey, p.user_name)

async def remove_http_pushers_by_access_tokens(
async def remove_pushers_by_access_tokens(
self, user_id: str, access_tokens: Iterable[int]
) -> None:
"""Remove the HTTP pushers for a given user corresponding to a set of
Expand All @@ -217,7 +217,7 @@ async def remove_http_pushers_by_access_tokens(
# background update finishes
tokens = set(access_tokens)
for p in await self.store.get_pushers_by_user_id(user_id):
if p.kind == "http" and p.access_token in tokens:
if p.access_token in tokens:
logger.info(
"Removing pusher for app id %s, pushkey %s, user %s",
p.app_id,
Expand All @@ -226,8 +226,8 @@ async def remove_http_pushers_by_access_tokens(
)
await self.remove_pusher(p.app_id, p.pushkey, p.user_name)

async def remove_http_pushers_by_devices(
self, user_id: str, devices: Iterable[str]
async def remove_pushers_by_devices(
self, user_id: str, devices: StrCollection
) -> None:
"""Remove the HTTP pushers for a given user corresponding to a set of devices
Expand All @@ -237,11 +237,7 @@ async def remove_http_pushers_by_devices(
"""
device_ids = set(devices)
for p in await self.store.get_pushers_by_user_id(user_id):
if (
p.kind == "http"
and p.device_id is not None
and p.device_id in device_ids
):
if p.device_id in device_ids:
logger.info(
"Removing pusher for app id %s, pushkey %s, user %s",
p.app_id,
Expand Down
29 changes: 22 additions & 7 deletions synapse/storage/databases/main/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,20 +509,24 @@ def __init__(
async def _set_device_id_for_pushers(
self, progress: JsonDict, batch_size: int
) -> int:
"""Background update to populate the device_id column and clear the access_token
column for the pushers table."""
"""
Background update to populate the device_id column and clear the access_token
column for the pushers table.
"""
last_pusher_id = progress.get("pusher_id", 0)

def set_device_id_for_pushers_txn(txn: LoggingTransaction) -> int:
txn.execute(
"""
SELECT p.id, at.device_id
SELECT
p.id AS pusher_id,
p.device_id AS pusher_device_id,
at.device_id AS token_device_id
FROM pushers AS p
INNER JOIN access_tokens AS at
LEFT JOIN access_tokens AS at
ON p.access_token = at.id
WHERE
p.access_token IS NOT NULL
AND at.device_id IS NOT NULL
AND p.id > ?
ORDER BY p.id
LIMIT ?
Expand All @@ -534,13 +538,24 @@ def set_device_id_for_pushers_txn(txn: LoggingTransaction) -> int:
if len(rows) == 0:
return 0

# The reason we're clearing the access_token column here is a bit subtle.
# When a user logs out, we:
# (1) delete the access token
# (2) delete the device
#
# Ideally, we would delete the pushers only via its link to the device
# during (2), but since this background update might not have fully run yet,
# we're still deleting the pushers via the access token during (1).
self.db_pool.simple_update_many_txn(
txn=txn,
table="pushers",
key_names=("id",),
key_values=[(row["id"],) for row in rows],
key_values=[(row["pusher_id"],) for row in rows],
value_names=("device_id", "access_token"),
value_values=[(row["device_id"], None) for row in rows],
# If there was already a device_id on the pusher, we only want to clear
# the access_token column, so we keep the existing device_id. Otherwise,
# we set the device_id we got from joining the access_tokens table.
value_values=[(row["pusher_device_id"] or row["token_device_id"], None) for row in rows],
)

self.db_pool.updates._background_update_progress_txn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* limitations under the License.
*/

-- Triggers the background update to set the device_id for pushers that don't have one.
-- Triggers the background update to set the device_id for pushers
-- that don't have one, and clear the access_token column.
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(7402, 'set_device_id_for_pushers', '{}');

0 comments on commit 0c57662

Please sign in to comment.