diff --git a/admin/sql/create_tables.sql b/admin/sql/create_tables.sql index 73f75d601a..30b7ac1b84 100644 --- a/admin/sql/create_tables.sql +++ b/admin/sql/create_tables.sql @@ -53,10 +53,10 @@ ALTER TABLE follow_list ADD CONSTRAINT follow_list_name_creator_key UNIQUE (name CREATE TABLE recommendation.cf_recording ( id SERIAL, -- PK user_id INTEGER NOT NULL, --FK to "user".id - recording_mbid UUID NOT NULL, - type cf_recording_type NOT NULL, + recording_mbid JSONB NOT NULL, created TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL ); +ALTER TABLE recommendation.cf_recording ADD CONSTRAINT user_id_unique UNIQUE (user_id); CREATE TABLE recommendation.recommender ( id SERIAL, --PK diff --git a/admin/sql/updates/2020-05-20-change-recommendation-cf-recording-col-type.sql b/admin/sql/updates/2020-05-20-change-recommendation-cf-recording-col-type.sql new file mode 100644 index 0000000000..9239bdcafc --- /dev/null +++ b/admin/sql/updates/2020-05-20-change-recommendation-cf-recording-col-type.sql @@ -0,0 +1,8 @@ +BEGIN; + +ALTER TABLE recommendation.cf_recording DROP recording_mbid; +ALTER TABLE recommendation.cf_recording ADD COLUMN recording_mbid JSONB NOT NULL; + +ALTER TABLE recommendation.cf_recording ADD CONSTRAINT user_id_unique UNIQUE (user_id); + +COMMIT; diff --git a/admin/sql/updates/2020-05-20-drop-recommendation-cf-recording-col.sql b/admin/sql/updates/2020-05-20-drop-recommendation-cf-recording-col.sql new file mode 100644 index 0000000000..021a717799 --- /dev/null +++ b/admin/sql/updates/2020-05-20-drop-recommendation-cf-recording-col.sql @@ -0,0 +1,5 @@ +BEGIN; + +ALTER TABLE recommendation.cf_recording DROP TYPE; + +COMMIT; diff --git a/listenbrainz/db/recommendations_cf_recording.py b/listenbrainz/db/recommendations_cf_recording.py new file mode 100644 index 0000000000..26bdd97f47 --- /dev/null +++ b/listenbrainz/db/recommendations_cf_recording.py @@ -0,0 +1,103 @@ +"""This module contains functions to insert and retrieve recommendations + generated from Apache Spark into the database. +""" + +# listenbrainz-server - Server for the ListenBrainz project. +# +# Copyright (C) 2020 MetaBrainz Foundation Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + + +import ujson +import sqlalchemy + +from listenbrainz import db +from flask import current_app + + +def get_timestamp_for_last_recording_recommended(): + """ Get the time when recommendation_cf_recording table was last updated + """ + with db.engine.connect() as connection: + result = connection.execute(sqlalchemy.text(""" + SELECT MAX(created) as created_ts + FROM recommendation.cf_recording + """) + ) + row = result.fetchone() + return row['created_ts'] if row else None + + +def insert_user_recommendation(user_id, top_artist_recording_mbids, similar_artist_recording_mbids): + """ Insert recommended recording for a user in the db. + + Args: + user_id (int): row id of the user. + top_artist_recording_mbids (list): recommended recording mbids that belong to top artists listened to by the user. + similar_artist_recording_mbids (list): recommended recording mbids that belong to artists similar to top artists + listened to by the user. + """ + recommendation = { + 'top_artist': top_artist_recording_mbids, + 'similar_artist': similar_artist_recording_mbids, + } + + with db.engine.connect() as connection: + connection.execute(sqlalchemy.text(""" + INSERT INTO recommendation.cf_recording (user_id, recording_mbid) + VALUES (:user_id, :recommendation) + ON CONFLICT (user_id) + DO UPDATE SET user_id = :user_id, + recording_mbid = :recommendation, + created = NOW() + """), { + 'user_id': user_id, + 'recommendation': ujson.dumps(recommendation), + } + ) + + +def get_user_recommendation(user_id): + """ Get recommendations for a user with the given row ID. + + Args: + user_id (int): the row ID of the user in the DB + + Returns: + A dict of the following format + { + 'user_id' (int): the row ID of the user in the DB, + 'recording_mbid' (dict): recommended recording mbids + 'created' (datetime): datetime object representing when + the recommendation for this user was last updated. + } + + recording_mbid = { + 'top_artist_recording': [], + 'similar_artist_recording': [] + } + """ + with db.engine.connect() as connection: + result = connection.execute(sqlalchemy.text(""" + SELECT user_id, recording_mbid, created + FROM recommendation.cf_recording + WHERE user_id = :user_id + """), { + 'user_id': user_id + } + ) + row = result.fetchone() + return dict(row) if row else None diff --git a/listenbrainz/db/tests/test_recommendations_cf_recording.py b/listenbrainz/db/tests/test_recommendations_cf_recording.py new file mode 100644 index 0000000000..6cafb8571c --- /dev/null +++ b/listenbrainz/db/tests/test_recommendations_cf_recording.py @@ -0,0 +1,55 @@ +import listenbrainz.db.user as db_user +import listenbrainz.db.recommendations_cf_recording as db_recommendations_cf_recording + +from datetime import datetime, timezone +from listenbrainz.db.testing import DatabaseTestCase + + +class CFRecordingRecommendationDatabaseTestCase(DatabaseTestCase): + + def setUp(self): + DatabaseTestCase.setUp(self) + self.user = db_user.get_or_create(1, 'vansika') + + def test_insert_user_recommendation(self): + top_artist_recording_mbids = ['a36d6fc9-49d0-4789-a7dd-a2b72369ca45, b36d6fc9-49d0-4789-a7dd-a2b72369ca45'] + similar_artist_recording_mbids = ['c36d6fc9-49d0-4789-a7dd-a2b72369ca45', 'd36d6fc9-49d0-4789-a7dd-a2b72369ca45'] + + db_recommendations_cf_recording.insert_user_recommendation( + user_id=self.user['id'], + top_artist_recording_mbids=top_artist_recording_mbids, + similar_artist_recording_mbids=similar_artist_recording_mbids + ) + + result = db_recommendations_cf_recording.get_user_recommendation(self.user['id']) + self.assertEqual(result['recording_mbid']['top_artist'], top_artist_recording_mbids) + self.assertEqual(result['recording_mbid']['similar_artist'], similar_artist_recording_mbids) + self.assertGreater(int(result['created'].strftime('%s')), 0) + + def insert_test_data(self): + top_artist_recording_mbids = ['x36d6fc9-49d0-4789-a7dd-a2b72369ca45, h36d6fc9-49d0-4789-a7dd-a2b72369ca45'] + similar_artist_recording_mbids = ['v36d6fc9-49d0-4789-a7dd-a2b72369ca45', 'i36d6fc9-49d0-4789-a7dd-a2b72369ca45'] + + db_recommendations_cf_recording.insert_user_recommendation( + user_id=self.user['id'], + top_artist_recording_mbids=top_artist_recording_mbids, + similar_artist_recording_mbids=similar_artist_recording_mbids + ) + + return { + 'top_artist_recording_mbids': top_artist_recording_mbids, + 'similar_artist_recording_mbids': similar_artist_recording_mbids, + } + + def test_get_user_recommendation(self): + data_inserted = self.insert_test_data() + + data_received = db_recommendations_cf_recording.get_user_recommendation(self.user['id']) + self.assertEqual(data_received['recording_mbid']['top_artist'], data_inserted['top_artist_recording_mbids']) + self.assertEqual(data_received['recording_mbid']['similar_artist'], data_inserted['similar_artist_recording_mbids']) + + def test_get_timestamp_for_last_recording_recommended(self): + ts = datetime.now(timezone.utc) + self.insert_test_data() + received_ts = db_recommendations_cf_recording.get_timestamp_for_last_recording_recommended() + self.assertGreaterEqual(received_ts, ts) diff --git a/listenbrainz/spark/handlers.py b/listenbrainz/spark/handlers.py index d463e518b1..28a867e399 100644 --- a/listenbrainz/spark/handlers.py +++ b/listenbrainz/spark/handlers.py @@ -3,12 +3,15 @@ """ import listenbrainz.db.user as db_user import listenbrainz.db.stats as db_stats +import listenbrainz.db.recommendations_cf_recording as db_recommendations_cf_recording from flask import current_app, render_template from brainzutils.mail import send_mail from datetime import datetime, timezone, timedelta + TIME_TO_CONSIDER_STATS_AS_OLD = 20 # minutes +TIME_TO_CONSIDER_RECOMMENDATIONS_AS_OLD = 7 # days def is_new_user_stats_batch(): @@ -25,6 +28,31 @@ def is_new_user_stats_batch(): return datetime.now(timezone.utc) - last_update_ts > timedelta(minutes=TIME_TO_CONSIDER_STATS_AS_OLD) +def is_new_cf_recording_recommendation_batch(): + """ Returns True if this batch of recommendations is new, False otherwise + """ + create_ts = db_recommendations_cf_recording.get_timestamp_for_last_recording_recommended() + if create_ts is None: + return True + + return datetime.now(timezone.utc) - create_ts > timedelta(days=TIME_TO_CONSIDER_RECOMMENDATIONS_AS_OLD) + + +def notify_cf_recording_recommendations_update(): + """ Send an email to notify recommendations are being written into db. + """ + if current_app.config['TESTING']: + return + + send_mail( + subject="Recommendations being written into the DB - ListenBrainz", + text=render_template('emails/cf_recording_recommendation_notification.txt', now=str(datetime.utcnow())), + recipients=['listenbrainz-observability@metabrainz.org'], + from_name='ListenBrainz', + from_addr='noreply@'+current_app.config['MAIL_FROM_DOMAIN'] + ) + + def notify_user_stats_update(stat_type): if not current_app.config['TESTING']: send_mail( @@ -73,3 +101,84 @@ def handle_dump_imported(data): from_name='ListenBrainz', from_addr='noreply@'+current_app.config['MAIL_FROM_DOMAIN'], ) + + +def handle_dataframes(data): + """ Send an email after dataframes have been successfully created and uploaded to HDFS. + """ + if current_app.config['TESTING']: + return + + dataframe_upload_time = data['dataframe_upload_time'] + dataframe_creation_time = data['total_time'] + from_date = data['from_date'] + to_date = data['to_date'] + send_mail( + subject='Dataframes have been uploaded to HDFS', + text=render_template('emails/cf_recording_dataframes_upload_notification.txt', time_to_upload=dataframe_upload_time, + from_date=from_date, to_date=to_date, total_time=dataframe_creation_time), + recipients=['listenbrainz-observability@metabrainz.org'], + from_name='ListenBrainz', + from_addr='noreply@'+current_app.config['MAIL_FROM_DOMAIN'], + ) + + +def handle_model(data): + """ Send an email after trained data (model) has been successfully uploaded to HDFS. + """ + if current_app.config['TESTING']: + return + + model_upload_time = data['model_upload_time'] + model_creation_time = data['total_time'] + send_mail( + subject='Model created and successfully uploaded to HDFS', + text=render_template('emails/cf_recording_model_upload_notification.txt', time_to_upload=model_upload_time, + total_time=model_creation_time), + recipients=['listenbrainz-observability@metabrainz.org'], + from_name='ListenBrainz', + from_addr='noreply@'+current_app.config['MAIL_FROM_DOMAIN'], + ) + + +def handle_candidate_sets(data): + """ Send an email after candidate sets have been successfully uploaded to HDFS. + """ + if current_app.config['TESTING']: + return + + candidate_sets_upload_time = data['candidate_sets_upload_time'] + candidate_set_creation_time = data['total_time'] + from_date = data['from_date'] + to_date = data['to_date'] + send_mail( + subject='Candidate sets created and successfully uploaded to HDFS', + text=render_template('emails/cf_candidate_sets_upload_notification.txt', time_to_upload=candidate_sets_upload_time, + from_date=from_date, to_date=to_date, total_time=candidate_set_creation_time), + recipients=['listenbrainz-observability@metabrainz.org'], + from_name='ListenBrainz', + from_addr='noreply@'+current_app.config['MAIL_FROM_DOMAIN'], + ) + + +def handle_recommendations(data): + """ Take recommended recordings for a user and save it in the db. + """ + musicbrainz_id = data['musicbrainz_id'] + user = db_user.get_by_mb_id(musicbrainz_id) + if not user: + current_app.logger.critical("Generated recommendations for a user that doesn't exist in the Postgres database: {}" + .format(musicbrainz_id)) + return + + if is_new_cf_recording_recommendation_batch(): + notify_cf_recording_recommendations_update() + + current_app.logger.debug("inserting recommendation for {}".format(musicbrainz_id)) + top_artist_recording_mbids = data['top_artist'] + similar_artist_recording_mbids = data['similar_artist'] + + db_recommendations_cf_recording.insert_user_recommendation(user['id'], top_artist_recording_mbids, + similar_artist_recording_mbids) + + current_app.logger.debug("recommendation for {} inserted".format(musicbrainz_id)) diff --git a/listenbrainz/spark/request_manage.py b/listenbrainz/spark/request_manage.py index b2d5221933..c3dec649f0 100644 --- a/listenbrainz/spark/request_manage.py +++ b/listenbrainz/spark/request_manage.py @@ -47,7 +47,6 @@ def _prepare_query_message(query, params=None): raise InvalidSparkRequestError(query) message = {'query': possible_queries[query]['name']} - required_params = set(possible_queries[query]['params']) given_params = set(params.keys()) if required_params != given_params: @@ -119,3 +118,31 @@ def request_import_new_full_dump(): """ Send the cluster a request to import a new full data dump """ send_request_to_spark_cluster(_prepare_query_message('import.dump.full')) + + +@cli.command(name="request_dataframes") +def request_dataframes(): + """ Send the cluster a request to create dataframes. + """ + send_request_to_spark_cluster(_prepare_query_message('cf_recording.recommendations.create_dataframes')) + + +@cli.command(name='request_model') +def request_model(): + """ Send the cluster a request to train the model. + """ + send_request_to_spark_cluster(_prepare_query_message('cf_recording.recommendations.train_model')) + + +@cli.command(name='request_candidate_sets') +def request_candidate_sets(): + """ Send the cluster a request to generate candidate sets. + """ + send_request_to_spark_cluster(_prepare_query_message('cf_recording.recommendations.candidate_sets')) + + +@cli.command(name='request_recommendations') +def request_recommendations(): + """ Send the cluster a request to generate recommendations. + """ + send_request_to_spark_cluster(_prepare_query_message('cf_recording.recommendations.recommend')) diff --git a/listenbrainz/spark/request_queries.json b/listenbrainz/spark/request_queries.json index caf03a497f..082b1560dc 100644 --- a/listenbrainz/spark/request_queries.json +++ b/listenbrainz/spark/request_queries.json @@ -1,32 +1,57 @@ { - "stats.user.all": { - "name": "stats.user.all", - "description": "All user statistics that ListenBrainz calculates for all ListenBrainz users", - "params": [] - }, - "stats.user.artist.week": { - "name": "stats.user.artist.week", - "description": "Artist statistics for all users in the last week", - "params": [] - }, - "stats.user.artist.month": { - "name": "stats.user.artist.month", - "description": "Artist statistics for all users in the last month", - "params": [] - }, - "stats.user.artist.year": { - "name": "stats.user.artist.year", - "description": "Artist statistics for all users in the last year", - "params": [] - }, - "stats.user.artist.all_time": { - "name": "stats.user.artist.all_time", - "description": "All time artist statistics for all users", - "params": [] - }, - "import.dump.full": { - "name": "import.dump.full", - "description": "Import a new full dump into the spark cluster", - "params": [] - } + "stats.user.all": { + "name": "stats.user.all", + "description": "All user statistics that ListenBrainz calculates for all ListenBrainz users", + "params": [] + }, + "stats.user.for_user": { + "name": "stats.user.for_user", + "description": "All user statistics that ListenBrainz calculates for the specified user", + "params": ["musicbrainz_id"] + }, + "stats.user.artist.week": { + "name": "stats.user.artist.week", + "description": "Artist statistics for all users in the last week", + "params": [] + }, + "stats.user.artist.month": { + "name": "stats.user.artist.month", + "description": "Artist statistics for all users in the last month", + "params": [] + }, + "stats.user.artist.year": { + "name": "stats.user.artist.year", + "description": "Artist statistics for all users in the last year", + "params": [] + }, + "stats.user.artist.all_time": { + "name": "stats.user.artist.all_time", + "description": "All time artist statistics for all users", + "params": [] + }, + "import.dump.full": { + "name": "import.dump.full", + "description": "Import a new full dump into the spark cluster", + "params": [] + }, + "cf_recording.recommendations.create_dataframes": { + "name": "cf_recording.recommendations.create_dataframes", + "description": "Create dataframes to prepare trainable data.", + "params": [] + }, + "cf_recording.recommendations.train_model": { + "name": "cf_recording.recommendations.train_model", + "description": "Train data to yield a model.", + "params": [] + }, + "cf_recording.recommendations.candidate_sets": { + "name": "cf_recording.recommendations.candidate_sets", + "description": "Create candidate sets to generate recommendations", + "params": [] + }, + "cf_recording.recommendations.recommend":{ + "name": "cf_recording.recommendations.recommend", + "description": "Generate recommendations for all active ListenBrainz users.", + "params": [] + } } diff --git a/listenbrainz/spark/spark_reader.py b/listenbrainz/spark/spark_reader.py index fe77794f20..9ee2e29b4a 100644 --- a/listenbrainz/spark/spark_reader.py +++ b/listenbrainz/spark/spark_reader.py @@ -1,21 +1,25 @@ - import json import logging import pika import time import ujson +import sqlalchemy from flask import current_app from listenbrainz import utils from listenbrainz.db import user as db_user, stats as db_stats from listenbrainz.webserver import create_app from listenbrainz.db.exceptions import DatabaseException -from listenbrainz.spark.handlers import handle_user_artist, handle_dump_imported -import sqlalchemy +from listenbrainz.spark.handlers import handle_user_artist, handle_dump_imported, handle_dataframes, handle_model, \ + handle_candidate_sets, handle_recommendations response_handler_map = { 'user_artists': handle_user_artist, 'import_full_dump': handle_dump_imported, + 'cf_recording_dataframes': handle_dataframes, + 'cf_recording_model': handle_model, + 'cf_recording_candidate_sets': handle_candidate_sets, + 'cf_recording_recommendations': handle_recommendations, } diff --git a/listenbrainz/spark/test_handlers.py b/listenbrainz/spark/test_handlers.py index 1401182644..a87d031245 100644 --- a/listenbrainz/spark/test_handlers.py +++ b/listenbrainz/spark/test_handlers.py @@ -1,10 +1,11 @@ import unittest - -from datetime import datetime, timezone, timedelta +from unittest import mock from flask import current_app -from listenbrainz.spark.handlers import handle_user_artist, is_new_user_stats_batch, handle_dump_imported +from datetime import datetime, timezone, timedelta + from listenbrainz.webserver import create_app -from unittest import mock +from listenbrainz.spark.handlers import handle_user_artist, is_new_user_stats_batch, handle_dump_imported, handle_dataframes, \ + handle_model, handle_candidate_sets, handle_recommendations, is_new_cf_recording_recommendation_batch class HandlersTestCase(unittest.TestCase): @@ -39,6 +40,35 @@ def test_is_new_user_stats_batch(self, mock_db_get_timestamp): mock_db_get_timestamp.return_value = datetime.now(timezone.utc) - timedelta(minutes=21) self.assertTrue(is_new_user_stats_batch()) + @mock.patch('listenbrainz.spark.handlers.db_recommendations_cf_recording.insert_user_recommendation') + @mock.patch('listenbrainz.spark.handlers.db_user.get_by_mb_id') + @mock.patch('listenbrainz.spark.handlers.is_new_cf_recording_recommendation_batch') + @mock.patch('listenbrainz.spark.handlers.send_mail') + def test_handle_recommendations(self, mock_send_mail, mock_new_recommendation, mock_get_by_mb_id, mock_db_insert): + data = { + 'musicbrainz_id': 'vansika', + 'type': 'cf_recording_recommendations', + 'top_artist': ['a36d6fc9-49d0-4789-a7dd-a2b72369ca45'], + 'similar_artist': ['b36d6fc9-49d0-4789-a7dd-a2b72369ca45'], + } + + mock_get_by_mb_id.return_value = {'id': 1, 'musicbrainz_id': 'vansika'} + mock_new_recommendation.return_value = True + + with self.app.app_context(): + current_app.config['TESTING'] = False + handle_recommendations(data) + + mock_db_insert.assert_called_with(1, data['top_artist'], data['similar_artist']) + mock_send_mail.assert_called_once() + + @mock.patch('listenbrainz.spark.handlers.db_recommendations_cf_recording.get_timestamp_for_last_recording_recommended') + def test_is_new_cf_recording_recommendation_batch(self, mock_db_get_timestamp): + mock_db_get_timestamp.return_value = datetime.now(timezone.utc) + self.assertFalse(is_new_cf_recording_recommendation_batch()) + mock_db_get_timestamp.return_value = datetime.now(timezone.utc) - timedelta(days=8) + self.assertTrue(is_new_cf_recording_recommendation_batch()) + @mock.patch('listenbrainz.spark.handlers.send_mail') def test_handle_dump_imported(self, mock_send_mail): with self.app.app_context(): @@ -60,3 +90,74 @@ def test_handle_dump_imported(self, mock_send_mail): 'time': str(time), }) mock_send_mail.assert_called_once() + + @mock.patch('listenbrainz.spark.handlers.send_mail') + def test_handle_dataframes(self, mock_send_mail): + with self.app.app_context(): + time = datetime.utcnow() + + self.app.config['TESTING'] = True + handle_dataframes({ + 'type': 'cf_recording_dataframes', + 'dataframe_upload_time': str(time), + 'total_time': '3.1', + 'from_date': str(time.strftime('%b %Y')), + 'to_date': str(time.strftime('%b %Y')), + }) + mock_send_mail.assert_not_called() + + self.app.config['TESTING'] = False + handle_dataframes({ + 'type': 'cf_recording_dataframes', + 'dataframe_upload_time': str(time), + 'total_time': '3.1', + 'from_date': str(time.strftime('%b %Y')), + 'to_date': str(time.strftime('%b %Y')), + }) + mock_send_mail.assert_called_once() + + @mock.patch('listenbrainz.spark.handlers.send_mail') + def test_handle_model(self, mock_send_mail): + with self.app.app_context(): + time = datetime.utcnow() + + self.app.config['TESTING'] = True + handle_model({ + 'type': 'cf_recording_model', + 'model_upload_time': str(time), + 'total_time': '3.1', + }) + mock_send_mail.assert_not_called() + + self.app.config['TESTING'] = False + handle_model({ + 'type': 'cf_recording_model', + 'model_upload_time': str(time), + 'total_time': '3.1', + }) + mock_send_mail.assert_called_once() + + @mock.patch('listenbrainz.spark.handlers.send_mail') + def test_handle_candidate_sets(self, mock_send_mail): + with self.app.app_context(): + time = datetime.utcnow() + + self.app.config['TESTING'] = True + handle_candidate_sets({ + 'type': 'cf_recording_candidate_sets', + 'candidate_sets_upload_time': str(time), + 'total_time': '3.1', + 'from_date': str(time), + 'to_date': str(time) + }) + mock_send_mail.assert_not_called() + + self.app.config['TESTING'] = False + handle_candidate_sets({ + 'type': 'cf_recording_candidate_sets', + 'candidate_sets_upload_time': str(time), + 'total_time': '3.1', + 'from_date': str(time), + 'to_date': str(time) + }) + mock_send_mail.assert_called_once() diff --git a/listenbrainz/spark/test_request_manage.py b/listenbrainz/spark/test_request_manage.py index 362cde0420..e7bb053314 100644 --- a/listenbrainz/spark/test_request_manage.py +++ b/listenbrainz/spark/test_request_manage.py @@ -80,3 +80,19 @@ def test_prepare_query_message_happy_path(self): expected_message = ujson.dumps({'query': 'stats.user.artist.all_time'}) received_message = request_manage._prepare_query_message('stats.user.artist.all_time') self.assertEqual(expected_message, received_message) + + expected_message = ujson.dumps({'query': 'cf_recording.recommendations.create_dataframes'}) + received_message = request_manage._prepare_query_message('cf_recording.recommendations.create_dataframes') + self.assertEqual(expected_message, received_message) + + expected_message = ujson.dumps({'query': 'cf_recording.recommendations.train_model'}) + received_message = request_manage._prepare_query_message('cf_recording.recommendations.train_model') + self.assertEqual(expected_message, received_message) + + expected_message = ujson.dumps({'query': 'cf_recording.recommendations.candidate_sets'}) + received_message = request_manage._prepare_query_message('cf_recording.recommendations.candidate_sets') + self.assertEqual(expected_message, received_message) + + expected_message = ujson.dumps({'query': 'cf_recording.recommendations.recommend'}) + received_message = request_manage._prepare_query_message('cf_recording.recommendations.recommend') + self.assertEqual(expected_message, received_message) diff --git a/listenbrainz/webserver/templates/emails/cf_candidate_sets_upload_notification.txt b/listenbrainz/webserver/templates/emails/cf_candidate_sets_upload_notification.txt new file mode 100644 index 0000000000..5141cd9bf4 --- /dev/null +++ b/listenbrainz/webserver/templates/emails/cf_candidate_sets_upload_notification.txt @@ -0,0 +1,9 @@ +Hey! 👋 + +Candidate sets have been successfully uploaded to HDFS. +listens submitted from {{ from_date }} to {{ to_date }} are used to create the candidate sets. + +upload time = {{ time_to_upload }} +time taken to create candidate sets = {{ total_time }}m + +Your Friendly Neighbourhood Brainzbot 🤖 diff --git a/listenbrainz/webserver/templates/emails/cf_recording_dataframes_upload_notification.txt b/listenbrainz/webserver/templates/emails/cf_recording_dataframes_upload_notification.txt new file mode 100644 index 0000000000..2a9032b2f4 --- /dev/null +++ b/listenbrainz/webserver/templates/emails/cf_recording_dataframes_upload_notification.txt @@ -0,0 +1,9 @@ +Hey! 👋 + +Dataframes have been successfully uploaded to HDFS. +listens submitted from {{ from_date }} to {{ to_date }} are used to create the dataframes. + +upload time = {{ time_to_upload }} +time taken to create dataframes = {{ total_time }}m + +Your Friendly Neighbourhood Brainzbot 🤖 diff --git a/listenbrainz/webserver/templates/emails/cf_recording_model_upload_notification.txt b/listenbrainz/webserver/templates/emails/cf_recording_model_upload_notification.txt new file mode 100644 index 0000000000..654d571d84 --- /dev/null +++ b/listenbrainz/webserver/templates/emails/cf_recording_model_upload_notification.txt @@ -0,0 +1,8 @@ +Hey! 👋 + +Data has been successfully trained to create a model. + +upload time = {{ time_to_upload }} +time taken to create model = {{ total_time }}h + +Your Friendly Neighbourhood Brainzbot 🤖 diff --git a/listenbrainz/webserver/templates/emails/cf_recording_recommendation_notification.txt b/listenbrainz/webserver/templates/emails/cf_recording_recommendation_notification.txt new file mode 100644 index 0000000000..edb093ae06 --- /dev/null +++ b/listenbrainz/webserver/templates/emails/cf_recording_recommendation_notification.txt @@ -0,0 +1,8 @@ +Hey! 👋 + +Recommendations were received by the spark consumer +and are being written into the database. + +The first recommendation came in at {{ now }} UTC. + +Your Friendly Neighbourhood Brainzbot 🤖 diff --git a/listenbrainz_spark/query_map.py b/listenbrainz_spark/query_map.py index bc9f1bd262..e3118fa5a1 100644 --- a/listenbrainz_spark/query_map.py +++ b/listenbrainz_spark/query_map.py @@ -1,6 +1,10 @@ import listenbrainz_spark.stats.user.all import listenbrainz_spark.stats.user.artist import listenbrainz_spark.request_consumer.jobs.import_dump +import listenbrainz_spark.recommendations.create_dataframes +import listenbrainz_spark.recommendations.train_models +import listenbrainz_spark.recommendations.candidate_sets +import listenbrainz_spark.recommendations.recommend functions = { 'stats.user.all': listenbrainz_spark.stats.user.all.calculate, @@ -9,6 +13,10 @@ 'stats.user.artist.year': listenbrainz_spark.stats.user.artist.get_artists_year, 'stats.user.artist.all_time': listenbrainz_spark.stats.user.artist.get_artists_all_time, 'import.dump.full': listenbrainz_spark.request_consumer.jobs.import_dump.import_newest_full_dump_handler, + 'cf_recording.recommendations.create_dataframes': listenbrainz_spark.recommendations.create_dataframes.main, + 'cf_recording.recommendations.train_model': listenbrainz_spark.recommendations.train_models.main, + 'cf_recording.recommendations.candidate_sets': listenbrainz_spark.recommendations.candidate_sets.main, + 'cf_recording.recommendations.recommend': listenbrainz_spark.recommendations.recommend.main, } diff --git a/listenbrainz_spark/recommendations/candidate_sets.py b/listenbrainz_spark/recommendations/candidate_sets.py index b4c18b1410..a30a065714 100644 --- a/listenbrainz_spark/recommendations/candidate_sets.py +++ b/listenbrainz_spark/recommendations/candidate_sets.py @@ -18,7 +18,7 @@ from flask import current_app import pyspark.sql.functions as func from pyspark.sql.window import Window -from pyspark.sql.functions import lit, col, to_timestamp, current_timestamp, date_sub, row_number +from pyspark.sql.functions import col, row_number from pyspark.sql.utils import AnalysisException, ParseException # Candidate Set HTML is generated if set to true. @@ -47,20 +47,36 @@ # ] -def get_listens_to_fetch_top_artists(mapped_df): +def get_dates_to_generate_candidate_sets(mapped_df): + """ Get window to fetch listens to generate candidate sets. + + Args: + mapped_df (dataframe): listens mapped with msid_mbid_mapping. Refer to candidate_sets.py + for dataframe columns. + + Returns: + from_date (datetime): Date from which start fetching listens. + to_date (datetime): Date upto which fetch listens. + """ + # get timestamp of latest listen in HDFS + to_date = mapped_df.select(func.max('listened_at').alias('listened_at')).collect()[0].listened_at + from_date = stats.adjust_days(to_date, config.RECOMMENDATION_GENERATION_WINDOW).replace(hour=0, minute=0, second=0) + return from_date, to_date + + +def get_listens_to_fetch_top_artists(mapped_df, from_date, to_date): """ Get listens of past X days to fetch top artists where X = RECOMMENDATION_GENERATION_WINDOW. Args: mapped_df (dataframe): listens mapped with msid_mbid_mapping. Refer to candidate_sets.py for dataframe columns. + from_date (datetime): Date from which start fetching listens. + to_date (datetime): Date upto which fetch listens. Returns: mapped_listens_subset (dataframe): A subset of mapped_df containing user history. """ - mapped_listens_subset = mapped_df.select('*') \ - .where((col('listened_at') >= to_timestamp(date_sub(current_timestamp(), - config.RECOMMENDATION_GENERATION_WINDOW))) & - (col('listened_at') <= current_timestamp())) + mapped_listens_subset = mapped_df.filter(mapped_df.listened_at.between(from_date, to_date)) return mapped_listens_subset @@ -219,18 +235,18 @@ def get_candidate_html_data(top_similar_artists_df): return user_data -def save_candidate_html(user_data, time_initial): +def save_candidate_html(user_data, total_time): """ Save user data to an HTML file. Args: user_data (dict): Top and similar artists associated to users. - time_initial (str): Timestamp when the script was invoked. + total_time (str): time taken to generate candidate_sets """ date = datetime.utcnow().strftime('%Y-%m-%d') candidate_html = 'Candidate-{}-{}.html'.format(uuid.uuid4(), date) context = { 'user_data': user_data, - 'total_time': '{:.2f}'.format((time() - time_initial) / 60), + 'total_time': total_time, } save_html(candidate_html, context, 'candidate.html') @@ -276,8 +292,10 @@ def main(): current_app.logger.error(str(err), exc_info=True) sys.exit(-1) + from_date, to_date = get_dates_to_generate_candidate_sets(mapped_df) + current_app.logger.info('Fetching listens to get top artists...') - mapped_listens_subset = get_listens_to_fetch_top_artists(mapped_df) + mapped_listens_subset = get_listens_to_fetch_top_artists(mapped_df, from_date, to_date) current_app.logger.info('Fetching top artists...') top_artists_df = get_top_artists(mapped_listens_subset) @@ -299,8 +317,20 @@ def main(): current_app.logger.error('{}\nAborting...'.format(str(err.java_exception)), exc_info=True) sys.exit(-1) + # time taken to generate candidate_sets + total_time = '{:.2f}'.format((time() - time_initial) / 60) if SAVE_CANDIDATE_HTML: user_data = get_candidate_html_data(top_similar_artists_df) current_app.logger.info('Saving HTML...') - save_candidate_html(user_data, time_initial) + save_candidate_html(user_data, total_time) current_app.logger.info('Done!') + + message = [{ + 'type': 'cf_recording_candidate_sets', + 'candidate_sets_upload_time': str(datetime.utcnow()), + 'total_time': total_time, + 'from_date': str(from_date), + 'to_date': str(to_date) + }] + + return message diff --git a/listenbrainz_spark/recommendations/create_dataframes.py b/listenbrainz_spark/recommendations/create_dataframes.py index 3d182821aa..483fec3358 100644 --- a/listenbrainz_spark/recommendations/create_dataframes.py +++ b/listenbrainz_spark/recommendations/create_dataframes.py @@ -290,12 +290,22 @@ def main(): playcounts_df = get_playcounts_df(listens_df, recordings_df, users_df, metadata) playcounts_df_time = '{:.2f}'.format((time() - t0) / 60) - total_time = '{:.2f}'.format((time() - ti) / 60) generate_best_model_id(metadata) save_dataframe_metadata_to_HDFS(metadata) + total_time = '{:.2f}'.format((time() - ti) / 60) if SAVE_DATAFRAME_HTML: current_app.logger.info('Saving HTML...') save_dataframe_html(users_df_time, recordings_df_time, playcounts_df_time, total_time) current_app.logger.info('Done!') + + message = [{ + 'type': 'cf_recording_dataframes', + 'dataframe_upload_time': str(datetime.utcnow()), + 'total_time': total_time, + 'from_date': str(from_date.strftime('%b %Y')), + 'to_date': str(to_date.strftime('%b %Y')), + }] + + return message diff --git a/listenbrainz_spark/recommendations/recommend.py b/listenbrainz_spark/recommendations/recommend.py index 30a376bfa3..f421e59166 100644 --- a/listenbrainz_spark/recommendations/recommend.py +++ b/listenbrainz_spark/recommendations/recommend.py @@ -1,3 +1,4 @@ +import sys import os import json import logging @@ -116,7 +117,6 @@ def get_recommendations_for_all(recordings_df, model, top_artists_candidate_set, """ messages = [] current_app.logger.info('Generating recommendations...') - # active users in the last week/month. # users for whom recommendations will be generated. users_df = top_artists_candidate_set.select('user_id', 'user_name').distinct() diff --git a/listenbrainz_spark/recommendations/tests/test_candidate.py b/listenbrainz_spark/recommendations/tests/test_candidate.py index f0178fdfd5..173d8dcaf9 100644 --- a/listenbrainz_spark/recommendations/tests/test_candidate.py +++ b/listenbrainz_spark/recommendations/tests/test_candidate.py @@ -40,20 +40,25 @@ def get_listens(cls): df1 = utils.create_dataframe(cls.get_listen_row(cls.date, 'vansika', 1), schema=None) shifted_date = stats.adjust_days(cls.date, config.RECOMMENDATION_GENERATION_WINDOW + 1) df2 = utils.create_dataframe(cls.get_listen_row(shifted_date, 'vansika', 1), schema=None) - shifted_date = stats.adjust_days(cls.date, 1, shift_backwards=False) + shifted_date = stats.adjust_days(cls.date, 1) df3 = utils.create_dataframe(cls.get_listen_row(shifted_date, 'rob', 2), schema=None) shifted_date = stats.adjust_days(cls.date, 2) df4 = utils.create_dataframe(cls.get_listen_row(shifted_date, 'rob', 2), schema=None) test_mapped_df = df1.union(df2).union(df3).union(df4) return test_mapped_df - def test_get_listens_for_rec_generation_window(self): + def test_get_dates_to_generate_candidate_sets(self): mapped_df = self.get_listens() - test_df = candidate_sets.get_listens_to_fetch_top_artists(mapped_df) - min_date = test_df.select(f.min('listened_at').alias('listened_at')).take(1)[0] - max_date = test_df.select(f.max('listened_at').alias('listened_at')).take(1)[0] - self.assertGreaterEqual(self.date, min_date.listened_at) - self.assertLessEqual(self.date, max_date.listened_at) + from_date, to_date = candidate_sets.get_dates_to_generate_candidate_sets(mapped_df) + self.assertEqual(to_date, self.date) + expected_date = stats.adjust_days(self.date, config.RECOMMENDATION_GENERATION_WINDOW).replace(hour=0, minute=0, second=0) + self.assertEqual(from_date, expected_date) + + def test_get_listens_to_fetch_top_artists(self): + mapped_df = self.get_listens() + from_date, to_date = candidate_sets.get_dates_to_generate_candidate_sets(mapped_df) + mapped_listens_subset = candidate_sets.get_listens_to_fetch_top_artists(mapped_df, from_date, to_date) + self.assertEqual(mapped_listens_subset.count(), 3) def test_get_top_artists(self): mapped_listens = self.get_mapped_listens() diff --git a/listenbrainz_spark/recommendations/train_models.py b/listenbrainz_spark/recommendations/train_models.py index dc589a9b48..b66d669297 100644 --- a/listenbrainz_spark/recommendations/train_models.py +++ b/listenbrainz_spark/recommendations/train_models.py @@ -237,3 +237,11 @@ def main(): recommendation_metadata['best_model_id'] = best_model_metadata['model_id'] with open(metadata_file_path, 'w') as f: json.dump(recommendation_metadata,f) + + message = [{ + 'type': 'cf_recording_model', + 'model_upload_time': str(datetime.utcnow()), + 'total_time': '{:.2f}'.format((time() - ti) / 3600), + }] + + return message