Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Combine the CurrentStateDeltaStream into the EventStream #4955

Merged
merged 10 commits into from Mar 28, 2019
2 changes: 2 additions & 0 deletions changelog.d/4953.misc
@@ -0,0 +1,2 @@
Split synapse.replication.tcp.streams into smaller files.

1 change: 1 addition & 0 deletions changelog.d/4954.misc
@@ -0,0 +1 @@
Refactor replication row generation/parsing.
1 change: 1 addition & 0 deletions changelog.d/4955.bugfix
@@ -0,0 +1 @@
Fix sync bug which made accepting invites unreliable in worker-mode synapses.
2 changes: 1 addition & 1 deletion synapse/app/federation_sender.py
Expand Up @@ -38,7 +38,7 @@
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.replication.tcp.streams import ReceiptsStream
from synapse.replication.tcp.streams._base import ReceiptsStream
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.types import ReadReceipt
Expand Down
5 changes: 4 additions & 1 deletion synapse/app/synchrotron.py
Expand Up @@ -48,6 +48,7 @@
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.replication.tcp.streams.events import EventsStreamEventRow
from synapse.rest.client.v1 import events
from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
Expand Down Expand Up @@ -369,7 +370,9 @@ def process_and_notify(self, stream_name, token, rows):
# We shouldn't get multiple rows per token for events stream, so
# we don't need to optimise this for multiple rows.
for row in rows:
event = yield self.store.get_event(row.event_id)
if row.type != EventsStreamEventRow.TypeId:
continue
event = yield self.store.get_event(row.data.event_id)
extra_users = ()
if event.type == EventTypes.Member:
extra_users = (event.state_key,)
Expand Down
17 changes: 10 additions & 7 deletions synapse/app/user_dir.py
Expand Up @@ -36,6 +36,10 @@
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.replication.tcp.streams.events import (
EventsStream,
EventsStreamCurrentStateRow,
)
from synapse.rest.client.v2_alpha import user_directory
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
Expand Down Expand Up @@ -73,19 +77,18 @@ def __init__(self, db_conn, hs):
prefilled_cache=curr_state_delta_prefill,
)

self._current_state_delta_pos = events_max

def stream_positions(self):
result = super(UserDirectorySlaveStore, self).stream_positions()
result["current_state_deltas"] = self._current_state_delta_pos
return result

