Skip to content
This repository has been archived by the owner on Jan 19, 2020. It is now read-only.

Commit

Permalink
HTML files corresponding to recommendation generation
Browse files Browse the repository at this point in the history
  • Loading branch information
vansika committed May 18, 2019
1 parent ddb752d commit c93cc7a
Show file tree
Hide file tree
Showing 6 changed files with 443 additions and 86 deletions.
60 changes: 34 additions & 26 deletions create_dataframes.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from pyspark.sql import Row, SparkSession
from datetime import datetime
from listenbrainz_spark.stats import run_query
from listenbrainz_spark.recommendations import train_models, recommend
from listenbrainz_spark.recommendations import train_models, recommend, utils
from time import sleep

def prepare_user_data(table):
Expand All @@ -18,10 +18,9 @@ def prepare_user_data(table):
, row_number() over (ORDER BY "user_name") as user_id
From (SELECT DISTINCT user_name FROM %s)
""" % (table))
print("Number of rows in users df: ",users_df.count())
users_count = users_df.count()
t = "%.2f" % (time.time() - t0)
print("Users data prepared in %ss" % (t))
return users_df
return users_df, t, users_count

def prepare_listen_data(table):
t0 = time.time()
Expand All @@ -32,10 +31,9 @@ def prepare_listen_data(table):
, user_name
From %s
""" % (table))
print("Number of rows in listens df:",listens_df.count())
listens_count = listens_df.count()
t = "%.2f" % (time.time() - t0)
print("Listens data prepared in %ss" % (t))
return listens_df
return listens_df, t, listens_count

def prepare_recording_data(table):
t0 = time.time()
Expand All @@ -47,12 +45,12 @@ def prepare_recording_data(table):
, release_name
, release_msid
, row_number() over (ORDER BY "recording_msid") AS recording_id
From (SELECT DISTINCT recording_msid, track_name, artist_name, artist_msid, release_name, release_msid FROM %s)
From (SELECT DISTINCT recording_msid, track_name, artist_name, artist_msid,
release_name, release_msid FROM %s)
""" % (table))
print("Number of rows in recording df:",recordings_df.count())
recordings_count = recordings_df.count()
t = "%.2f" % (time.time() - t0)
print("Recording data prepared in %ss" % (t))
return recordings_df
return recordings_df, t, recordings_count

def get_playcounts_data(listens_df, users_df, recordings_df):
t0 = time.time()
Expand All @@ -71,14 +69,13 @@ def get_playcounts_data(listens_df, users_df, recordings_df):
GROUP BY user_id, recording_id
ORDER BY user_id
""")
print("Number of rows in playcounts df:",playcounts_df.count())
playcounts_count = playcounts_df.count()
t = "%.2f" % (time.time() - t0)
print("Playcount data prepared in %ss" % (t))
return playcounts_df
return playcounts_df, t, playcounts_count

if __name__ == '__main__':

t0 = time.time()
ti = time.time()
listenbrainz_spark.init_spark_session('Create_Dataframe')
df = None
for y in range(config.starting_year, config.ending_year + 1):
Expand All @@ -90,31 +87,42 @@ def get_playcounts_data(listens_df, users_df, recordings_df):
logging.error("Cannot read files from HDFS: %s / %s. Aborting." % (type(err).__name__, str(err)))
continue

df.printSchema()
print("Registering Dataframe...")
print("\nRegistering Dataframe...")
date = datetime.utcnow()
table = 'df_to_train_{}'.format(datetime.strftime(date, '%Y_%m_%d'))
df.createOrReplaceTempView(table)
t = "%.2f" % (time.time() - t0)
t = "%.2f" % (time.time() - ti)
print("Dataframe registered in %ss" % (t))

