Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Add v2 state resolution algorithm #4040

Merged
merged 9 commits into from Oct 24, 2018
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/3786.misc
@@ -0,0 +1 @@
Add initial implementation of new state resolution algorithm
2 changes: 1 addition & 1 deletion synapse/event_auth.py
Expand Up @@ -690,7 +690,7 @@ def auth_types_for_event(event):
auth_types = []

auth_types.append((EventTypes.PowerLevels, "", ))
auth_types.append((EventTypes.Member, event.user_id, ))
auth_types.append((EventTypes.Member, event.sender, ))
auth_types.append((EventTypes.Create, "", ))

if event.type == EventTypes.Member:
Expand Down
23 changes: 10 additions & 13 deletions synapse/handlers/federation.py
Expand Up @@ -53,7 +53,7 @@
ReplicationFederationSendEventsRestServlet,
)
from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
from synapse.state import resolve_events_with_factory
from synapse.state import StateResolutionStore, resolve_events_with_factory
from synapse.types import UserID, get_domain_from_id
from synapse.util import logcontext, unwrapFirstError
from synapse.util.async_helpers import Linearizer
Expand Down Expand Up @@ -384,21 +384,18 @@ def on_receive_pdu(
for x in remote_state:
event_map[x.event_id] = x

# Resolve any conflicting state
@defer.inlineCallbacks
def fetch(ev_ids):
fetched = yield self.store.get_events(
ev_ids, get_prev_content=False, check_redacted=False,
)
# add any events we fetch here to the `event_map` so that we
# can use them to build the state event list below.
event_map.update(fetched)
defer.returnValue(fetched)

room_version = yield self.store.get_room_version(room_id)
state_map = yield resolve_events_with_factory(
room_version, state_maps, event_map, fetch,
room_version, state_maps, event_map,
state_res_store=StateResolutionStore(self.store),
)

evs = yield self.store.get_events(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could do with a comment (now that the one at line 393 has gone)

list(state_map.values()),
get_prev_content=False,
check_redacted=False,
)
event_map.update(evs)

# we need to give _process_received_pdu the actual state events
# rather than event ids, so generate that now.
Expand Down
87 changes: 69 additions & 18 deletions synapse/state/__init__.py
Expand Up @@ -19,13 +19,14 @@

from six import iteritems, itervalues

import attr
from frozendict import frozendict

from twisted.internet import defer

from synapse.api.constants import EventTypes, RoomVersions
from synapse.events.snapshot import EventContext
from synapse.state import v1
from synapse.state import v1, v2
from synapse.util.async_helpers import Linearizer
from synapse.util.caches import get_cache_factor_for
from synapse.util.caches.expiringcache import ExpiringCache
Expand Down Expand Up @@ -372,15 +373,10 @@ def resolve_state_groups_for_events(self, room_id, event_ids):

result = yield self._state_resolution_handler.resolve_state_groups(
room_id, room_version, state_groups_ids, None,
self._state_map_factory,
state_res_store=StateResolutionStore(self.store),
)
defer.returnValue(result)

def _state_map_factory(self, ev_ids):
return self.store.get_events(
ev_ids, get_prev_content=False, check_redacted=False,
)

@defer.inlineCallbacks
def resolve_events(self, room_version, state_sets, event):
logger.info(
Expand All @@ -401,7 +397,7 @@ def resolve_events(self, room_version, state_sets, event):
new_state = yield resolve_events_with_factory(
room_version, state_set_ids,
event_map=state_map,
state_map_factory=self._state_map_factory
state_res_store=StateResolutionStore(self.store),
)

new_state = {
Expand Down Expand Up @@ -436,7 +432,7 @@ def __init__(self, hs):
@defer.inlineCallbacks
@log_function
def resolve_state_groups(
self, room_id, room_version, state_groups_ids, event_map, state_map_factory,
self, room_id, room_version, state_groups_ids, event_map, state_res_store,
):
"""Resolves conflicts between a set of state groups

Expand All @@ -454,9 +450,11 @@ def resolve_state_groups(
a dict from event_id to event, for any events that we happen to
have in flight (eg, those currently being persisted). This will be
used as a starting point fof finding the state we need; any missing
events will be requested via state_map_factory.
events will be requested via state_res_store.

If None, all events will be fetched via state_res_store.

If None, all events will be fetched via state_map_factory.
state_res_store (StateResolutionStore)

Returns:
Deferred[_StateCacheEntry]: resolved state
Expand Down Expand Up @@ -502,7 +500,7 @@ def resolve_state_groups(
room_version,
list(itervalues(state_groups_ids)),
event_map=event_map,
state_map_factory=state_map_factory,
state_res_store=state_res_store,
)

# if the new state matches any of the input state groups, we can
Expand Down Expand Up @@ -583,7 +581,7 @@ def _make_state_cache_entry(
)


def resolve_events_with_factory(room_version, state_sets, event_map, state_map_factory):
def resolve_events_with_factory(room_version, state_sets, event_map, state_res_store):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we rename this now?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(and the v2 impl)

"""
Args:
room_version(str): Version of the room
Expand All @@ -599,21 +597,74 @@ def resolve_events_with_factory(room_version, state_sets, event_map, state_map_f

If None, all events will be fetched via state_map_factory.

state_map_factory(func): will be called
with a list of event_ids that are needed, and should return with
a Deferred of dict of event_id to event.
state_res_store (StateResolutionStore)

Returns
Deferred[dict[(str, str), str]]:
a map from (type, state_key) to event_id.
"""
if room_version in (RoomVersions.V1, RoomVersions.VDH_TEST,):
if room_version == RoomVersions.V1:
return v1.resolve_events_with_factory(
state_sets, event_map, state_map_factory,
state_sets, event_map, state_res_store.get_events,
)
elif room_version == RoomVersions.VDH_TEST:
return v2.resolve_events_with_factory(
state_sets, event_map, state_res_store,
)
else:
# This should only happen if we added a version but forgot to add it to
# the list above.
raise Exception(
"No state resolution algorithm defined for version %r" % (room_version,)
)


@attr.s
class StateResolutionStore(object):
"""Interface that allows state resolution algorithms to access the database
in well defined way.

Args:
store (DataStore)
"""

store = attr.ib()

def get_events(self, event_ids, allow_rejected=False):
"""Get events from the database

Args:
event_ids (list): The event_ids of the events to fetch
allow_rejected (bool): If True return rejected events.

Returns:
Deferred[dict[str, FrozenEvent]]: Dict from event_id to event.
"""

return self.store.get_events(
event_ids,
check_redacted=False,
get_prev_content=False,
allow_rejected=allow_rejected,
)

def get_auth_chain(self, event_ids):
"""Gets the full auth chain for a set of events (including rejected
events).

Includes the given event IDs in the result.

Note that:
1. All events must be state events.
2. For v1 rooms this may not have the full auth chain in the
presence of rejected events

Args:
event_ids (list): The event IDs of the events to fetch the auth
chain for. Must be state events.

Returns:
Deferred[list[str]]: List of event IDs of the auth chain.
"""

return self.store.get_auth_chain_ids(event_ids, include_given=True)