Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Opentrace e2e keys #5855

Merged
merged 20 commits into from Aug 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/5855.misc
@@ -0,0 +1 @@
Opentracing for room and e2e keys.
3 changes: 3 additions & 0 deletions synapse/federation/federation_server.py
Expand Up @@ -43,6 +43,7 @@
from synapse.federation.units import Edu, Transaction
from synapse.http.endpoint import parse_server_name
from synapse.logging.context import nested_logging_context
from synapse.logging.opentracing import log_kv, trace
from synapse.logging.utils import log_function
from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet,
Expand Down Expand Up @@ -507,6 +508,7 @@ def on_query_client_keys(self, origin, content):
def on_query_user_devices(self, origin, user_id):
return self.on_query_request("user_devices", user_id)

@trace
@defer.inlineCallbacks
@log_function
def on_claim_client_keys(self, origin, content):
Expand All @@ -515,6 +517,7 @@ def on_claim_client_keys(self, origin, content):
for device_id, algorithm in device_keys.items():
query.append((user_id, device_id, algorithm))

log_kv({"message": "Claiming one time keys.", "user, device pairs": query})
results = yield self.store.claim_e2e_one_time_keys(query)

json_result = {}
Expand Down
52 changes: 51 additions & 1 deletion synapse/handlers/e2e_keys.py
Expand Up @@ -24,6 +24,7 @@

from synapse.api.errors import CodeMessageException, SynapseError
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace
from synapse.types import UserID, get_domain_from_id
from synapse.util import unwrapFirstError
from synapse.util.retryutils import NotRetryingDestination
Expand All @@ -46,6 +47,7 @@ def __init__(self, hs):
"client_keys", self.on_federation_query_client_keys
)

@trace
@defer.inlineCallbacks
def query_devices(self, query_body, timeout):
""" Handle a device key query from a client
Expand Down Expand Up @@ -81,6 +83,9 @@ def query_devices(self, query_body, timeout):
else:
remote_queries[user_id] = device_ids

set_tag("local_key_query", local_query)
set_tag("remote_key_query", remote_queries)

# First get local devices.
failures = {}
results = {}
Expand Down Expand Up @@ -121,6 +126,7 @@ def query_devices(self, query_body, timeout):
r[user_id] = remote_queries[user_id]

# Now fetch any devices that we don't have in our cache
@trace
@defer.inlineCallbacks
def do_remote_query(destination):
"""This is called when we are querying the device list of a user on
Expand Down Expand Up @@ -185,6 +191,8 @@ def do_remote_query(destination):
except Exception as e:
failure = _exception_to_failure(e)
failures[destination] = failure
set_tag("error", True)
set_tag("reason", failure)

