Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Opentrace device lists #5853

Merged
merged 36 commits into from
Sep 3, 2019
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
bdf8e51
Newsfile
erikjohnston Aug 14, 2019
8629a68
Fixup changelog and remove debug logging
erikjohnston Aug 16, 2019
9043403
Stylish imports
JorikSchellekens Aug 14, 2019
98afb14
Trace devices
JorikSchellekens Jun 28, 2019
b0d2e26
Trace device messages.
JorikSchellekens Jun 28, 2019
0b3d088
Update to new access pattern
JorikSchellekens Jul 2, 2019
34f6214
How did that half of the statement get deleted?
JorikSchellekens Jul 3, 2019
e358f00
These functions were not deferreds!
JorikSchellekens Jul 4, 2019
6a2ac72
The great logging/ migration
JorikSchellekens Jul 4, 2019
09e40a0
Some tracing
JorikSchellekens Jul 8, 2019
78a0123
Nicer tracing
JorikSchellekens Jul 11, 2019
b9db310
A little extra device_list tracing
JorikSchellekens Jul 15, 2019
6068b69
Use better decorator names.
JorikSchellekens Jul 17, 2019
e5155ee
Use unified trace method
JorikSchellekens Jul 22, 2019
c35f9d9
Refactor return value so we don't create identical lists each time.
JorikSchellekens Aug 5, 2019
303fcce
String concatenation without the '+'
JorikSchellekens Aug 5, 2019
d7d8492
Use underscores.
JorikSchellekens Aug 5, 2019
68d4c94
isort
JorikSchellekens Aug 14, 2019
8869422
Bad return type
JorikSchellekens Aug 14, 2019
08787f4
newsfile
JorikSchellekens Aug 14, 2019
42c2acd
Use the import style.
JorikSchellekens Aug 14, 2019
547f125
Remove astray indent
JorikSchellekens Aug 15, 2019
ff86fa6
Import style
JorikSchellekens Aug 16, 2019
395ee6a
Missing import
JorikSchellekens Aug 16, 2019
83a011f
Unused import
JorikSchellekens Aug 16, 2019
8769aa3
Feature and simplification
JorikSchellekens Aug 20, 2019
656e3b7
Underscores
JorikSchellekens Aug 20, 2019
fb87863
Allow the passing of operation_name to trace
JorikSchellekens Aug 22, 2019
1f5b9b0
Old import
JorikSchellekens Aug 23, 2019
47f8a59
Remove unused import
JorikSchellekens Aug 27, 2019
071b04d
Use the the keyword in the trace method
JorikSchellekens Aug 27, 2019
4d68ac0
Cleanup trace method
JorikSchellekens Sep 2, 2019
bce66ae
Merge branch 'develop' into joriks/opentraicng_devices
JorikSchellekens Sep 2, 2019
1c86773
Merge was not black
JorikSchellekens Sep 2, 2019
fa17018
opname not operation_name
JorikSchellekens Sep 3, 2019
1b5c891
Remove comment
JorikSchellekens Sep 3, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/5853.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Opentracing for device list updates.
65 changes: 63 additions & 2 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
HttpResponseException,
RequestSendFailed,
)
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util import stringutils
from synapse.util.async_helpers import Linearizer
Expand All @@ -45,6 +46,7 @@ def __init__(self, hs):
self.state = hs.get_state_handler()
self._auth_handler = hs.get_auth_handler()

@trace
@defer.inlineCallbacks
def get_devices_by_user(self, user_id):
"""
Expand All @@ -56,6 +58,7 @@ def get_devices_by_user(self, user_id):
defer.Deferred: list[dict[str, X]]: info on each device
"""

set_tag("user_id", user_id)
device_map = yield self.store.get_devices_by_user(user_id)

ips = yield self.store.get_last_client_ip_by_device(user_id, device_id=None)
Expand All @@ -64,8 +67,10 @@ def get_devices_by_user(self, user_id):
for device in devices:
_update_device_from_client_ips(device, ips)

log_kv(device_map)
return devices