def process_replication_rows(self, stream_name, token, rows):
if stream_name == "current_state_deltas":
self._current_state_delta_pos = token
if stream_name == EventsStream.NAME:
self._stream_id_gen.advance(token)
for row in rows:
if row.type != EventsStreamCurrentStateRow.TypeId:
continue
self._curr_state_delta_stream_cache.entity_has_changed(
row.room_id, token
row.data.room_id, token
)
return super(UserDirectorySlaveStore, self).process_replication_rows(
stream_name, token, rows
Expand Down Expand Up @@ -170,7 +173,7 @@ def on_rdata(self, stream_name, token, rows):
yield super(UserDirectoryReplicationHandler, self).on_rdata(
stream_name, token, rows
)
if stream_name == "current_state_deltas":
if stream_name == EventsStream.NAME:
run_in_background(self._notify_directory)

@defer.inlineCallbacks
Expand Down
8 changes: 6 additions & 2 deletions synapse/replication/slave/storage/events.py
Expand Up @@ -16,6 +16,7 @@
import logging

from synapse.api.constants import EventTypes
from synapse.replication.tcp.streams.events import EventsStreamEventRow
from synapse.storage.event_federation import EventFederationWorkerStore
from synapse.storage.event_push_actions import EventPushActionsWorkerStore
from synapse.storage.events_worker import EventsWorkerStore
Expand Down Expand Up @@ -79,9 +80,12 @@ def process_replication_rows(self, stream_name, token, rows):
if stream_name == "events":
self._stream_id_gen.advance(token)
for row in rows:
if row.type != EventsStreamEventRow.TypeId:
continue
data = row.data
self.invalidate_caches_for_event(
token, row.event_id, row.room_id, row.type, row.state_key,
row.redacts,
token, data.event_id, data.room_id, data.type, data.state_key,
data.redacts,
backfilled=False,
)
elif stream_name == "backfill":
Expand Down
5 changes: 3 additions & 2 deletions synapse/replication/tcp/client.py
Expand Up @@ -105,13 +105,14 @@ def start_replication(self, hs):
def on_rdata(self, stream_name, token, rows):
"""Called to handle a batch of replication data with a given stream token.

By default this just pokes the slave store. Can be overriden in subclasses to
By default this just pokes the slave store. Can be overridden in subclasses to
handle more.

Args:
stream_name (str): name of the replication stream for this batch of rows
token (int): stream token for this batch of rows
rows (list): a list of Stream.ROW_TYPE objects.
rows (list): a list of Stream.ROW_TYPE objects as returned by
Stream.parse_row.

Returns:
Deferred|None
Expand Down
6 changes: 3 additions & 3 deletions synapse/replication/tcp/protocol.py
Expand Up @@ -42,8 +42,8 @@
> POSITION backfill 1
> POSITION caches 1
> RDATA caches 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513]
> RDATA events 14 ["$149019767112vOHxz:localhost:8823",
"!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]
> RDATA events 14 ["ev", ["$149019767112vOHxz:localhost:8823",
"!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]]
< PING 1490197675618
> ERROR server stopping
* connection closed by server *
Expand Down Expand Up @@ -605,7 +605,7 @@ def on_RDATA(self, cmd):
inbound_rdata_count.labels(stream_name).inc()

try:
row = STREAMS_MAP[stream_name].ROW_TYPE(*cmd.row)
row = STREAMS_MAP[stream_name].parse_row(cmd.row)
except Exception:
logger.exception(
"[%s] Failed to parse RDATA: %r %r",
Expand Down
3 changes: 2 additions & 1 deletion synapse/replication/tcp/resource.py
Expand Up @@ -30,7 +30,8 @@
from synapse.util.metrics import Measure, measure_func

from .protocol import ServerReplicationStreamProtocol
from .streams import STREAMS_MAP, FederationStream
from .streams import STREAMS_MAP
from .streams.federation import FederationStream

stream_updates_counter = Counter("synapse_replication_tcp_resource_stream_updates",
"", ["stream_name"])
Expand Down
49 changes: 49 additions & 0 deletions synapse/replication/tcp/streams/__init__.py
@@ -0,0 +1,49 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Defines all the valid streams that clients can subscribe to, and the format
of the rows returned by each stream.

Each stream is defined by the following information:

stream name: The name of the stream
row type: The type that is used to serialise/deserialse the row
current_token: The function that returns the current token for the stream
update_function: The function that returns a list of updates between two tokens
"""

from . import _base, events, federation

STREAMS_MAP = {
stream.NAME: stream
for stream in (
events.EventsStream,
_base.BackfillStream,
_base.PresenceStream,
_base.TypingStream,
_base.ReceiptsStream,
_base.PushRulesStream,
_base.PushersStream,
_base.CachesStream,
_base.PublicRoomsStream,
_base.DeviceListsStream,
_base.ToDeviceStream,
federation.FederationStream,
_base.TagAccountDataStream,
_base.AccountDataStream,
_base.GroupServerStream,
)
}
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -13,16 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""Defines all the valid streams that clients can subscribe to, and the format
of the rows returned by each stream.

Each stream is defined by the following information:

stream name: The name of the stream
row type: The type that is used to serialise/deserialse the row
current_token: The function that returns the current token for the stream
update_function: The function that returns a list of updates between two tokens
"""
import itertools
import logging
from collections import namedtuple
Expand All @@ -34,14 +26,6 @@

MAX_EVENTS_BEHIND = 10000


EventStreamRow = namedtuple("EventStreamRow", (
"event_id", # str
"room_id", # str
"type", # str
"state_key", # str, optional
"redacts", # str, optional
))
BackfillStreamRow = namedtuple("BackfillStreamRow", (
"event_id", # str
"room_id", # str
Expand Down Expand Up @@ -96,10 +80,6 @@
ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", (
"entity", # str
))
FederationStreamRow = namedtuple("FederationStreamRow", (
"type", # str, the type of data as defined in the BaseFederationRows
"data", # dict, serialization of a federation.send_queue.BaseFederationRow
))
TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", (
"user_id", # str
"room_id", # str
Expand All @@ -111,12 +91,6 @@
"data_type", # str
"data", # dict
))
CurrentStateDeltaStreamRow = namedtuple("CurrentStateDeltaStream", (
"room_id", # str
"type", # str
"state_key", # str
"event_id", # str, optional
))
GroupsStreamRow = namedtuple("GroupsStreamRow", (
"group_id", # str
"user_id", # str
Expand All @@ -132,9 +106,24 @@ class Stream(object):
time it was called up until the point `advance_current_token` was called.
"""
NAME = None # The name of the stream
ROW_TYPE = None # The type of the row
ROW_TYPE = None # The type of the row. Used by the default impl of parse_row.
_LIMITED = True # Whether the update function takes a limit

@classmethod
def parse_row(cls, row):
"""Parse a row received over replication

By default, assumes that the row data is an array object and passes its contents
to the constructor of the ROW_TYPE for this stream.

Args:
row: row data from the incoming RDATA command, after json decoding

Returns:
ROW_TYPE object for this stream
"""
return cls.ROW_TYPE(*row)

def __init__(self, hs):
# The token from which we last asked for updates
self.last_token = self.current_token()
Expand Down Expand Up @@ -206,7 +195,7 @@ def get_updates_since(self, from_token):
from_token, current_token,
)

