Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

Already on GitHub? Sign in to your account

Change pushers to use the event_actions table #705

Merged
merged 22 commits into from Apr 11, 2016

Conversation

Projects
None yet
2 participants
Member

dbkr commented Apr 7, 2016

Makes pushers no longer run as separate things that each listen on an event stream. They now have hooks that are called from points in message / receipt sending code where they go and query the event_push_actions table for new notifications.

This will completely eliminate the runtime used by the old style pushers, but done mostly to make way for doing email notifications in the same way.

dbkr added some commits Apr 6, 2016

Make pushers use the event_push_actions table instead of listening on…
… an event stream & running the rules again. Sytest passes, but remaining to do:

 * Make badges work again
 * Remove old, unused code
Send badge count pushes.
Also fix bugs with retrying.

@dbkr dbkr commented on the diff Apr 7, 2016

synapse/push/httppusher.py
d = {
'notification': {
- 'id': event['event_id'],
- 'room_id': event['room_id'],
- 'type': event['type'],
- 'sender': event['user_id'],
+ 'id': event.event_id, # deprecated: remove soon

dbkr added some commits Apr 7, 2016

Add comments on min_stream_id
saying that the min stream id won't be completely accurate all the time

@erikjohnston erikjohnston commented on an outdated diff Apr 7, 2016

synapse/push/push_tools.py
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+
+@defer.inlineCallbacks
+def get_badge_count(hs, user_id):
@erikjohnston

erikjohnston Apr 7, 2016

Owner

Given we only use hs.get_datastore(), it might be nice if this took a datastore instead?

@erikjohnston erikjohnston commented on an outdated diff Apr 7, 2016

synapse/push/push_tools.py
+
+ for r in joins:
+ if r.room_id in my_receipts_by_room:
+ last_unread_event_id = my_receipts_by_room[r.room_id]
+
+ notifs = yield (
+ hs.get_datastore().get_unread_event_push_actions_by_room_for_user(
+ r.room_id, user_id, last_unread_event_id
+ )
+ )
+ badge += notifs["notify_count"]
+ defer.returnValue(badge)
+
+
+@defer.inlineCallbacks
+def get_context_for_event(hs, ev):

@erikjohnston erikjohnston commented on an outdated diff Apr 7, 2016

synapse/push/pusherpool.py
)
- else:
- raise PusherConfigException(
- "Unknown pusher type '%s' for user %s" %
- (pusherdict['kind'], pusherdict['user_name'])
+ for u in users_affected:
+ if u in self.pushers:
+ for p in self.pushers[u].values():
+ yield p.on_new_notifications(min_stream_id, max_stream_id)
@erikjohnston

erikjohnston Apr 7, 2016

Owner

Possibly don't yield here and do a defer.gatherResults? Or maybe we don't even need to yield?

Also might be nice to surround in try/except to ensure an exception in one doesn't break another?

@erikjohnston erikjohnston commented on an outdated diff Apr 7, 2016

synapse/push/pusherpool.py
)
+ # This returns a tuple, user_id is at index 3
+ users_affected = set([r[3] for r in updated_receipts])
+ for u in users_affected:
+ if u in self.pushers:
+ for p in self.pushers[u].values():
+ yield p.on_new_receipts(min_stream_id, max_stream_id)

@erikjohnston erikjohnston and 1 other commented on an outdated diff Apr 7, 2016

synapse/storage/event_push_actions.py
@@ -100,6 +100,54 @@ def _get_unread_event_push_actions_by_room(txn):
)
defer.returnValue(ret)
+ @defer.inlineCallbacks
+ def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering):
+ def f(txn):
+ sql = (
+ "SELECT DISTINCT(user_id) FROM event_push_actions WHERE"
+ " stream_ordering >= ? AND stream_ordering >= ?"
@erikjohnston

erikjohnston Apr 7, 2016

Owner

You might want to switch around some of those equalities?

@erikjohnston

erikjohnston Apr 7, 2016

Owner

Also, event_push_actions doesn't currently have an index on stream ordering? Might want to add a (stream_ordering, user_id) index?

@dbkr

dbkr Apr 7, 2016

Member

Meh, my brain gets much less confused by this way, but sure :)