yield make_deferred_yieldable(
defer.gatherResults(
Expand All @@ -198,6 +206,7 @@ def do_remote_query(destination):

return {"device_keys": results, "failures": failures}

@trace
@defer.inlineCallbacks
def query_local_devices(self, query):
"""Get E2E device keys for local users
Expand All @@ -210,13 +219,22 @@ def query_local_devices(self, query):
defer.Deferred: (resolves to dict[string, dict[string, dict]]):
map from user_id -> device_id -> device details
"""
set_tag("local_query", query)
local_query = []

result_dict = {}
for user_id, device_ids in query.items():
# we use UserID.from_string to catch invalid user ids
if not self.is_mine(UserID.from_string(user_id)):
logger.warning("Request for keys for non-local user %s", user_id)
log_kv(
{
"message": "Requested a local key for a user which"
" was not local to the homeserver",
"user_id": user_id,
}
)
set_tag("error", True)
raise SynapseError(400, "Not a user here")

if not device_ids:
Expand All @@ -241,6 +259,7 @@ def query_local_devices(self, query):
r["unsigned"]["device_display_name"] = display_name
result_dict[user_id][device_id] = r

log_kv(results)
JorikSchellekens marked this conversation as resolved.
Show resolved Hide resolved
return result_dict

@defer.inlineCallbacks
Expand All @@ -251,6 +270,7 @@ def on_federation_query_client_keys(self, query_body):
res = yield self.query_local_devices(device_keys_query)
return {"device_keys": res}

@trace
@defer.inlineCallbacks
def claim_one_time_keys(self, query, timeout):
local_query = []
Expand All @@ -265,6 +285,9 @@ def claim_one_time_keys(self, query, timeout):
domain = get_domain_from_id(user_id)
remote_queries.setdefault(domain, {})[user_id] = device_keys

set_tag("local_key_query", local_query)
set_tag("remote_key_query", remote_queries)

results = yield self.store.claim_e2e_one_time_keys(local_query)

json_result = {}
Expand All @@ -276,8 +299,10 @@ def claim_one_time_keys(self, query, timeout):
key_id: json.loads(json_bytes)
}

@trace
@defer.inlineCallbacks
def claim_client_keys(destination):
set_tag("destination", destination)
device_keys = remote_queries[destination]
try:
remote_result = yield self.federation.claim_client_keys(
Expand All @@ -290,6 +315,8 @@ def claim_client_keys(destination):
except Exception as e:
failure = _exception_to_failure(e)
failures[destination] = failure
set_tag("error", True)
set_tag("reason", failure)

yield make_deferred_yieldable(
defer.gatherResults(
Expand All @@ -313,9 +340,11 @@ def claim_client_keys(destination):
),
)

log_kv({"one_time_keys": json_result, "failures": failures})
return {"one_time_keys": json_result, "failures": failures}

@defer.inlineCallbacks
@tag_args
def upload_keys_for_user(self, user_id, device_id, keys):

time_now = self.clock.time_msec()
Expand All @@ -329,19 +358,38 @@ def upload_keys_for_user(self, user_id, device_id, keys):
user_id,
time_now,
)
log_kv(
{
"message": "Updating device_keys for user.",
"user_id": user_id,
"device_id": device_id,
}
)
# TODO: Sign the JSON with the server key
changed = yield self.store.set_e2e_device_keys(
user_id, device_id, time_now, device_keys
)
if changed:
# Only notify about device updates *if* the keys actually changed
yield self.device_handler.notify_device_update(user_id, [device_id])

else:
log_kv({"message": "Not updating device_keys for user", "user_id": user_id})
one_time_keys = keys.get("one_time_keys", None)
if one_time_keys:
log_kv(
{
"message": "Updating one_time_keys for device.",
"user_id": user_id,
"device_id": device_id,
}
)
yield self._upload_one_time_keys_for_user(
user_id, device_id, time_now, one_time_keys
)
else:
log_kv(
{"message": "Did not update one_time_keys", "reason": "no keys given"}
)

# the device should have been registered already, but it may have been
# deleted due to a race with a DELETE request. Or we may be using an
Expand All @@ -352,6 +400,7 @@ def upload_keys_for_user(self, user_id, device_id, keys):

result = yield self.store.count_e2e_one_time_keys(user_id, device_id)

set_tag("one_time_key_counts", result)
return {"one_time_key_counts": result}

@defer.inlineCallbacks
Expand Down Expand Up @@ -395,6 +444,7 @@ def _upload_one_time_keys_for_user(
(algorithm, key_id, encode_canonical_json(key).decode("ascii"))
)

log_kv({"message": "Inserting new one_time_keys.", "keys": new_keys})
yield self.store.add_e2e_one_time_keys(user_id, device_id, time_now, new_keys)


Expand Down
28 changes: 26 additions & 2 deletions synapse/handlers/e2e_room_keys.py
Expand Up @@ -26,6 +26,7 @@
StoreError,
SynapseError,
)
from synapse.logging.opentracing import log_kv, trace
from synapse.util.async_helpers import Linearizer

logger = logging.getLogger(__name__)
Expand All @@ -49,6 +50,7 @@ def __init__(self, hs):
# changed.
self._upload_linearizer = Linearizer("upload_room_keys_lock")

