Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit cache invalidation replication line length #4748

Merged
merged 4 commits into from Feb 27, 2019
Merged
Diff settings

Always

Just for now

Copy path View file
@@ -0,0 +1 @@
Improve replication performance by reducing cache invalidation traffic.
@@ -268,7 +268,17 @@ def send_command(self, cmd, do_buffer=True):
if "\n" in string:
raise Exception("Unexpected newline in command: %r", string)

self.sendLine(string.encode("utf-8"))
encoded_string = string.encode("utf-8")

if len(encoded_string) > self.MAX_LENGTH:
raise Exception(
"Failed to send command %s as too long (%d > %d)" % (
cmd.NAME,
len(encoded_string), self.MAX_LENGTH,
)
)

self.sendLine(encoded_string)

self.last_sent_command = self.clock.time_msec()

@@ -361,6 +371,11 @@ def __str__(self):
def id(self):
return "%s-%s" % (self.name, self.conn_id)

def lineLengthExceeded(self, line):
"""Called when we receive a line that is above the maximum line length
"""
self.send_error("Line length exceeded")


class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
VALID_INBOUND_COMMANDS = VALID_CLIENT_COMMANDS
Copy path View file
@@ -1327,10 +1327,15 @@ def _invalidate_state_caches_and_stream(self, txn, room_id, members_changed):
"""
txn.call_after(self._invalidate_state_caches, room_id, members_changed)

keys = itertools.chain([room_id], members_changed)
self._send_invalidation_to_replication(
txn, _CURRENT_STATE_CACHE_NAME, keys,
)
# We need to be careful that the size of the `members_changed` list
# isn't so large that it causes problems sending over replication, so we
# send them in chunks.
members_changed = list(members_changed)

This comment has been minimized.

Copy link
@richvdh

richvdh Feb 26, 2019

Member

I think you want synapse.util.batch_iter here.

for i in range(0, len(members_changed), 100):

This comment has been minimized.

Copy link
@richvdh

richvdh Feb 26, 2019

Member

given that mxids have a maxlen of 255 chars, we can still easily overflow the max line length of 16K

keys = itertools.chain([room_id], members_changed[i:i + 100])
self._send_invalidation_to_replication(
txn, _CURRENT_STATE_CACHE_NAME, keys,
)

def _invalidate_state_caches(self, room_id, members_changed):
"""Invalidates caches that are based on the current state, but does
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.