@trace
@defer.inlineCallbacks
def get_device(self, user_id, device_id):
""" Retrieve the given device
Expand All @@ -85,9 +90,14 @@ def get_device(self, user_id, device_id):
raise errors.NotFoundError
ips = yield self.store.get_last_client_ip_by_device(user_id, device_id)
_update_device_from_client_ips(device, ips)

set_tag("device", device)
set_tag("ips", ips)

return device

@measure_func("device.get_user_ids_changed")
@trace
@defer.inlineCallbacks
def get_user_ids_changed(self, user_id, from_token):
"""Get list of users that have had the devices updated, or have newly
Expand All @@ -97,6 +107,9 @@ def get_user_ids_changed(self, user_id, from_token):
user_id (str)
from_token (StreamToken)
"""

set_tag("user_id", user_id)
set_tag("from_token", from_token)
now_room_key = yield self.store.get_room_events_max_id()

room_ids = yield self.store.get_rooms_for_user(user_id)
Expand Down Expand Up @@ -148,6 +161,9 @@ def get_user_ids_changed(self, user_id, from_token):
# special-case for an empty prev state: include all members
# in the changed list
if not event_ids:
log_kv(
{"event": "encountered empty previous state", "room_id": room_id}
)
for key, event_id in iteritems(current_state_ids):
etype, state_key = key
if etype != EventTypes.Member:
Expand Down Expand Up @@ -200,7 +216,11 @@ def get_user_ids_changed(self, user_id, from_token):
possibly_joined = []
possibly_left = []

return {"changed": list(possibly_joined), "left": list(possibly_left)}
result = {"changed": list(possibly_joined), "left": list(possibly_left)}

log_kv(result)

return result


class DeviceHandler(DeviceWorkerHandler):
Expand Down Expand Up @@ -267,6 +287,7 @@ def check_device_registered(

raise errors.StoreError(500, "Couldn't generate a device ID.")

@trace
@defer.inlineCallbacks
def delete_device(self, user_id, device_id):
""" Delete the given device
Expand All @@ -284,6 +305,10 @@ def delete_device(self, user_id, device_id):
except errors.StoreError as e:
if e.code == 404:
# no match
set_tag("error", True)
log_kv(
{"reason": "User doesn't have device id.", "device_id": device_id}
)
pass
else:
raise
Expand All @@ -296,6 +321,7 @@ def delete_device(self, user_id, device_id):

yield self.notify_device_update(user_id, [device_id])

@trace
@defer.inlineCallbacks
def delete_all_devices_for_user(self, user_id, except_device_id=None):
"""Delete all of the user's devices
Expand Down Expand Up @@ -331,6 +357,8 @@ def delete_devices(self, user_id, device_ids):
except errors.StoreError as e:
if e.code == 404:
# no match
set_tag("error", True)
set_tag("reason", "User doesn't have that device id.")
pass
else:
raise
Expand Down Expand Up @@ -371,6 +399,7 @@ def update_device(self, user_id, device_id, content):
else:
raise

@trace
@measure_func("notify_device_update")
@defer.inlineCallbacks
def notify_device_update(self, user_id, device_ids):
Expand All @@ -386,6 +415,8 @@ def notify_device_update(self, user_id, device_ids):
hosts.update(get_domain_from_id(u) for u in users_who_share_room)
hosts.discard(self.server_name)

set_tag("target_hosts", hosts)

position = yield self.store.add_device_change_to_streams(
user_id, device_ids, list(hosts)
)
Expand All @@ -405,6 +436,7 @@ def notify_device_update(self, user_id, device_ids):
)
for host in hosts:
self.federation_sender.send_device_messages(host)
log_kv({"message": "sent device update to host", "host": host})

@defer.inlineCallbacks
def on_federation_query_user_devices(self, user_id):
Expand Down Expand Up @@ -451,12 +483,15 @@ def __init__(self, hs, device_handler):
iterable=True,
)

@trace
@defer.inlineCallbacks
def incoming_device_list_update(self, origin, edu_content):
"""Called on incoming device list update from federation. Responsible
for parsing the EDU and adding to pending updates list.
"""

set_tag("origin", origin)
set_tag("edu_content", edu_content)
user_id = edu_content.pop("user_id")
device_id = edu_content.pop("device_id")
stream_id = str(edu_content.pop("stream_id")) # They may come as ints
Expand All @@ -471,12 +506,30 @@ def incoming_device_list_update(self, origin, edu_content):
device_id,
origin,
)

set_tag("error", True)
log_kv(
{
"message": "Got a device list update edu from a user and "
"device which does not match the origin of the request.",
"user_id": user_id,
"device_id": device_id,
}
)
return

room_ids = yield self.store.get_rooms_for_user(user_id)
if not room_ids:
# We don't share any rooms with this user. Ignore update, as we
# probably won't get any further updates.
set_tag("error", True)
log_kv(
{
"message": "Got an update from a user for which "
"we don't share any rooms",
"other user_id": user_id,
}
)
logger.warning(
"Got device list update edu for %r/%r, but don't share a room",
user_id,
Expand Down Expand Up @@ -578,6 +631,7 @@ def user_device_resync(self, user_id):
request:
https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid
"""
log_kv({"message": "Doing resync to update device list."})
# Fetch all devices for the user.
origin = get_domain_from_id(user_id)
try:
Expand All @@ -594,13 +648,20 @@ def user_device_resync(self, user_id):
# eventually become consistent.
return
except FederationDeniedError as e:
set_tag("error", True)
log_kv({"reason": "FederationDeniedError"})
logger.info(e)
return
except Exception:
except Exception as e:
# TODO: Remember that we are now out of sync and try again
# later
set_tag("error", True)
log_kv(
{"message": "Exception raised by federation request", "exception": e}
)
logger.exception("Failed to handle device list update for %s", user_id)
return
log_kv({"result": result})
stream_id = result["stream_id"]
devices = result["devices"]

