Skip to content

Commit

Permalink
Remove time ranges, refactor listen fetching and improve listen count…
Browse files Browse the repository at this point in the history
…/timestamp caching (#1390)

* Create an improved listen_count_week view that will perform better for our data.
Start removing time_range arguments for fetching listens, and start using
listen_count_week to fetch listens accurately.

* Committing some notes for timescale tweaking

* Interim check-in

* Remove time range fetching mechanism from front-end

* Remove erroneously commited change

* Small cleanup

* Commit more notes after more tinkering

* Improve listen_count continuous aggregated, add indexes to the materialized view
and drop min/max continuous aggregates and use the listen_count indexes instead.

* Tests for updated listen fetching now pass

* Finished fixing last bugs

* Make timestamps accurate -- its slower, but really a must

* CHERRYPICK: This PR renames the listen_count agg to listen_count_5day so taht we may
test this code on the test server. We may be able to rename indexes, in which
case we will need to undo this commit. This making it clear what this PR is for.

* Fix an off by one error

* listen count fixes

* Add some documentation to this complex function

* Add timing for the timestamp and fetch listens functions

* Improve timing output

* CHERRYPICK: This commit undoes the fancy footwork listen_count table to optimize the lookup
of listens, which actually became slower. :(

* Interim check-in

* Finish debugging the exponential backoff problem

* peppy and fix a comment

* Mostly a made up commit so I can test the new testing setup. Failures ensured!

* Refactor timestamps to not expire and use them to bound query lookups.

* Undo create use set timestamps, it needs to be done elsewhere. ALso fix another test.

* Fix an invocation to fetch listens

* Fixing more tests

* Refactor timestamps code

* Fix a constant

* Fix last failing tests.. I hope

* One final test fixed

* More cache key cleanup and set timestamps for newly created user.

* Query timestamps directly

* Make cache calls consistent

* More timestamp cleanup

* Update snapshot

* Fix the incorrect listen count

* Cleanup cache in redis listenstore as well

* Minor fixes

* Fix upper end check

* Peppy

* Minor cleanup

* Update snapshot

After merging master

* Start migrating to TS 2.2

* Update for timescale 2.2.0. Remove code to create index by hand, can be done with static sql now. However, tests all fail for some mysterious reason.

* Use simpler group-by

* Pin 2.2.0-pg11 image tag (#1426)

* Fix tests

* Update snapshot

* Just kidding, just because the continous aggregate is now a materialized view and
that it is possible to create indexes on materialized views, its not possible to do so on *these* views.

* Fix tests

* Fix some tests

some issues after a bad merge

* Improve timing related debug output

* Don't load feedback if no current user

We had a bad assertion when current user was undefined: undefined===undefined = true

* Fix recentlistens tests

don't connect to websocket server by default, and some modification of the test props (latestListenTs)

* Fix debug statement

* PR feedback

* Listenstore PR feedback improvements

* Remove superfluous order/group by

* Recalculate all user data (#1396)

* Add tabulation script

* If an account has no listens, dont update anything.

* Move recalculate_all_user_data to a separate module

* Remove function from listenstore

* Pepperson

Co-authored-by: Monkey Do <contact@monkeydo.digital>
Co-authored-by: Kartik Ohri <kartikohri13@gmail.com>
  • Loading branch information
3 people committed May 7, 2021
1 parent 2af210b commit 498577f
Show file tree
Hide file tree
Showing 38 changed files with 1,657 additions and 2,884 deletions.
2 changes: 2 additions & 0 deletions admin/timescale/create_indexes.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ BEGIN;
CREATE INDEX listened_at_user_name_ndx_listen ON listen (listened_at DESC, user_name);
CREATE UNIQUE INDEX listened_at_track_name_user_name_ndx_listen ON listen (listened_at DESC, track_name, user_name);

-- View indexes are created in listenbrainz/db/timescale.py

-- Playlists

CREATE UNIQUE INDEX mbid_playlist ON playlist.playlist (mbid);
Expand Down
20 changes: 2 additions & 18 deletions admin/timescale/create_views.sql
Original file line number Diff line number Diff line change
@@ -1,18 +1,2 @@
BEGIN;

CREATE VIEW listen_count
WITH (timescaledb.continuous, timescaledb.refresh_lag=43200, timescaledb.refresh_interval=3600)
AS SELECT time_bucket(bigint '86400', listened_at) AS listened_at_bucket, user_name, count(listen)
FROM listen group by time_bucket(bigint '86400', listened_at), user_name;

CREATE VIEW listened_at_max
WITH (timescaledb.continuous, timescaledb.refresh_lag=43200, timescaledb.refresh_interval=3600)
AS SELECT time_bucket(bigint '86400', listened_at) AS listened_at_bucket, user_name, max(listened_at) AS max_value
FROM listen group by time_bucket(bigint '86400', listened_at), user_name;

CREATE VIEW listened_at_min
WITH (timescaledb.continuous, timescaledb.refresh_lag=43200, timescaledb.refresh_interval=3600)
AS SELECT time_bucket(bigint '86400', listened_at) AS listened_at_bucket, user_name, min(listened_at) AS min_value
FROM listen group by time_bucket(bigint '86400', listened_at), user_name;

COMMIT;
-- 2592000 is number of seconds in 30 days
CREATE MATERIALIZED VIEW listen_count_30day WITH (timescaledb.continuous) AS SELECT time_bucket(bigint '2592000', listened_at) AS listened_at_bucket, user_name, count(listen) FROM listen GROUP BY listened_at_bucket, user_name;
2 changes: 1 addition & 1 deletion admin/timescale/drop_views.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
BEGIN;

DROP VIEW listen_count CASCADE;
DROP VIEW listen_count_30day CASCADE;

COMMIT;
2 changes: 1 addition & 1 deletion docker/docker-compose.integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ version: "3.4"
services:

db:
image: timescale/timescaledb:1.7.4-pg12
image: timescale/timescaledb:2.2.0-pg11
command: postgres -F
environment:
POSTGRES_PASSWORD: 'postgres'
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ version: "3.4"
services:

db:
image: timescale/timescaledb:1.7.4-pg12
image: timescale/timescaledb:2.2.0-pg11
command: postgres -F
environment:
POSTGRES_PASSWORD: 'postgres'
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ services:
- redis:/data:z

db:
image: timescale/timescaledb:1.7.4-pg12
image: timescale/timescaledb:2.2.0-pg11
volumes:
- timescaledb:/var/lib/postgresql/data:z
environment:
Expand Down
2 changes: 1 addition & 1 deletion listenbrainz/db/lastfm_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,4 @@ def get_play_count(user_id, listenstore):
""" Get playcount from the given user name.
"""
user = User.load_by_id(user_id)
return listenstore.get_listen_count_for_user(user.name, need_exact=False)
return listenstore.get_listen_count_for_user(user.name)
3 changes: 2 additions & 1 deletion listenbrainz/db/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ def reset_timescale_db(self):
ts.run_sql_script(os.path.join(TIMESCALE_SQL_DIR, 'create_schemas.sql'))
ts.run_sql_script(os.path.join(TIMESCALE_SQL_DIR, 'create_tables.sql'))
ts.run_sql_script(os.path.join(TIMESCALE_SQL_DIR, 'create_functions.sql'))
ts.run_sql_script(os.path.join(TIMESCALE_SQL_DIR, 'create_views.sql'))
ts.run_sql_script_without_transaction(os.path.join(TIMESCALE_SQL_DIR, 'create_views.sql'))
ts.run_sql_script(os.path.join(TIMESCALE_SQL_DIR, 'create_indexes.sql'))
ts.create_view_indexes()
ts.run_sql_script(os.path.join(TIMESCALE_SQL_DIR, 'create_primary_keys.sql'))
ts.run_sql_script(os.path.join(TIMESCALE_SQL_DIR, 'create_foreign_keys.sql'))
ts.engine.dispose()
42 changes: 42 additions & 0 deletions listenbrainz/db/timescale.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import time
import psycopg2

from listenbrainz import config

# This value must be incremented after schema changes on replicated tables!
SCHEMA_VERSION = 5

Expand Down Expand Up @@ -68,3 +70,43 @@ def run_sql_script_without_transaction(sql_file_path):
connection.connection.set_isolation_level(1)
connection.close()
return True


def create_view_indexes():
""" This function is needed since we need to create an index on a materialized view, whose
name we need to query - its not a constant, thus it cannot be done in pure SQL.
"""

admin_engine = create_engine(
config.TIMESCALE_ADMIN_LB_URI, poolclass=NullPool)
with admin_engine.connect() as connection:
query = """SELECT materialization_hypertable_schema, materialization_hypertable_name
FROM timescaledb_information.continuous_aggregates
WHERE view_name = 'listen_count_30day'"""
curs = connection.execute(query)
row = curs.fetchone()
if row is None:
raise RuntimeError(
"Cannot find materialized view name for listen_count view.")

view_schema = row[0]
view_name = row[1]
if not view_name:
raise RuntimeError(
"Cannot find materialized view name for listen_count view.")

query = """CREATE INDEX listened_at_bucket_user_name_ndx_listen_count_30day
ON %s.%s (listened_at_bucket, user_name)""" % (view_schema, view_name)
try:
connection.execute(query)
except Exception as err:
raise RuntimeError(
"Cannot create index on materialized view of listen_count_30day")

query = """CREATE INDEX user_name_ndx_listen_count_30day
ON %s.%s (user_name)""" % (view_schema, view_name)
try:
connection.execute(query)
except Exception as err:
raise RuntimeError(
"Cannot create index on materialized view of listen_count_30day")
1 change: 1 addition & 0 deletions listenbrainz/db/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def create(musicbrainz_row_id: int, musicbrainz_id: str) -> int:
"token": str(uuid.uuid4()),
"mb_row_id": musicbrainz_row_id,
})

return result.fetchone()["id"]


Expand Down
3 changes: 0 additions & 3 deletions listenbrainz/listenstore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@

LISTENS_DUMP_SCHEMA_VERSION = 1

REDIS_USER_TIMESTAMPS = "user.%s.timestamps" # substitute user_name
USER_CACHE_TIME = 3600 # in seconds. 1 hour

from listenbrainz.listenstore import listenstore
import listenbrainz.listen as listen
ListenStore = listenstore.ListenStore
Expand Down
16 changes: 3 additions & 13 deletions listenbrainz/listenstore/listenstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,11 @@ def get_total_listen_count(self):
""" Return the total number of listens stored in the ListenStore """
raise NotImplementedError()

def get_listen_count_for_user(self, user_name, need_exact):
def get_listen_count_for_user(self, user_name):
""" Override this method in ListenStore implementation class
Args:
user_name: the user to get listens for
need_exact: if True, get an exact number of listens directly from the ListenStore
otherwise, can get from a cache also
"""
raise NotImplementedError()

Expand All @@ -58,23 +56,15 @@ def import_listens_dump(self, archive_path, threads=None):
"""
raise NotImplementedError()

def fetch_listens(self, user_name, from_ts=None, to_ts=None, limit=DEFAULT_LISTENS_PER_FETCH, time_range=None):
def fetch_listens(self, user_name, from_ts=None, to_ts=None, limit=DEFAULT_LISTENS_PER_FETCH):
""" Check from_ts, to_ts, and limit for fetching listens
and set them to default values if not given.
"""
if from_ts and to_ts and from_ts >= to_ts:
raise ValueError("from_ts should be less than to_ts")
if from_ts is None and to_ts is None:
raise ValueError("You must specify either from_ts or to_ts.")
if from_ts:
order = ORDER_ASC
else:
order = ORDER_DESC

if time_range:
try:
time_range = int(time_range)
except ValueError:
raise ValueError("time_range must be an int value")

return self.fetch_listens_from_storage(user_name, from_ts, to_ts, limit, order, time_range)
return self.fetch_listens_from_storage(user_name, from_ts, to_ts, limit, order)
53 changes: 26 additions & 27 deletions listenbrainz/listenstore/redis_listenstore.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,33 @@
# coding=utf-8
from datetime import datetime
from time import time

import ujson
import redis
from time import time
from redis import Redis
from typing import Optional
import ujson

from listenbrainz.listen import Listen
from listenbrainz.listenstore import ListenStore
from datetime import datetime
from brainzutils import cache
from listenbrainz.utils import create_path, init_cache


class RedisListenStore(ListenStore):

RECENT_LISTENS_KEY = "lb_recent_sorted"
RECENT_LISTENS_KEY = "rl-"
RECENT_LISTENS_MAX = 100
PLAYING_NOW_KEY = "pn."
LISTEN_COUNT_PER_DAY_EXPIRY_TIME = 3 * 24 * 60 * 60 # 3 days in seconds
LISTEN_COUNT_PER_DAY_KEY_FORMAT = "lb_listen_count_for_day_{}"
LISTEN_COUNT_PER_DAY_KEY = "lc-day-"


def __init__(self, log, conf):
super(RedisListenStore, self).__init__(log)
self.redis = Redis(host=conf['REDIS_HOST'], port=conf['REDIS_PORT'], decode_responses=True)
self.ns = conf['REDIS_NAMESPACE']

# Initialize brainzutils cache
init_cache(host=conf['REDIS_HOST'], port=conf['REDIS_PORT'],
namespace=conf['REDIS_NAMESPACE'])
# This is used in tests. Leave for cleanup in LB-879
self.redis = cache._r

def get_playing_now(self, user_id):
""" Return the current playing song of the user
Expand All @@ -34,7 +39,7 @@ def get_playing_now(self, user_id):
Listen object which is the currently playing song of the user
"""
data = self.redis.get(self.ns + 'playing_now:{}'.format(user_id))
data = cache.get(self.PLAYING_NOW_KEY + str(user_id))
if not data:
return None
data = ujson.loads(data)
Expand All @@ -49,16 +54,12 @@ def put_playing_now(self, user_id, listen, expire_time):
listen (dict): the listen data
expire_time (int): the time in seconds in which the `playing_now` listen should expire
"""
self.redis.setex(
self.ns + 'playing_now:{}'.format(user_id),
time=expire_time,
value=ujson.dumps(listen).encode('utf-8')
)
cache.set(self.PLAYING_NOW_KEY + str(user_id), ujson.dumps(listen).encode('utf-8'), time=expire_time)

def check_connection(self):
""" Pings the redis server to check if the connection works or not """
try:
self.redis.ping()
cache._r.ping()
except redis.exceptions.ConnectionError as e:
self.log.error("Redis ping didn't work: {}".format(str(e)))
raise
Expand All @@ -76,20 +77,20 @@ def update_recent_listens(self, unique):

# Don't take this very seriously -- if it fails, really no big deal. Let is go.
if recent:
self.redis.zadd(self.ns + self.RECENT_LISTENS_KEY, recent, nx=True)
cache._r.zadd(cache._prep_key(self.RECENT_LISTENS_KEY), recent, nx=True)

# Don't prune the sorted list each time, but only when it reaches twice the desired size
count = self.redis.zcard(self.ns + self.RECENT_LISTENS_KEY)
count = cache._r.zcard(cache._prep_key(self.RECENT_LISTENS_KEY))
if count > (self.RECENT_LISTENS_MAX * 2):
self.redis.zpopmin(self.ns + self.RECENT_LISTENS_KEY, count - self.RECENT_LISTENS_MAX - 1)
cache._r.zpopmin(cache._prep_key(self.RECENT_LISTENS_KEY), count - self.RECENT_LISTENS_MAX - 1)


def get_recent_listens(self, max = RECENT_LISTENS_MAX):
"""
Get the max number of most recent listens
"""
recent = []
for listen in self.redis.zrevrange(self.ns + self.RECENT_LISTENS_KEY, 0, max - 1):
for listen in cache._r.zrevrange(cache._prep_key(self.RECENT_LISTENS_KEY), 0, max - 1):
recent.append(Listen.from_json(ujson.loads(listen)))

return recent
Expand All @@ -98,17 +99,15 @@ def increment_listen_count_for_day(self, day: datetime, count: int):
""" Increment the number of listens submitted on the day `day`
by `count`.
"""
key = self.LISTEN_COUNT_PER_DAY_KEY_FORMAT.format(day.strftime('%Y%m%d'))
if self.redis.exists(key):
self.redis.incrby(key, count)
else:
self.redis.setex(key, self.LISTEN_COUNT_PER_DAY_EXPIRY_TIME, count)
key = cache._prep_key(self.LISTEN_COUNT_PER_DAY_KEY + day.strftime('%Y%m%d'))
cache._r.incrby(key, count)
cache._r.expire(key, self.LISTEN_COUNT_PER_DAY_EXPIRY_TIME)

def get_listen_count_for_day(self, day: datetime) -> Optional[int]:
""" Get the number of listens submitted for day `day`, return None if not available.
"""
key = self.LISTEN_COUNT_PER_DAY_KEY_FORMAT.format(day.strftime('%Y%m%d'))
listen_count = self.redis.get(key)
key = self.LISTEN_COUNT_PER_DAY_KEY + day.strftime('%Y%m%d')
listen_count = cache.get(key, decode=False)
if listen_count:
return int(listen_count)
return None
1 change: 1 addition & 0 deletions listenbrainz/listenstore/tests/test_redislistenstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from dateutil.relativedelta import relativedelta
from redis.connection import Connection

from brainzutils import cache
import listenbrainz.db.user as db_user
from listenbrainz.db.testing import DatabaseTestCase
from listenbrainz import config
Expand Down
4 changes: 2 additions & 2 deletions listenbrainz/listenstore/tests/test_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from listenbrainz.webserver.timescale_connection import init_timescale_connection
from listenbrainz.db.dump import SchemaMismatchException
from listenbrainz.listenstore import LISTENS_DUMP_SCHEMA_VERSION
from listenbrainz.listenstore.timescale_listenstore import REDIS_TIMESCALE_USER_LISTEN_COUNT
from brainzutils import cache

TIMESCALE_SQL_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', '..', '..', 'admin', 'timescale')
Expand All @@ -43,8 +42,9 @@ def reset_timescale_db(self):
ts.run_sql_script(os.path.join(TIMESCALE_SQL_DIR, 'create_schemas.sql'))
ts.run_sql_script(os.path.join(TIMESCALE_SQL_DIR, 'create_tables.sql'))
ts.run_sql_script(os.path.join(TIMESCALE_SQL_DIR, 'create_functions.sql'))
ts.run_sql_script(os.path.join(TIMESCALE_SQL_DIR, 'create_views.sql'))
ts.run_sql_script_without_transaction(os.path.join(TIMESCALE_SQL_DIR, 'create_views.sql'))
ts.run_sql_script(os.path.join(TIMESCALE_SQL_DIR, 'create_indexes.sql'))
ts.create_view_indexes()
ts.run_sql_script(os.path.join(TIMESCALE_SQL_DIR, 'create_primary_keys.sql'))
ts.run_sql_script(os.path.join(TIMESCALE_SQL_DIR, 'create_foreign_keys.sql'))
ts.engine.dispose()
Expand Down

0 comments on commit 498577f

Please sign in to comment.