Skip to content

Commit

Permalink
Merge pull request #44 from lenskit/feature/sharing
Browse files Browse the repository at this point in the history
Add multi-process recommendation on Linux/macOS.
  • Loading branch information
mdekstrand committed Nov 22, 2018
2 parents 08f9551 + 5d7325d commit 0380d3b
Show file tree
Hide file tree
Showing 15 changed files with 192 additions and 89 deletions.
13 changes: 6 additions & 7 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
displayName: 'Build LKPY'
- script: |
python3 setup.py test
python3 -m pytest
displayName: 'Test LKPY'
- script: |
Expand Down Expand Up @@ -107,7 +107,7 @@ jobs:
displayName: 'Download ML-100K'
- script: |
python3 setup.py test
python3 -m pytest
displayName: 'pytest'
- script: |
Expand Down Expand Up @@ -151,7 +151,7 @@ jobs:
displayName: 'Install partial extra deps'
- script: |
python3 setup.py test
python3 -m pytest
displayName: 'pytest'
- script: |
Expand Down Expand Up @@ -198,7 +198,7 @@ jobs:
displayName: 'Install dependencies'
- script: |
python setup.py test
python -m pytest
displayName: 'pytest'
- job: 'WindowsConda'
Expand All @@ -225,8 +225,7 @@ jobs:
displayName: 'Build LKPY'
- script: |
python -V
python setup.py test
python -m pytest
displayName: 'Test LKPY'
- job: 'MacConda'
Expand Down Expand Up @@ -260,5 +259,5 @@ jobs:
displayName: 'Build LKPY'
- script: |
python3 setup.py test
python3 -m pytest
displayName: 'Test LKPY'
2 changes: 1 addition & 1 deletion lenskit/algorithms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def train(self, ratings):
Returns:
the trained model (of an implementation-defined type).
"""
raise NotImplemented()
raise NotImplementedError()

def save_model(self, model, path):
"""
Expand Down
6 changes: 6 additions & 0 deletions lenskit/algorithms/als.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ class BiasedMF(Predictor, Trainable):
iterations(int): the number of iterations to train
reg(double): the regularization factor
damping(double): damping factor for the underlying mean
Attributes:
features(int): the number of features.
iterations(int): the number of training iterations.
regularization(double): the regularization factor.
damping(double): the mean damping.
"""
timer = None

Expand Down
16 changes: 14 additions & 2 deletions lenskit/algorithms/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,20 @@

import pandas as pd

from .. import util as lku
from .. import check
from . import Predictor, Trainable, Recommender

_logger = logging.getLogger(__name__)

BiasModel = namedtuple('BiasModel', ['mean', 'items', 'users'])
BiasModel.__doc__ = "Trained model for the :py:class:`Bias` algorithm."
BiasModel.__doc__ = '''
Trained model for the :py:class:`Bias` algorithm.
Attributes:
mean(double): the global mean.
items(pandas.Series): the item means.
users(pandas.Series): the user means.
'''


class Bias(Predictor, Trainable):
Expand Down Expand Up @@ -288,6 +294,9 @@ def recommend(self, model, user, n=None, candidates=None, ratings=None):
scores.index.name = 'item'
return scores.reset_index()

def __str__(self):
return 'TN/' + str(self.predidctor)


class _TrainableTopN(TopN, Trainable):
"""
Expand All @@ -302,3 +311,6 @@ def save_model(self, model, path):

def load_model(self, path):
return self.predictor.load_model(path)

def __str__(self):
return 'TTN/' + str(self.predictor)
28 changes: 8 additions & 20 deletions lenskit/algorithms/item_knn.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,6 @@ def predict(self, model, user, items, ratings=None):
iscore = np.full(len(model.items), np.nan, dtype=np.float_)

