Skip to content
This repository has been archived by the owner on Jan 19, 2020. It is now read-only.

Commit

Permalink
script as per the guide
Browse files Browse the repository at this point in the history
  • Loading branch information
vansika committed Apr 23, 2019
1 parent a1d3a37 commit ddb752d
Show file tree
Hide file tree
Showing 13 changed files with 439 additions and 139 deletions.
120 changes: 120 additions & 0 deletions create_dataframes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import listenbrainz_spark
import os
import sys
import logging
import time

from listenbrainz_spark import config
from pyspark.sql import Row, SparkSession
from datetime import datetime
from listenbrainz_spark.stats import run_query
from listenbrainz_spark.recommendations import train_models, recommend
from time import sleep

def prepare_user_data(table):
t0 = time.time()
users_df = run_query("""
SELECT user_name
, row_number() over (ORDER BY "user_name") as user_id
From (SELECT DISTINCT user_name FROM %s)
""" % (table))
print("Number of rows in users df: ",users_df.count())
t = "%.2f" % (time.time() - t0)
print("Users data prepared in %ss" % (t))
return users_df

def prepare_listen_data(table):
t0 = time.time()
listens_df = run_query("""
SELECT listened_at
, track_name
, recording_msid
, user_name
From %s
""" % (table))
print("Number of rows in listens df:",listens_df.count())
t = "%.2f" % (time.time() - t0)
print("Listens data prepared in %ss" % (t))
return listens_df

def prepare_recording_data(table):
t0 = time.time()
recordings_df = run_query("""
SELECT track_name
, recording_msid
, artist_name
, artist_msid
, release_name
, release_msid
, row_number() over (ORDER BY "recording_msid") AS recording_id
From (SELECT DISTINCT recording_msid, track_name, artist_name, artist_msid, release_name, release_msid FROM %s)
""" % (table))
print("Number of rows in recording df:",recordings_df.count())
t = "%.2f" % (time.time() - t0)
print("Recording data prepared in %ss" % (t))
return recordings_df

def get_playcounts_data(listens_df, users_df, recordings_df):
t0 = time.time()
listens_df.createOrReplaceTempView('listen')
users_df.createOrReplaceTempView('user')
recordings_df.createOrReplaceTempView('recording')
playcounts_df = run_query("""
SELECT user_id,
recording_id,
count(recording_id) as count
FROM listen
INNER JOIN user
ON listen.user_name = user.user_name
INNER JOIN recording
ON recording.recording_msid = listen.recording_msid
GROUP BY user_id, recording_id
ORDER BY user_id
""")
print("Number of rows in playcounts df:",playcounts_df.count())
t = "%.2f" % (time.time() - t0)
print("Playcount data prepared in %ss" % (t))
return playcounts_df

if __name__ == '__main__':

t0 = time.time()
listenbrainz_spark.init_spark_session('Create_Dataframe')
df = None
for y in range(config.starting_year, config.ending_year + 1):
for m in range(config.starting_month, config.ending_month + 1):
try:
month = listenbrainz_spark.sql_context.read.parquet('{}/data/listenbrainz/{}/{}.parquet'.format(config.HDFS_CLUSTER_URI, y, m))
df = df.union(month) if df else month
except Exception as err:
logging.error("Cannot read files from HDFS: %s / %s. Aborting." % (type(err).__name__, str(err)))
continue

df.printSchema()
print("Registering Dataframe...")
date = datetime.utcnow()
table = 'df_to_train_{}'.format(datetime.strftime(date, '%Y_%m_%d'))
df.createOrReplaceTempView(table)
t = "%.2f" % (time.time() - t0)
print("Dataframe registered in %ss" % (t))

print("Preparing user data...")
users_df = prepare_user_data(table)
print("Load data dump...")
listens_df = prepare_listen_data(table)
print("Prepare recording dump...")
recordings_df = prepare_recording_data(table)
print("Get playcounts...")
playcounts_df = get_playcounts_data(listens_df, users_df, recordings_df)
lb_dump_time_window = ("{}-{}".format(config.starting_year, "%02d" % config.starting_month), "{}-{}".format(config.ending_year, "%02d" % config.ending_month))

