Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 56 additions & 12 deletions matrix_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import logging
import sys

logger = logging.getLogger(__name__)


class MatrixClient(object):
"""
Expand Down Expand Up @@ -82,8 +84,6 @@ def __init__(self, base_url, token=None, user_id=None, valid_cert_check=True):
self.sync_token = None
self.sync_filter = None

self.logger = logging.getLogger("matrix_client")

""" Time to wait before attempting a /sync request after failing."""
self.bad_sync_timeout_limit = 60 * 60
self.rooms = {
Expand Down Expand Up @@ -196,14 +196,20 @@ def get_rooms(self):
"""
return self.rooms

def add_listener(self, callback):
def add_listener(self, callback, event_type=None):
""" Add a listener that will send a callback when the client recieves
an event.

Args:
callback (func(roomchunk)): Callback called when an event arrives.
event_type (str): The event_type to filter for.
"""
self.listeners.append(callback)
self.listeners.append(
{
'callback': callback,
'event_type': event_type
}
)

def listen_for_events(self, timeout_ms=30000):
"""Deprecated. sync now pulls events from the request.
Expand All @@ -228,17 +234,17 @@ def listen_forever(self, timeout_ms=30000):
self._sync(timeout_ms)
bad_sync_timeout = 5
except MatrixRequestError as e:
self.logger.warning("A MatrixRequestError occured during sync.")
logger.warning("A MatrixRequestError occured during sync.")
if e.code >= 500:
self.logger.warning("Problem occured serverside. Waiting %i seconds",
bad_sync_timeout)
logger.warning("Problem occured serverside. Waiting %i seconds",
bad_sync_timeout)
sleep(bad_sync_timeout)
bad_sync_timeout = min(bad_sync_timeout * 2,
self.bad_sync_timeout_limit)
else:
raise e
except Exception as e:
self.logger.error("Exception thrown during sync\n %s", str(e))
logger.exception("Exception thrown during sync")

def start_listener_thread(self, timeout_ms=30000):
""" Start a listener thread to listen for events in the background.
Expand All @@ -253,7 +259,7 @@ def start_listener_thread(self, timeout_ms=30000):
thread.start()
except:
e = sys.exc_info()[0]
self.logger.error("Error: unable to start thread. %s", str(e))
logger.error("Error: unable to start thread. %s", str(e))

def upload(self, content, content_type):
""" Upload content to the home server and recieve a MXC url.
Expand Down Expand Up @@ -296,6 +302,13 @@ def _process_state_event(self, state_event, current_room):
elif etype == "m.room.aliases":
current_room.aliases = state_event["content"].get("aliases", None)

for listener in current_room.state_listeners:
if (
listener['event_type'] is None or
listener['event_type'] == state_event['type']
):
listener['callback'](state_event)

def _sync(self, timeout_ms=30000):
# TODO: Deal with presence
# TODO: Deal with left rooms
Expand All @@ -312,6 +325,14 @@ def _sync(self, timeout_ms=30000):
for event in sync_room["timeline"]["events"]:
room._put_event(event)

# Dispatch for client (global) listeners
for listener in self.listeners:
if (
listener['event_type'] is None or
listener['event_type'] == event['type']
):
listener['callback'](event)

def get_user(self, user_id):
""" Return a User by their id.

Expand Down Expand Up @@ -345,6 +366,7 @@ def __init__(self, client, room_id):
self.room_id = room_id
self.client = client
self.listeners = []
self.state_listeners = []
self.events = []
self.event_history_limit = 20
self.name = None
Expand Down Expand Up @@ -387,21 +409,43 @@ def send_image(self, url, name, **imageinfo):
extra_information=imageinfo
)

def add_listener(self, callback):
def add_listener(self, callback, event_type=None):
""" Add a callback handler for events going to this room.

Args:
callback (func(roomchunk)): Callback called when an event arrives.
event_type (str): The event_type to filter for.
"""
self.listeners.append(
{
'callback': callback,
'event_type': event_type
}
)

def add_state_listener(self, callback, event_type=None):
""" Add a callback handler for state events going to this room.

Args:
callback (func(roomchunk)): Callback called when an event arrives.
event_type (str): The event_type to filter for.
"""
self.listeners.append(callback)
self.state_listeners.append(
{
'callback': callback,
'event_type': event_type
}
)

def _put_event(self, event):
self.events.append(event)
if len(self.events) > self.event_history_limit:
self.events.pop(0)

# Dispatch for room-specific listeners
for listener in self.listeners:
listener(self, event)
if listener['event_type'] is None or listener['event_type'] == event['type']:
listener['callback'](self, event)

def get_events(self):
""" Get the most recent events for this room.
Expand Down