# now compute the predictions
_logger.debug('aggregating with function %s', self._predict_agg)
iscore = self._predict_agg(model.sim_matrix,
len(model.items),
(self.min_neighbors, self.max_neighbors),
Expand All @@ -363,14 +362,12 @@ def save_model(self, model, path):
path = pathlib.Path(path)
_logger.info('saving I-I model to %s', path)

np.savez_compressed(path, items=model.items.values, users=model.users.values,
means=model.means,
s_rows=matrix.csr_rowinds(model.sim_matrix),
s_cols=model.sim_matrix.colinds,
s_vals=model.sim_matrix.values,
r_rows=matrix.csr_rowinds(model.rating_matrix),
r_cols=model.rating_matrix.colinds,
r_vals=model.rating_matrix.values)
data = dict(items=model.items.values, users=model.users.values,
means=model.means)
data.update(matrix.csr_save(model.sim_matrix, 's_'))
data.update(matrix.csr_save(model.rating_matrix, 'r_'))

np.savez_compressed(path, **data)

def load_model(self, path):
path = pathlib.Path(path)
Expand All @@ -381,25 +378,16 @@ def load_model(self, path):
items = npz['items']
users = npz['users']
means = npz['means']
s_rows = npz['s_rows']
s_cols = npz['s_cols']
s_vals = npz['s_vals']
r_rows = npz['r_rows']
r_cols = npz['r_cols']
r_vals = npz['r_vals']
s_mat = matrix.csr_load(npz, 's_')
r_mat = matrix.csr_load(npz, 'r_')

if means.dtype == np.object:
means = None
if r_vals.dtype == np.object:
r_vals = None

items = pd.Index(items, name='item')
users = pd.Index(users, name='user')
nitems = len(items)
nusers = len(users)

s_mat = matrix.csr_from_coo(s_rows, s_cols, s_vals, shape=(nitems, nitems))
r_mat = matrix.csr_from_coo(r_rows, r_cols, r_vals, shape=(nusers, nitems))
s_mat.sort_values()

_logger.info('read %d similarities for %d items', s_mat.nnz, nitems)
Expand Down
4 changes: 2 additions & 2 deletions lenskit/algorithms/mf_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ def load(cls, path):
path = util.npz_path(path)

with np.load(path) as npz:
users = pd.Index(npz['users'])
items = pd.Index(npz['items'])
users = pd.Index(npz['users'], name='user')
items = pd.Index(npz['items'], name='item')
umat = npz['umat']
imat = npz['imat']
return cls(users, items, umat, imat)
Expand Down
140 changes: 105 additions & 35 deletions lenskit/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import logging
import pathlib
import collections
from time import perf_counter
from functools import partial
import warnings
import multiprocessing as mp
from multiprocessing.pool import Pool

import pandas as pd
import numpy as np
Expand All @@ -23,7 +23,31 @@
_logger = logging.getLogger(__name__)


def predict(algo, pairs, model=None):
def __mp_init_data(algo, model, candidates, size):
global __rec_model, __rec_algo, __rec_candidates, __rec_size

__rec_algo = algo
__rec_model = model
__rec_candidates = candidates
__rec_size = size


def _predict_user(algo, model, user, udf):
watch = util.Stopwatch()
res = algo.predict(model, user, udf['item'])
res = pd.DataFrame({'user': user, 'item': res.index, 'prediction': res.values})
_logger.debug('%s produced %d/%d predictions for %s in %s',
algo, res.prediction.notna().sum(), len(udf), user, watch)
return res


def _predict_worker(job):
user, udf = job
res = _predict_user(__rec_algo, __rec_model, user, udf)
return res.to_msgpack()


def predict(algo, pairs, model=None, nprocs=None):
"""
Generate predictions for user-item pairs. The provided algorithm should be a
:py:class:`algorithms.Predictor` or a function of two arguments: the user ID and
Expand All @@ -45,23 +69,60 @@ def predict(algo, pairs, model=None):
result will also contain a `rating` column.
"""

if isinstance(algo, Predictor):
pfun = partial(algo.predict, model)
else:
pfun = None
if not isinstance(algo, Predictor):
_logger.warning('non-Predictor deprecated')
nprocs = None
pfun = algo

def run(user, udf):
res = pfun(user, udf.item)
return pd.DataFrame({'user': user, 'item': res.index, 'prediction': res.values})

ures = (run(user, udf) for (user, udf) in pairs.groupby('user'))
res = pd.concat(ures)
if nprocs and nprocs > 1 and mp.get_start_method() == 'fork':
__mp_init_data(algo, model, None, None)
_logger.info('starting predict process with %d workers', nprocs)
with Pool(nprocs) as pool:
results = pool.map(_predict_worker, pairs.groupby('user'))
results = [pd.read_msgpack(r) for r in results]
else:
results = []
for user, udf in pairs.groupby('user'):
if pfun:
res = pfun(user, udf['item'])
res = pd.DataFrame({'user': user, 'item': res.index, 'prediction': res.values})
else:
res = _predict_user(algo, model, user, udf)
results.append(res)

results = pd.concat(results)
if 'rating' in pairs:
return pairs.join(res.set_index(['user', 'item']), on=('user', 'item'))
return res
return pairs.join(results.set_index(['user', 'item']), on=('user', 'item'))
return results


def _recommend_user(algo, model, user, n, candidates):
_logger.debug('generating recommendations for %s', user)
watch = util.Stopwatch()
res = algo.recommend(model, user, n, candidates)
_logger.debug('%s recommended %d/%d items for %s in %s', algo, len(res), n, user, watch)
iddf = pd.DataFrame({'user': user, 'rank': np.arange(1, len(res) + 1)})
return pd.concat([iddf, res], axis='columns')

def recommend(algo, model, users, n, candidates, ratings=None):

def _recommend_seq(algo, model, users, n, candidates):
if isinstance(candidates, dict):
candidates = candidates.get
algo = Recommender.adapt(algo)
results = [_recommend_user(algo, model, user, n, candidates(user))
for user in users]
return results


def _recommend_worker(user):
candidates = __rec_candidates(user)
algo = Recommender.adapt(__rec_algo)
res = _recommend_user(algo, __rec_model, user, __rec_size, candidates)
return res.to_msgpack()


def recommend(algo, model, users, n, candidates, ratings=None, nprocs=None):
"""
Batch-recommend for multiple users. The provided algorithm should be a
:py:class:`algorithms.Recommender` or :py:class:`algorithms.Predictor` (which
Expand All @@ -85,20 +146,19 @@ def recommend(algo, model, users, n, candidates, ratings=None):
``score``, and any other columns returned by the recommender.
"""

if isinstance(candidates, dict):
candidates = candidates.get
algo = Recommender.adapt(algo)

results = []
for user in users:
_logger.debug('generating recommendations for %s', user)
ucand = candidates(user)
res = algo.recommend(model, user, n, ucand)
iddf = pd.DataFrame({'user': user, 'rank': np.arange(1, len(res) + 1)})
results.append(pd.concat([iddf, res], axis='columns'))
if nprocs and nprocs > 1 and mp.get_start_method() == 'fork':
__mp_init_data(algo, model, candidates, n)
_logger.info('starting recommend process with %d workers', nprocs)
with Pool(nprocs) as pool:
results = pool.map(_recommend_worker, users)
results = [pd.read_msgpack(r) for r in results]
else:
_logger.info('starting sequential recommend process')
results = _recommend_seq(algo, model, users, n, candidates)

results = pd.concat(results, ignore_index=True)
if ratings is not None:

if ratings is not None and 'rating' in ratings.columns:
# combine with test ratings for relevance data
results = pd.merge(results, ratings, how='left', on=('user', 'item'))
# fill in missing 0s
Expand Down Expand Up @@ -127,18 +187,25 @@ class MultiEval:
path(str or :py:class:`pathlib.Path`):
the working directory for this evaluation.
It will be created if it does not exist.
nrecs(int):
predict(bool):
whether to generate rating predictions.
recommend(int):
the number of recommendations to generate per user (None to disable top-N).
candidates:
the default candidate set generator.
candidates(function):
the default candidate set generator for recommendations. It should take the
training data and return a candidate generator, itself a function mapping user
IDs to candidate sets.
"""

def __init__(self, path, nrecs=100, candidates=topn.UnratedCandidates):
def __init__(self, path, predict=True,
recommend=100, candidates=topn.UnratedCandidates, nprocs=None):
self.workdir = pathlib.Path(path)
self.n_recs = nrecs
self.predict = predict
self.recommend = recommend
self.candidate_generator = candidates
self.algorithms = []
self.datasets = []
self.nprocs = nprocs

@property
def run_csv(self):
Expand Down Expand Up @@ -276,26 +343,29 @@ def _train_algo(self, algo, train):
return model, watch.elapsed()

def _predict(self, rid, algo, model, test):
if not self.predict:
return None, None
if not isinstance(algo, Predictor):
return None, None

watch = util.Stopwatch()
_logger.info('generating %d predictions for %s', len(test), algo)
preds = predict(algo, test, model)
preds = predict(algo, test, model, nprocs=self.nprocs)
watch.stop()
_logger.info('generated predictions in %s', watch)
preds['RunId'] = rid
preds = preds[['RunId', 'user', 'item', 'rating', 'prediction']]
return preds, watch.elapsed()

def _recommend(self, rid, algo, model, test, candidates):
if self.n_recs is None:
if self.recommend is None:
return None, None

watch = util.Stopwatch()
users = test.user.unique()
_logger.info('generating recommendations for %d users for %s', len(users), algo)
recs = recommend(algo, model, users, self.n_recs, candidates, test)
recs = recommend(algo, model, users, self.recommend, candidates, test,
nprocs=self.nprocs)
watch.stop()
_logger.info('generated recommendations in %s', watch)
recs['RunId'] = rid
Expand Down

0 comments on commit 0380d3b

Please sign in to comment.