updates = [(row[0], self.ROW_TYPE(*row[1:])) for row in rows]
updates = [(row[0], row[1:]) for row in rows]

# check we didn't get more rows than the limit.
# doing it like this allows the update_function to be a generator.
Expand Down Expand Up @@ -236,20 +225,6 @@ def update_function(self, from_token, current_token, limit=None):
raise NotImplementedError()


class EventsStream(Stream):
"""We received a new event, or an event went from being an outlier to not
"""
NAME = "events"
ROW_TYPE = EventStreamRow

def __init__(self, hs):
store = hs.get_datastore()
self.current_token = store.get_current_events_token
self.update_function = store.get_all_new_forward_event_rows

super(EventsStream, self).__init__(hs)


class BackfillStream(Stream):
"""We fetched some old events and either we had never seen that event before
or it went from being an outlier to not.
Expand Down Expand Up @@ -404,22 +379,6 @@ def __init__(self, hs):
super(ToDeviceStream, self).__init__(hs)


class FederationStream(Stream):
"""Data to be sent over federation. Only available when master has federation
sending disabled.
"""
NAME = "federation"
ROW_TYPE = FederationStreamRow

def __init__(self, hs):
federation_sender = hs.get_federation_sender()

self.current_token = federation_sender.get_current_token
self.update_function = federation_sender.get_replication_rows

super(FederationStream, self).__init__(hs)


class TagAccountDataStream(Stream):
"""Someone added/removed a tag for a room
"""
Expand Down Expand Up @@ -463,21 +422,6 @@ def update_function(self, from_token, to_token, limit):
defer.returnValue(results)


class CurrentStateDeltaStream(Stream):
"""Current state for a room was changed
"""
NAME = "current_state_deltas"
ROW_TYPE = CurrentStateDeltaStreamRow

def __init__(self, hs):
store = hs.get_datastore()

self.current_token = store.get_max_current_state_delta_stream_id
self.update_function = store.get_all_updated_current_state_deltas

super(CurrentStateDeltaStream, self).__init__(hs)


class GroupServerStream(Stream):
NAME = "groups"
ROW_TYPE = GroupsStreamRow
Expand All @@ -489,26 +433,3 @@ def __init__(self, hs):
self.update_function = store.get_all_groups_changes

super(GroupServerStream, self).__init__(hs)


STREAMS_MAP = {
stream.NAME: stream
for stream in (
EventsStream,
BackfillStream,
PresenceStream,
TypingStream,
ReceiptsStream,
PushRulesStream,
PushersStream,
CachesStream,
PublicRoomsStream,
DeviceListsStream,
ToDeviceStream,
FederationStream,
TagAccountDataStream,
AccountDataStream,
CurrentStateDeltaStream,
GroupServerStream,
)
}