Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Make synchrotron accept /events
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Aug 12, 2016
1 parent 866a532 commit 4e1cebd
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 18 deletions.
36 changes: 34 additions & 2 deletions synapse/app/synchrotron.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from synapse.http.server import JsonResource
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
from synapse.rest.client.v2_alpha import sync
from synapse.rest.client.v1 import events
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
Expand Down Expand Up @@ -89,17 +90,23 @@ class SynchrotronSlavedStore(
get_presence_list_accepted = PresenceStore.__dict__[
"get_presence_list_accepted"
]
get_presence_list_observers_accepted = PresenceStore.__dict__[
"get_presence_list_observers_accepted"
]


UPDATE_SYNCING_USERS_MS = 10 * 1000


class SynchrotronPresence(object):
def __init__(self, hs):
self.is_mine_id = hs.is_mine_id
self.http_client = hs.get_simple_http_client()
self.store = hs.get_datastore()
self.user_to_num_current_syncs = {}
self.syncing_users_url = hs.config.worker_replication_url + "/syncing_users"
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()

active_presence = self.store.take_presence_startup_info()
self.user_to_current_state = {
Expand All @@ -124,6 +131,8 @@ def set_state(self, user, state, ignore_status_msg=False):
pass

get_states = PresenceHandler.get_states.__func__
get_state = PresenceHandler.get_state.__func__
_get_interested_parties = PresenceHandler._get_interested_parties.__func__
current_state_for_users = PresenceHandler.current_state_for_users.__func__

@defer.inlineCallbacks
Expand Down Expand Up @@ -194,19 +203,39 @@ def _send_syncing_users_now(self):
self._need_to_send_sync = False
yield self._send_syncing_users_now()

@defer.inlineCallbacks
def notify_from_replication(self, states, stream_id):
parties = yield self._get_interested_parties(
states, calculate_remote_hosts=False
)
room_ids_to_states, users_to_states, _ = parties

self.notifier.on_new_event(
"presence_key", stream_id, rooms=room_ids_to_states.keys(),
users=users_to_states.keys()
)

@defer.inlineCallbacks
def process_replication(self, result):
stream = result.get("presence", {"rows": []})
states = []
for row in stream["rows"]:
(
position, user_id, state, last_active_ts,
last_federation_update_ts, last_user_sync_ts, status_msg,
currently_active
) = row
self.user_to_current_state[user_id] = UserPresenceState(
state = UserPresenceState(
user_id, state, last_active_ts,
last_federation_update_ts, last_user_sync_ts, status_msg,
currently_active
)
self.user_to_current_state[user_id] = state
states.append(state)

if states and "position" in stream:
stream_id = int(stream["position"])
yield self.notify_from_replication(states, stream_id)


class SynchrotronTyping(object):
Expand Down Expand Up @@ -266,10 +295,12 @@ def _listen_http(self, listener_config):
elif name == "client":
resource = JsonResource(self, canonical_json=False)
sync.register_servlets(self, resource)
events.register_servlets(self, resource)
resources.update({
"/_matrix/client/r0": resource,
"/_matrix/client/unstable": resource,
"/_matrix/client/v2_alpha": resource,
"/_matrix/client/api/v1": resource,
})

root_resource = create_resource_tree(resources, Resource())
Expand Down Expand Up @@ -315,6 +346,7 @@ def replicate(self):
def expire_broken_caches():
store.who_forgot_in_room.invalidate_all()
store.get_presence_list_accepted.invalidate_all()
store.get_presence_list_observers_accepted.invalidate_all()

def notify_from_stream(
result, stream_name, stream_key, room=None, user=None
Expand Down Expand Up @@ -392,7 +424,7 @@ def notify(result):
)
yield store.process_replication(result)
typing_handler.process_replication(result)
presence_handler.process_replication(result)
yield presence_handler.process_replication(result)
notify(result)
except:
logger.exception("Error replicating from %r", replication_url)
Expand Down
3 changes: 0 additions & 3 deletions synapse/handlers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
)
from .room_member import RoomMemberHandler
from .message import MessageHandler
from .events import EventStreamHandler, EventHandler
from .federation import FederationHandler
from .profile import ProfileHandler
from .directory import DirectoryHandler
Expand Down Expand Up @@ -53,8 +52,6 @@ def __init__(self, hs):
self.message_handler = MessageHandler(hs)
self.room_creation_handler = RoomCreationHandler(hs)
self.room_member_handler = RoomMemberHandler(hs)
self.event_stream_handler = EventStreamHandler(hs)
self.event_handler = EventHandler(hs)
self.federation_handler = FederationHandler(hs)
self.profile_handler = ProfileHandler(hs)
self.directory_handler = DirectoryHandler(hs)
Expand Down
27 changes: 19 additions & 8 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ def current_state_for_users(self, user_ids):
defer.returnValue(states)

@defer.inlineCallbacks
def _get_interested_parties(self, states):
def _get_interested_parties(self, states, calculate_remote_hosts=True):
"""Given a list of states return which entities (rooms, users, servers)
are interested in the given states.
Expand All @@ -526,14 +526,15 @@ def _get_interested_parties(self, states):
users_to_states.setdefault(state.user_id, []).append(state)

hosts_to_states = {}
for room_id, states in room_ids_to_states.items():
local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
if not local_states:
continue
if calculate_remote_hosts:
for room_id, states in room_ids_to_states.items():
local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
if not local_states:
continue

hosts = yield self.store.get_joined_hosts_for_room(room_id)
for host in hosts:
hosts_to_states.setdefault(host, []).extend(local_states)
hosts = yield self.store.get_joined_hosts_for_room(room_id)
for host in hosts:
hosts_to_states.setdefault(host, []).extend(local_states)

for user_id, states in users_to_states.items():
local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
Expand Down Expand Up @@ -565,6 +566,16 @@ def _persist_and_notify(self, states):

self._push_to_remotes(hosts_to_states)

@defer.inlineCallbacks
def notify_for_states(self, state, stream_id):
parties = yield self._get_interested_parties([state])
room_ids_to_states, users_to_states, hosts_to_states = parties

self.notifier.on_new_event(
"presence_key", stream_id, rooms=room_ids_to_states.keys(),
users=[UserID.from_string(u) for u in users_to_states.keys()]
)

def _push_to_remotes(self, hosts_to_states):
"""Sends state updates to remote servers.
Expand Down
9 changes: 4 additions & 5 deletions synapse/rest/client/v1/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class EventStreamRestServlet(ClientV1RestServlet):

def __init__(self, hs):
super(EventStreamRestServlet, self).__init__(hs)
self.handlers = hs.get_handlers()
self.event_stream_handler = hs.get_event_stream_handler()

@defer.inlineCallbacks
def on_GET(self, request):
Expand All @@ -50,7 +50,6 @@ def on_GET(self, request):
if "room_id" in request.args:
room_id = request.args["room_id"][0]

handler = self.handlers.event_stream_handler
pagin_config = PaginationConfig.from_request(request)
timeout = EventStreamRestServlet.DEFAULT_LONGPOLL_TIME_MS
if "timeout" in request.args:
Expand All @@ -61,7 +60,7 @@ def on_GET(self, request):

as_client_event = "raw" not in request.args

chunk = yield handler.get_stream(
chunk = yield self.event_stream_handler.get_stream(
requester.user.to_string(),
pagin_config,
timeout=timeout,
Expand All @@ -84,12 +83,12 @@ class EventRestServlet(ClientV1RestServlet):
def __init__(self, hs):
super(EventRestServlet, self).__init__(hs)
self.clock = hs.get_clock()
self.event_handler = hs.get_event_handler()

@defer.inlineCallbacks
def on_GET(self, request, event_id):
requester = yield self.auth.get_user_by_req(request)
handler = self.handlers.event_handler
event = yield handler.get_event(requester.user, event_id)
event = yield self.event_handler.get_event(requester.user, event_id)

time_now = self.clock.time_msec()
if event:
Expand Down
9 changes: 9 additions & 0 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from synapse.handlers.room import RoomListHandler
from synapse.handlers.sync import SyncHandler
from synapse.handlers.typing import TypingHandler
from synapse.handlers.events import EventHandler, EventStreamHandler
from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from synapse.notifier import Notifier
Expand Down Expand Up @@ -94,6 +95,8 @@ def build_DEPENDENCY(self)
'auth_handler',
'device_handler',
'e2e_keys_handler',
'event_handler',
'event_stream_handler',
'application_service_api',
'application_service_scheduler',
'application_service_handler',
Expand Down Expand Up @@ -214,6 +217,12 @@ def build_application_service_scheduler(self):
def build_application_service_handler(self):
return ApplicationServicesHandler(self)

def build_event_handler(self):
return EventHandler(self)

def build_event_stream_handler(self):
return EventStreamHandler(self)

def build_event_sources(self):
return EventSources(self)

Expand Down

0 comments on commit 4e1cebd

Please sign in to comment.