print("Preparing user data...")
users_df = prepare_user_data(table)
users_df, users_time, users_count = prepare_user_data(table)
print("Load data dump...")
listens_df = prepare_listen_data(table)
listens_df, listens_time, listens_count = prepare_listen_data(table)
print("Prepare recording dump...")
recordings_df = prepare_recording_data(table)
recordings_df, recordings_time, recordings_count = prepare_recording_data(table)
print("Get playcounts...")
playcounts_df = get_playcounts_data(listens_df, users_df, recordings_df)
lb_dump_time_window = ("{}-{}".format(config.starting_year, "%02d" % config.starting_month), "{}-{}".format(config.ending_year, "%02d" % config.ending_month))
playcounts_df, playcounts_time, playcounts_count = get_playcounts_data(listens_df, users_df, recordings_df)
lb_dump_time_window = ("{}-{}".format(config.starting_year, "%02d" % config.starting_month),
"{}-{}".format(config.ending_year, "%02d" % config.ending_month))

for attempt in range(config.MAX_RETRIES):
try:
train_models.main(playcounts_df, lb_dump_time_window)
bestmodel_id = train_models.main(playcounts_df)
break
except Exception as err:
sleep(config.TIME_BEFORE_RETRIES)
if attempt == config.MAX_RETRIES - 1:
raise SystemExit("%s.Aborting..." % (str(err)))
logging.error("Unable to train the model: %s. Retrying in %ss." % (type(err).__name__,config.TIME_BEFORE_RETRIES))
recommend.main(users_df, playcounts_df, recordings_df, t0)
logging.error("Unable to train the model: %s. Retrying in %ss." % (str(err),config.TIME_BEFORE_RETRIES))
recommend.main(users_df, playcounts_df, recordings_df, ti, bestmodel_id)

