Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Handle replication commands synchronously where possible #7876

Merged
merged 13 commits into from
Jul 27, 2020
42 changes: 28 additions & 14 deletions synapse/replication/tcp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):

On receiving a new command it calls `on_<COMMAND_NAME>` with the parsed
command before delegating to `ReplicationCommandHandler.on_<COMMAND_NAME>`.
`ReplicationCommandHandler.on_<COMMAND_NAME>` can optionally return a coroutine;
if so, that will get run as a background process.

It also sends `PING` periodically, and correctly times out remote connections
(if they send a `PING` command)
Expand Down Expand Up @@ -232,18 +234,17 @@ def lineReceived(self, line: bytes):

tcp_inbound_commands_counter.labels(cmd.NAME, self.name).inc()

# Now lets try and call on_<CMD_NAME> function
run_as_background_process(
"replication-" + cmd.get_logcontext_id(), self.handle_command, cmd
)
self.handle_command(cmd)

async def handle_command(self, cmd: Command):
def handle_command(self, cmd: Command) -> None:
"""Handle a command we have received over the replication stream.

First calls `self.on_<COMMAND>` if it exists, then calls
`self.command_handler.on_<COMMAND>` if it exists. This allows for
protocol level handling of commands (e.g. PINGs), before delegating to
the handler.
`self.command_handler.on_<COMMAND>` if it exists (which can optionally
return an Awaitable).

This allows for protocol level handling of commands (e.g. PINGs), before
delegating to the handler.

Args:
cmd: received command
Expand All @@ -254,13 +255,26 @@ async def handle_command(self, cmd: Command):
# specific handling.
cmd_func = getattr(self, "on_%s" % (cmd.NAME,), None)
if cmd_func:
await cmd_func(cmd)
cmd_func(cmd)
handled = True

# Then call out to the handler.
cmd_func = getattr(self.command_handler, "on_%s" % (cmd.NAME,), None)
if cmd_func:
await cmd_func(self, cmd)
res = cmd_func(self, cmd)

# the handler might be a coroutine: fire it off as a background process
# if so.

if hasattr(res, "__await__"):
richvdh marked this conversation as resolved.
Show resolved Hide resolved

async def handle_command():
await res

run_as_background_process(
"replication-" + cmd.get_logcontext_id(), handle_command
richvdh marked this conversation as resolved.
Show resolved Hide resolved
)

handled = True

if not handled:
Expand Down Expand Up @@ -336,10 +350,10 @@ def _send_pending_commands(self):
for cmd in pending:
self.send_command(cmd)

async def on_PING(self, line):
def on_PING(self, line):
self.received_ping = True

async def on_ERROR(self, cmd):
def on_ERROR(self, cmd):
logger.error("[%s] Remote reported error: %r", self.id(), cmd.data)

def pauseProducing(self):
Expand Down Expand Up @@ -431,7 +445,7 @@ def connectionMade(self):
self.send_command(ServerCommand(self.server_name))
super().connectionMade()

async def on_NAME(self, cmd):
def on_NAME(self, cmd):
logger.info("[%s] Renamed to %r", self.id(), cmd.data)
self.name = cmd.data

Expand Down Expand Up @@ -460,7 +474,7 @@ def connectionMade(self):
# Once we've connected subscribe to the necessary streams
self.replicate()

async def on_SERVER(self, cmd):
def on_SERVER(self, cmd):
if cmd.data != self.server_name:
logger.error("[%s] Connected to wrong remote: %r", self.id(), cmd.data)
self.send_error("Wrong remote")
Expand Down
40 changes: 20 additions & 20 deletions synapse/replication/tcp/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,36 +109,36 @@ def messageReceived(self, pattern: str, channel: str, message: str):
# remote instances.
tcp_inbound_commands_counter.labels(cmd.NAME, "redis").inc()

# Now lets try and call on_<CMD_NAME> function
run_as_background_process(
"replication-" + cmd.get_logcontext_id(), self.handle_command, cmd
)
self.handle_command(cmd)

async def handle_command(self, cmd: Command):
def handle_command(self, cmd: Command) -> None:
"""Handle a command we have received over the replication stream.

By default delegates to on_<COMMAND>, which should return an awaitable.
Delegates to `self.handler.on_<COMMAND>` (which can optionally return an
Awaitable).

Args:
cmd: received command
"""
handled = False

# First call any command handlers on this instance. These are for redis
# specific handling.
cmd_func = getattr(self, "on_%s" % (cmd.NAME,), None)
if cmd_func:
await cmd_func(cmd)
handled = True

# Then call out to the handler.
cmd_func = getattr(self.handler, "on_%s" % (cmd.NAME,), None)
if cmd_func:
await cmd_func(self, cmd)
handled = True

if not handled:
if not cmd_func:
logger.warning("Unhandled command: %r", cmd)
return

res = cmd_func(self, cmd)

# the handler might be a coroutine: fire it off as a background process
# if so.

if hasattr(res, "__await__"):
richvdh marked this conversation as resolved.
Show resolved Hide resolved

async def handle_command():
await res

run_as_background_process(
"replication-" + cmd.get_logcontext_id(), handle_command
)

def connectionLost(self, reason):
logger.info("Lost connection to redis")
Expand Down