Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Move more encryption endpoints off master (#9068)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Jan 11, 2021
1 parent 00c62b9 commit 108ad0f
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 46 deletions.
1 change: 1 addition & 0 deletions changelog.d/9068.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add experimental support for handling `/keys/claim` and `/room_keys` APIs on worker processes.
12 changes: 10 additions & 2 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,18 @@
)
from synapse.rest.client.v1.push_rule import PushRuleRestServlet
from synapse.rest.client.v1.voip import VoipRestServlet
from synapse.rest.client.v2_alpha import groups, sync, user_directory
from synapse.rest.client.v2_alpha import groups, room_keys, sync, user_directory
from synapse.rest.client.v2_alpha._base import client_patterns
from synapse.rest.client.v2_alpha.account import ThreepidRestServlet
from synapse.rest.client.v2_alpha.account_data import (
AccountDataServlet,
RoomAccountDataServlet,
)
from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
from synapse.rest.client.v2_alpha.keys import (
KeyChangesServlet,
KeyQueryServlet,
OneTimeKeyServlet,
)
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
from synapse.rest.client.v2_alpha.sendtodevice import SendToDeviceRestServlet
from synapse.rest.client.versions import VersionsRestServlet
Expand All @@ -115,6 +119,7 @@
from synapse.server import HomeServer, cache_in_self
from synapse.storage.databases.main.censor_events import CensorEventsStore
from synapse.storage.databases.main.client_ips import ClientIpWorkerStore
from synapse.storage.databases.main.e2e_room_keys import EndToEndRoomKeyStore
from synapse.storage.databases.main.media_repository import MediaRepositoryStore
from synapse.storage.databases.main.metrics import ServerMetricsStore
from synapse.storage.databases.main.monthly_active_users import (
Expand Down Expand Up @@ -446,6 +451,7 @@ class GenericWorkerSlavedStore(
UserDirectoryStore,
StatsStore,
UIAuthWorkerStore,
EndToEndRoomKeyStore,
SlavedDeviceInboxStore,
SlavedDeviceStore,
SlavedReceiptsStore,
Expand Down Expand Up @@ -503,6 +509,7 @@ def _listen_http(self, listener_config: ListenerConfig):
LoginRestServlet(self).register(resource)
ThreepidRestServlet(self).register(resource)
KeyQueryServlet(self).register(resource)
OneTimeKeyServlet(self).register(resource)
KeyChangesServlet(self).register(resource)
VoipRestServlet(self).register(resource)
PushRuleRestServlet(self).register(resource)
Expand All @@ -520,6 +527,7 @@ def _listen_http(self, listener_config: ListenerConfig):
room.register_servlets(self, resource, True)
room.register_deprecated_servlets(self, resource)
InitialSyncRestServlet(self).register(resource)
room_keys.register_servlets(self, resource)

SendToDeviceRestServlet(self).register(resource)

Expand Down
88 changes: 44 additions & 44 deletions synapse/storage/databases/main/end_to_end_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -707,50 +707,6 @@ def get_device_stream_token(self) -> int:
"""Get the current stream id from the _device_list_id_gen"""
...


class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
async def set_e2e_device_keys(
self, user_id: str, device_id: str, time_now: int, device_keys: JsonDict
) -> bool:
"""Stores device keys for a device. Returns whether there was a change
or the keys were already in the database.
"""

def _set_e2e_device_keys_txn(txn):
set_tag("user_id", user_id)
set_tag("device_id", device_id)
set_tag("time_now", time_now)
set_tag("device_keys", device_keys)

old_key_json = self.db_pool.simple_select_one_onecol_txn(
txn,
table="e2e_device_keys_json",
keyvalues={"user_id": user_id, "device_id": device_id},
retcol="key_json",
allow_none=True,
)

# In py3 we need old_key_json to match new_key_json type. The DB
# returns unicode while encode_canonical_json returns bytes.
new_key_json = encode_canonical_json(device_keys).decode("utf-8")

if old_key_json == new_key_json:
log_kv({"Message": "Device key already stored."})
return False

self.db_pool.simple_upsert_txn(
txn,
table="e2e_device_keys_json",
keyvalues={"user_id": user_id, "device_id": device_id},
values={"ts_added_ms": time_now, "key_json": new_key_json},
)
log_kv({"message": "Device keys stored."})
return True

return await self.db_pool.runInteraction(
"set_e2e_device_keys", _set_e2e_device_keys_txn
)

async def claim_e2e_one_time_keys(
self, query_list: Iterable[Tuple[str, str, str]]
) -> Dict[str, Dict[str, Dict[str, bytes]]]:
Expand Down Expand Up @@ -840,6 +796,50 @@ def _claim_e2e_one_time_keys(txn):
"claim_e2e_one_time_keys", _claim_e2e_one_time_keys
)


class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
async def set_e2e_device_keys(
self, user_id: str, device_id: str, time_now: int, device_keys: JsonDict
) -> bool:
"""Stores device keys for a device. Returns whether there was a change
or the keys were already in the database.
"""

def _set_e2e_device_keys_txn(txn):
set_tag("user_id", user_id)
set_tag("device_id", device_id)
set_tag("time_now", time_now)
set_tag("device_keys", device_keys)

old_key_json = self.db_pool.simple_select_one_onecol_txn(
txn,
table="e2e_device_keys_json",
keyvalues={"user_id": user_id, "device_id": device_id},
retcol="key_json",
allow_none=True,
)

# In py3 we need old_key_json to match new_key_json type. The DB
# returns unicode while encode_canonical_json returns bytes.
new_key_json = encode_canonical_json(device_keys).decode("utf-8")

if old_key_json == new_key_json:
log_kv({"Message": "Device key already stored."})
return False

self.db_pool.simple_upsert_txn(
txn,
table="e2e_device_keys_json",
keyvalues={"user_id": user_id, "device_id": device_id},
values={"ts_added_ms": time_now, "key_json": new_key_json},
)
log_kv({"message": "Device keys stored."})
return True

return await self.db_pool.runInteraction(
"set_e2e_device_keys", _set_e2e_device_keys_txn
)

async def delete_e2e_keys_by_device(self, user_id: str, device_id: str) -> None:
def delete_e2e_keys_by_device_txn(txn):
log_kv(
Expand Down

0 comments on commit 108ad0f

Please sign in to comment.