Navigation Menu

Skip to content

Commit

Permalink
Request consumer: cf_recording recommendations from spark cluster (#867)
Browse files Browse the repository at this point in the history
* request recommendation from spark cluster

* send recommendation engine data to lemmy

* PEP-8 fixes

* change col type to jsonb to enable recommendation update

* add tests for cf recording recommendations request

* add 'mbids' to var names
  • Loading branch information
vansika committed May 21, 2020
1 parent 9c973b7 commit 26402f1
Show file tree
Hide file tree
Showing 21 changed files with 608 additions and 60 deletions.
4 changes: 2 additions & 2 deletions admin/sql/create_tables.sql
Expand Up @@ -53,10 +53,10 @@ ALTER TABLE follow_list ADD CONSTRAINT follow_list_name_creator_key UNIQUE (name
CREATE TABLE recommendation.cf_recording (
id SERIAL, -- PK
user_id INTEGER NOT NULL, --FK to "user".id
recording_mbid UUID NOT NULL,
type cf_recording_type NOT NULL,
recording_mbid JSONB NOT NULL,
created TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL
);
ALTER TABLE recommendation.cf_recording ADD CONSTRAINT user_id_unique UNIQUE (user_id);

CREATE TABLE recommendation.recommender (
id SERIAL, --PK
Expand Down
@@ -0,0 +1,8 @@
BEGIN;

ALTER TABLE recommendation.cf_recording DROP recording_mbid;
ALTER TABLE recommendation.cf_recording ADD COLUMN recording_mbid JSONB NOT NULL;

ALTER TABLE recommendation.cf_recording ADD CONSTRAINT user_id_unique UNIQUE (user_id);

COMMIT;
@@ -0,0 +1,5 @@
BEGIN;

ALTER TABLE recommendation.cf_recording DROP TYPE;

COMMIT;
103 changes: 103 additions & 0 deletions listenbrainz/db/recommendations_cf_recording.py
@@ -0,0 +1,103 @@
"""This module contains functions to insert and retrieve recommendations
generated from Apache Spark into the database.
"""

# listenbrainz-server - Server for the ListenBrainz project.
#
# Copyright (C) 2020 MetaBrainz Foundation Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA


import ujson
import sqlalchemy

from listenbrainz import db
from flask import current_app


def get_timestamp_for_last_recording_recommended():
""" Get the time when recommendation_cf_recording table was last updated
"""
with db.engine.connect() as connection:
result = connection.execute(sqlalchemy.text("""
SELECT MAX(created) as created_ts
FROM recommendation.cf_recording
""")
)
row = result.fetchone()
return row['created_ts'] if row else None


def insert_user_recommendation(user_id, top_artist_recording_mbids, similar_artist_recording_mbids):
""" Insert recommended recording for a user in the db.
Args:
user_id (int): row id of the user.
top_artist_recording_mbids (list): recommended recording mbids that belong to top artists listened to by the user.
similar_artist_recording_mbids (list): recommended recording mbids that belong to artists similar to top artists
listened to by the user.
"""
recommendation = {
'top_artist': top_artist_recording_mbids,
'similar_artist': similar_artist_recording_mbids,
}

with db.engine.connect() as connection:
connection.execute(sqlalchemy.text("""
INSERT INTO recommendation.cf_recording (user_id, recording_mbid)
VALUES (:user_id, :recommendation)
ON CONFLICT (user_id)
DO UPDATE SET user_id = :user_id,
recording_mbid = :recommendation,
created = NOW()
"""), {
'user_id': user_id,
'recommendation': ujson.dumps(recommendation),
}
)


def get_user_recommendation(user_id):
""" Get recommendations for a user with the given row ID.
Args:
user_id (int): the row ID of the user in the DB
Returns:
A dict of the following format
{
'user_id' (int): the row ID of the user in the DB,
'recording_mbid' (dict): recommended recording mbids
'created' (datetime): datetime object representing when
the recommendation for this user was last updated.
}
recording_mbid = {
'top_artist_recording': [],
'similar_artist_recording': []
}
"""
with db.engine.connect() as connection:
result = connection.execute(sqlalchemy.text("""
SELECT user_id, recording_mbid, created
FROM recommendation.cf_recording
WHERE user_id = :user_id
"""), {
'user_id': user_id
}
)
row = result.fetchone()
return dict(row) if row else None
55 changes: 55 additions & 0 deletions listenbrainz/db/tests/test_recommendations_cf_recording.py
@@ -0,0 +1,55 @@
import listenbrainz.db.user as db_user
import listenbrainz.db.recommendations_cf_recording as db_recommendations_cf_recording

from datetime import datetime, timezone
from listenbrainz.db.testing import DatabaseTestCase


class CFRecordingRecommendationDatabaseTestCase(DatabaseTestCase):

def setUp(self):
DatabaseTestCase.setUp(self)
self.user = db_user.get_or_create(1, 'vansika')

def test_insert_user_recommendation(self):
top_artist_recording_mbids = ['a36d6fc9-49d0-4789-a7dd-a2b72369ca45, b36d6fc9-49d0-4789-a7dd-a2b72369ca45']
similar_artist_recording_mbids = ['c36d6fc9-49d0-4789-a7dd-a2b72369ca45', 'd36d6fc9-49d0-4789-a7dd-a2b72369ca45']

db_recommendations_cf_recording.insert_user_recommendation(
user_id=self.user['id'],
top_artist_recording_mbids=top_artist_recording_mbids,
similar_artist_recording_mbids=similar_artist_recording_mbids
)

result = db_recommendations_cf_recording.get_user_recommendation(self.user['id'])
self.assertEqual(result['recording_mbid']['top_artist'], top_artist_recording_mbids)
self.assertEqual(result['recording_mbid']['similar_artist'], similar_artist_recording_mbids)
self.assertGreater(int(result['created'].strftime('%s')), 0)

def insert_test_data(self):
top_artist_recording_mbids = ['x36d6fc9-49d0-4789-a7dd-a2b72369ca45, h36d6fc9-49d0-4789-a7dd-a2b72369ca45']
similar_artist_recording_mbids = ['v36d6fc9-49d0-4789-a7dd-a2b72369ca45', 'i36d6fc9-49d0-4789-a7dd-a2b72369ca45']

db_recommendations_cf_recording.insert_user_recommendation(
user_id=self.user['id'],
top_artist_recording_mbids=top_artist_recording_mbids,
similar_artist_recording_mbids=similar_artist_recording_mbids
)

return {
'top_artist_recording_mbids': top_artist_recording_mbids,
'similar_artist_recording_mbids': similar_artist_recording_mbids,
}

def test_get_user_recommendation(self):
data_inserted = self.insert_test_data()

data_received = db_recommendations_cf_recording.get_user_recommendation(self.user['id'])
self.assertEqual(data_received['recording_mbid']['top_artist'], data_inserted['top_artist_recording_mbids'])
self.assertEqual(data_received['recording_mbid']['similar_artist'], data_inserted['similar_artist_recording_mbids'])

def test_get_timestamp_for_last_recording_recommended(self):
ts = datetime.now(timezone.utc)
self.insert_test_data()
received_ts = db_recommendations_cf_recording.get_timestamp_for_last_recording_recommended()
self.assertGreaterEqual(received_ts, ts)
109 changes: 109 additions & 0 deletions listenbrainz/spark/handlers.py
Expand Up @@ -3,12 +3,15 @@
"""
import listenbrainz.db.user as db_user
import listenbrainz.db.stats as db_stats
import listenbrainz.db.recommendations_cf_recording as db_recommendations_cf_recording

from flask import current_app, render_template
from brainzutils.mail import send_mail
from datetime import datetime, timezone, timedelta


TIME_TO_CONSIDER_STATS_AS_OLD = 20 # minutes
TIME_TO_CONSIDER_RECOMMENDATIONS_AS_OLD = 7 # days


def is_new_user_stats_batch():
Expand All @@ -25,6 +28,31 @@ def is_new_user_stats_batch():
return datetime.now(timezone.utc) - last_update_ts > timedelta(minutes=TIME_TO_CONSIDER_STATS_AS_OLD)


def is_new_cf_recording_recommendation_batch():
""" Returns True if this batch of recommendations is new, False otherwise
"""
create_ts = db_recommendations_cf_recording.get_timestamp_for_last_recording_recommended()
if create_ts is None:
return True

return datetime.now(timezone.utc) - create_ts > timedelta(days=TIME_TO_CONSIDER_RECOMMENDATIONS_AS_OLD)


def notify_cf_recording_recommendations_update():
""" Send an email to notify recommendations are being written into db.
"""
if current_app.config['TESTING']:
return

send_mail(
subject="Recommendations being written into the DB - ListenBrainz",
text=render_template('emails/cf_recording_recommendation_notification.txt', now=str(datetime.utcnow())),
recipients=['listenbrainz-observability@metabrainz.org'],
from_name='ListenBrainz',
from_addr='noreply@'+current_app.config['MAIL_FROM_DOMAIN']
)


def notify_user_stats_update(stat_type):
if not current_app.config['TESTING']:
send_mail(
Expand Down Expand Up @@ -73,3 +101,84 @@ def handle_dump_imported(data):
from_name='ListenBrainz',
from_addr='noreply@'+current_app.config['MAIL_FROM_DOMAIN'],
)


def handle_dataframes(data):
""" Send an email after dataframes have been successfully created and uploaded to HDFS.
"""
if current_app.config['TESTING']:
return

dataframe_upload_time = data['dataframe_upload_time']
dataframe_creation_time = data['total_time']
from_date = data['from_date']
to_date = data['to_date']
send_mail(
subject='Dataframes have been uploaded to HDFS',
text=render_template('emails/cf_recording_dataframes_upload_notification.txt', time_to_upload=dataframe_upload_time,
from_date=from_date, to_date=to_date, total_time=dataframe_creation_time),
recipients=['listenbrainz-observability@metabrainz.org'],
from_name='ListenBrainz',
from_addr='noreply@'+current_app.config['MAIL_FROM_DOMAIN'],
)


def handle_model(data):
""" Send an email after trained data (model) has been successfully uploaded to HDFS.
"""
if current_app.config['TESTING']:
return

model_upload_time = data['model_upload_time']
model_creation_time = data['total_time']
send_mail(
subject='Model created and successfully uploaded to HDFS',
text=render_template('emails/cf_recording_model_upload_notification.txt', time_to_upload=model_upload_time,
total_time=model_creation_time),
recipients=['listenbrainz-observability@metabrainz.org'],
from_name='ListenBrainz',
from_addr='noreply@'+current_app.config['MAIL_FROM_DOMAIN'],
)


def handle_candidate_sets(data):
""" Send an email after candidate sets have been successfully uploaded to HDFS.
"""
if current_app.config['TESTING']:
return

candidate_sets_upload_time = data['candidate_sets_upload_time']
candidate_set_creation_time = data['total_time']
from_date = data['from_date']
to_date = data['to_date']
send_mail(
subject='Candidate sets created and successfully uploaded to HDFS',
text=render_template('emails/cf_candidate_sets_upload_notification.txt', time_to_upload=candidate_sets_upload_time,
from_date=from_date, to_date=to_date, total_time=candidate_set_creation_time),
recipients=['listenbrainz-observability@metabrainz.org'],
from_name='ListenBrainz',
from_addr='noreply@'+current_app.config['MAIL_FROM_DOMAIN'],
)


def handle_recommendations(data):
""" Take recommended recordings for a user and save it in the db.
"""
musicbrainz_id = data['musicbrainz_id']
user = db_user.get_by_mb_id(musicbrainz_id)
if not user:
current_app.logger.critical("Generated recommendations for a user that doesn't exist in the Postgres database: {}"
.format(musicbrainz_id))
return

if is_new_cf_recording_recommendation_batch():
notify_cf_recording_recommendations_update()

current_app.logger.debug("inserting recommendation for {}".format(musicbrainz_id))
top_artist_recording_mbids = data['top_artist']
similar_artist_recording_mbids = data['similar_artist']

db_recommendations_cf_recording.insert_user_recommendation(user['id'], top_artist_recording_mbids,
similar_artist_recording_mbids)

current_app.logger.debug("recommendation for {} inserted".format(musicbrainz_id))
29 changes: 28 additions & 1 deletion listenbrainz/spark/request_manage.py
Expand Up @@ -47,7 +47,6 @@ def _prepare_query_message(query, params=None):
raise InvalidSparkRequestError(query)

message = {'query': possible_queries[query]['name']}

required_params = set(possible_queries[query]['params'])
given_params = set(params.keys())
if required_params != given_params:
Expand Down Expand Up @@ -119,3 +118,31 @@ def request_import_new_full_dump():
""" Send the cluster a request to import a new full data dump
"""
send_request_to_spark_cluster(_prepare_query_message('import.dump.full'))


@cli.command(name="request_dataframes")
def request_dataframes():
""" Send the cluster a request to create dataframes.
"""
send_request_to_spark_cluster(_prepare_query_message('cf_recording.recommendations.create_dataframes'))


@cli.command(name='request_model')
def request_model():
""" Send the cluster a request to train the model.
"""
send_request_to_spark_cluster(_prepare_query_message('cf_recording.recommendations.train_model'))


@cli.command(name='request_candidate_sets')
def request_candidate_sets():
""" Send the cluster a request to generate candidate sets.
"""
send_request_to_spark_cluster(_prepare_query_message('cf_recording.recommendations.candidate_sets'))


@cli.command(name='request_recommendations')
def request_recommendations():
""" Send the cluster a request to generate recommendations.
"""
send_request_to_spark_cluster(_prepare_query_message('cf_recording.recommendations.recommend'))

0 comments on commit 26402f1

Please sign in to comment.