Expand Down
6 changes: 5 additions & 1 deletion synapse/handlers/devicemessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from synapse.api.errors import SynapseError
from synapse.logging.opentracing import (
get_active_span_text_map,
log_kv,
set_tag,
start_active_span,
whitelisted_homeserver,
Expand Down Expand Up @@ -86,7 +87,8 @@ def on_direct_to_device_edu(self, origin, content):

@defer.inlineCallbacks
def send_device_message(self, sender_user_id, message_type, messages):

set_tag("number_of_messages", len(messages))
set_tag("sender", sender_user_id)
local_messages = {}
remote_messages = {}
for user_id, by_device in messages.items():
Expand Down Expand Up @@ -124,6 +126,7 @@ def send_device_message(self, sender_user_id, message_type, messages):
else None,
}

log_kv({"local_messages": local_messages})
stream_id = yield self.store.add_messages_to_device_inbox(
local_messages, remote_edu_contents
)
Expand All @@ -132,6 +135,7 @@ def send_device_message(self, sender_user_id, message_type, messages):
"to_device_key", stream_id, users=local_messages.keys()
)

log_kv({"remote_messages": remote_messages})
for destination in remote_messages.keys():
# Enqueue a new federation transaction to send the new
# device messages to each remote destination.
Expand Down
70 changes: 18 additions & 52 deletions synapse/logging/opentracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,14 @@ def interesting_function(*args, **kwargs):
return something_usual_and_useful


Operation names can be explicitly set for functions by using
``trace_using_operation_name``
Operation names can be explicitly set for a function by passing the
operation name to ``trace``

.. code-block:: python

from synapse.logging.opentracing import trace_using_operation_name
from synapse.logging.opentracing import trace

@trace_using_operation_name("A *much* better operation name")
@trace(operation_name="a_better_operation_name")
JorikSchellekens marked this conversation as resolved.
Show resolved Hide resolved
def interesting_badly_named_function(*args, **kwargs):
# Does all kinds of cool and expected things
return something_usual_and_useful
Expand Down Expand Up @@ -641,60 +641,22 @@ def extract_text_map(carrier):
# Tracing decorators


def trace(func):
def trace(func=None, opname=None):
"""
Decorator to trace a function.
Sets the operation name to that of the function's.
Sets the operation name to that of the function's or that given
as operation_name. See the module's doc string for usage
examples.
"""
if opentracing is None:
return func

@wraps(func)
def _trace_inner(self, *args, **kwargs):
if opentracing is None:
return func(self, *args, **kwargs)

scope = start_active_span(func.__name__)
scope.__enter__()

try:
result = func(self, *args, **kwargs)
if isinstance(result, defer.Deferred):

def call_back(result):
scope.__exit__(None, None, None)
return result

def err_back(result):
scope.span.set_tag(tags.ERROR, True)
scope.__exit__(None, None, None)
return result

result.addCallbacks(call_back, err_back)

else:
scope.__exit__(None, None, None)

return result

except Exception as e:
scope.__exit__(type(e), None, e.__traceback__)
raise

return _trace_inner


def trace_using_operation_name(operation_name):
"""Decorator to trace a function. Explicitely sets the operation_name."""

def trace(func):
"""
Decorator to trace a function.
Sets the operation name to that of the function's.
"""
def decorator(func):
if opentracing is None:
return func

# Doing this weird assignment thing to get around local variable
# referenced before assignment 'bug' raised by checkstyle
JorikSchellekens marked this conversation as resolved.
Show resolved Hide resolved
operation_name = opname if opname else func.__name__

@wraps(func)
def _trace_inner(self, *args, **kwargs):
if opentracing is None:
Expand All @@ -717,6 +679,7 @@ def err_back(result):
return result

result.addCallbacks(call_back, err_back)

else:
scope.__exit__(None, None, None)

Expand All @@ -728,7 +691,10 @@ def err_back(result):

return _trace_inner

return trace
if func:
return decorator(func)
else:
return decorator


def tag_args(func):
Expand Down
4 changes: 2 additions & 2 deletions synapse/rest/client/v2_alpha/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
parse_json_object_from_request,
parse_string,
)
from synapse.logging.opentracing import log_kv, set_tag, trace_using_operation_name
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.types import StreamToken

from ._base import client_patterns
Expand Down Expand Up @@ -69,7 +69,7 @@ def __init__(self, hs):
self.auth = hs.get_auth()
self.e2e_keys_handler = hs.get_e2e_keys_handler()

@trace_using_operation_name("upload_keys")
@trace(opname="upload_keys")
@defer.inlineCallbacks
def on_POST(self, request, device_id):
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
Expand Down
Loading