-
-
Notifications
You must be signed in to change notification settings - Fork 210
/
similar_users.py
157 lines (137 loc) · 6.72 KB
/
similar_users.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
from operator import itemgetter
import time
import sqlalchemy
import psycopg2
from psycopg2.errors import OperationalError
from psycopg2.extras import execute_values
import ujson
from flask import current_app
from listenbrainz import db
ROWS_PER_BATCH = 1000
def import_user_similarities(data):
""" Import the user similarities into the DB by inserting the data into a new table
and then rotating the table into place atomically.
Returns a tuple of three values:
(user_count, avr_similar_users_per_user, error)
If an error occurs rotating the tables in place, error will be a non-empty
string and the user count values will be 0. Upon success error will be empty
and the user count values will be set accordingly.
"""
user_count = 0
target_user_count = 0
# Start by importing the data into an import table
conn = db.engine.raw_connection()
try:
with conn.cursor() as curs:
curs.execute(
"""DROP TABLE IF EXISTS recommendation.similar_user_import""")
curs.execute("""CREATE TABLE recommendation.similar_user_import (
user_id INTEGER NOT NULL,
similar_users JSONB)""")
query = "INSERT INTO recommendation.similar_user_import VALUES %s"
values = []
for user, similar in data.items():
values.append((user, ujson.dumps(similar)))
user_count += 1
target_user_count += len(similar.keys())
if len(values) == ROWS_PER_BATCH:
execute_values(curs, query, values, template=None)
values = []
execute_values(curs, query, values, template=None)
conn.commit()
except psycopg2.errors.OperationalError as err:
conn.rollback()
current_app.logger.error(
"Error: Cannot import user similarites: %s" % str(err))
return (0, 0.0, "Error: Cannot import user similarites: %s" % str(err))
# Next lookup user names and insert them into the new similar_users table
try:
with conn.cursor() as curs:
curs.execute(
"""DROP TABLE IF EXISTS recommendation.tmp_similar_user""")
curs.execute("""CREATE TABLE recommendation.tmp_similar_user
(LIKE recommendation.similar_user
EXCLUDING INDEXES
EXCLUDING CONSTRAINTS
INCLUDING DEFAULTS)""")
curs.execute("""INSERT INTO recommendation.tmp_similar_user
SELECT id AS user_id, similar_users
FROM recommendation.similar_user_import
JOIN "user"
ON user_id = "user".id""")
curs.execute("""DROP TABLE recommendation.similar_user_import""")
# Give each constraint a unique name so that we don't have to deal with PITA constraint renaming
curs.execute("""CREATE UNIQUE INDEX user_id_ndx_similar_user_%s
ON recommendation.tmp_similar_user (user_id)""" % int(time.time()))
curs.execute("""ALTER TABLE recommendation.tmp_similar_user
ADD CONSTRAINT similar_user_user_id_foreign_key_%s
FOREIGN KEY (user_id)
REFERENCES "user" (id)
ON DELETE CASCADE""" % int(time.time()))
conn.commit()
except psycopg2.errors.OperationalError as err:
conn.rollback()
current_app.logger.error(
"Error: Cannot correlate user similarity user name: %s" % str(err))
return (0, 0.0, "Error: Cannot correlate user similarity user name: %s" % str(err))
# Finally rotate the table into place
try:
with conn.cursor() as curs:
curs.execute("""ALTER TABLE recommendation.similar_user
RENAME TO delete_similar_user""")
curs.execute("""ALTER TABLE recommendation.tmp_similar_user
RENAME TO similar_user""")
conn.commit()
except psycopg2.errors.OperationalError as err:
conn.rollback()
current_app.logger.error(
"Error: Failed to rotate similar_users table into place: %s" % str(err))
return (0, 0.0, "Error: Failed to rotate similar_users table into place: %s" % str(err))
# Last, delete the old table
try:
with conn.cursor() as curs:
curs.execute(
"""DROP TABLE recommendation.delete_similar_user CASCADE""")
conn.commit()
except psycopg2.errors.OperationalError as err:
conn.rollback()
current_app.logger.error(
"Error: Failed to clean up old similar user table: %s" % str(err))
return (0, 0.0, "Error: Failed to clean up old similar user table: %s" % str(err))
return (user_count, target_user_count / user_count, "")
def get_top_similar_users(connection, count: int = 200):
"""
Fetch the count top similar users and return a tuple(user1, user2, score(0.0-1.0))
If global_similarity is True, the return the user similarity on a global (not
per user) scale.
"""
similar_users = {}
try:
result = connection.execute("""
SELECT u.musicbrainz_id AS user_name
, ou.musicbrainz_id AS other_user_name
, value->1 AS similarity -- first element of array is similarity, second is global_similarity
FROM recommendation.similar_user r
JOIN jsonb_each(r.similar_users) j
ON TRUE
JOIN "user" ou
ON j.key::int = ou.id -- user_name of other user stored in jsonb
JOIN "user" u
ON r.user_id = u.id -- user_name of the user_id stored directly in column
""")
while True:
row = result.fetchone()
if not row:
break
user = row["user_name"]
other_user = row["other_user_name"]
similarity = "%.3f" % row["similarity"]
if user < other_user:
similar_users[user + other_user] = (user, other_user, similarity)
else:
similar_users[other_user + user] = (other_user, user, similarity)
except psycopg2.errors.OperationalError as err:
current_app.logger.error("Error: Failed to fetch top similar users %s" % str(err))
return []
similar_users = [similar_users[u] for u in similar_users]
return sorted(similar_users, key=itemgetter(2), reverse=True)[:count]