@trace
@defer.inlineCallbacks
def get_room_keys(self, user_id, version, room_id=None, session_id=None):
"""Bulk get the E2E room keys for a given backup, optionally filtered to a given
Expand Down Expand Up @@ -84,8 +86,10 @@ def get_room_keys(self, user_id, version, room_id=None, session_id=None):
user_id, version, room_id, session_id
)

log_kv(results)
return results

@trace
@defer.inlineCallbacks
def delete_room_keys(self, user_id, version, room_id=None, session_id=None):
"""Bulk delete the E2E room keys for a given backup, optionally filtered to a given
Expand All @@ -107,6 +111,7 @@ def delete_room_keys(self, user_id, version, room_id=None, session_id=None):
with (yield self._upload_linearizer.queue(user_id)):
yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)

@trace
@defer.inlineCallbacks
def upload_room_keys(self, user_id, version, room_keys):
"""Bulk upload a list of room keys into a given backup version, asserting
Expand Down Expand Up @@ -186,7 +191,14 @@ def _upload_room_key(self, user_id, version, room_id, session_id, room_key):
session_id(str): the session whose room_key we're setting
room_key(dict): the room_key being set
"""

log_kv(
{
"message": "Trying to upload room key",
"room_id": room_id,
"session_id": session_id,
"user_id": user_id,
}
)
# get the room_key for this particular row
current_room_key = None
try:
Expand All @@ -195,14 +207,23 @@ def _upload_room_key(self, user_id, version, room_id, session_id, room_key):
)
except StoreError as e:
if e.code == 404:
pass
log_kv(
{
"message": "Room key not found.",
"room_id": room_id,
"user_id": user_id,
}
)
else:
raise

if self._should_replace_room_key(current_room_key, room_key):
log_kv({"message": "Replacing room key."})
yield self.store.set_e2e_room_key(
user_id, version, room_id, session_id, room_key
)
else:
log_kv({"message": "Not replacing room_key."})

@staticmethod
def _should_replace_room_key(current_room_key, room_key):
Expand Down Expand Up @@ -236,6 +257,7 @@ def _should_replace_room_key(current_room_key, room_key):
return False
return True

@trace
@defer.inlineCallbacks
def create_version(self, user_id, version_info):
"""Create a new backup version. This automatically becomes the new
Expand Down Expand Up @@ -294,6 +316,7 @@ def get_version_info(self, user_id, version=None):
raise
return res

@trace
@defer.inlineCallbacks
def delete_version(self, user_id, version=None):
"""Deletes a given version of the user's e2e_room_keys backup
Expand All @@ -314,6 +337,7 @@ def delete_version(self, user_id, version=None):
else:
raise

@trace
@defer.inlineCallbacks
def update_version(self, user_id, version, version_info):
"""Update the info about a given version of the user's backup
Expand Down
13 changes: 12 additions & 1 deletion synapse/rest/client/v2_alpha/keys.py
Expand Up @@ -24,6 +24,7 @@
parse_json_object_from_request,
parse_string,
)
from synapse.logging.opentracing import log_kv, set_tag, trace_using_operation_name
from synapse.types import StreamToken

from ._base import client_patterns
Expand Down Expand Up @@ -68,6 +69,7 @@ def __init__(self, hs):
self.auth = hs.get_auth()
self.e2e_keys_handler = hs.get_e2e_keys_handler()

@trace_using_operation_name("upload_keys")
@defer.inlineCallbacks
def on_POST(self, request, device_id):
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
Expand All @@ -78,6 +80,14 @@ def on_POST(self, request, device_id):
# passing the device_id here is deprecated; however, we allow it
# for now for compatibility with older clients.
if requester.device_id is not None and device_id != requester.device_id:
set_tag("error", True)
log_kv(
{
"message": "Client uploading keys for a different device",
"logged_in_id": requester.device_id,
"key_being_uploaded": device_id,
}
)
logger.warning(
"Client uploading keys for a different device "
"(logged in as %s, uploading for %s)",
Expand Down Expand Up @@ -178,10 +188,11 @@ def on_GET(self, request):
requester = yield self.auth.get_user_by_req(request, allow_guest=True)

from_token_string = parse_string(request, "from")
set_tag("from", from_token_string)

# We want to enforce they do pass us one, but we ignore it and return
# changes after the "to" as well as before.
parse_string(request, "to")
set_tag("to", parse_string(request, "to"))

from_token = StreamToken.from_string(from_token_string)

Expand Down