@dbkr

dbkr Apr 7, 2016

Member

Turns out you were pointing out it was actually broken. Fixed.

dbkr added some commits Apr 7, 2016

@erikjohnston erikjohnston commented on an outdated diff Apr 7, 2016

synapse/storage/pusher.py
- kind=kind,
- app_display_name=app_display_name,
- device_display_name=device_display_name,
- ts=pushkey_ts,
- lang=lang,
- data=encode_canonical_json(data),
- profile_tag=profile_tag,
- id=stream_id,
- ),
- desc="add_pusher",
- )
+ pushkey, pushkey_ts, lang, data, last_stream_ordering,
+ profile_tag=""):
+ def f(txn):
+ txn.call_after(self.get_users_with_pushers_in_room.invalidate_all)
+ with self._pushers_id_gen.get_next() as stream_id:
@erikjohnston

erikjohnston Apr 7, 2016

Owner

This should be done on the main thread (outside the txn)?

@erikjohnston erikjohnston commented on an outdated diff Apr 7, 2016

synapse/storage/pusher.py
- lang=lang,
- data=encode_canonical_json(data),
- profile_tag=profile_tag,
- id=stream_id,
- ),
- desc="add_pusher",
- )
+ pushkey, pushkey_ts, lang, data, last_stream_ordering,
+ profile_tag=""):
+ def f(txn):
+ txn.call_after(self.get_users_with_pushers_in_room.invalidate_all)
+ with self._pushers_id_gen.get_next() as stream_id:
+ return self._simple_upsert_txn(
+ txn,
+ "pushers",
+ dict(
@erikjohnston

erikjohnston Apr 7, 2016

Owner

We tend to prefer dict literals

@erikjohnston erikjohnston commented on the diff Apr 7, 2016

synapse/storage/schema/delta/31/pushers.py
+ FROM pushers
+ """)
+ count = 0
+ for row in cur.fetchall():
+ row = list(row)
+ row[12] = token_to_stream_ordering(row[12])
+ cur.execute(database_engine.convert_param_style("""
+ INSERT into pushers2 (
+ id, user_name, access_token, profile_tag, kind,
+ app_id, app_display_name, device_display_name,
+ pushkey, ts, lang, data, last_stream_ordering, last_success,
+ failing_since
+ ) values (%s)""" % (','.join(['?' for _ in range(len(row))]))),
+ row
+ )
+ count += 1
@erikjohnston

erikjohnston Apr 7, 2016

Owner

Would prefer if this was done as an insert many, but probably doesn't matter in this case.

@dbkr

dbkr Apr 7, 2016

Member

ah yeah. unlikley to be many there though.

dbkr added some commits Apr 7, 2016

@erikjohnston erikjohnston commented on an outdated diff Apr 8, 2016

synapse/push/httppusher.py
- ctx = yield self.get_context_for_event(event)
+ def on_stop(self):
+ if self.timed_call:
+ self.timed_call.cancel()
+
+ @defer.inlineCallbacks
+ def _process(self):
+ try:
+ self.processing = True
+ yield self._unsafe_process()
+ finally:
+ self.processing = False
+
+ @defer.inlineCallbacks
+ def _unsafe_process(self):
+ unprocessed = yield self.store.get_unread_push_actions_for_user_in_range(
@erikjohnston

erikjohnston Apr 8, 2016

Owner

Can you document what _unsafe_process does? Or at least what is unsafe about, what it assumes?

@erikjohnston

erikjohnston Apr 8, 2016

Owner

Should _unsafe_process call itself if self.max_stream_ordering has been updated?

@erikjohnston erikjohnston commented on an outdated diff Apr 8, 2016

synapse/push/httppusher.py
self.data_minus_url = {}
self.data_minus_url.update(self.data)
del self.data_minus_url['url']
@defer.inlineCallbacks
- def _build_notification_dict(self, event, tweaks, badge):
- # we probably do not want to push for every presence update
- # (we may want to be able to set up notifications when specific
- # people sign in, but we'd want to only deliver the pertinent ones)
- # Actually, presence events will not get this far now because we
- # need to filter them out in the main Pusher code.
- if 'event_id' not in event:
- defer.returnValue(None)
+ def on_started(self):
+ yield self._process()
+
+ @defer.inlineCallbacks
+ def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
+ with Measure(self.clock, "push.on_new_notifications"):
+ self.max_stream_ordering = max_stream_ordering
@erikjohnston

erikjohnston Apr 8, 2016

Owner

Might be worth taking the max here to ensure it is monotonic.

dbkr added some commits Apr 8, 2016

Fix invite pushes
 * If the event is an invite event, add the invitee to list of user we run push rules for (if they have a pusher etc)
 * Move invite_for_me to be higher prio than member events otherwise member events matches them
 * Spell override right

@dbkr dbkr referenced this pull request in matrix-org/sytest Apr 8, 2016

Merged

Test unread counts in pushes #224

Member

dbkr commented Apr 8, 2016

I've also fixed broken invite pushing and added a test for it: https://github.com/matrix-org/sytest/compare/dbkr/test_invites_pushed

ptal

@erikjohnston erikjohnston and 1 other commented on an outdated diff Apr 8, 2016

synapse/push/httppusher.py
- ctx = yield self.get_context_for_event(event)
+ def on_stop(self):
+ if self.timed_call:
+ self.timed_call.cancel()
+
+ @defer.inlineCallbacks
+ def _process(self):
+ if self.processing:
+ return
+ try:
+ self.processing = True
+ yield self._unsafe_process()
+ finally:
+ self.processing = False
@erikjohnston

erikjohnston Apr 8, 2016

Owner

Do we want to check if max_stream_ordering has increased and if so call _unsafe_process again?

@dbkr

dbkr Apr 8, 2016

Member

I did this within _unsafe_process

@erikjohnston

erikjohnston Apr 11, 2016

Owner

Oh yes, so you did

@erikjohnston erikjohnston and 1 other commented on an outdated diff Apr 11, 2016

synapse/push/httppusher.py
+ )
+
+ self.failing_since = None
+ yield self.store.update_pusher_failing_since(
+ self.app_id,
+ self.pushkey,
+ self.user_id,
+ self.failing_since
+ )
+ else:
+ logger.info("Push failed: delaying for %ds", self.backoff_delay)
+ self.timed_call = reactor.callLater(self.backoff_delay, self.on_timer)
+ self.backoff_delay = min(self.backoff_delay * 2, self.MAX_BACKOFF_SEC)
+ break
+ if self.max_stream_ordering != starting_max_ordering:
+ self._unsafe_process()
@erikjohnston

erikjohnston Apr 11, 2016

Owner

Might be worth sticking this in a try... finally?

@erikjohnston

erikjohnston Apr 11, 2016

Owner

Will this have a logcontext at this point?

@dbkr

dbkr Apr 11, 2016

Member

I've moved this to _process to try & keep all the try/finally stuff in one place and the actual logic in another. Hopefully this means the logcontexts are fine too since it's now just the same place calling it.

Run unsafe proces in a loop until we've caught up
and wrap unsafe process in a try block
Owner

erikjohnston commented Apr 11, 2016

LGTM

@dbkr dbkr merged commit 2547dff into develop Apr 11, 2016

8 checks passed

Flake8 + Packaging (Commit) Build #378 origin/dbkr/pushers_use_event_actions succeeded in 29 sec
Details
Flake8 + Packaging (Merged PR) Build finished.
Details
Sytest Postgres (Commit) Build #369 origin/dbkr/pushers_use_event_actions succeeded in 5 min 27 sec
Details
Sytest Postgres (Merged PR) Build finished.
Details
Sytest SQLite (Commit) Build #374 origin/dbkr/pushers_use_event_actions succeeded in 4 min 33 sec
Details
Sytest SQLite (Merged PR) Build finished.
Details
Unit Tests (Commit) Build #422 origin/dbkr/pushers_use_event_actions succeeded in 1 min 17 sec
Details
Unit Tests (Merged PR) Build finished.
Details

@richvdh richvdh deleted the dbkr/pushers_use_event_actions branch Dec 1, 2016

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment