Skip to content

Commit

Permalink
Merge pull request #52 from lenskit/feature/oneshot
Browse files Browse the repository at this point in the history
Allow individual multi jobs to be run
  • Loading branch information
mdekstrand committed Dec 21, 2018
2 parents fdf2d6e + 9ef6c72 commit 6f04018
Show file tree
Hide file tree
Showing 5 changed files with 292 additions and 20 deletions.
11 changes: 10 additions & 1 deletion lenskit/algorithms/implicit.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,16 @@ def recommend(self, model: ImplicitModel, user, n=None, candidates=None, ratings
return rec_df.loc[:, ['item', 'score']]

def __getattr__(self, name):
return self.algo_kwargs[name]
return self.__dict__['algo_kwargs'][name]

def __getstate__(self):
return (self.algo_class, self.algo_args, self.algo_kwargs)

def __setstate__(self, rec):
cls, args, kwargs = rec
self.algo_class = cls
self.algo_args = args
self.algo_kwargs = kwargs

def __str__(self):
return 'Implicit({}, {}, {})'.format(self.algo_class.__name__, self.algo_args, self.algo_kwargs)
Expand Down
115 changes: 99 additions & 16 deletions lenskit/batch/_multi.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import pathlib
import collections
import warnings
import json

import pandas as pd

Expand Down Expand Up @@ -45,17 +46,23 @@ class MultiEval:
the default candidate set generator for recommendations. It should take the
training data and return a candidate generator, itself a function mapping user
IDs to candidate sets.
combine(bool):
whether to combine output; if ``False``, output will be left in separate files, if
``True``, it will be in a single set of files (runs, recommendations, and preditions).
"""

def __init__(self, path, predict=True,
recommend=100, candidates=topn.UnratedCandidates, nprocs=None):
recommend=100, candidates=topn.UnratedCandidates,
nprocs=None, combine=True):
self.workdir = pathlib.Path(path)
self.predict = predict
self.recommend = recommend
self.candidate_generator = candidates
self.algorithms = []
self.datasets = []
self.nprocs = nprocs
self.combine_output = combine
self._is_flat = True

@property
def run_csv(self):
Expand Down Expand Up @@ -125,6 +132,9 @@ def add_datasets(self, data, name=None, candidates=None, **kwargs):
attrs['DataSet'] = name
attrs.update(kwargs)

if not isinstance(data, tuple):
self._is_flat = False

self.datasets.append(_DSRec(data, candidates, attrs))

def persist_data(self):
Expand All @@ -148,6 +158,7 @@ def persist_data(self):
test = fn
ds2.append(((train, test), cand_f, ds_attrs))
self.datasets = ds2
self._is_flat = True

def _normalize_ds_entry(self, entry):
# normalize data set to be an iterable of tuples
Expand Down Expand Up @@ -176,11 +187,32 @@ def _flat_runs(self):
for arec in self.algorithms:
yield (dse, arec)

def run(self):
def run_count(self):
"Get the number of runs in this evaluation."
if self._is_flat:
nds = len(self.datasets)
else:
_logger.warning('attempting to count runs in a non-flattened evaluation')
nds = len(list(self._flat_datasets()))
return nds * len(self.algorithms)

def run(self, runs=None):
"""
Run the evaluation.
Args:
runs(int or set-like):
If provided, a specific set of runs to run. Useful for splitting
an experiment into individual runs. This is a set of 1-based run
IDs, not 0-based indexes.
"""

if runs is not None and self.combine_output:
raise ValueError('Cannot select runs with combined output')

if runs is not None and not isinstance(runs, collections.Iterable):
runs = [runs]

self.workdir.mkdir(parents=True, exist_ok=True)

run_id = 0
Expand All @@ -190,6 +222,9 @@ def run(self):

for i, (dsrec, arec) in enumerate(self._flat_runs()):
run_id = i + 1
if runs is not None and run_id not in runs:
_logger.info('skipping deselected run %d', run_id)
continue

ds, cand_f, ds_attrs = dsrec
if cand_f is None:
Expand All @@ -208,10 +243,7 @@ def run(self):
_logger.info('finished run %d: %s on %s:%s', run_id, arec.algorithm,
ds_name, ds_part)
run_data.append(run)
run_df = pd.DataFrame(run_data)
# overwrite files to show progress
run_df.to_csv(self.run_csv, index=False)
run_df.to_parquet(self.run_file, compression=None)
self._write_run(run, run_data)

def _run_algo(self, run_id, arec, data):
train, test, dsp_attrs, cand = data
Expand All @@ -225,11 +257,11 @@ def _run_algo(self, run_id, arec, data):

preds, pred_time = self._predict(run_id, arec.algorithm, model, test)
run['PredTime'] = pred_time
self._write_results(self.preds_file, preds, append=run_id > 1)
self._write_results('predictions', preds, run_id)

recs, rec_time = self._recommend(run_id, arec.algorithm, model, test, cand)
run['RecTime'] = rec_time
self._write_results(self.recs_file, recs, append=run_id > 1)
self._write_results('recommendations', recs, run_id)

return run

Expand Down Expand Up @@ -270,15 +302,66 @@ def _recommend(self, rid, algo, model, test, candidates):
recs['RunId'] = rid
return recs, watch.elapsed()

def _write_results(self, file, df, append=True):
def _write_run(self, run, run_data):
if self.combine_output:
run_df = pd.DataFrame(run_data)
# overwrite files to show progress
run_df.to_csv(self.run_csv, index=False)
run_df.to_parquet(self.run_file, compression=None)
else:
rf = self.workdir / 'run-{}.json'.format(run['RunId'])
with rf.open('w') as f:
json.dump(run, f)

def _write_results(self, name, df, run_id):
if df is None:
return

if fastparquet is not None:
fastparquet.write(str(file), df, append=append, compression='snappy')
elif append and file.exists():
warnings.warn('fastparquet not available, appending is slow')
odf = pd.read_parquet(str(file))
pd.concat([odf, df]).to_parquet(str(file))
if self.combine_output:
out = self.workdir / '{}.parquet'.format(name)
_logger.info('run %d: writing predictions to %s', run_id, out)
append = run_id > 1
util.write_parquet(out, df, append=append)
else:
df.to_parquet(str(file))
out = self.workdir / '{}-{}.parquet'.format(name, run_id)
_logger.info('run %d: writing predictions to %s', run_id, out)
df.to_parquet(out)

def collect_results(self):
"""
Collect the results from non-combined runs into combined output files.
"""

oc = self.combine_output
try:
self.combine_output = True
n = self.run_count()
runs = (self._read_json('run-{}.json', i+1) for i in range(n))
runs = pd.DataFrame.from_records(runs)
runs.to_parquet(self.run_file)
runs.to_csv(self.run_csv, index=False)

for i in range(n):
preds = self._read_parquet('predictions-{}.parquet', i+1)
self._write_results('predictions', preds, i+1)
recs = self._read_parquet('recommendations-{}.parquet', i+1)
self._write_results('recommendations', recs, i+1)
finally:
self.combine_output = oc

def _read_parquet(self, name, *args):
fn = self.workdir / name.format(*args)
if not fn.exists():
_logger.warning('file %s does not exist', fn)
return None

return pd.read_parquet(fn)

def _read_json(self, name, *args):
fn = self.workdir / name.format(*args)
if not fn.exists():
_logger.warning('file %s does not exist', fn)
return {}

with fn.open('r') as f:
return json.load(f)
49 changes: 48 additions & 1 deletion lenskit/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,24 @@
Miscellaneous utility functions.
"""

import os
import os.path
import time
import pathlib
import warnings

from numba import jitclass, njit, int32, double
import numpy as np
import pandas as pd

try:
import fastparquet
except ImportError:
fastparquet = None


__os_fp = getattr(os, 'fspath', None)


@njit
def _ind_downheap(pos: int, size, keys, values):
Expand Down Expand Up @@ -122,8 +133,23 @@ def __str__(self):
elapsed = self.elapsed()
if elapsed < 1:
return "{: 0.0f}ms".format(elapsed * 1000)
elif elapsed > 60 * 60:
h, m = divmod(elapsed, 60 * 60)
m, s = divmod(m, 60)
return "{:0.0f}h{:0.0f}m{:0.2f}s".format(h, m, s)
elif elapsed > 60:
m, s = divmod(elapsed, 60)
return "{:0.0f}m{:0.2f}s".format(m, s)
else:
return "{: 0.2f}s".format(elapsed)
return "{:0.2f}s".format(elapsed)


def fspath(path):
"Backport of :py:fun:`os.fspath` function for Python 3.5."
if __os_fp:
return __os_fp(path)
else:
return str(path)


def npz_path(path):
Expand Down Expand Up @@ -161,6 +187,27 @@ def read_df_detect(path):
return pd.read_parquet(path)


def write_parquet(path, frame, append=False):
"""
Write a Parquet file.
Args:
path(pathlib.Path): The path of the Parquet file to write.
frame(pandas.DataFrame): The data to write.
append(bool): Whether to append to the file or overwrite it.
"""
fn = fspath(path)
append = append and os.path.exists(fn)
if fastparquet is not None:
fastparquet.write(fn, frame, append=append, compression='snappy')
elif append:
warnings.warn('fastparquet not available, appending is slow')
odf = pd.read_parquet(fn)
pd.concat([odf, frame], ignore_index=True).to_parquet(fn)
else:
frame.to_parquet(fn)


class LastMemo:
def __init__(self, func):
self.function = func
Expand Down

0 comments on commit 6f04018

Please sign in to comment.