Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix background updates to handle redactions/rejections #5352

Merged
merged 6 commits into from
Jun 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/5352.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix room stats and presence background updates to correctly handle missing events.
11 changes: 7 additions & 4 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -828,14 +828,17 @@ def _handle_state_delta(self, deltas):
# joins.
continue

event = yield self.store.get_event(event_id)
if event.content.get("membership") != Membership.JOIN:
event = yield self.store.get_event(event_id, allow_none=True)
if not event or event.content.get("membership") != Membership.JOIN:
# We only care about joins
continue

if prev_event_id:
prev_event = yield self.store.get_event(prev_event_id)
if prev_event.content.get("membership") == Membership.JOIN:
prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
if (
prev_event
and prev_event.content.get("membership") == Membership.JOIN
):
# Ignore changes to join events.
continue

Expand Down
18 changes: 13 additions & 5 deletions synapse/handlers/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def _handle_deltas(self, deltas):
event_id = delta["event_id"]
stream_id = delta["stream_id"]
prev_event_id = delta["prev_event_id"]
stream_pos = delta["stream_id"]

logger.debug("Handling: %r %r, %s", typ, state_key, event_id)

Expand All @@ -136,10 +137,15 @@ def _handle_deltas(self, deltas):
event_content = {}

if event_id is not None:
event_content = (yield self.store.get_event(event_id)).content or {}
event = yield self.store.get_event(event_id, allow_none=True)
if event:
event_content = event.content or {}

# We use stream_pos here rather than fetch by event_id as event_id
# may be None
now = yield self.store.get_received_ts_by_stream_pos(stream_pos)

# quantise time to the nearest bucket
now = yield self.store.get_received_ts(event_id)
now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size

if typ == EventTypes.Member:
Expand All @@ -149,9 +155,11 @@ def _handle_deltas(self, deltas):
# compare them.
prev_event_content = {}
if prev_event_id is not None:
prev_event_content = (
yield self.store.get_event(prev_event_id)
).content
prev_event = yield self.store.get_event(
prev_event_id, allow_none=True,
)
if prev_event:
prev_event_content = prev_event.content

membership = event_content.get("membership", Membership.LEAVE)
prev_membership = prev_event_content.get("membership", Membership.LEAVE)
Expand Down
37 changes: 37 additions & 0 deletions synapse/storage/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,43 @@ def get_received_ts(self, event_id):
desc="get_received_ts",
)

def get_received_ts_by_stream_pos(self, stream_ordering):
"""Given a stream ordering get an approximate timestamp of when it
happened.

This is done by simply taking the received ts of the first event that
has a stream ordering greater than or equal to the given stream pos.
If none exists returns the current time, on the assumption that it must
have happened recently.

Args:
stream_ordering (int)

Returns:
Deferred[int]
"""

def _get_approximate_received_ts_txn(txn):
sql = """
SELECT received_ts FROM events
WHERE stream_ordering >= ?
LIMIT 1
"""

txn.execute(sql, (stream_ordering,))
row = txn.fetchone()
if row and row[0]:
ts = row[0]
else:
ts = self.clock.time_msec()

return ts

return self.runInteraction(
"get_approximate_received_ts",
_get_approximate_received_ts_txn,
)

@defer.inlineCallbacks
def get_event(
self,
Expand Down
62 changes: 59 additions & 3 deletions tests/handlers/test_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def test_incorrect_state_transition(self):
"a2": {"membership": "not a real thing"},
}

def get_event(event_id):
def get_event(event_id, allow_none=True):
m = Mock()
m.content = events[event_id]
d = defer.Deferred()
Expand All @@ -224,7 +224,7 @@ def get_received_ts(event_id):
"room_id": "room",
"event_id": "a1",
"prev_event_id": "a2",
"stream_id": "bleb",
"stream_id": 60,
}
]

Expand All @@ -241,11 +241,67 @@ def get_received_ts(event_id):
"room_id": "room",
"event_id": "a2",
"prev_event_id": "a1",
"stream_id": "bleb",
"stream_id": 100,
}
]

f = self.get_failure(self.handler._handle_deltas(deltas), ValueError)
self.assertEqual(
f.value.args[0], "'not a real thing' is not a valid membership"
)

def test_redacted_prev_event(self):
"""
If the prev_event does not exist, then it is assumed to be a LEAVE.
"""
u1 = self.register_user("u1", "pass")
u1_token = self.login("u1", "pass")

room_1 = self.helper.create_room_as(u1, tok=u1_token)

# Do the initial population of the user directory via the background update
self._add_background_updates()

while not self.get_success(self.store.has_completed_background_updates()):
self.get_success(self.store.do_next_background_update(100), by=0.1)

events = {
"a1": None,
"a2": {"membership": Membership.JOIN},
}

def get_event(event_id, allow_none=True):
if events.get(event_id):
m = Mock()
m.content = events[event_id]
else:
m = None
d = defer.Deferred()
self.reactor.callLater(0.0, d.callback, m)
return d

def get_received_ts(event_id):
return defer.succeed(1)

self.store.get_received_ts = get_received_ts
self.store.get_event = get_event

deltas = [
{
"type": EventTypes.Member,
"state_key": "some_user:test",
"room_id": room_1,
"event_id": "a2",
"prev_event_id": "a1",
"stream_id": 100,
}
]

# Handle our fake deltas, which has a user going from LEAVE -> JOIN.
self.get_success(self.handler._handle_deltas(deltas))

# One delta, with two joined members -- the room creator, and our fake
# user.
r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
self.assertEqual(len(r), 1)
self.assertEqual(r[0]["joined_members"], 2)