Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Make cleaning up pushers depend on the device_id instead of the token_id
Browse files Browse the repository at this point in the history
  • Loading branch information
sandhose committed Mar 17, 2023
1 parent 3d70cc3 commit b2df07f
Show file tree
Hide file tree
Showing 14 changed files with 92 additions and 57 deletions.
1 change: 1 addition & 0 deletions changelog.d/15280.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make the pushers rely on the `device_id` instead of the `access_token_id` for various operations.
11 changes: 0 additions & 11 deletions synapse/handlers/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -1503,12 +1503,6 @@ async def delete_access_token(self, access_token: str) -> None:
access_token=access_token,
)

# delete pushers associated with this access token
if token.token_id is not None:
await self.hs.get_pusherpool().remove_pushers_by_access_token(
token.user_id, (token.token_id,)
)

async def delete_access_tokens_for_user(
self,
user_id: str,
Expand All @@ -1534,11 +1528,6 @@ async def delete_access_tokens_for_user(
user_id=user_id, device_id=device_id, access_token=token
)

# delete pushers associated with the access tokens
await self.hs.get_pusherpool().remove_pushers_by_access_token(
user_id, (token_id for _, token_id, _ in tokens_and_devices)
)

async def add_threepid(
self, user_id: str, medium: str, address: str, validated_at: int
) -> None:
Expand Down
2 changes: 2 additions & 0 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,8 @@ async def delete_devices(self, user_id: str, device_ids: List[str]) -> None:
else:
raise

await self.hs.get_pusherpool().remove_pushers_by_devices(user_id, device_ids)

# Delete data specific to each device. Not optimised as it is not
# considered as part of a critical path.
for device_id in device_ids:
Expand Down
4 changes: 2 additions & 2 deletions synapse/handlers/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -1013,11 +1013,11 @@ async def _register_email_threepid(
user_tuple = await self.store.get_user_by_access_token(token)
# The token better still exist.
assert user_tuple
token_id = user_tuple.token_id
device_id = user_tuple.device_id

await self.pusher_pool.add_or_update_pusher(
user_id=user_id,
access_token=token_id,
device_id=device_id,
kind="email",
app_id="m.email",
app_display_name="Email Notifications",
Expand Down
7 changes: 6 additions & 1 deletion synapse/push/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class PusherConfig:

id: Optional[str]
user_name: str
access_token: Optional[int]

profile_tag: str
kind: str
app_id: str
Expand All @@ -119,6 +119,11 @@ class PusherConfig:
enabled: bool
device_id: Optional[str]

# XXX(quenting): The access_token is not persisted anymore for new pushers, but we
# keep it when reading from the database, so that we don't get stale pushers
# while the "set_device_id_for_pushers" background update is running.
access_token: Optional[int]

def as_dict(self) -> Dict[str, Any]:
"""Information that can be retrieved about a pusher after creation."""
return {
Expand Down
53 changes: 38 additions & 15 deletions synapse/push/pusherpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ def start(self) -> None:
async def add_or_update_pusher(
self,
user_id: str,
access_token: Optional[int],
kind: str,
app_id: str,
app_display_name: str,
Expand Down Expand Up @@ -128,6 +127,21 @@ async def add_or_update_pusher(
# stream ordering, so it will process pushes from this point onwards.
last_stream_ordering = self.store.get_room_max_stream_ordering()

# Before we actually persist the pusher, we check if the user already has one
# this app ID and pushkey. If so, we want to keep the access token and device ID
# in place, since this could be one device modifying (e.g. enabling/disabling)
# another device's pusher.
# Even though we're not persisting the access_token_id for new pushers anymore,
# we still need to copy existing access_token_ids over when updating a pusher,
# in case the "set_device_id_for_pushers" background update hasn't run yet.
access_token_id = None
existing_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey(
user_id, app_id, pushkey
)
if existing_config:
device_id = existing_config.device_id
access_token_id = existing_config.access_token

# we try to create the pusher just to validate the config: it
# will then get pulled out of the database,
# recreated, added and started: this means we have only one
Expand All @@ -136,7 +150,6 @@ async def add_or_update_pusher(
PusherConfig(
id=None,
user_name=user_id,
access_token=access_token,
profile_tag=profile_tag,
kind=kind,
app_id=app_id,
Expand All @@ -151,23 +164,12 @@ async def add_or_update_pusher(
failing_since=None,
enabled=enabled,
device_id=device_id,
access_token=access_token_id,
)
)

# Before we actually persist the pusher, we check if the user already has one
# this app ID and pushkey. If so, we want to keep the access token and device ID
# in place, since this could be one device modifying (e.g. enabling/disabling)
# another device's pusher.
existing_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey(
user_id, app_id, pushkey
)
if existing_config:
access_token = existing_config.access_token
device_id = existing_config.device_id

await self.store.add_pusher(
user_id=user_id,
access_token=access_token,
kind=kind,
app_id=app_id,
app_display_name=app_display_name,
Expand All @@ -180,6 +182,7 @@ async def add_or_update_pusher(
profile_tag=profile_tag,
enabled=enabled,
device_id=device_id,
access_token_id=access_token_id,
)
pusher = await self.process_pusher_change_by_id(app_id, pushkey, user_id)

Expand All @@ -199,7 +202,7 @@ async def remove_pushers_by_app_id_and_pushkey_not_user(
)
await self.remove_pusher(p.app_id, p.pushkey, p.user_name)

async def remove_pushers_by_access_token(
async def remove_pushers_by_access_tokens(
self, user_id: str, access_tokens: Iterable[int]
) -> None:
"""Remove the pushers for a given user corresponding to a set of
Expand All @@ -220,6 +223,26 @@ async def remove_pushers_by_access_token(
)
await self.remove_pusher(p.app_id, p.pushkey, p.user_name)

async def remove_pushers_by_devices(
self, user_id: str, devices: Iterable[str]
) -> None:
"""Remove the pushers for a given user corresponding to a set of devices
Args:
user_id: user to remove pushers for
devices: device IDs to remove pushers for
"""
device_ids = set(devices)
for p in await self.store.get_pushers_by_user_id(user_id):
if p.device_id is not None and p.device_id in device_ids:
logger.info(
"Removing pusher for app id %s, pushkey %s, user %s",
p.app_id,
p.pushkey,
p.user_name,
)
await self.remove_pusher(p.app_id, p.pushkey, p.user_name)

def on_new_notifications(self, max_token: RoomStreamToken) -> None:
if not self.pushers:
# nothing to do here.
Expand Down
1 change: 0 additions & 1 deletion synapse/rest/admin/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,6 @@ async def on_PUT(
):
await self.pusher_pool.add_or_update_pusher(
user_id=user_id,
access_token=None,
kind="email",
app_id="m.email",
app_display_name="Email Notifications",
Expand Down
1 change: 0 additions & 1 deletion synapse/rest/client/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
try:
await self.pusher_pool.add_or_update_pusher(
user_id=user.to_string(),
access_token=requester.access_token_id,
kind=content["kind"],
app_id=content["app_id"],
app_display_name=content["app_display_name"],
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,6 @@ class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore):
async def add_pusher(
self,
user_id: str,
access_token: Optional[int],
kind: str,
app_id: str,
app_display_name: str,
Expand All @@ -581,13 +580,13 @@ async def add_pusher(
profile_tag: str = "",
enabled: bool = True,
device_id: Optional[str] = None,
access_token_id: Optional[int] = None,
) -> None:
async with self._pushers_id_gen.get_next() as stream_id:
await self.db_pool.simple_upsert(
table="pushers",
keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
values={
"access_token": access_token,
"kind": kind,
"app_display_name": app_display_name,
"device_display_name": device_display_name,
Expand All @@ -599,6 +598,7 @@ async def add_pusher(
"id": stream_id,
"enabled": enabled,
"device_id": device_id,
"access_token": access_token_id,
},
desc="add_pusher",
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Triggers the background update to set the device_id for pushers that don't have one.
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(7402, 'set_device_id_for_pushers', '{}');
6 changes: 3 additions & 3 deletions tests/push/test_email.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.hs.get_datastores().main.get_user_by_access_token(self.access_token)
)
assert user_tuple is not None
self.token_id = user_tuple.token_id
self.device_id = user_tuple.device_id

# We need to add email to account before we can create a pusher.
self.get_success(
Expand All @@ -117,7 +117,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
pusher = self.get_success(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=self.user_id,
access_token=self.token_id,
device_id=self.device_id,
kind="email",
app_id="m.email",
app_display_name="Email Notifications",
Expand All @@ -141,7 +141,7 @@ def test_need_validated_email(self) -> None:
self.get_success_or_raise(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=self.user_id,
access_token=self.token_id,
device_id=self.device_id,
kind="email",
app_id="m.email",
app_display_name="Email Notifications",
Expand Down
33 changes: 16 additions & 17 deletions tests/push/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ def test_invalid_configuration(self) -> None:
self.hs.get_datastores().main.get_user_by_access_token(access_token)
)
assert user_tuple is not None
token_id = user_tuple.token_id
device_id = user_tuple.device_id

def test_data(data: Any) -> None:
self.get_failure(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=user_id,
access_token=token_id,
device_id=device_id,
kind="http",
app_id="m.http",
app_display_name="HTTP Push Notifications",
Expand Down Expand Up @@ -114,12 +114,12 @@ def test_sends_http(self) -> None:
self.hs.get_datastores().main.get_user_by_access_token(access_token)
)
assert user_tuple is not None
token_id = user_tuple.token_id
device_id = user_tuple.device_id

self.get_success(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=user_id,
access_token=token_id,
device_id=device_id,
kind="http",
app_id="m.http",
app_display_name="HTTP Push Notifications",
Expand Down Expand Up @@ -235,12 +235,12 @@ def test_sends_high_priority_for_encrypted(self) -> None:
self.hs.get_datastores().main.get_user_by_access_token(access_token)
)
assert user_tuple is not None
token_id = user_tuple.token_id
device_id = user_tuple.device_id

self.get_success(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=user_id,
access_token=token_id,
device_id=device_id,
kind="http",
app_id="m.http",
app_display_name="HTTP Push Notifications",
Expand Down Expand Up @@ -356,12 +356,12 @@ def test_sends_high_priority_for_one_to_one_only(self) -> None:
self.hs.get_datastores().main.get_user_by_access_token(access_token)
)
assert user_tuple is not None
token_id = user_tuple.token_id
device_id = user_tuple.device_id

self.get_success(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=user_id,
access_token=token_id,
device_id=device_id,
kind="http",
app_id="m.http",
app_display_name="HTTP Push Notifications",
Expand Down Expand Up @@ -443,12 +443,12 @@ def test_sends_high_priority_for_mention(self) -> None:
self.hs.get_datastores().main.get_user_by_access_token(access_token)
)
assert user_tuple is not None
token_id = user_tuple.token_id
device_id = user_tuple.device_id

self.get_success(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=user_id,
access_token=token_id,
device_id=device_id,
kind="http",
app_id="m.http",
app_display_name="HTTP Push Notifications",
Expand Down Expand Up @@ -521,12 +521,12 @@ def test_sends_high_priority_for_atroom(self) -> None:
self.hs.get_datastores().main.get_user_by_access_token(access_token)
)
assert user_tuple is not None
token_id = user_tuple.token_id
device_id = user_tuple.device_id

self.get_success(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=user_id,
access_token=token_id,
device_id=device_id,
kind="http",
app_id="m.http",
app_display_name="HTTP Push Notifications",
Expand Down Expand Up @@ -628,12 +628,12 @@ def _test_push_unread_count(self) -> None:
self.hs.get_datastores().main.get_user_by_access_token(access_token)
)
assert user_tuple is not None
token_id = user_tuple.token_id
device_id = user_tuple.device_id

self.get_success(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=user_id,
access_token=token_id,
device_id=device_id,
kind="http",
app_id="m.http",
app_display_name="HTTP Push Notifications",
Expand Down Expand Up @@ -764,12 +764,12 @@ def _set_pusher(self, user_id: str, access_token: str, enabled: bool) -> None:
self.hs.get_datastores().main.get_user_by_access_token(access_token)
)
assert user_tuple is not None
token_id = user_tuple.token_id
device_id = user_tuple.device_id

self.get_success(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=user_id,
access_token=token_id,
device_id=device_id,
kind="http",
app_id="m.http",
app_display_name="HTTP Push Notifications",
Expand All @@ -778,7 +778,6 @@ def _set_pusher(self, user_id: str, access_token: str, enabled: bool) -> None:
lang=None,
data={"url": "http://example.com/_matrix/push/v1/notify"},
enabled=enabled,
device_id=user_tuple.device_id,
)
)

Expand Down
Loading

0 comments on commit b2df07f

Please sign in to comment.