outputfile = 'Queries-%s.html' % (datetime.utcnow().strftime("%Y-%m-%d"))
context = {
'user' : {'time' : users_time, 'count' : users_count, 'schema' : users_df.schema.names},
'listen' : {'time' : listens_time, 'count' : listens_count},
'recording' : {'time' : recordings_time, 'count' : recordings_count},
'playcount' : {'time' : playcounts_time, 'count' : playcounts_count},
'lb_dump_time_window' : lb_dump_time_window,
'link' : 'Model-Info-%s.html' % (datetime.utcnow().strftime("%Y-%m-%d")),
}
utils.save_html(outputfile, context, 'queries.html')
59 changes: 44 additions & 15 deletions listenbrainz_spark/recommendations/recommend.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ def get_user_id(user_name):
""" % user_name)
return result.first()['user_id']

def recommend_user(user_name, model, recordings_map):
def recommend_user(user_name, model, recordings_map, all_recordings):
user_info = {}
t0 = time.time()
user_id = get_user_id(user_name)
user_playcounts = run_query("""
SELECT user_id,
Expand All @@ -33,48 +35,75 @@ def recommend_user(user_name, model, recordings_map):
FROM playcount
WHERE user_id = %d
""" % user_id)
t = "%.2f" % (time.time() - t0)
user_info['user-playcounts-time'] = t

t0 = time.time()
user_recordings = user_playcounts.rdd.map(lambda r: r['recording_id'])
user_recordings.count()
all_recordings = recordings_map.keys()
all_recordings.count()
t = "%.2f" % (time.time() - t0)
user_info['user-recordings-time'] = t

t0 = time.time()
candidate_recordings = all_recordings.subtract(user_recordings)
candidate_recordings.count()
t = "%.2f" % (time.time() - t0)
user_info['candidate-recordings-time'] = t

t0 = time.time()
recommendations = model.predictAll(candidate_recordings.map(lambda recording: (user_id, recording))).takeOrdered(50, lambda product: -product.rating)
t = "%.2f" % (time.time() - t0)
user_info['recommendations-time'] = t

t0 = time.time()
recommended_recordings = [recordings_map.lookup(recommendations[i].product)[0] for i in range(len(recommendations))]
t = "%.2f" % (time.time() - t0)
return recommended_recordings, t
user_info['lookup-time'] = t
user_info['recordings'] = recommended_recordings
return user_info

def main(users_df, playcounts_df, recordings_df, t0):
def main(users_df, playcounts_df, recordings_df, ti, bestmodel_id):
time_info = {}
users_df.createOrReplaceTempView('user')
playcounts_df.createOrReplaceTempView('playcount')
recordings_map = recordings_df.rdd.map(lambda r: (r['recording_id'], [r['track_name'], r['recording_msid'], r['artist_name'], r['artist_msid'], r['release_name'], r["release_msid"]]))
t0 = time.time()
recordings_map = recordings_df.rdd.map(lambda r: (r['recording_id'], [r['track_name'], r['recording_msid'],
r['artist_name'], r['artist_msid'], r['release_name'], r["release_msid"]]))
recordings_map.count()
t = "%.2f" % (time.time() - t0)
time_info['recordings_map'] = t

t0 = time.time()
all_recordings = recordings_map.keys()
all_recordings.count()
t = "%.2f" % (time.time() - t0)
time_info['all_recordings'] = t
date = datetime.utcnow().strftime("%Y-%m-%d")
path = os.path.join('/', 'data', 'listenbrainz', 'listenbrainz-recommendation-mode-{}'.format(date))
path = os.path.join('/', 'data', 'listenbrainz', '{}'.format(bestmodel_id))

print("Loading model...")
for attempt in range(config.MAX_RETRIES):
try:
t0 = time.time()
model = load_model(config.HDFS_CLUSTER_URI + path)
t = "%.2f" % (time.time() - t0)
time_info['load_model'] = t
break
except Exception as err:
sleep(config.TIME_BEFORE_RETRIES)
if attempt == config.MAX_RETRIES - 1:
raise SystemExit("%s.Aborting..." % (str(err)))
logging.error("Unable to load model: %s.Retrying in %ss" % (type(err).__name__, config.TIME_BEFORE_RETRIES))
logging.error("Unable to load model: %s.Retrying in %ss" % (str(err), config.TIME_BEFORE_RETRIES))

path = os.path.join(os.path.dirname(os.path.abspath(__file__)),'users.json')
with open(path) as f:
users = json.load(f)
num_users = len(users['user_name'])
recommendations = []
recommendations = {}
for user_name in users['user_name']:
try:
recommended_recordings, t = recommend_user(user_name, model, recordings_map)
user_info = recommend_user(user_name, model, recordings_map, all_recordings)
print("Recommendations for %s generated" % (user_name))
recommendations.append((user_name, t, recommended_recordings))
recommendations[user_name] = user_info
except TypeError as err:
logging.error("%s: Invalid user name. User \"%s\" does not exist." % (type(err).__name__,user_name))
except Exception as err:
Expand All @@ -83,11 +112,11 @@ def main(users_df, playcounts_df, recordings_df, t0):
column = ['Track Name', 'Recording MSID', 'Artist Name', 'Artist MSID', 'Release Name', 'Release MSID']
outputfile = 'Recommendations-%s.html' % (date)
context = {
'num_users' : num_users,
'recommendations' : recommendations,
'column' : column,
'total_time' : int(time.time() - t0),
'date' : date,
'total_time' : int(time.time() - ti),
'time' : time_info,
'best_model' : bestmodel_id,
}
utils.save_html(outputfile, context, 'recommend.html')

Expand Down
39 changes: 23 additions & 16 deletions listenbrainz_spark/recommendations/templates/model.html
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<html>
<head>
<meta charset="utf-8"/>
<title>Model Information</title>
<title>Data preprocessing and model training</title>
<style>
table {
font-family: arial, sans-serif;
Expand All @@ -15,44 +15,51 @@
text-align: left;
padding: 8px;
}

tr:nth-child(even) {
background-color: #dddddd;
}
</style>
</head>
<body>
<center>
<h2>Model Information</h2>
<h2>Data preprocessing and model training</h2>
</center>
<p>Collaborative filtering has been used to generate recommendations for users. From Listenbrainz data dump, <b>{{ total_listens }}</b> listens from <b>{{ lb_dump_time_window[0] }}</b> to <b>{{ lb_dump_time_window[1]}}</b> have been used to train, validate and test the model. Approximately, 65% data has been used to train the model, and the remaining data has been used to validate and test the model.</p>
<p>Sparks's inbuilt function to train a model takes an RDD of 'implicit preferences' given by users to some products, in the form of (userID (Int), productID (Int), preference (Double)) pairs. Here userID ~ user_id, productID ~ recording_id and preference ~ count as represented by rows in playcounts-dataframe.</p>
<p>Playcounts-dataframe is converetd to an RDD and each row is mapped to object of Rating class using <blockquote>Rating(user_id, recording_id, count)</blockquote></p>
<p>Preprocessing of playcounts-dataframe takes <b>{{ time.preprocessing }}s</b>. Of the preprocessed data, approx. 66% ({{ num_training }}) listens have been used as training data, 17% ({{ num_validation }}) listens have been used as validation data and 17% ({{ num_test }}) listens have been used as test data. After preprocessing, training phase starts. From the models trained, the best one is selected to generate recommendations.</p>


<h4>Following are the parameters required to train the model</h4>
<ul>
<li><b>rank</b></li><p>This refers to the number of factors in our ALS model, that is,the number of hidden features in our low-rank approximation matrices. </p>
<li><b>rank</b></li><p>This refers to the number of factors in our ALS model, that is,the number of hidden features in our low-rank approximation matrices. A rank in the range of 10 to 200 is usually reasonable</p>
<li><b>lmbda</b></li><p>This parameter controls the regularization of our model.Thus, lambda controls over fitting.</p>
<li><b>iterations</b></li><p>This refers to the number of iterations to run.</p>
<li><b>iterations</b></li><p>This refers to the number of iterations to run(around 10 is often a good default).</p>
<li><b>alpha</b></li><p>The alpha parameter controls the baseline level of confidence weighting applied.A higher level of alpha tends to make the model more confident about the fact that missing data equates to no preference for the relevant user-item pair.</p>
</ul>
<p>Value of alpha used is <b>3.0</b></p>
<p>The Mean Squared Error (MSE) is a direct measure of the reconstruction error of the user-item rating matrix. It is defined as the sum of the squared errors divided by the number of observations. The squared error, in turn, is the square of the difference between the predicted rating for a given user-item pair and the actual rating.</p>
<p>Ratings are predicted for all the (user_id, recording_id) pairs in validation data, the predicted ratings are then subtracted with actual ratings and RMSE is calculated.</p>
<p>The following table gives information about the parameters fed to the model in every iteration</p>
<p><b>Note</b>: <i>Here, iteration does not refer to the parameter "iteration", but the number of times the whole process of training is carried out.</p>
<p><b>Note</b>: <i>Here, iteration does not refer to the parameter "iteration", but the number of times the whole process of training is carried out i.e. number of models trained.</p>
<table style="width:100%">
<tr>
<th>model ID</th>
<th>model training time(sec)</th>
<th>rank</th>
<th>lmbda</th>
<th>iterations</th>
<th>validation rmse</th>
<th>RMSE</th>
<th>RMSE computation time(sec)</th>
</tr>
{% for row in model.training_metadata -%}
{% for model in models -%}
<tr>
{% for i in row -%}
{% for i in model -%}
<td>{{ i }}</td>
{% endfor -%}
</tr>
{% endfor -%}
</table>
<p>Value of alpha used is <b>3.0</b></p>
<p>Best model has error = <b>{{ model.best_model.error }}</b>, rank = <b>{{ model.best_model.rank }}</b>, lmbda = <b>{{ model.best_model.lmbda }}</b>, iteration = <b>{{ model.best_model.iteration}}</b>.</p>
<p>Best model trained in <b>{{ time }}</b> seconds</p>
<p>All the above listed models trained in <b>{{ time.models }}s</b></p>
<p>Best Model trained in <b>{{ best_model.time }}s</b></p>
<p>Best model has error = <b>{{ best_model.error }}</b>, rank = <b>{{ best_model.rank }}</b>, lmbda = <b>{{ best_model.lmbda }}</b>, iteration = <b>{{ best_model.iteration}}</b>, model ID = <b>{{ best_model.model_id}}</b></p>
<p>Best Model saved in <b>{{ time.save_model }}s</b></p>
<p><i>The final step is to generate recommendations using the best model: </i><a href={{ link }}>click here</a></p>
</body>
</html>

0 comments on commit c93cc7a

Please sign in to comment.