Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

make notification of signatures work with workers #6254

Merged
merged 6 commits into from
Nov 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6254.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make notification of cross-signing signatures work with workers.
13 changes: 11 additions & 2 deletions synapse/replication/slave/storage/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams._base import DeviceListsStream, UserSignatureStream
from synapse.storage.data_stores.main.devices import DeviceWorkerStore
from synapse.storage.data_stores.main.end_to_end_keys import EndToEndKeyWorkerStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
Expand Down Expand Up @@ -42,14 +43,22 @@ def __init__(self, db_conn, hs):

def stream_positions(self):
result = super(SlavedDeviceStore, self).stream_positions()
result["device_lists"] = self._device_list_id_gen.get_current_token()
# The user signature stream uses the same stream ID generator as the
# device list stream, so set them both to the device list ID
# generator's current token.
current_token = self._device_list_id_gen.get_current_token()
result[DeviceListsStream.NAME] = current_token
result[UserSignatureStream.NAME] = current_token
return result

def process_replication_rows(self, stream_name, token, rows):
if stream_name == "device_lists":
if stream_name == DeviceListsStream.NAME:
self._device_list_id_gen.advance(token)
for row in rows:
self._invalidate_caches_for_devices(token, row.user_id, row.destination)
elif stream_name == UserSignatureStream.NAME:
for row in rows:
self._user_signature_stream_cache.entity_has_changed(row.user_id, token)
return super(SlavedDeviceStore, self).process_replication_rows(
stream_name, token, rows
)
Expand Down
1 change: 1 addition & 0 deletions synapse/replication/tcp/streams/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,6 @@
_base.TagAccountDataStream,
_base.AccountDataStream,
_base.GroupServerStream,
_base.UserSignatureStream,
)
}
18 changes: 18 additions & 0 deletions synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
"GroupsStreamRow",
("group_id", "user_id", "type", "content"), # str # str # str # dict
)
UserSignatureStreamRow = namedtuple("UserSignatureStreamRow", ("user_id")) # str


class Stream(object):
Expand Down Expand Up @@ -438,3 +439,20 @@ def __init__(self, hs):
self.update_function = store.get_all_groups_changes

super(GroupServerStream, self).__init__(hs)


class UserSignatureStream(Stream):
"""A user has signed their own device with their user-signing key
"""

NAME = "user_signature"
_LIMITED = False
ROW_TYPE = UserSignatureStreamRow

def __init__(self, hs):
store = hs.get_datastore()

self.current_token = store.get_device_stream_token
self.update_function = store.get_all_user_signature_changes_for_remotes

super(UserSignatureStream, self).__init__(hs)
5 changes: 4 additions & 1 deletion synapse/storage/data_stores/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,10 @@ def __init__(self, db_conn, hs):
db_conn, "public_room_list_stream", "stream_id"
)
self._device_list_id_gen = StreamIdGenerator(
db_conn, "device_lists_stream", "stream_id"
db_conn,
"device_lists_stream",
"stream_id",
extra_tables=[("user_signature_stream", "stream_id")],
)
self._cross_signing_id_gen = StreamIdGenerator(
db_conn, "e2e_cross_signing_keys", "stream_id"
Expand Down
24 changes: 24 additions & 0 deletions synapse/storage/data_stores/main/end_to_end_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,30 @@ def get_e2e_cross_signing_key(self, user_id, key_type, from_user_id=None):
from_user_id,
)

def get_all_user_signature_changes_for_remotes(self, from_key, to_key):
"""Return a list of changes from the user signature stream to notify remotes.
Note that the user signature stream represents when a user signs their
device with their user-signing key, which is not published to other
users or servers, so no `destination` is needed in the returned
list. However, this is needed to poke workers.

Args:
from_key (int): the stream ID to start at (exclusive)
to_key (int): the stream ID to end at (inclusive)

Returns:
Deferred[list[(int,str)]] a list of `(stream_id, user_id)`
"""
sql = """
SELECT MAX(stream_id) AS stream_id, from_user_id AS user_id
FROM user_signature_stream
WHERE ? < stream_id AND stream_id <= ?
GROUP BY user_id
"""
return self._execute(
"get_all_user_signature_changes_for_remotes", None, sql, from_key, to_key
)


class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
def set_e2e_device_keys(self, user_id, device_id, time_now, device_keys):
Expand Down