/
handlers.py
184 lines (145 loc) · 7.39 KB
/
handlers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
""" This file contains handler functions for rabbitmq messages we
receive from the Spark cluster.
"""
import listenbrainz.db.user as db_user
import listenbrainz.db.stats as db_stats
import listenbrainz.db.recommendations_cf_recording as db_recommendations_cf_recording
from flask import current_app, render_template
from brainzutils.mail import send_mail
from datetime import datetime, timezone, timedelta
TIME_TO_CONSIDER_STATS_AS_OLD = 20 # minutes
TIME_TO_CONSIDER_RECOMMENDATIONS_AS_OLD = 7 # days
def is_new_user_stats_batch():
""" Returns True if this batch of user stats is new, False otherwise
User stats come in as multiple rabbitmq messages. We only wish to send an email once per batch.
So, we check the database and see if the difference between the last time stats were updated
and right now is greater than 12 hours.
"""
last_update_ts = db_stats.get_timestamp_for_last_user_stats_update()
if last_update_ts is None:
last_update_ts = datetime.min.replace(tzinfo=timezone.utc) # use min datetime value if last_update_ts is None
return datetime.now(timezone.utc) - last_update_ts > timedelta(minutes=TIME_TO_CONSIDER_STATS_AS_OLD)
def is_new_cf_recording_recommendation_batch():
""" Returns True if this batch of recommendations is new, False otherwise
"""
create_ts = db_recommendations_cf_recording.get_timestamp_for_last_recording_recommended()
if create_ts is None:
return True
return datetime.now(timezone.utc) - create_ts > timedelta(days=TIME_TO_CONSIDER_RECOMMENDATIONS_AS_OLD)
def notify_cf_recording_recommendations_update():
""" Send an email to notify recommendations are being written into db.
"""
if current_app.config['TESTING']:
return
send_mail(
subject="Recommendations being written into the DB - ListenBrainz",
text=render_template('emails/cf_recording_recommendation_notification.txt', now=str(datetime.utcnow())),
recipients=['listenbrainz-observability@metabrainz.org'],
from_name='ListenBrainz',
from_addr='noreply@'+current_app.config['MAIL_FROM_DOMAIN']
)
def notify_user_stats_update(stat_type):
if not current_app.config['TESTING']:
send_mail(
subject="New user stats are being written into the DB - ListenBrainz",
text=render_template('emails/user_stats_notification.txt', now=str(datetime.utcnow()), stat_type=stat_type),
recipients=['listenbrainz-observability@metabrainz.org'],
from_name='ListenBrainz',
from_addr='noreply@'+current_app.config['MAIL_FROM_DOMAIN']
)
def handle_user_artist(data):
""" Take artist stats for a user and save it in the database.
"""
musicbrainz_id = data['musicbrainz_id']
user = db_user.get_by_mb_id(musicbrainz_id)
if not user:
current_app.logger.critical("Calculated stats for a user that doesn't exist in the Postgres database: %s", musicbrainz_id)
return
# send a notification if this is a new batch of stats
if is_new_user_stats_batch():
notify_user_stats_update(stat_type=data.get('type', ''))
current_app.logger.debug("inserting stats for user %s", musicbrainz_id)
to_remove = {'musicbrainz_id', 'type'}
data_mod = {key: data[key] for key in data if key not in to_remove}
db_stats.insert_user_stats(user['id'], data_mod, {}, {})
def handle_dump_imported(data):
""" Process the response that the cluster sends after importing a new full dump
We don't really need to _do_ anything, just send an email over for observability.
"""
if current_app.config['TESTING']:
return
dump_name = data['imported_dump']
import_completion_time = data['time']
send_mail(
subject='A full data dump has been imported into the Spark cluster',
text=render_template('emails/dump_import_notification.txt', dump_name=dump_name, time=import_completion_time),
recipients=['listenbrainz-observability@metabrainz.org'],
from_name='ListenBrainz',
from_addr='noreply@'+current_app.config['MAIL_FROM_DOMAIN'],
)
def handle_dataframes(data):
""" Send an email after dataframes have been successfully created and uploaded to HDFS.
"""
if current_app.config['TESTING']:
return
dataframe_upload_time = data['dataframe_upload_time']
dataframe_creation_time = data['total_time']
from_date = data['from_date']
to_date = data['to_date']
send_mail(
subject='Dataframes have been uploaded to HDFS',
text=render_template('emails/cf_recording_dataframes_upload_notification.txt', time_to_upload=dataframe_upload_time,
from_date=from_date, to_date=to_date, total_time=dataframe_creation_time),
recipients=['listenbrainz-observability@metabrainz.org'],
from_name='ListenBrainz',
from_addr='noreply@'+current_app.config['MAIL_FROM_DOMAIN'],
)
def handle_model(data):
""" Send an email after trained data (model) has been successfully uploaded to HDFS.
"""
if current_app.config['TESTING']:
return
model_upload_time = data['model_upload_time']
model_creation_time = data['total_time']
send_mail(
subject='Model created and successfully uploaded to HDFS',
text=render_template('emails/cf_recording_model_upload_notification.txt', time_to_upload=model_upload_time,
total_time=model_creation_time),
recipients=['listenbrainz-observability@metabrainz.org'],
from_name='ListenBrainz',
from_addr='noreply@'+current_app.config['MAIL_FROM_DOMAIN'],
)
def handle_candidate_sets(data):
""" Send an email after candidate sets have been successfully uploaded to HDFS.
"""
if current_app.config['TESTING']:
return
candidate_sets_upload_time = data['candidate_sets_upload_time']
candidate_set_creation_time = data['total_time']
from_date = data['from_date']
to_date = data['to_date']
send_mail(
subject='Candidate sets created and successfully uploaded to HDFS',
text=render_template('emails/cf_candidate_sets_upload_notification.txt', time_to_upload=candidate_sets_upload_time,
from_date=from_date, to_date=to_date, total_time=candidate_set_creation_time),
recipients=['listenbrainz-observability@metabrainz.org'],
from_name='ListenBrainz',
from_addr='noreply@'+current_app.config['MAIL_FROM_DOMAIN'],
)
def handle_recommendations(data):
""" Take recommended recordings for a user and save it in the db.
"""
musicbrainz_id = data['musicbrainz_id']
user = db_user.get_by_mb_id(musicbrainz_id)
if not user:
current_app.logger.critical("Generated recommendations for a user that doesn't exist in the Postgres database: {}"
.format(musicbrainz_id))
return
if is_new_cf_recording_recommendation_batch():
notify_cf_recording_recommendations_update()
current_app.logger.debug("inserting recommendation for {}".format(musicbrainz_id))
top_artist_recording_mbids = data['top_artist']
similar_artist_recording_mbids = data['similar_artist']
db_recommendations_cf_recording.insert_user_recommendation(user['id'], top_artist_recording_mbids,
similar_artist_recording_mbids)
current_app.logger.debug("recommendation for {} inserted".format(musicbrainz_id))