for attempt in range(config.MAX_RETRIES):
try:
train_models.main(playcounts_df, lb_dump_time_window)
break
except Exception as err:
sleep(config.TIME_BEFORE_RETRIES)
if attempt == config.MAX_RETRIES - 1:
raise SystemExit("%s.Aborting..." % (str(err)))
logging.error("Unable to train the model: %s. Retrying in %ss." % (type(err).__name__,config.TIME_BEFORE_RETRIES))
recommend.main(users_df, playcounts_df, recordings_df, t0)
8 changes: 8 additions & 0 deletions listenbrainz_spark/config.py.sample
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
HDFS_HTTP_URI = 'http://hadoop-master:9870' # the URI of the http webclient for HDFS

HDFS_CLUSTER_URI = 'hdfs://hadoop-master:9000' # the URI to be used with Spark

MAX_RETRIES = 5
TIME_BEFORE_RETRIES = 4

starting_month = 1
ending_month = 12
starting_year = 2017
ending_year = 2019
95 changes: 95 additions & 0 deletions listenbrainz_spark/recommendations/recommend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import sys
import os
import tempfile
import time
import listenbrainz_spark
import json
import logging

from pyspark.mllib.recommendation import MatrixFactorizationModel
from listenbrainz_spark import config
from datetime import datetime
from listenbrainz_spark.stats import run_query
from listenbrainz_spark.recommendations import utils
from time import sleep

def load_model(path):
return MatrixFactorizationModel.load(listenbrainz_spark.context, path)

def get_user_id(user_name):
result = run_query("""
SELECT user_id
FROM user
WHERE user_name = '%s'
""" % user_name)
return result.first()['user_id']

def recommend_user(user_name, model, recordings_map):
user_id = get_user_id(user_name)
user_playcounts = run_query("""
SELECT user_id,
recording_id,
count
FROM playcount
WHERE user_id = %d
""" % user_id)

user_recordings = user_playcounts.rdd.map(lambda r: r['recording_id'])
user_recordings.count()
all_recordings = recordings_map.keys()
all_recordings.count()
candidate_recordings = all_recordings.subtract(user_recordings)
candidate_recordings.count()
t0 = time.time()
recommendations = model.predictAll(candidate_recordings.map(lambda recording: (user_id, recording))).takeOrdered(50, lambda product: -product.rating)
recommended_recordings = [recordings_map.lookup(recommendations[i].product)[0] for i in range(len(recommendations))]
t = "%.2f" % (time.time() - t0)
return recommended_recordings, t

def main(users_df, playcounts_df, recordings_df, t0):
users_df.createOrReplaceTempView('user')
playcounts_df.createOrReplaceTempView('playcount')
recordings_map = recordings_df.rdd.map(lambda r: (r['recording_id'], [r['track_name'], r['recording_msid'], r['artist_name'], r['artist_msid'], r['release_name'], r["release_msid"]]))
recordings_map.count()
date = datetime.utcnow().strftime("%Y-%m-%d")
path = os.path.join('/', 'data', 'listenbrainz', 'listenbrainz-recommendation-mode-{}'.format(date))

print("Loading model...")
for attempt in range(config.MAX_RETRIES):
try:
model = load_model(config.HDFS_CLUSTER_URI + path)
break
except Exception as err:
sleep(config.TIME_BEFORE_RETRIES)
if attempt == config.MAX_RETRIES - 1:
raise SystemExit("%s.Aborting..." % (str(err)))
logging.error("Unable to load model: %s.Retrying in %ss" % (type(err).__name__, config.TIME_BEFORE_RETRIES))

path = os.path.join(os.path.dirname(os.path.abspath(__file__)),'users.json')
with open(path) as f:
users = json.load(f)
num_users = len(users['user_name'])
recommendations = []
for user_name in users['user_name']:
try:
recommended_recordings, t = recommend_user(user_name, model, recordings_map)
print("Recommendations for %s generated" % (user_name))
recommendations.append((user_name, t, recommended_recordings))
except TypeError as err:
logging.error("%s: Invalid user name. User \"%s\" does not exist." % (type(err).__name__,user_name))
except Exception as err:
logging.error("Recommendations for \"%s\" not generated.%s" % (user_name, str(err)))

