From 84a8007504a4c3b52628b9756d9e95e24d774281 Mon Sep 17 00:00:00 2001 From: pik Date: Fri, 18 Nov 2016 18:07:38 -0300 Subject: [PATCH 1/3] Aggregations Backend initial --- synapse/app/homeserver.py | 1 + synapse/handlers/__init__.py | 2 + synapse/handlers/aggregation.py | 295 ++++++++++++++++++ synapse/handlers/message.py | 4 + synapse/rest/__init__.py | 2 + synapse/rest/client/v2_alpha/aggregation.py | 62 ++++ synapse/server.py | 5 + synapse/server.pyi | 3 + synapse/storage/__init__.py | 2 + synapse/storage/aggregation.py | 165 ++++++++++ .../38/create_aggregation_entries_table.sql | 9 + .../38/create_aggregation_tasks_table.sql | 7 + 12 files changed, 557 insertions(+) create mode 100644 synapse/handlers/aggregation.py create mode 100644 synapse/rest/client/v2_alpha/aggregation.py create mode 100644 synapse/storage/aggregation.py create mode 100644 synapse/storage/schema/38/create_aggregation_entries_table.sql create mode 100644 synapse/storage/schema/38/create_aggregation_tasks_table.sql diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 54f35900f81d..ea9d9787db20 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -335,6 +335,7 @@ def start(): hs.get_datastore().start_profiling() hs.get_datastore().start_doing_background_updates() hs.get_replication_layer().start_get_pdu_cache() + hs.get_aggregation_handler().run_aggregation_events() register_memory_metrics(hs) diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index 63d05f25310b..2b6d45ddd45d 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -26,6 +26,7 @@ from .identity import IdentityHandler from .receipts import ReceiptsHandler from .search import SearchHandler +from .aggregation import AggregationHandler class Handlers(object): @@ -60,3 +61,4 @@ def __init__(self, hs): self.identity_handler = IdentityHandler(hs) self.search_handler = SearchHandler(hs) self.room_context_handler = RoomContextHandler(hs) + self.aggregation_handler = AggregationHandler(hs) diff --git a/synapse/handlers/aggregation.py b/synapse/handlers/aggregation.py new file mode 100644 index 000000000000..60c66b1263c5 --- /dev/null +++ b/synapse/handlers/aggregation.py @@ -0,0 +1,295 @@ +import logging +from twisted.internet import defer +from ._base import BaseHandler + +import jsonschema +import re +import json +from itertools import groupby +from collections import defaultdict + +AGGREGATION_TYPE = 'm.room._aggregation' +PRUNE_AGGREGATION_EVENTS = False + +AGGREGATION_SCHEMA = { + 'aggregation_field_names': { + 'type' : 'array', + 'items': { + 'type': 'string' + }, + }, + 'aggregation_event_schema': { + 'type': 'object' + }, + 'aggregation_type': { 'type': 'string', 'oneOf': ['append', 'replace']}, + 'constraints': { + 'type': 'array', + 'items': { + 'type': 'object', + 'op': { + 'type': 'string', + 'oneOf': ['equal', 'greaterThan', 'lessThan', 'lessThanOrEqualTo', 'greaterThanOrEqualTo', 'notEqual'] + }, + 'conditions': { + 'type': 'array', + 'items': { + 'type': 'string' + } + } + } + }, + 'aggregation_event_name' : { 'type': 'string' }, + 'type' : 'object', + 'required': ['aggregation_field_names', 'aggregation_event_schema', 'aggregation_type', 'aggregation_event_name'] +} + +class AggregationTask: + def __init__(self, store, room_id, aggregation_spec): + self.store = store + self.room_id = room_id + self.aggregation_spec = aggregation_spec + self.constraints = aggregation_spec.get('constraints', []) + self.aggregation_field_names = aggregation_spec['aggregation_field_names'] + self.aggregation_type = aggregation_spec['aggregation_type'] + self.aggregation_event_name = aggregation_spec['aggregation_event_name'] + self.aggregation_event_schema = aggregation_spec['aggregation_event_schema'] + + def interpolate_params(params, event, target, user): + param_literals = [] + for param in params: + if param.startswith('$user'): + param_literal = user + elif param.startswith('$self'): + param_literal = event + elif param.startswith('$target'): + param_literal = target + paths = param.split('.') + for path in paths[1:]: + param_literal = param_literal.get(path) or param_literal.__dict__.get(path) + param_literals.append(param_literal) + return param_literals + + def check_aggregation_event_constraints(event, user, group): + for constraint in self.constraints: + param_literals = self.interpolate_params(constraint['params'], event, user, group) + check_constraint(constraint['op'], param_literals) + + def content_for_aggregate_replace(self, group, target): + for event in reversed(group): + event_content = event['content'] + try: + jsonschema.validate( + self.aggregation_event_schema, + event_content + ) + except jsonschema.ValidationError: + logger.warn('Invalid Schema: Skipping Aggregation for Event %s' % event['event_id']) + continue + aggregate_entry = { field_name : content[field_name] for field_name in self.aggregation_field_names } + aggregate_entry['event_id'] = event['event_id'] + return aggregate_entry + + def content_for_aggregate_append(self, group, target): + aggregate_entries = [] + # import pdb; pdb.set_trace() + for event in group: + event_content = event['content'] + try: + jsonschema.validate( + self.aggregation_event_schema, + event_content + ) + except jsonschema.ValidationError: + logger.warn('Invalid Schema: Skipping Aggregation for Event %s' % event['event_id']) + continue + aggregate_entry = { field_name : event_content[field_name] for field_name in self.aggregation_field_names } + aggregate_entry['event_id'] = event['event_id'] + aggregate_entry['sender'] = event['sender'] + aggregate_entries.append(aggregate_entry) + return aggregate_entries + + @defer.inlineCallbacks + def run(self, events): + def get_aggregation_event_target(event): + # Although content is a JSON blob it's always stored as Text + # Would be nicer to cast this with the psycopg cursor tracer than + # here.. + content = event.get('content') + if content and not isinstance(content, dict): + event['content'] = content = json.loads(content) + return content.get('target_id') + else: + event['content'] = {} + + event_groups = groupby(events, get_aggregation_event_target) + backlog = [] + for (target_id, group) in event_groups: + target_event = yield self.store.get_event(target_id, check_redacted=False, get_prev_content=False, allow_rejected=False, allow_none=True) + if not target_event: + # TODO backlogging + backlog.push((target_id, group)) + + if self.aggregation_type == 'replace': + aggregate_entry = self.content_for_aggregate_replace(group, target_event) + # Don't bother writing to DB if all entries were invalid + if aggregate_entry: + self.store.replace_aggregate_entry( + self.room_id, target_id, + self.aggregation_event_name, + aggregate_entry['event_id'], aggregate_entry + ) + + elif self.aggregation_type == 'append': + aggregate_entries = self.content_for_aggregate_append(group, target_event) + # Don't bother writing to DB if all entries were invalid + if len(aggregate_entries): + latest_event_id = max(entry['event_id'] for entry in aggregate_entries) + + self.store.append_aggregate_entries( + self.room_id, target_id, + self.aggregation_event_name, + latest_event_id, aggregate_entries + ) + # Pruning events is not atomic with updating aggregation_entries + # But since the client will always receive some unaggregated events + # It is up to them to check latest_event_id on the aggregation_entry + # for a target + if PRUNE_AGGREGATION_EVENTS: + ids_to_prune = [event.get('stream_ordering') for event in group] + sql = ''' + DELETE FROM events WHERE stream_ordering is ANY(%s) + ''' + yield self.store.runInteraction( + 'prune_aggregation_events', + self._simple_run_txn, + sql, (ids_to_prune) + ) + +class AggregationHandler(BaseHandler): + BACKGROUND_UPDATE_INTERVAL_MS = 5000 + BACKGROUND_UPDATE_DURATION_MS = 100 # UNUSED + + def __init__(self, hs): + super(AggregationHandler, self).__init__(hs) + self.pending_events = defaultdict(list) + self.clock = hs.get_clock() + + @defer.inlineCallbacks + def process_aggregation_events(self, desired_duration_ms): + sql = ''' + SELECT MAX(latest_event_id) AS latest_event_id, event_name FROM aggregation_entries + GROUP BY event_name; + ''' + latest_entries = yield self.store.runInteraction( + 'get_latest_aggregation_entries', + self.store._simple_select_txn, + sql + ) + # Convert to a hash for easy lookup + latest_entries = { row['event_name'] : row['latest_event_id'] \ + for row in latest_entries } + + sql = ''' + SELECT MAX(event_id) AS event_id, type, room_id FROM events WHERE type LIKE 'm.room._aggregation%' GROUP BY type, room_id; + ''' + + latest_aggregation_events = yield self.store.runInteraction( + 'get_latest_aggregation_events', + self.store._simple_select_txn, + sql + ) + + needs_catchup = [] + for event in latest_aggregation_events: + event_type = event['type'] + # Doesn't need catchup only if latest_event_id in aggregate entries + # is the same as the event table event_id + if not event_type in latest_entries: + # '$0' as a floor for comparing event_id strings + event['latest_event_id'] = '$0' + needs_catchup.append(event) + elif latest_entries[event_type] != event['event_id']: + event['latest_event_id'] = latest_entries[event_type] + needs_catchup.append(event) + + sql = ''' + SELECT * FROM events WHERE type = '%s' AND event_id > '%s' + ''' + for entry in needs_catchup: + params = (entry['type'], entry['latest_event_id']) + + events_for_aggregation = yield self.store.runInteraction( + 'get_events_for_aggregation', + self.store._simple_select_txn, + sql, params + ) + + + if not len(events_for_aggregation): + continue + task = yield self.get_task_for_event(entry['room_id'], entry['type']) + # import pdb; pdb.set_trace() + task.run(events_for_aggregation) + + def get_aggregation_key(self, event): + if event.type.startswith('m.room._aggregation'): + return (event.room_id, event.type) + + def is_aggregation_event(self, event): + if event.type.startswith('m.room._aggregation'): + return True + + # Currently unused + def on_new_event(self, event, _context): + aggregation_key = self.get_aggregation_key(event) + if aggregation_key: + self.pending_events[aggregation_key].append((event)) + # if self.is_aggregation_event(event): + # self.pending_events.append(event) + + @defer.inlineCallbacks + def run_aggregation_events(self): + while True: + sleep = defer.Deferred() + self._background_update_timer = self.clock.call_later( + self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback, None + ) + try: + yield sleep + finally: + self._background_update_timer = None + + yield self.process_aggregation_events(self.BACKGROUND_UPDATE_DURATION_MS) + + # @defer.inlineCallbacks + # def process_aggregation_events(self, desired_duration_ms): + # for ((room_id, aggregation_event_name), event_group) in self.pending_events.items(): + # task = yield self.get_task_for_event(room_id, aggregation_event_name) + # self.pending_events[(room_id, aggregation_event_name)] = [] + # task.run(event_group) + # event = self.pending_events.pop_left() + # task = yield self.get_task_for_event(event.room_id, event.type) + # task.run(event) + + + @defer.inlineCallbacks + def get_task_for_event(self, room_id, aggregation_event_name): + aggregation_info = (yield self.store.get_aggregation_tasks(room_id, aggregation_event_name))[0] + defer.returnValue(AggregationTask( + self.store, + aggregation_info['room_id'], + aggregation_info['aggregation_spec'] + )) + + def upsert_aggregation(self, room_id, aggregation_spec): + return self.store.upsert_aggregation(room_id, aggregation_spec) + + def validate(self, aggregation_event_spec): + try: + jsonschema.validate(aggregation_event_spec, AGGREGATION_SCHEMA) + return True + except: + return False + + def is_room_creator(self, user, room_id): + return self.store.is_room_creator(user, room_id) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index abfa8c65a472..53c17bbdd31a 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -574,6 +574,10 @@ def is_inviter_member_event(e): event, context ) + # try: + self.hs.get_aggregation_handler().on_new_event(event, context) + # except Exception: + (event_stream_id, max_stream_id) = yield self.store.persist_event( event, context=context ) diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py index f9f5a3e0771c..ea7faa9d85ee 100644 --- a/synapse/rest/__init__.py +++ b/synapse/rest/__init__.py @@ -50,6 +50,7 @@ devices, thirdparty, sendtodevice, + aggregation ) from synapse.http.server import JsonResource @@ -98,3 +99,4 @@ def register_servlets(client_resource, hs): devices.register_servlets(hs, client_resource) thirdparty.register_servlets(hs, client_resource) sendtodevice.register_servlets(hs, client_resource) + aggregation.register_servlets(hs, client_resource) diff --git a/synapse/rest/client/v2_alpha/aggregation.py b/synapse/rest/client/v2_alpha/aggregation.py new file mode 100644 index 000000000000..f25a079bdc75 --- /dev/null +++ b/synapse/rest/client/v2_alpha/aggregation.py @@ -0,0 +1,62 @@ +from ._base import client_v2_patterns +from twisted.internet import defer +from synapse.http.servlet import RestServlet, parse_json_object_from_request +from synapse.api.errors import AuthError, SynapseError, StoreError, Codes +import logging + +class AggregationRestServlet(RestServlet): + + PATTERNS = client_v2_patterns("/room/(?P[^/]+)/aggregation$") + + def __init__(self, hs): + super(AggregationRestServlet, self).__init__() + self.auth = hs.get_auth() + self.sync_handler = hs.get_sync_handler() + self.clock = hs.get_clock() + self.filtering = hs.get_filtering() + self.presence_handler = hs.get_presence_handler() + self.aggregation_handler = hs.get_handlers().aggregation_handler + + @defer.inlineCallbacks + def on_GET(self, request): + requester = yield self.auth.get_user_by_req(request, allow_guest=False) + yield self.handler.get_aggregations(room_id) + + @defer.inlineCallbacks + def on_POST(self, request, room_id): + ''' + Available Special (Interpolated values) are: $user $target $self + $user - the message author + $target - event specified by target_id: in the message + $self - the message body + + Example Post Body: + { + 'aggregation_field_names': ['emoticon'], + 'aggregation_event_name': 'm.room.experimental.emoticon', + 'aggregation_type': 'append', + 'aggregation_event_schema': { + 'type': 'object', + 'emoticon': { 'type': 'string' } + required: ['emoticon'] + }, + 'constraints': [], + } + ''' + requester = yield self.auth.get_user_by_req(request, allow_guest=False) + + aggregation_spec = register_json = parse_json_object_from_request(request) + + is_room_creator = yield self.aggregation_handler.is_room_creator(requester.user, room_id) + + if not is_room_creator: + raise AuthError(403, 'Only Room Creator Can Modify Aggregations') + + if not self.aggregation_handler.validate(aggregation_spec): + raise SynapseError(400, 'Invalid Aggregation Event Spec') + + self.aggregation_handler.upsert_aggregation(room_id, aggregation_spec) + defer.returnValue((200, {})) + +def register_servlets(hs, http_server): + AggregationRestServlet(hs).register(http_server) diff --git a/synapse/server.py b/synapse/server.py index 374124a147fd..c1831a699878 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -55,6 +55,8 @@ from synapse.util import Clock from synapse.util.distributor import Distributor +from synapse.handlers.aggregation import AggregationHandler + logger = logging.getLogger(__name__) @@ -124,6 +126,7 @@ def build_DEPENDENCY(self) 'http_client_context_factory', 'simple_http_client', 'media_repository', + 'aggregation_handler' ] def __init__(self, hostname, **kwargs): @@ -268,6 +271,8 @@ def build_media_repository(self): def remove_pusher(self, app_id, push_key, user_id): return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) + def build_aggregation_handler(self): + return AggregationHandler(self) def _make_dependency_method(depname): def _get(hs): diff --git a/synapse/server.pyi b/synapse/server.pyi index 9570df5537f7..a9386e515702 100644 --- a/synapse/server.pyi +++ b/synapse/server.pyi @@ -27,3 +27,6 @@ class HomeServer(object): def get_state_handler(self) -> synapse.state.StateHandler: pass + + def get_aggregation_handler(self) -> synapse.handlers.aggregation.AggregationHandler: + pass diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 9996f195a0a4..c180c303c3fa 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -37,6 +37,7 @@ from .rejections import RejectionsStore from .event_push_actions import EventPushActionsStore from .deviceinbox import DeviceInboxStore +from .aggregation import AggregationStore from .state import StateStore from .signatures import SignatureStore @@ -86,6 +87,7 @@ class DataStore(RoomMemberStore, RoomStore, ClientIpStore, DeviceStore, DeviceInboxStore, + AggregationStore ): def __init__(self, db_conn, hs): diff --git a/synapse/storage/aggregation.py b/synapse/storage/aggregation.py new file mode 100644 index 000000000000..c9568ed6e961 --- /dev/null +++ b/synapse/storage/aggregation.py @@ -0,0 +1,165 @@ +from twisted.internet import defer + +from synapse.storage import background_updates +import json + +class AggregationStore(background_updates.BackgroundUpdateStore): + def __init__(self, hs): + super(AggregationStore, self).__init__(hs) + + @defer.inlineCallbacks + def is_room_creator(self, user, room_id): + result = yield self._simple_select_one_onecol( + table="rooms", + keyvalues={ + "creator": user.to_string(), + "room_id": room_id + }, + retcol="creator", + allow_none=True, + desc="is_room_creator", + ) + defer.returnValue(True if result else False) + + @defer.inlineCallbacks + def upsert_aggregation(self, room_id, aggregation_spec): + result = yield self._simple_upsert('aggregation_tasks', + { + 'aggregation_event_name': aggregation_spec['aggregation_event_name'], + 'room_id': room_id + }, + { + 'aggregation_spec': json.dumps(aggregation_spec) + }, + desc='upsert_aggregation' + ) + defer.returnValue(result) + + @defer.inlineCallbacks + def get_aggregation_tasks(self, room_id=None, event_name=None): + where_params = {} + if room_id: + where_params['room_id'] = room_id + if event_name: + where_params['aggregation_event_name'] = event_name + result = yield self._simple_select_list( + 'aggregation_tasks', + where_params, + ('room_id', 'aggregation_event_name', 'aggregation_spec'), + desc="get_aggregation_for_room" + ) + defer.returnValue(result) + + # @defer.inlineCallbacks + # def replace_aggregate_entry(self, room_id, target_id, event_name, aggregate_entry): + # sql = ''' + # INSERT INTO aggregation_entries(room_id, target_id) + # VALUES(%s, %s) + # ON CONFLICT DO NOTHING; + # UPDATE aggregation_entries + # SET aggregation_data=jsonb_set( + # aggregation_data, '{{{event_name}}}', %s + # ) + # WHERE target_id=%s'''.format(event_name=event_name) + # params = (room_id, json.dumps(aggregate_entry), target_id) + # return self.runInteraction( + # 'replace_aggrregate_entry', + # self._simple_run_txn, + # sql, params, + # ) + + def replace_aggregate_entry(self, room_id, target_id, event_name, latest_event_id, aggregate_entry): + sql = ''' + INSERT INTO aggregation_entries( + room_id, + target_id, + event_name, + latest_event_id, + aggregation_data + ) + VALUES (%s, %s, %s, %s, %s) ON CONFLICT UPDATE + ''' + params = ( + room_id, target_id, + event_name, latest_event_id, + json.dumps(aggregate_entry) + ) + return self.runInteraction( + 'replace_aggrregate_entry', + self._simple_run_txn, + sql, params, + ) + + @staticmethod + def _simple_run_txn(txn, sql, params): + return txn.execute(sql, params) + + @classmethod + def _simple_select_txn(cls, txn, sql, params=()): + try: + if len(params): + sql = sql % params + txn.execute(sql) + except Exception as e: + import traceback, sys + traceback.print_exc(file=sys.stdout) + return cls.cursor_to_dict(txn) + + # def append_aggregate_entries(self, room_id, target_id, event_name, aggregate_entries): + # sql = ''' + # INSERT INTO aggregation_entries(room_id, target_id) + # VALUES(%s, %s) + # ON CONFLICT DO NOTHING; + # UPDATE aggregation_entries + # SET aggregation_data=jsonb_set( + # aggregation_data, + # '{{{event_name}}}', + # to_jsonb( + # ARRAY(SELECT jsonb_array_elements_text( + # aggregation_data->'{event_name}' + # )) || %s::text[] + # ) + # ) + # WHERE target_id=%s + # '''.format(event_name=event_name) + # params = ( + # room_id, + # target_id, + # [json.dumps(entry) for entry in aggregate_entries], + # target_id + # ) + # return self.runInteraction( + # 'append_aggregate_entries', + # self._simple_run_txn, + # sql, params, + # ) + + def append_aggregate_entries(self, room_id, target_id, event_name, latest_event_id, aggregate_entries): + sql = ''' + INSERT INTO aggregation_entries(room_id, target_id, event_name, latest_event_id) + VALUES(%s, %s, %s, %s) + ON CONFLICT DO NOTHING; + UPDATE aggregation_entries + SET latest_event_id=%s, + aggregation_data=to_jsonb( + ARRAY(SELECT jsonb_array_elements_text( + aggregation_data + )) || %s::text[] + ) + WHERE target_id=%s + ''' + params = ( + room_id, + target_id, + event_name, + latest_event_id, + latest_event_id, + [json.dumps(entry) for entry in aggregate_entries], + target_id + ) + + return self.runInteraction( + 'append_aggregate_entries', + self._simple_run_txn, + sql, params, + ) diff --git a/synapse/storage/schema/38/create_aggregation_entries_table.sql b/synapse/storage/schema/38/create_aggregation_entries_table.sql new file mode 100644 index 000000000000..21177067790e --- /dev/null +++ b/synapse/storage/schema/38/create_aggregation_entries_table.sql @@ -0,0 +1,9 @@ +CREATE TABLE IF NOT EXISTS aggregation_entries( + target_id TEXT NOT NULL, + room_id TEXT NOT NULL, + event_name TEXT NOT NULL, + latest_event_id TEXT NOT NULL, + aggregation_data JSONB NOT NULL DEFAULT jsonb('[]') +); + +CREATE UNIQUE INDEX aggregation_entries_target_id_event_name ON aggregation_entries(target_id, event_name); diff --git a/synapse/storage/schema/38/create_aggregation_tasks_table.sql b/synapse/storage/schema/38/create_aggregation_tasks_table.sql new file mode 100644 index 000000000000..86f98c118d4b --- /dev/null +++ b/synapse/storage/schema/38/create_aggregation_tasks_table.sql @@ -0,0 +1,7 @@ +CREATE TABLE IF NOT EXISTS aggregation_tasks( + room_id TEXT NOT NULL, + aggregation_event_name TEXT NOT NULL, + aggregation_spec JSONB NOT NULL +); + +CREATE INDEX aggregation_tasks_room_id ON aggregation_tasks(room_id); From c7edc8f8d8a6cafa8cf93258714546fe4bc16738 Mon Sep 17 00:00:00 2001 From: pik Date: Sun, 20 Nov 2016 10:13:55 -0300 Subject: [PATCH 2/3] Merge aggregation_data into events in storage.events methods * API now returns aggregation_data * removed unused code * Note that aggregation_entries is now one per (event_type, target_id) rather than one per (target_id) - so they are manually merged server-side to yield a single aggregation_data blob on the event. * TODO: make aggregation code less invasive on Synapse structure --- synapse/events/__init__.py | 7 +++-- synapse/events/utils.py | 3 ++ synapse/handlers/aggregation.py | 13 +++++++-- synapse/rest/client/v1/events.py | 1 - synapse/storage/aggregation.py | 47 -------------------------------- synapse/storage/events.py | 45 +++++++++++++++++++++++++++--- 6 files changed, 58 insertions(+), 58 deletions(-) diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index bcb8f33a5899..a3e731b3b586 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -56,7 +56,7 @@ def delete(self): class EventBase(object): def __init__(self, event_dict, signatures={}, unsigned={}, - internal_metadata_dict={}, rejected_reason=None): + internal_metadata_dict={}, aggregation_data=None, rejected_reason=None): self.signatures = signatures self.unsigned = unsigned self.rejected_reason = rejected_reason @@ -66,6 +66,7 @@ def __init__(self, event_dict, signatures={}, unsigned={}, self.internal_metadata = _EventInternalMetadata( internal_metadata_dict ) + self.aggregation_data = aggregation_data auth_events = _event_dict_property("auth_events") depth = _event_dict_property("depth") @@ -132,9 +133,8 @@ def items(self): class FrozenEvent(EventBase): - def __init__(self, event_dict, internal_metadata_dict={}, rejected_reason=None): + def __init__(self, event_dict, internal_metadata_dict={}, aggregation_data=None, rejected_reason=None): event_dict = dict(event_dict) - # Signatures is a dict of dicts, and this is faster than doing a # copy.deepcopy signatures = { @@ -159,6 +159,7 @@ def __init__(self, event_dict, internal_metadata_dict={}, rejected_reason=None): unsigned=unsigned, internal_metadata_dict=internal_metadata_dict, rejected_reason=rejected_reason, + aggregation_data=aggregation_data, ) @staticmethod diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 0e9fd902af69..5a24111343b2 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -157,6 +157,9 @@ def serialize_event(e, time_now_ms, as_client_event=True, event_format=event_format ) + if "aggregation_data" in e.__dict__: + d["aggregation_data"] = e.aggregation_data + if token_id is not None: if token_id == getattr(e.internal_metadata, "token_id", None): txn_id = getattr(e.internal_metadata, "txn_id", None) diff --git a/synapse/handlers/aggregation.py b/synapse/handlers/aggregation.py index 60c66b1263c5..8b419a10e200 100644 --- a/synapse/handlers/aggregation.py +++ b/synapse/handlers/aggregation.py @@ -8,6 +8,8 @@ from itertools import groupby from collections import defaultdict +logger = logging.getLogger(__name__) + AGGREGATION_TYPE = 'm.room._aggregation' PRUNE_AGGREGATION_EVENTS = False @@ -91,7 +93,6 @@ def content_for_aggregate_replace(self, group, target): def content_for_aggregate_append(self, group, target): aggregate_entries = [] - # import pdb; pdb.set_trace() for event in group: event_content = event['content'] try: @@ -228,7 +229,8 @@ def process_aggregation_events(self, desired_duration_ms): if not len(events_for_aggregation): continue task = yield self.get_task_for_event(entry['room_id'], entry['type']) - # import pdb; pdb.set_trace() + if not task: + continue task.run(events_for_aggregation) def get_aggregation_key(self, event): @@ -274,7 +276,12 @@ def run_aggregation_events(self): @defer.inlineCallbacks def get_task_for_event(self, room_id, aggregation_event_name): - aggregation_info = (yield self.store.get_aggregation_tasks(room_id, aggregation_event_name))[0] + try: + aggregation_info = (yield self.store.get_aggregation_tasks(room_id, aggregation_event_name))[0] + except IndexError: + logger.warn('Could not find task for (room_id, aggregation_event_type): (%s, %s)', + room_id, aggregation_event_name) + defer.returnValue(None) defer.returnValue(AggregationTask( self.store, aggregation_info['room_id'], diff --git a/synapse/rest/client/v1/events.py b/synapse/rest/client/v1/events.py index 701b6f549ba5..cc8bad7e0ab1 100644 --- a/synapse/rest/client/v1/events.py +++ b/synapse/rest/client/v1/events.py @@ -96,7 +96,6 @@ def on_GET(self, request, event_id): else: defer.returnValue((404, "Event not found.")) - def register_servlets(hs, http_server): EventStreamRestServlet(hs).register(http_server) EventRestServlet(hs).register(http_server) diff --git a/synapse/storage/aggregation.py b/synapse/storage/aggregation.py index c9568ed6e961..57d7c90131f8 100644 --- a/synapse/storage/aggregation.py +++ b/synapse/storage/aggregation.py @@ -50,24 +50,6 @@ def get_aggregation_tasks(self, room_id=None, event_name=None): ) defer.returnValue(result) - # @defer.inlineCallbacks - # def replace_aggregate_entry(self, room_id, target_id, event_name, aggregate_entry): - # sql = ''' - # INSERT INTO aggregation_entries(room_id, target_id) - # VALUES(%s, %s) - # ON CONFLICT DO NOTHING; - # UPDATE aggregation_entries - # SET aggregation_data=jsonb_set( - # aggregation_data, '{{{event_name}}}', %s - # ) - # WHERE target_id=%s'''.format(event_name=event_name) - # params = (room_id, json.dumps(aggregate_entry), target_id) - # return self.runInteraction( - # 'replace_aggrregate_entry', - # self._simple_run_txn, - # sql, params, - # ) - def replace_aggregate_entry(self, room_id, target_id, event_name, latest_event_id, aggregate_entry): sql = ''' INSERT INTO aggregation_entries( @@ -105,35 +87,6 @@ def _simple_select_txn(cls, txn, sql, params=()): traceback.print_exc(file=sys.stdout) return cls.cursor_to_dict(txn) - # def append_aggregate_entries(self, room_id, target_id, event_name, aggregate_entries): - # sql = ''' - # INSERT INTO aggregation_entries(room_id, target_id) - # VALUES(%s, %s) - # ON CONFLICT DO NOTHING; - # UPDATE aggregation_entries - # SET aggregation_data=jsonb_set( - # aggregation_data, - # '{{{event_name}}}', - # to_jsonb( - # ARRAY(SELECT jsonb_array_elements_text( - # aggregation_data->'{event_name}' - # )) || %s::text[] - # ) - # ) - # WHERE target_id=%s - # '''.format(event_name=event_name) - # params = ( - # room_id, - # target_id, - # [json.dumps(entry) for entry in aggregate_entries], - # target_id - # ) - # return self.runInteraction( - # 'append_aggregate_entries', - # self._simple_run_txn, - # sql, params, - # ) - def append_aggregate_entries(self, room_id, target_id, event_name, latest_event_id, aggregate_entries): sql = ''' INSERT INTO aggregation_entries(room_id, target_id, event_name, latest_event_id) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 49aeb953bd01..f1da003e6150 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -839,15 +839,20 @@ def _add_to_cache(self, txn, events_and_contexts): "SELECT " " e.event_id as event_id, " " r.redacts as redacts," - " rej.event_id as rejects " + " rej.event_id as rejects, " + " agg.event_name as aggregation_event_type," + " agg.aggregation_data as aggregation_data," + " agg.latest_event_id as latest_aggregation_event_id " " FROM events as e" " LEFT JOIN rejections as rej USING (event_id)" " LEFT JOIN redactions as r ON e.event_id = r.redacts" + " LEFT JOIN aggregation_entries AS agg ON e.event_id = agg.target_id" " WHERE e.event_id IN (%s)" ) % (",".join(["?"] * len(ev_map)),) txn.execute(sql, ev_map.keys()) rows = self.cursor_to_dict(txn) + rows = self._fold_aggregation_data(rows) for row in rows: event = ev_map[row["event_id"]] if not row["rejects"] and not row["redacts"]: @@ -1093,6 +1098,7 @@ def _enqueue_events(self, events, check_redacted=True, allow_rejected=False): [ preserve_fn(self._get_event_from_row)( row["internal_metadata"], row["json"], row["redacts"], + aggregation_data=row["aggregation_data"], rejected_reason=row["rejects"], ) for row in rows @@ -1105,6 +1111,32 @@ def _enqueue_events(self, events, check_redacted=True, allow_rejected=False): for e in res if e }) + # Aggregation_entries contains an entry per (aggregation_event_type, target_id) + # This means that on a JOIN the same event row will be returned once per corresponding + # aggregation entry, fold the aggregation_entries into a single key-value namespaced + # JSONB + def _fold_aggregation_data(self, rows): + rows_dict = {} + for row in rows: + try: + aggregation_data = row.pop('aggregation_data') + latest_event_id = row.pop('latest_aggregation_event_id') + aggregation_event_type = row.pop('aggregation_event_type') + aggregation_data = { 'aggregation_event_type': { + 'aggregation_data': aggregation_data, + 'latest_event_id': latest_event_id + } + } + unified_row = rows_dict.get(row['event_id']) + if unified_row: + unified_row['aggregation_data'].update(aggregation_data) + else: + row['aggregation_data'] = aggregation_data + rows_dict[row['event_id']] = row + except KeyError: + rows_dict[row['event_id']] = row + return rows_dict.values() + def _fetch_event_rows(self, txn, events): rows = [] N = 200 @@ -1119,20 +1151,24 @@ def _fetch_event_rows(self, txn, events): " e.internal_metadata," " e.json," " r.redacts as redacts," - " rej.event_id as rejects " + " rej.event_id as rejects," + " agg.event_name as aggregation_event_type," + " agg.aggregation_data as aggregation_data," + " agg.latest_event_id as latest_aggregation_event_id " " FROM event_json as e" " LEFT JOIN rejections as rej USING (event_id)" " LEFT JOIN redactions as r ON e.event_id = r.redacts" + " LEFT JOIN aggregation_entries AS agg ON e.event_id = agg.target_id" " WHERE e.event_id IN (%s)" ) % (",".join(["?"] * len(evs)),) txn.execute(sql, evs) rows.extend(self.cursor_to_dict(txn)) - + rows = self._fold_aggregation_data(rows) return rows @defer.inlineCallbacks - def _get_event_from_row(self, internal_metadata, js, redacted, + def _get_event_from_row(self, internal_metadata, js, redacted, aggregation_data=None, rejected_reason=None): with Measure(self._clock, "_get_event_from_row"): d = json.loads(js) @@ -1149,6 +1185,7 @@ def _get_event_from_row(self, internal_metadata, js, redacted, original_ev = FrozenEvent( d, internal_metadata_dict=internal_metadata, + aggregation_data=aggregation_data, rejected_reason=rejected_reason, ) From 673b1d182ce52730a68b00647fbd22a4b7c159fa Mon Sep 17 00:00:00 2001 From: pik Date: Mon, 21 Nov 2016 14:53:37 -0300 Subject: [PATCH 3/3] Fix typo in _fold_aggregation_data --- synapse/storage/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index f1da003e6150..c2559c7622ef 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1122,7 +1122,7 @@ def _fold_aggregation_data(self, rows): aggregation_data = row.pop('aggregation_data') latest_event_id = row.pop('latest_aggregation_event_id') aggregation_event_type = row.pop('aggregation_event_type') - aggregation_data = { 'aggregation_event_type': { + aggregation_data = { aggregation_event_type: { 'aggregation_data': aggregation_data, 'latest_event_id': latest_event_id }