Join GitHub today
GitHub is home to over 20 million developers working together to host and review code, manage projects, and build software together.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
Already on GitHub? Sign in to your account
Reduce federation replication traffic #2115
Conversation
| + continue | ||
| + | ||
| + host = get_domain_from_id(user_id) | ||
| + hosts_and_states.append(([host], local_states)) |
| + state.user_id: state for state in states | ||
| + }) | ||
| + | ||
| + preserve_fn(self._attempt_new_transaction)(destination) |
| @@ -224,17 +227,95 @@ def _send_pdu(self, pdu, destinations): | ||
| self._attempt_new_transaction, destination | ||
| ) | ||
| - def send_presence(self, destination, states): | ||
| - if not self.can_send_to(destination): | ||
| + @preserve_fn |
richvdh
Apr 11, 2017
Member
I think so. I think a comment saying # the caller should not yield on this on this line would help.
erikjohnston
requested a review
from richvdh
Apr 10, 2017
erikjohnston
assigned
richvdh
Apr 10, 2017
richvdh
requested changes
Apr 11, 2017
looks broadly plausible, but I had a bit of a nightmare trying to understand what was going on. A lot of that is because of poor comments in the existing code, so I had to go digging to understand things. Accordingly, I've made a load of requests that you comment things better; I realise that some of them aren't changing, but in the interests of trying to make work in this area less awful in the first place I'd appreciate it if we could take the opportunity to improve things.
I think moving get_interested_parties out would really help.
| + # way that can be replicated. This means that we don't have a way to | ||
| + # invalidate the cache correctly. | ||
| + # This is fine since in practice nobody uses the presence list stuff... | ||
| + get_presence_list_accepted = PresenceStore.__dict__[ |
| - states, calculate_remote_hosts=False | ||
| - ) | ||
| - room_ids_to_states, users_to_states, _ = parties | ||
| + parties = yield self._get_interested_parties(states) |
richvdh
Apr 11, 2017
Member
since we're changing the signature of _get_interested_parties anyway, it would be a good time to rename it to reflect the fact it is being used outside PresenceHandler.
see below: I think _get_interested_parties needs to move anyway.
| @@ -187,18 +190,14 @@ def send_edu(self, destination, edu_type, content, key=None): | ||
| self.notifier.on_new_replication_data() | ||
| - def send_presence(self, destination, states): | ||
| + def send_presence(self, states): | ||
| """As per TransactionQueue""" |
| - state.user_id: state | ||
| - for state in states | ||
| - }) | ||
| + local_states = filter(lambda s: self.is_mine_id(s.user_id), states) |
richvdh
Apr 11, 2017
Member
can you comment to explain why we are filtering here. I guess it's so that we only send out presence updates for our own users, but also: why would we get as far as here with an update for someone else?
| - self.presence_changed[pos] = [ | ||
| - (destination, state.user_id) for state in states | ||
| - ] | ||
| + self.presence_map.update({state.user_id: state for state in local_states}) |
richvdh
Apr 11, 2017
Member
can you document what presence_map and presence_changed mean in the constructor so I can understand if this is a sane thing?
| @@ -78,6 +78,7 @@ def __init__(self, hs): | ||
| self.pending_edus_by_dest = edus = {} | ||
| # Presence needs to be separate as we send single aggragate EDUs | ||
| + self.pending_presence = {} |
| @@ -224,17 +227,95 @@ def _send_pdu(self, pdu, destinations): | ||
| self._attempt_new_transaction, destination | ||
| ) | ||
| - def send_presence(self, destination, states): | ||
| - if not self.can_send_to(destination): | ||
| + @preserve_fn |
richvdh
Apr 11, 2017
Member
I think so. I think a comment saying # the caller should not yield on this on this line would help.
| + @preserve_fn | ||
| + @defer.inlineCallbacks | ||
| + def send_presence(self, states): | ||
| + """Send the new presence states to the appropriate destinations. |
richvdh
Apr 11, 2017
Member
"Start sending" ... "if we are not already sending updates" or something
| + # hosts in those rooms. | ||
| + room_ids_to_states = {} | ||
| + users_to_states = {} | ||
| + for state in states.itervalues(): |
richvdh
Apr 11, 2017
Member
please let's not c&p get_interested_parties here. How about moving this lump of code to the store or something?
(For super-mega-bonus points, make it a separate PR which precedes this one...)
| + for u in plist: | ||
| + users_to_states.setdefault(u, []).append(state) | ||
| + | ||
| + hosts_and_states = [] |
| + | ||
| + hosts_and_states = [] | ||
| + for room_id, states in room_ids_to_states.items(): | ||
| + local_states = filter(lambda s: self.is_mine_id(s.user_id), states) |
richvdh
Apr 11, 2017
Member
again, why are we filtering, and why would updates for remote users be getting in here? (and could we filter before deriving room_ids_to_states and users_to_states to avoid some work? happy if you want to leave this for now to avoid changing too much at once)
| @@ -615,12 +611,12 @@ def current_state_for_users(self, user_ids): | ||
| defer.returnValue(states) | ||
| @defer.inlineCallbacks | ||
| - def _get_interested_parties(self, states, calculate_remote_hosts=True): | ||
| + def _get_interested_parties(self, states): |
richvdh
Apr 11, 2017
Member
what are we expecting states to be here? A list(UserPresenceState) representing presence updates, presumably?
| @@ -615,12 +611,12 @@ def current_state_for_users(self, user_ids): | ||
| defer.returnValue(states) | ||
| @defer.inlineCallbacks | ||
| - def _get_interested_parties(self, states, calculate_remote_hosts=True): | ||
| + def _get_interested_parties(self, states): | ||
| """Given a list of states return which entities (rooms, users, servers) |
| - | ||
| - host = get_domain_from_id(user_id) | ||
| - hosts_to_states.setdefault(host, []).extend(local_states) | ||
| - | ||
| # TODO: de-dup hosts_to_states, as a single host might have multiple |
| """Sends state updates to remote servers. | ||
| Args: | ||
| - hosts_to_states (dict): Mapping `server_name` -> `[UserPresenceState]` | ||
| + hosts_to_states (list): list(state) |
richvdh
assigned
erikjohnston
and unassigned
richvdh
Apr 11, 2017
erikjohnston
added some commits
Apr 11, 2017
|
I've moved the |
erikjohnston
assigned
richvdh
and unassigned
erikjohnston
Apr 11, 2017
| @@ -78,7 +78,18 @@ def __init__(self, hs): | ||
| self.pending_edus_by_dest = edus = {} | ||
| # Presence needs to be separate as we send single aggragate EDUs |
| + # to be sent out by user_id. Entries here get processed and put in | ||
| + # pending_presence_by_dest | ||
| + self.pending_presence = {} | ||
| + # Map of destination -> user_id -> UserPresenceState of pending presence |
| + @preserve_fn | ||
| + @defer.inlineCallbacks | ||
| + def send_presence(self, states): | ||
| + """Send the new presence states to the appropriate destinations. |
| @@ -669,7 +641,7 @@ def _push_to_remotes(self, states): | ||
| """Sends state updates to remote servers. | ||
| Args: | ||
| - hosts_to_states (list): list(state) | ||
| + hosts_to_states (list(UserPresenceState)) |
| + each row the list of UserPresenceState should be sent to each | ||
| + destination | ||
| + """ | ||
| + hosts_and_states = [] # Final result to return |
richvdh
Apr 12, 2017
Member
now that you have an (excellent) description of the return value of the function, this probably doesn't really need a comment, but it's harmless enough
| + # hosts in those rooms. | ||
| + room_ids_to_states = {} | ||
| + users_to_states = {} | ||
| + for state in states.itervalues(): |
erikjohnston
Apr 12, 2017
Owner
Heh, for some reason I got it stuck in my head that they were different
Fine by me. Looks much better now. |
erikjohnston commentedApr 10, 2017
This is mainly done by moving the calculation of where to send presence updates from the presence handler to the transaction queue, so we only need to send the presence event (and not the destinations) across the replication connection. Before we were duplicating by sending the full state across once per destination.