diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 3457402596e2..081e7cce5998 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -35,7 +35,7 @@ from synapse.server import HomeServer -from twisted.internet import reactor, task, defer +from twisted.internet import reactor, defer from twisted.application import service from twisted.web.resource import Resource, EncodingResourceWrapper from twisted.web.static import File @@ -53,7 +53,7 @@ from synapse.config.homeserver import HomeServerConfig from synapse.crypto import context_factory from synapse.util.logcontext import LoggingContext, PreserveLoggingContext -from synapse.metrics import register_memory_metrics, get_metrics_for +from synapse.metrics import register_memory_metrics from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory from synapse.federation.transport.server import TransportLayerServer @@ -398,7 +398,8 @@ def profiled(*args, **kargs): ThreadPool._worker = profile(ThreadPool._worker) reactor.run = profile(reactor.run) - start_time = hs.get_clock().time() + clock = hs.get_clock() + start_time = clock.time() stats = {} @@ -410,41 +411,23 @@ def phone_stats_home(): if uptime < 0: uptime = 0 - # If the stats directory is empty then this is the first time we've - # reported stats. - first_time = not stats - stats["homeserver"] = hs.config.server_name stats["timestamp"] = now stats["uptime_seconds"] = uptime stats["total_users"] = yield hs.get_datastore().count_all_users() + total_nonbridged_users = yield hs.get_datastore().count_nonbridged_users() + stats["total_nonbridged_users"] = total_nonbridged_users + room_count = yield hs.get_datastore().get_room_count() stats["total_room_count"] = room_count stats["daily_active_users"] = yield hs.get_datastore().count_daily_users() - daily_messages = yield hs.get_datastore().count_daily_messages() - if daily_messages is not None: - stats["daily_messages"] = daily_messages - else: - stats.pop("daily_messages", None) - - if first_time: - # Add callbacks to report the synapse stats as metrics whenever - # prometheus requests them, typically every 30s. - # As some of the stats are expensive to calculate we only update - # them when synapse phones home to matrix.org every 24 hours. - metrics = get_metrics_for("synapse.usage") - metrics.add_callback("timestamp", lambda: stats["timestamp"]) - metrics.add_callback("uptime_seconds", lambda: stats["uptime_seconds"]) - metrics.add_callback("total_users", lambda: stats["total_users"]) - metrics.add_callback("total_room_count", lambda: stats["total_room_count"]) - metrics.add_callback( - "daily_active_users", lambda: stats["daily_active_users"] - ) - metrics.add_callback( - "daily_messages", lambda: stats.get("daily_messages", 0) - ) + stats["daily_active_rooms"] = yield hs.get_datastore().count_daily_active_rooms() + stats["daily_messages"] = yield hs.get_datastore().count_daily_messages() + + daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages() + stats["daily_sent_messages"] = daily_sent_messages logger.info("Reporting stats to matrix.org: %s" % (stats,)) try: @@ -456,9 +439,12 @@ def phone_stats_home(): logger.warn("Error reporting stats: %s", e) if hs.config.report_stats: - phone_home_task = task.LoopingCall(phone_stats_home) - logger.info("Scheduling stats reporting for 24 hour intervals") - phone_home_task.start(60 * 60 * 24, now=False) + logger.info("Scheduling stats reporting for 3 hour intervals") + clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000) + + # We wait 5 minutes to send the first set of stats as the server can + # be quite busy the first few minutes + clock.call_later(5 * 60, phone_stats_home) def in_thread(): # Uncomment to enable tracing of log context changes. diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 5e72985cdaf0..f119c5a758d4 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -246,7 +246,7 @@ def __init__(self, db_conn, hs): cur.close() self.find_stream_orderings_looping_call = self._clock.looping_call( - self._find_stream_orderings_for_times, 60 * 60 * 1000 + self._find_stream_orderings_for_times, 10 * 60 * 1000 ) self._stream_order_on_start = self.get_room_max_stream_ordering() @@ -287,17 +287,19 @@ def count_daily_users(self): Counts the number of users who used this homeserver in the last 24 hours. """ def _count_users(txn): - txn.execute( - "SELECT COUNT(DISTINCT user_id) AS users" - " FROM user_ips" - " WHERE last_seen > ?", - # This is close enough to a day for our purposes. - (int(self._clock.time_msec()) - (1000 * 60 * 60 * 24),) - ) - rows = self.cursor_to_dict(txn) - if rows: - return rows[0]["users"] - return 0 + yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24), + + sql = """ + SELECT COALESCE(count(*), 0) FROM ( + SELECT user_id FROM user_ips + WHERE last_seen > ? + GROUP BY user_id + ) u + """ + + txn.execute(sql, (yesterday,)) + count, = txn.fetchone() + return count ret = yield self.runInteraction("count_users", _count_users) defer.returnValue(ret) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 72ce84b0b8e7..f60ed889d54c 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -38,7 +38,6 @@ import synapse.metrics import logging -import math import ujson as json # these are only included to make the type annotations work @@ -1625,66 +1624,52 @@ def count_daily_messages(self): call to this function, it will return None. """ def _count_messages(txn): - now = self.hs.get_clock().time() + sql = """ + SELECT COALESCE(COUNT(*), 0) FROM events + WHERE type = 'm.room.message' + AND stream_ordering > ? + """ + txn.execute(sql, (self.stream_ordering_day_ago,)) + count, = txn.fetchone() + return count - txn.execute( - "SELECT reported_stream_token, reported_time FROM stats_reporting" - ) - last_reported = self.cursor_to_dict(txn) - - txn.execute( - "SELECT stream_ordering" - " FROM events" - " ORDER BY stream_ordering DESC" - " LIMIT 1" - ) - now_reporting = self.cursor_to_dict(txn) - if not now_reporting: - logger.info("Calculating daily messages skipped; no now_reporting") - return None - now_reporting = now_reporting[0]["stream_ordering"] - - txn.execute("DELETE FROM stats_reporting") - txn.execute( - "INSERT INTO stats_reporting" - " (reported_stream_token, reported_time)" - " VALUES (?, ?)", - (now_reporting, now,) - ) + ret = yield self.runInteraction("count_messages", _count_messages) + defer.returnValue(ret) - if not last_reported: - logger.info("Calculating daily messages skipped; no last_reported") - return None - - # Close enough to correct for our purposes. - yesterday = (now - 24 * 60 * 60) - since_yesterday_seconds = yesterday - last_reported[0]["reported_time"] - any_since_yesterday = math.fabs(since_yesterday_seconds) > 60 * 60 - if any_since_yesterday: - logger.info( - "Calculating daily messages skipped; since_yesterday_seconds: %d" % - (since_yesterday_seconds,) - ) - return None - - txn.execute( - "SELECT COUNT(*) as messages" - " FROM events NATURAL JOIN event_json" - " WHERE json like '%m.room.message%'" - " AND stream_ordering > ?" - " AND stream_ordering <= ?", - ( - last_reported[0]["reported_stream_token"], - now_reporting, - ) - ) - rows = self.cursor_to_dict(txn) - if not rows: - logger.info("Calculating daily messages skipped; messages count missing") - return None - return rows[0]["messages"] + @defer.inlineCallbacks + def count_daily_sent_messages(self): + def _count_messages(txn): + # This is good enough as if you have silly characters in your own + # hostname then thats your own fault. + like_clause = "%:" + self.hs.hostname + + sql = """ + SELECT COALESCE(COUNT(*), 0) FROM events + WHERE type = 'm.room.message' + AND sender LIKE ? + AND stream_ordering > ? + """ + + txn.execute(sql, (like_clause, self.stream_ordering_day_ago,)) + count, = txn.fetchone() + return count + + ret = yield self.runInteraction("count_daily_sent_messages", _count_messages) + defer.returnValue(ret) - ret = yield self.runInteraction("count_messages", _count_messages) + @defer.inlineCallbacks + def count_daily_active_rooms(self): + def _count(txn): + sql = """ + SELECT COALESCE(COUNT(DISTINCT room_id), 0) FROM events + WHERE type = 'm.room.message' + AND stream_ordering > ? + """ + txn.execute(sql, (self.stream_ordering_day_ago,)) + count, = txn.fetchone() + return count + + ret = yield self.runInteraction("count_daily_active_rooms", _count) defer.returnValue(ret) @defer.inlineCallbacks diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index ec2c52ab939f..20acd58fcfa3 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -437,6 +437,19 @@ def _count_users(txn): ret = yield self.runInteraction("count_users", _count_users) defer.returnValue(ret) + @defer.inlineCallbacks + def count_nonbridged_users(self): + def _count_users(txn): + txn.execute(""" + SELECT COALESCE(COUNT(*), 0) FROM users + WHERE appservice_id IS NULL + """) + count, = txn.fetchone() + return count + + ret = yield self.runInteraction("count_users", _count_users) + defer.returnValue(ret) + @defer.inlineCallbacks def find_next_generated_user_id_localpart(self): """ diff --git a/tests/storage/test_events.py b/tests/storage/test_events.py deleted file mode 100644 index 14443b53bcfb..000000000000 --- a/tests/storage/test_events.py +++ /dev/null @@ -1,115 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2015, 2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from mock import Mock -from synapse.types import RoomID, UserID - -from tests import unittest -from twisted.internet import defer -from tests.storage.event_injector import EventInjector - -from tests.utils import setup_test_homeserver - - -class EventsStoreTestCase(unittest.TestCase): - - @defer.inlineCallbacks - def setUp(self): - self.hs = yield setup_test_homeserver( - resource_for_federation=Mock(), - http_client=None, - ) - self.store = self.hs.get_datastore() - self.db_pool = self.hs.get_db_pool() - self.message_handler = self.hs.get_handlers().message_handler - self.event_injector = EventInjector(self.hs) - - @defer.inlineCallbacks - def test_count_daily_messages(self): - yield self.db_pool.runQuery("DELETE FROM stats_reporting") - - self.hs.clock.now = 100 - - # Never reported before, and nothing which could be reported - count = yield self.store.count_daily_messages() - self.assertIsNone(count) - count = yield self.db_pool.runQuery("SELECT COUNT(*) FROM stats_reporting") - self.assertEqual([(0,)], count) - - # Create something to report - room = RoomID.from_string("!abc123:test") - user = UserID.from_string("@raccoonlover:test") - yield self.event_injector.create_room(room, user) - - self.base_event = yield self._get_last_stream_token() - - yield self.event_injector.inject_message(room, user, "Raccoons are really cute") - - # Never reported before, something could be reported, but isn't because - # it isn't old enough. - count = yield self.store.count_daily_messages() - self.assertIsNone(count) - yield self._assert_stats_reporting(1, self.hs.clock.now) - - # Already reported yesterday, two new events from today. - yield self.event_injector.inject_message(room, user, "Yeah they are!") - yield self.event_injector.inject_message(room, user, "Incredibly!") - self.hs.clock.now += 60 * 60 * 24 - count = yield self.store.count_daily_messages() - self.assertEqual(2, count) # 2 since yesterday - yield self._assert_stats_reporting(3, self.hs.clock.now) # 3 ever - - # Last reported too recently. - yield self.event_injector.inject_message(room, user, "Who could disagree?") - self.hs.clock.now += 60 * 60 * 22 - count = yield self.store.count_daily_messages() - self.assertIsNone(count) - yield self._assert_stats_reporting(4, self.hs.clock.now) - - # Last reported too long ago - yield self.event_injector.inject_message(room, user, "No one.") - self.hs.clock.now += 60 * 60 * 26 - count = yield self.store.count_daily_messages() - self.assertIsNone(count) - yield self._assert_stats_reporting(5, self.hs.clock.now) - - # And now let's actually report something - yield self.event_injector.inject_message(room, user, "Indeed.") - yield self.event_injector.inject_message(room, user, "Indeed.") - yield self.event_injector.inject_message(room, user, "Indeed.") - # A little over 24 hours is fine :) - self.hs.clock.now += (60 * 60 * 24) + 50 - count = yield self.store.count_daily_messages() - self.assertEqual(3, count) - yield self._assert_stats_reporting(8, self.hs.clock.now) - - @defer.inlineCallbacks - def _get_last_stream_token(self): - rows = yield self.db_pool.runQuery( - "SELECT stream_ordering" - " FROM events" - " ORDER BY stream_ordering DESC" - " LIMIT 1" - ) - if not rows: - defer.returnValue(0) - else: - defer.returnValue(rows[0][0]) - - @defer.inlineCallbacks - def _assert_stats_reporting(self, messages, time): - rows = yield self.db_pool.runQuery( - "SELECT reported_stream_token, reported_time FROM stats_reporting" - ) - self.assertEqual([(self.base_event + messages, time,)], rows)