column = ['Track Name', 'Recording MSID', 'Artist Name', 'Artist MSID', 'Release Name', 'Release MSID']
outputfile = 'Recommendations-%s.html' % (date)
context = {
'num_users' : num_users,
'recommendations' : recommendations,
'column' : column,
'total_time' : int(time.time() - t0),
'date' : date,
}
utils.save_html(outputfile, context, 'recommend.html')



Empty file.
58 changes: 58 additions & 0 deletions listenbrainz_spark/recommendations/templates/model.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8"/>
<title>Model Information</title>
<style>
table {
font-family: arial, sans-serif;
border-collapse: collapse;
width: 100%;
}

td, th {
border: 1px solid #dddddd;
text-align: left;
padding: 8px;
}

tr:nth-child(even) {
background-color: #dddddd;
}
</style>
</head>
<body>
<center>
<h2>Model Information</h2>
</center>
<p>Collaborative filtering has been used to generate recommendations for users. From Listenbrainz data dump, <b>{{ total_listens }}</b> listens from <b>{{ lb_dump_time_window[0] }}</b> to <b>{{ lb_dump_time_window[1]}}</b> have been used to train, validate and test the model. Approximately, 65% data has been used to train the model, and the remaining data has been used to validate and test the model.</p>

<h4>Following are the parameters required to train the model</h4>
<ul>
<li><b>rank</b></li><p>This refers to the number of factors in our ALS model, that is,the number of hidden features in our low-rank approximation matrices. </p>
<li><b>lmbda</b></li><p>This parameter controls the regularization of our model.Thus, lambda controls over fitting.</p>
<li><b>iterations</b></li><p>This refers to the number of iterations to run.</p>
<li><b>alpha</b></li><p>The alpha parameter controls the baseline level of confidence weighting applied.A higher level of alpha tends to make the model more confident about the fact that missing data equates to no preference for the relevant user-item pair.</p>
</ul>
<p>The following table gives information about the parameters fed to the model in every iteration</p>
<p><b>Note</b>: <i>Here, iteration does not refer to the parameter "iteration", but the number of times the whole process of training is carried out.</p>
<table style="width:100%">
<tr>
<th>rank</th>
<th>lmbda</th>
<th>iterations</th>
<th>validation rmse</th>
</tr>
{% for row in model.training_metadata -%}
<tr>
{% for i in row -%}
<td>{{ i }}</td>
{% endfor -%}
</tr>
{% endfor -%}
</table>
<p>Value of alpha used is <b>3.0</b></p>
<p>Best model has error = <b>{{ model.best_model.error }}</b>, rank = <b>{{ model.best_model.rank }}</b>, lmbda = <b>{{ model.best_model.lmbda }}</b>, iteration = <b>{{ model.best_model.iteration}}</b>.</p>
<p>Best model trained in <b>{{ time }}</b> seconds</p>
</body>
</html>
47 changes: 47 additions & 0 deletions listenbrainz_spark/recommendations/templates/recommend.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8"/>
<title>Recommendations</title>
<style>
table {
font-family: arial, sans-serif;
border-collapse: collapse;
width: 100%;
}

td, th {
border: 1px solid #dddddd;
text-align: left;
padding: 8px;
}

tr:nth-child(even) {
background-color: #dddddd;
}
</style>
</head>
<body>
<center>
<h1>Recommendations generated for {{ num_users }} users on {{ date }}</h1>
</center>
<p><i>Total time lapsed in training model and generating recommendations: <b>{{ total_time }}s</b></i></p>
{% for row in recommendations -%}
<h4>Recommendations for {{ row[0] }} generated in {{ row[1] }}s</h4>
<table style="width:100%">
<tr>
{% for col in column -%}
<th>{{ col }}</th>
{% endfor -%}
</tr>
{% for recommended_recordings in row[2] -%}
<tr>
{% for entity in recommended_recordings -%}
<td>{{ entity }}</td>
{% endfor -%}
</tr>
{% endfor -%}
</table>
{% endfor -%}
</body>
</html>

0 comments on commit ddb752d

Please sign in to comment.