diff --git a/.gitignore b/.gitignore index 3743bbd..429fec3 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,4 @@ dist/ *.egg-info/ Makefile !docs/Makefile +version.py diff --git a/CMakeLists.txt b/CMakeLists.txt index 71cefd3..aa77e3c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -4,12 +4,6 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14 -fPIC -Wall -Wextra -fopenmp include(GNUInstallDirs) -include_directories("./include") -include_directories("./3rd/json11") -include_directories("./3rd/spdlog/include") -include_directories("./3rd/n2/include") -include_directories("./3rd/eigen3") - set(SOURCES "./3rd/json11/json11.cpp" ) @@ -20,8 +14,16 @@ file(GLOB CFR_SRCS ${CMAKE_CURRENT_SOURCE_DIR}/lib/algo_impl/cfr/*.cc) file(GLOB BPR_SRCS ${CMAKE_CURRENT_SOURCE_DIR}/lib/algo_impl/bpr/*.cc) file(GLOB WARP_SRCS ${CMAKE_CURRENT_SOURCE_DIR}/lib/algo_impl/warp/*.cc) file(GLOB W2V_SRCS ${CMAKE_CURRENT_SOURCE_DIR}/lib/algo_impl/w2v/*.cc) -add_library(cbuffalo SHARED ${SOURCES} ${ALGO_SRCS} ${ALS_SRCS} ${CFR_SRCS} ${BPR_SRCS} ${WARP_SRCS} ${W2V_SRCS} ${MISC_SRCS}) -include_directories(cbuffalo "/usr/local/include/eigen3") +file(GLOB PLSI_SRCS ${CMAKE_CURRENT_SOURCE_DIR}/lib/algo_impl/plsi/*.cc) +add_library(cbuffalo SHARED ${SOURCES} ${ALGO_SRCS} ${ALS_SRCS} ${CFR_SRCS} ${BPR_SRCS} ${WARP_SRCS} ${W2V_SRCS} ${PLSI_SRCS} ${MISC_SRCS}) +target_include_directories(cbuffalo +PRIVATE + ./include + ./3rd/json11 + ./3rd/spdlog/include + ./3rd/n2/include + ./3rd/eigen3 +) set_target_properties(cbuffalo PROPERTIES VERSION 0.1.0) set_target_properties(cbuffalo PROPERTIES SOVERSION 1) @@ -34,3 +36,34 @@ install(DIRECTORY "${CMAKE_SOURCE_DIR}/include/buffalo" # source directory FILES_MATCHING # install only matched files PATTERN "*.hpp" # select header files ) + +set(N2_DIR "./3rd/n2") +file(GLOB_RECURSE N2_SRC CONFIGURE_DEPENDS ${N2_DIR}/src/*.cc) +add_library(n2 SHARED ${N2_SRC}) +target_compile_options(n2 PRIVATE + ${OpenMP_CXX_FLAGS} + "-DBOOST_DISABLE_ASSERTS" +) +target_link_libraries(n2 PRIVATE pthread) +target_include_directories(n2 +PRIVATE + ${N2_DIR}/include/ + ${N2_DIR}/third_party/eigen/ + ${N2_DIR}/third_party/spdlog/include/ + ${N2_DIR}/third_party/boost/mpl/include/ + ${N2_DIR}/third_party/boost/bind/include/ + ${N2_DIR}/third_party/boost/core/include/ + ${N2_DIR}/third_party/boost/heap/include/ + ${N2_DIR}/third_party/boost/mp11/include/ + ${N2_DIR}/third_party/boost/assert/include/ + ${N2_DIR}/third_party/boost/config/include/ + ${N2_DIR}/third_party/boost/detail/include/ + ${N2_DIR}/third_party/boost/utility/include/ + ${N2_DIR}/third_party/boost/iterator/include/ + ${N2_DIR}/third_party/boost/parameter/include/ + ${N2_DIR}/third_party/boost/type_traits/include/ + ${N2_DIR}/third_party/boost/preprocessor/include/ + ${N2_DIR}/third_party/boost/concept_check/include/ + ${N2_DIR}/third_party/boost/static_assert/include/ + ${N2_DIR}/third_party/boost/throw_exception/include/ +) diff --git a/MANIFEST.in b/MANIFEST.in index 204801f..34ec7c6 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,6 +1,7 @@ include cuda_setup.py include CMakeLists.txt include requirements.txt +include pyproject.toml include tests/res/*.json recursive-exclude buffalo/ *.cpp recursive-include buffalo/ *.pyx diff --git a/NOTICE.md b/NOTICE.md index 6c5337c..7ac7e24 100644 --- a/NOTICE.md +++ b/NOTICE.md @@ -114,16 +114,6 @@ Copyright 2005-2018, NumPy Developers. BSD 3-Clause "New" or "Revised" License - **pathlib** - -https://bitbucket.org/pitrou/pathlib - -Copyright 2012 Antoine Pitrou - - -MIT License - - **psutil** https://github.com/giampaolo/psutil @@ -164,14 +154,14 @@ Copyright 2007-2019 by the Sphinx team (see AUTHORS file). BSD 2-Clause "Simplified" License - **TensorFlow** + **Tensorboard** -https://github.com/tensorflow/tensorflow +https://github.com/tensorflow/tensorboard -Copyright 2019 The TensorFlow Authors. +Copyright 2017 The TensorFlow Authors. -Apache License 2.0 +Apache License 2.0 **tqdm** diff --git a/benchmark/evaluate.py b/benchmark/evaluate.py index afd6270..a8f76df 100644 --- a/benchmark/evaluate.py +++ b/benchmark/evaluate.py @@ -38,7 +38,7 @@ def filter_seen_items(_topk, seen, topk): HIT += hit # ndcg, map - idcg = idcgs[min(len(_gt), len(_topk)) - 1] + idcg = idcgs[min(len(_gt), topk) - 1] dcg = 0.0 hit, miss, ap = 0.0, 0.0, 0.0 @@ -60,7 +60,7 @@ def filter_seen_items(_topk, seen, topk): ndcg = dcg / idcg NDCG += ndcg - ap /= min(len(_gt), len(_topk)) + ap /= min(len(_gt), topk) AP += ap N += 1.0 AUC += auc diff --git a/buffalo/algo/_plsi.pyx b/buffalo/algo/_plsi.pyx new file mode 100644 index 0000000..382655d --- /dev/null +++ b/buffalo/algo/_plsi.pyx @@ -0,0 +1,61 @@ +# cython: experimental_cpp_class_def=True, language_level=3 +# distutils: language=c++ +# -*- coding: utf-8 -*- +import cython +import numpy as np +cimport numpy as np + +from libcpp.string cimport string +from libcpp cimport bool as bool_t +from libc.stdint cimport int32_t, int64_t + + +cdef extern from "buffalo/algo_impl/plsi/plsi.hpp" namespace "plsi": + cdef cppclass CPLSI: + bool_t init(string) nogil except + + void release() nogil except + + void swap() nogil except + + void reset() nogil except + + void initialize_model(float*, int, float*, int) nogil except + + float partial_update(int, int, int64_t*, int32_t*, float*) nogil except + + void normalize(float, float) nogil except + + + +cdef class CyPLSI: + """CPLSI object holder""" + cdef CPLSI* obj # C-PLSI object + + def __cinit__(self): + self.obj = new CPLSI() + + def __dealloc__(self): + self.obj.release() + del self.obj + + def init(self, opt_path): + return self.obj.init(opt_path) + + def swap(self): + self.obj.swap() + + def release(self): + self.obj.release() + + def reset(self): + self.obj.reset() + + def initialize_model(self, np.ndarray[np.float32_t, ndim=2] P, + np.ndarray[np.float32_t, ndim=2] Q): + self.obj.initialize_model(&P[0, 0], P.shape[0], + &Q[0, 0], Q.shape[0]) + + def normalize(self, alpha1, alpha2): + self.obj.normalize(alpha1, alpha2) + + @cython.boundscheck(False) + @cython.wraparound(False) + def partial_update(self, int start_x, int next_x, + np.ndarray[np.int64_t, ndim=1] indptr, + np.ndarray[np.int32_t, ndim=1] keys, + np.ndarray[np.float32_t, ndim=1] vals): + return self.obj.partial_update(start_x, next_x, &indptr[0], &keys[0], &vals[0]) diff --git a/buffalo/algo/als.py b/buffalo/algo/als.py index 4cbcf06..23d8a7d 100644 --- a/buffalo/algo/als.py +++ b/buffalo/algo/als.py @@ -13,7 +13,7 @@ from buffalo.algo.options import ALSOption from buffalo.algo.optimize import Optimizable from buffalo.data.buffered_data import BufferedDataMatrix -from buffalo.algo.base import Algo, Serializable, TensorboardExtention +from buffalo.algo.base import Algo, Serializable, TensorboardExtension inited_CUALS = True try: @@ -22,7 +22,7 @@ inited_CUALS = False -class ALS(Algo, ALSOption, Evaluable, Serializable, Optimizable, TensorboardExtention): +class ALS(Algo, ALSOption, Evaluable, Serializable, Optimizable, TensorboardExtension): """Python implementation for C-ALS. Implementation of Collaborative Filtering for Implicit Feedback datasets. diff --git a/buffalo/algo/base.py b/buffalo/algo/base.py index f8ef1bb..03e7686 100644 --- a/buffalo/algo/base.py +++ b/buffalo/algo/base.py @@ -1,6 +1,5 @@ # -*- coding: utf-8 -*- import os -os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' import abc import json import pickle @@ -9,10 +8,7 @@ import datetime import numpy as np -import tensorflow as tf -tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR) -from tensorflow.keras.utils import Progbar -# what the... +import tensorboard as tb import absl.logging logging.root.removeHandler(absl.logging._absl_handler) absl.logging._warn_preinit_stderr = False @@ -331,7 +327,7 @@ def instantiate(cls, cls_opt, path, data_fields): return c -class TensorboardExtention(object): +class TensorboardExtension(object): @abc.abstractmethod def get_evaluation_metrics(self): raise NotImplementedError @@ -339,10 +335,7 @@ def get_evaluation_metrics(self): def _get_initial_tensorboard_data(self): tb = aux.Option({'summary_writer': None, 'name': None, - 'metrics': {}, - 'feed_dict': {}, - 'merged_summary_op': None, - 'session': None, + 'metrics': [], 'pbar': None, 'data_root': None, 'step': 1}) @@ -352,7 +345,7 @@ def initialize_tensorboard(self, num_steps, name_prefix='', name_postfix='', met if not self.opt.tensorboard: if not hasattr(self, '_tb_setted'): self.logger.debug('Cannot find tensorboard configuration.') - self.tb_setted = False + self._tb_setted = False return name = self.opt.tensorboard.name name = name_prefix + name + name_postfix @@ -360,33 +353,20 @@ def initialize_tensorboard(self, num_steps, name_prefix='', name_postfix='', met template = self.opt.tensorboard.get('name_template', '{name}.{dtm}') self._tb = self._get_initial_tensorboard_data() self._tb.name = template.format(name=name, dtm=dtm) - if not os.path.isdir(self.opt.tensorboard.root): - os.makedirs(self.opt.tensorboard.root) + os.makedirs(self.opt.tensorboard.root, exist_ok=True) tb_dir = os.path.join(self.opt.tensorboard.root, self._tb.name) self._tb.data_root = tb_dir - self._tb.summary_writer = tf.summary.FileWriter(tb_dir) - if not metrics: - metrics = self.get_evaluation_metrics() - for m in metrics: - self._tb.metrics[m] = tf.placeholder(tf.float32) - tf.summary.scalar(m, self._tb.metrics[m]) - self._tb.feed_dict[self._tb.metrics[m]] = 0.0 - self._tb.merged_summary_op = tf.summary.merge_all() - self._tb.session = tf.Session() - self._tb.pbar = Progbar(num_steps, stateful_metrics=self._tb.metrics, verbose=0) + self._tb.summary_writer = tb.summary.Writer(tb_dir) + self._tb.metrics = metrics if metrics is not None else self.get_evaluation_metrics() self._tb_setted = True def update_tensorboard_data(self, metrics): if not self.opt.tensorboard: return - metrics = [(m, np.float32(metrics.get(m, 0.0))) - for m in self._tb.metrics.keys()] - self._tb.feed_dict = {self._tb.metrics[k]: v - for k, v in metrics} - summary = self._tb.session.run(self._tb.merged_summary_op, - feed_dict=self._tb.feed_dict) - self._tb.summary_writer.add_summary(summary, self._tb.step) - self._tb.pbar.update(self._tb.step, metrics) + for m in self._tb.metrics: + v = metrics.get(m, 0.0) + self._tb.summary_writer.add_scalar(m, v, self._tb.step) + self._tb.summary_writer.flush() self._tb.step += 1 def finalize_tensorboard(self): @@ -395,6 +375,4 @@ def finalize_tensorboard(self): with open(os.path.join(self._tb.data_root, 'opt.json'), 'w') as fout: fout.write(json.dumps(self.opt, indent=2)) self._tb.summary_writer.close() - self._tb.session.close() self._tb = None - tf.reset_default_graph() diff --git a/buffalo/algo/bpr.py b/buffalo/algo/bpr.py index eaccc7e..91fb207 100644 --- a/buffalo/algo/bpr.py +++ b/buffalo/algo/bpr.py @@ -15,7 +15,7 @@ from buffalo.algo.options import BPRMFOption from buffalo.algo.optimize import Optimizable from buffalo.data.buffered_data import BufferedDataMatrix -from buffalo.algo.base import Algo, Serializable, TensorboardExtention +from buffalo.algo.base import Algo, Serializable, TensorboardExtension # TODO init structure of gpu modules will be abstracted to a higher module inited_CUBPR = True @@ -25,7 +25,7 @@ inited_CUBPR = False -class BPRMF(Algo, BPRMFOption, Evaluable, Serializable, Optimizable, TensorboardExtention): +class BPRMF(Algo, BPRMFOption, Evaluable, Serializable, Optimizable, TensorboardExtension): """Python implementation for C-BPRMF. """ def __init__(self, opt_path=None, *args, **kwargs): @@ -41,7 +41,7 @@ def __init__(self, opt_path=None, *args, **kwargs): self.opt, self.opt_path = self.get_option(opt_path) if self.opt.accelerator and not inited_CUBPR: - self.logger.error(f"ImportError CuBPRMF, no cuda library exists.") + self.logger.error('ImportError CuBPRMF, no cuda library exists.') raise RuntimeError() self.obj = CuBPRMF() if self.opt.accelerator else CyBPRMF() @@ -92,11 +92,11 @@ def init_factors(self): for attr_name in ['P', 'Q', 'Qb']: setattr(self, attr_name, None) self.P = np.abs(np.random.normal(scale=1.0 / (self.opt.d ** 2), - size=(header['num_users'], self.opt.d)).astype("float32"), order='C') + size=(header['num_users'], self.opt.d)).astype('float32'), order='C') self.Q = np.abs(np.random.normal(scale=1.0 / (self.opt.d ** 2), - size=(header['num_items'], self.opt.d)).astype("float32"), order='C') + size=(header['num_items'], self.opt.d)).astype('float32'), order='C') self.Qb = np.abs(np.random.normal(scale=1.0 / (self.opt.d ** 2), - size=(header['num_items'], 1)).astype("float32"), order='C') + size=(header['num_items'], 1)).astype('float32'), order='C') if not self.opt.use_bias: self.Qb *= 0 self.obj.initialize_model(self.P, self.Q, self.Qb, self.num_nnz) @@ -200,7 +200,7 @@ def compute_loss(self): def _prepare_train(self): if self.opt.accelerator: vdim = self.obj.get_vdim() - for attr in ["P", "Q"]: + for attr in ['P', 'Q']: F = getattr(self, attr) if F.shape[1] < vdim: _F = np.empty(shape=(F.shape[0], vdim), dtype=np.float32) diff --git a/buffalo/algo/cfr.py b/buffalo/algo/cfr.py index 44ea6df..2f3fc16 100644 --- a/buffalo/algo/cfr.py +++ b/buffalo/algo/cfr.py @@ -14,10 +14,10 @@ from buffalo.algo.options import CFROption from buffalo.algo.optimize import Optimizable from buffalo.data.buffered_data import BufferedDataMatrix -from buffalo.algo.base import Algo, Serializable, TensorboardExtention +from buffalo.algo.base import Algo, Serializable, TensorboardExtension -class CFR(Algo, CFROption, Evaluable, Serializable, Optimizable, TensorboardExtention): +class CFR(Algo, CFROption, Evaluable, Serializable, Optimizable, TensorboardExtension): """Python implementation for CoFactor. Reference: Factorization Meets the Item Embedding: diff --git a/buffalo/algo/options.py b/buffalo/algo/options.py index c900583..198578e 100644 --- a/buffalo/algo/options.py +++ b/buffalo/algo/options.py @@ -346,7 +346,7 @@ def get_default_option(self): 'num_iters': 20, 'd': 20, 'threshold': 1.0, - 'max_trials': 300, + 'max_trials': 500, 'update_i': True, 'update_j': True, 'reg_u': 0.01, @@ -400,6 +400,7 @@ def get_default_option(self): :ivar int window: The window size. (default: 5) :ivar int min_count: The minimum required frequency of the words to use training vocabulary. (default: 5) :ivar float sample: The sampling ratio to downsample the frequent words. (default: 0.001) + :ivar int num_negative_samples: The number of negative noise words. (default: 5) :ivar float lr: The learning rate. :ivar str model_path: Where to save model. :ivar dict data_opt: This option will be used to load data if given. @@ -414,6 +415,7 @@ def get_default_option(self): 'window': 5, 'min_count': 5, 'sample': 0.001, + 'num_negative_samples': 5, 'lr': 0.025, 'min_lr': 0.0001, @@ -440,3 +442,61 @@ def get_default_optimize_option(self): } }) return Option(opt) + + +class PLSIOption(AlgoOption): + def __init__(self, *args, **kwargs): + super(PLSIOption, self).__init__(*args, **kwargs) + + def get_default_option(self): + """ Basic Options for pLSI. + + :ivar int d: The number of latent feature dimension. (default: 20) + :ivar int num_iters: The number of iterations for training. (default: 10) + :ivar int num_workers: The number of threads. (default: 1) + :ivar float alpha1: The coefficient of regularization term for clustering assignment. (default: 1.0) + :ivar float alpha2: The coefficient of regularization term for item preference in each cluster. (default: 1.0) + :ivar float eps: epsilon for numerical stability (default: 1e-10) + :ivar str model_path: Where to save model. (default: '') + :ivar bool save_factors: Set True, to save models. (default: False) + :ivar dict data_opt: This option will be used to load data if given. (default: {}) + """ + opt = super().get_default_option() + opt.update({ + 'd': 20, + 'num_iters': 10, + 'num_workers': 1, + 'alpha1': 1.0, + 'alpha2': 1.0, + 'eps': 1e-10, + 'model_path': '', + 'save_factors': False, + 'data_opt': {}, + 'inherit_opt': {} + }) + return Option(opt) + + def get_default_optimize_option(self): + """Optimization options for pLSI. + + :ivar str loss: Target loss to optimize. + :ivar int max_trials: Maximum experiments for optimization. If not given, run forever. + :ivar int min_trials: Minimum experiments before deploying model. (Since the best parameter may not be found after `min_trials`, the first best parameter is always deployed) + :ivar bool deployment: Set True to train model with the best parameter. During the optimization, it try to dump the model which beated the previous best loss. + :ivar bool start_with_default_parameters: If set to True, the loss value of the default parameter is used as the starting loss to beat. + :ivar dict space: Parameter space definition. For more information, please check reference hyperopt's express. Note) Due to hyperopt's `randint` does not provide lower value, we had to implement it a bait tricky. Please see optimize.py to check how we deal with `randint`. + """ + opt = super().get_default_optimize_option() + opt.update({ + 'loss': 'train_loss', + 'max_trials': 100, + 'min_trials': 0, + 'deployment': True, + 'start_with_default_parameters': True, + 'space': { + 'd': ['randint', ['d', 10, 30]], + 'alpha1': ['uniform', ['alpha1', 0.1, 1.2]], + 'alpha2': ['uniform', ['alpha2', 0.1, 1.2]] + } + }) + return Option(opt) diff --git a/buffalo/algo/plsi.py b/buffalo/algo/plsi.py new file mode 100644 index 0000000..244975c --- /dev/null +++ b/buffalo/algo/plsi.py @@ -0,0 +1,232 @@ +# -*- coding: utf-8 -*- +import time +import json + +import numpy as np +from hyperopt import STATUS_OK as HOPT_STATUS_OK + +import buffalo.data +from buffalo.misc import aux, log +from buffalo.data.base import Data +from buffalo.algo._plsi import CyPLSI +from buffalo.evaluate import Evaluable +from buffalo.algo.options import PLSIOption +from buffalo.algo.optimize import Optimizable +from buffalo.data.buffered_data import BufferedDataMatrix +from buffalo.algo.base import Algo, Serializable, TensorboardExtension + + +class PLSI(Algo, PLSIOption, Evaluable, Serializable, Optimizable, TensorboardExtension): + """Python implementation for pLSI.""" + def __init__(self, opt_path=None, *args, **kwargs): + Algo.__init__(self, *args, **kwargs) + PLSIOption.__init__(self, *args, **kwargs) + Evaluable.__init__(self, *args, **kwargs) + Serializable.__init__(self, *args, **kwargs) + Optimizable.__init__(self, *args, **kwargs) + if opt_path is None: + opt_path = PLSIOption().get_default_option() + + self.logger = log.get_logger('PLSI') + self.opt, self.opt_path = self.get_option(opt_path) + + self.obj = CyPLSI() + assert self.obj.init(self.opt_path.encode("utf8")), "putting parameter to cython object failed" + + self.data = None + data = kwargs.get('data') + data_opt = self.opt.get('data_opt') + data_opt = kwargs.get('data_opt', data_opt) + if data_opt: + self.data = buffalo.data.load(data_opt) + assert self.data.data_type == 'matrix' + self.data.create() + elif isinstance(data, Data): + self.data = data + self.logger.info('PLSI ({})'.format(json.dumps(self.opt, indent=2))) + if self.data: + self.logger.info(self.data.show_info()) + assert self.data.data_type in ['matrix'] + + @staticmethod + def new(path, data_fields=[]): + return PLSI.instantiate(PLSIOption, path, data_fields) + + def set_data(self, data): + assert isinstance(data, aux.data.Data), 'Wrong instance: {}'.format(type(data)) + self.data = data + + def normalize(self, group='item'): + if group == 'item': + self.Q /= (np.sum(self.Q, axis=0, keepdims=True) + self.opt.eps) + elif group == 'user': + self.P /= (np.sum(self.P, axis=1, keepdims=True) + self.opt.eps) + + def inherit(self): + def _inherit(key): + if key == 'user': + self.build_userid_map() + else: + self.build_itemid_map() + curr_idmap = self._idmanager.userid_map if key == 'user' else self._idmanager.itemid_map + prev_idmap = prev_model._idmanager.userid_map if key == 'user' else prev_model._idmanager.itemid_map + curr_obj = self.P if key == 'user' else self.Q + prev_obj = prev_model.P if key == 'user' else prev_model.Q + curr_d, prev_d = curr_obj.shape[1], prev_obj.shape[1] + assert curr_d == prev_d, f'Dimension mismatch. Current dimension: {curr_d} / Previous dimension: {prev_d}' + for key, curr_idx in curr_idmap.items(): + if key in prev_idmap: + prev_idx = prev_idmap[key] + curr_obj[curr_idx] = prev_obj[prev_idx] + + if not self.opt['inherit_opt']: + return + inherit_opt = self.opt.inherit_opt + prev_model = PLSI.new(inherit_opt.model_path) + if inherit_opt.get('inherit_user', False): + self.logger.info('Inherit from previous user matrix') + _inherit('user') + + if inherit_opt.get('inherit_item', False): + self.logger.info('Inherit from previous item matrix') + _inherit('item') + + def initialize(self): + super().initialize() + self.buf = BufferedDataMatrix() + self.buf.initialize(self.data) + self.buf.set_group('rowwise') + self.init_factors() + self.inherit() + + def init_factors(self): + assert self.data, 'Did not set data' + + header = self.data.get_header() + self.num_items = header['num_items'] + self.num_users = header['num_users'] + self.num_nnz = header['num_nnz'] + + for name, rows in [('P', self.num_users), ('Q', self.num_items)]: + setattr(self, name, None) + setattr(self, name, np.zeros((rows, self.opt.d), dtype="float32")) + + self.obj.initialize_model(self.P, self.Q) + + def _get_topk_recommendation(self, rows, topk, pool=None): + p = self.P[rows] + topks = super()._get_topk_recommendation( + p, self.Q, + pb=None, Qb=None, + pool=pool, topk=topk, num_workers=self.opt.num_workers) + return zip(rows, topks) + + def _get_most_similar_item(self, col, topk, pool): + return super()._get_most_similar_item(col, topk, self.Q, True, pool) + + def get_scores(self, row_col_pairs): + rets = {(r, c): self.P[r].dot(self.Q[c]) for r, c in row_col_pairs} + return rets + + def _get_scores(self, row, col): + scores = (self.P[row] * self.Q[col]).sum(axis=1) + return scores + + def _iterate(self): + self.obj.reset() + + loss_nume, loss_deno = 0.0, 0.0 + update_t, feed_t, updated = 0, 0, 0 + + with log.ProgressBar(log.DEBUG, total=self.num_nnz, mininterval=30) as pbar: + for sz in self.buf.fetch_batch(): + st = time.time() + start_x, next_x, indptr, keys, vals = self.buf.get() + feed_t += time.time() - st + + st = time.time() + _loss = self.obj.partial_update(start_x, next_x, indptr, keys, vals) + update_t += time.time() - st + + loss_deno += np.sum(vals) + loss_nume += _loss + + pbar.update(sz) + updated += sz + pbar.refresh() + + self.obj.normalize(self.opt.alpha1, self.opt.alpha2) + self.obj.swap() + + self.logger.debug( + f'updated processed({updated}) elapsed(data feed: {feed_t:0.5f} update: {update_t:0.5f})') + return loss_nume, loss_deno + + def train(self): + best_loss, loss, self.validation_result = 1e+10, None, {} + self.initialize_tensorboard(self.opt.num_iters) + self.logger.info(f'Train pLSI, K: {self.opt.d}, alpha1: {self.opt.alpha1}, ' + f'alpha2: {self.opt.alpha2}, num_workers: {self.opt.num_workers}') + for i in range(self.opt.num_iters): + start_t = time.time() + _loss_nume, _loss_deno = self._iterate() + train_t = time.time() - start_t + + loss = _loss_nume / (_loss_deno + self.opt.eps) + metrics = {'train_loss': loss} + + if self.opt.validation and \ + self.opt.evaluation_on_learning and \ + self.periodical(self.opt.evaluation_period, i): + start_t = time.time() + self.validation_result = self.get_validation_results() + vali_t = time.time() - start_t + val_str = ' '.join([f'{k}:{v:0.5f}' for k, v in self.validation_result.items()]) + self.logger.info(f'Validation: {val_str} Elapsed {vali_t:0.3f} secs') + metrics.update({'val_%s' % k: v for k, v in self.validation_result.items()}) + + self.logger.info('Iteration %d: Loss %.3f Elapsed %.3f secs' % (i + 1, loss, train_t)) + self.update_tensorboard_data(metrics) + best_loss = self.save_best_only(loss, best_loss, i) + if self.early_stopping(loss): + break + + ret = {'train_loss': loss} + ret.update({'val_%s' % k: v for k, v in self.validation_result.items()}) + self.finalize_tensorboard() + return ret + + def _optimize(self, params): + self._optimize_params = params + for name, value in params.items(): + assert name in self.opt, 'Unexepcted parameter: {}'.format(name) + if isinstance(value, np.generic): + setattr(self.opt, name, value.item()) + else: + setattr(self.opt, name, value) + with open(self._temporary_opt_file, 'w') as fout: + json.dump(self.opt, fout, indent=2) + assert self.obj.init(bytes(self._temporary_opt_file, 'utf-8')),\ + 'cannot parse option file: %s' % self._temporary_opt_file + self.logger.info(params) + self.initialize() + loss = self.train() + loss['loss'] = loss.get(self.opt.optimize.loss) + loss['status'] = HOPT_STATUS_OK + self._optimize_loss = loss + return loss + + def _get_feature(self, index, group='item'): + if group == 'item': + return self.Q[index] + elif group == 'user': + return self.P[index] + return None + + def _get_data(self): + data = super()._get_data() + data.extend([('opt', self.opt), ('Q', self.Q), ('P', self.P)]) + return data + + def get_evaluation_metrics(self): + return ['train_loss', 'val_rmse', 'val_ndcg', 'val_map', 'val_accuracy', 'val_error'] diff --git a/buffalo/algo/tensorflow/__init__.py b/buffalo/algo/tensorflow/__init__.py deleted file mode 100644 index 40a96af..0000000 --- a/buffalo/algo/tensorflow/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# -*- coding: utf-8 -*- diff --git a/buffalo/algo/tensorflow/_als.py b/buffalo/algo/tensorflow/_als.py deleted file mode 100644 index af430b9..0000000 --- a/buffalo/algo/tensorflow/_als.py +++ /dev/null @@ -1,124 +0,0 @@ -# -*- coding: utf-8 -*- -import numpy as np -import tensorflow as tf - -from buffalo.misc import log - - -class TFALS(object): - def __init__(self, opt, name="tf_als"): - self.logger = log.get_logger("tf-als") - self.opt = opt - self.name = name - self.sess = tf.Session() - self.graph = tf.get_default_graph() - - def initialize_model(self, P, Q): - with tf.variable_scope(self.name): - self.P = tf.get_variable("P", initializer=P) - self.Q = tf.get_variable("Q", initializer=Q) - self.FF = tf.get_variable("FF", shape=(self.opt.d, self.opt.d), - initializer=tf.keras.initializers.Zeros, - dtype=tf.float32) - self.build_graph() - self.sess.run(tf.global_variables_initializer()) - - def get_variable(self, name): - return self.sess.run(getattr(self, name)) - - def precompute(self, int_group): - if int_group == 0: - self.sess.run(self.precomputeQ) - else: - self.sess.run(self.precomputeP) - - def build_graph(self): - self.start_x = tf.placeholder(dtype=tf.int32, name="start_x") - self.next_x = tf.placeholder(dtype=tf.int32, name="next_x") - self.rows = tf.placeholder(dtype=tf.int32, shape=(None, ), name="rows") - self.keys = tf.placeholder(dtype=tf.int32, shape=(None, ), name="keys") - self.vals = tf.placeholder(dtype=tf.float32, shape=(None, ), name="vals") - for int_group in [0, 1]: - self._build_graph(int_group) - - def _dot(self, X, Y): - return tf.reduce_sum(X * Y, axis=1) - - def _build_graph(self, int_group): - if int_group == 0: - P, Q, reg = self.P, self.Q, self.opt.reg_u - else: - P, Q, reg = self.Q, self.P, self.opt.reg_i - start_x, next_x, rows, keys, vals = \ - self.start_x, self.next_x, self.rows, self.keys, self.vals - - # compute ys - Fgtr = tf.gather(Q, keys) - coeff = self.vals * self.opt.alpha - ys = tf.scatter_nd(tf.expand_dims(rows, axis=1), - Fgtr * tf.expand_dims(coeff + 1, axis=1), - shape=(next_x - start_x, self.opt.d)) - - # prepare cg - _P = P[start_x:next_x] - Axs = tf.matmul(_P, self.FF) + reg * _P - dots = self._dot(tf.gather(_P, rows), Fgtr) - Axs = tf.tensor_scatter_add(Axs, tf.expand_dims(rows, axis=1), - Fgtr * tf.expand_dims(dots * coeff, axis=1)) - rs = ys - Axs - ps = rs - rss_old = tf.reduce_sum(tf.square(rs), axis=1) - - # iterate cg steps - for i in range(self.opt.num_cg_max_iters): - Aps = tf.matmul(ps, self.FF) + ps * reg - _dots = coeff * self._dot(tf.gather(ps, rows), Fgtr) - Aps = tf.tensor_scatter_add(Aps, tf.expand_dims(rows, axis=1), - Fgtr * tf.expand_dims(_dots, axis=1)) - pAps = self._dot(Aps, ps) - alphas = rss_old / (pAps + self.opt.eps) - _P = _P + ps * tf.expand_dims(alphas, axis=1) - rs = rs - tf.expand_dims(alphas, axis=1) * Aps - rss_new = tf.reduce_sum(tf.square(rs), axis=1) - betas = rss_new / (rss_old + self.opt.eps) - ps = rs + (tf.expand_dims(betas, axis=1) * ps) - rss_old = rss_new - - if int_group == 1: - if self.opt.compute_loss_on_training: - self.err = tf.reduce_sum(tf.square(vals - dots)) - else: - self.err = tf.constant(0.0, dtype=tf.float32) - - name = "updateP" if int_group == 0 else "updateQ" - _update = P[start_x:next_x].assign(_P) - with self.graph.control_dependencies([_update]): - update = tf.constant(True) - setattr(self, name, update) - - _FF = tf.assign(self.FF, tf.matmul(P, P, transpose_a=True)) - with self.graph.control_dependencies([_FF]): - FF = tf.constant(True) - name = "precomputeP" if int_group == 0 else "precomputeQ" - setattr(self, name, FF) - - def _generate_rows(self, start_x, next_x, indptr): - ends = indptr[start_x:next_x] - begs = np.empty(next_x - start_x, dtype=np.int64) - begs[0] = 0 if start_x == 0 else indptr[start_x - 1] - begs[1:] = ends[:-1] - ret = np.arange(next_x - start_x, dtype=np.int32) - ret = np.repeat(ret, ends - begs) - return ret, len(ret) - - def partial_update(self, start_x, next_x, indptr, keys, vals, int_group): - rows, sz = self._generate_rows(start_x, next_x, indptr) - feed_dict = {self.start_x: start_x, self.next_x: next_x, - self.rows: rows, self.keys: keys[:sz], self.vals: vals[:sz]} - err = 0.0 - if int_group == 0: - _ = self.sess.run(self.updateP, feed_dict=feed_dict) - else: - _, _err = self.sess.run([self.updateQ, self.err], feed_dict=feed_dict) - err += _err - return err diff --git a/buffalo/algo/w2v.py b/buffalo/algo/w2v.py index ec7b291..4552557 100644 --- a/buffalo/algo/w2v.py +++ b/buffalo/algo/w2v.py @@ -15,10 +15,10 @@ from buffalo.algo.options import W2VOption from buffalo.algo.optimize import Optimizable from buffalo.data.buffered_data import BufferedDataStream -from buffalo.algo.base import Algo, Serializable, TensorboardExtention +from buffalo.algo.base import Algo, Serializable, TensorboardExtension -class W2V(Algo, W2VOption, Evaluable, Serializable, Optimizable, TensorboardExtention): +class W2V(Algo, W2VOption, Evaluable, Serializable, Optimizable, TensorboardExtension): """Python implementation for C-W2V """ def __init__(self, opt_path=None, *args, **kwargs): @@ -73,14 +73,13 @@ def get_index(self, key, group='item'): indexes = super().get_index(key, group) if not is_many: indexes = [indexes] - indexes = [i if i is None or self._vocab.index[i] < 1 else self._vocab.index[i] - 1 - for i in indexes] + indexes = [None if i is None or self._vocab.index[i] < 1 else self._vocab.index[i] - 1 for i in indexes] if not is_many: return indexes[0] - return np.array(indexes) + return indexes def _get_feature(self, index, group='item'): - if group == 'item': + if group == 'item' and index is not None: return self.L0[index] return None diff --git a/buffalo/algo/warp.py b/buffalo/algo/warp.py index 28b40d5..fc78b0c 100644 --- a/buffalo/algo/warp.py +++ b/buffalo/algo/warp.py @@ -13,10 +13,10 @@ from buffalo.algo.options import WARPOption from buffalo.algo.optimize import Optimizable from buffalo.data.buffered_data import BufferedDataMatrix -from buffalo.algo.base import Algo, Serializable, TensorboardExtention +from buffalo.algo.base import Algo, Serializable, TensorboardExtension -class WARP(Algo, WARPOption, Evaluable, Serializable, Optimizable, TensorboardExtention): +class WARP(Algo, WARPOption, Evaluable, Serializable, Optimizable, TensorboardExtension): """Python implementation for C-WARP. """ def __init__(self, opt_path=None, *args, **kwargs): diff --git a/buffalo/data/base.py b/buffalo/data/base.py index 6dcf06b..ce253b0 100644 --- a/buffalo/data/base.py +++ b/buffalo/data/base.py @@ -162,11 +162,15 @@ def get(self, index, axis='rowwise') -> [int, int, float]: def close(self): if self.handle: + self.handle.close() self.handle = None self.header = None def _create_database(self, path, **kwargs): # Create database structure + if os.path.exists(path): + self.logger.info(f'File {path} exists. To build new database, existing file {path} will be deleted.') + os.remove(path) f = h5py.File(path, 'w') self.path = path num_users, num_items, num_nnz = kwargs['num_users'], kwargs['num_items'], kwargs['num_nnz'] @@ -397,6 +401,7 @@ def _build_data(self, fin.seek(0, 2) approximated_data_mb = db.attrs['num_nnz'] * 3 * 4 / 1024 / 1024 buffer_mb = int(max(1024, available_mb * 0.75)) + disk_based = self.opt.data.get('disk_based', False) # for each sides for group, sep_idx, max_key in [('rowwise', 0, db.attrs['num_users']), ('colwise', 1, db.attrs['num_items'])]: @@ -405,7 +410,7 @@ def _build_data(self, self.logger.info(f'Building compressed triplets for {group}...') self.logger.info('Preprocessing...') self.prepro.pre(db) - if approximated_data_mb * 1.2 < available_mb: + if approximated_data_mb * 1.2 < available_mb and not disk_based: self.logger.info('In-memory Compressing ...') job_files = self._sort_and_compressed_binarization( working_data_path, @@ -443,6 +448,9 @@ class DataOption(object): def is_valid_option(self, opt) -> bool: """General type/logic checking""" + assert hasattr(opt['data'], 'disk_based'), 'disk_based not defined on data' + assert isinstance(opt['data']['disk_based'], bool), 'invalid type for data.disk_based' + if 'validation' in opt['data']: assert opt['data']['validation']['name'] in ['sample', 'newest'], 'Unknown validation.name.' if opt['data']['validation']['name'] == 'sample': diff --git a/buffalo/data/mm.py b/buffalo/data/mm.py index 9f70ec8..11f6c0a 100644 --- a/buffalo/data/mm.py +++ b/buffalo/data/mm.py @@ -31,7 +31,8 @@ def get_default_option(self) -> aux.Option: 'batch_mb': 1024, 'use_cache': False, 'tmp_dir': '/tmp/', - 'path': './mm.h5py' + 'path': './mm.h5py', + 'disk_based': False # use disk based data compressing } } return aux.Option(opt) @@ -265,13 +266,11 @@ def create(self) -> h5py.File: db.attrs['completed'] = 1 db.close() self.handle = h5py.File(data_path, 'r') - self.path = data_path except Exception as e: self.logger.error('Cannot create db: %s' % (str(e))) self.logger.error(traceback.format_exc().splitlines()) - raise - finally: - if hasattr(self, 'patr'): + if hasattr(self, 'path'): if os.path.isfile(self.path): os.remove(self.path) + raise self.logger.info('DB built on %s' % data_path) diff --git a/buffalo/data/stream.py b/buffalo/data/stream.py index 400431f..60b06ee 100644 --- a/buffalo/data/stream.py +++ b/buffalo/data/stream.py @@ -26,6 +26,8 @@ class StreamOptions(DataOption): validation: See validation section. batch_mb: Internal batch size. Generally, the larger size, faster. use_cache: Set True to use already built data, otherwise building new one every time. + disk_based: Set True to use disk based data compressing with low memory usage. + Otherwise, base on approximated data size system will decided on its own. tmp_dir: Where temporary files goes on. path: Output path of Stream. internal_data_type: "stream" or "matrix" @@ -58,7 +60,8 @@ def get_default_option(self) -> aux.Option: 'use_cache': False, 'tmp_dir': '/tmp/', 'path': './stream.h5py', - 'internal_data_type': 'stream' # if set to 'matrix', internal data stored as like matrix market format + 'internal_data_type': 'stream', # if set to 'matrix', internal data stored as like matrix market format + 'disk_based': False } } return aux.Option(opt) @@ -299,13 +302,11 @@ def create(self) -> h5py.File: db.attrs['completed'] = 1 db.close() self.handle = h5py.File(data_path, 'r') - self.path = data_path except Exception as e: self.logger.error('Cannot create db: %s' % (str(e))) self.logger.error(traceback.format_exc()) - raise - finally: - if hasattr(self, 'patr'): + if hasattr(self, 'path'): if os.path.isfile(self.path): os.remove(self.path) + raise self.logger.info('DB built on %s' % data_path) diff --git a/buffalo/evaluate/base.py b/buffalo/evaluate/base.py index 1f5f100..0533c2a 100644 --- a/buffalo/evaluate/base.py +++ b/buffalo/evaluate/base.py @@ -94,7 +94,7 @@ def filter_seen_items(_topk, seen, topk): HIT += hit # ndcg, map - idcg = idcgs[min(len(_gt), len(_topk)) - 1] + idcg = idcgs[min(len(_gt), topk) - 1] dcg = 0.0 hit, miss, ap = 0.0, 0.0, 0.0 @@ -116,7 +116,7 @@ def filter_seen_items(_topk, seen, topk): ndcg = dcg / idcg NDCG += ndcg - ap /= min(len(_gt), len(_topk)) + ap /= min(len(_gt), topk) AP += ap N += 1.0 AUC += auc diff --git a/buffalo/misc/aux.py b/buffalo/misc/aux.py index 40fa3e2..42f54b2 100644 --- a/buffalo/misc/aux.py +++ b/buffalo/misc/aux.py @@ -15,9 +15,11 @@ class Option(dict): def __init__(self, *args, **kwargs): - import json - args = [arg if isinstance(arg, dict) else json.loads(open(arg).read()) - for arg in args] + def read(fname): + with open(fname) as fin: + return json.load(fin) + + args = [arg if isinstance(arg, dict) else read(arg) for arg in args] super(Option, self).__init__(*args, **kwargs) for arg in args: if isinstance(arg, dict): diff --git a/buffalo/misc/log.py b/buffalo/misc/log.py index 613914f..97f27f4 100644 --- a/buffalo/misc/log.py +++ b/buffalo/misc/log.py @@ -102,7 +102,7 @@ def initialize(self, **kwargs): self.s_t = time.time() self.t = time.time() self.desc = kwargs.get('desc', 'PROGRESS') - self.period_secs = kwargs.get('mininteral', 1) + self.period_secs = kwargs.get('mininterval', 1) self.total = kwargs.get('total', -1) self.step = 0 diff --git a/docs/algo.rst b/docs/algo.rst index 0cc8cb8..8f53c29 100644 --- a/docs/algo.rst +++ b/docs/algo.rst @@ -8,7 +8,7 @@ Buffalo provides the following algorithm implementations: - Word2Vec - CoFactors -All algorithms inherit common parent classes such as Algo, Serializable, TensorboardExtention, Optimizable, Evaluable. +All algorithms inherit common parent classes such as Algo, Serializable, TensorboardExtension, Optimizable, Evaluable. Algo @@ -27,9 +27,9 @@ Serializable :show-inheritance: :undoc-members: -TensorboardExtention +TensorboardExtension ```````````````````` -.. autoclass:: buffalo.algo.base.TensorboardExtention +.. autoclass:: buffalo.algo.base.TensorboardExtension :members: :exclude-members: :show-inheritance: @@ -122,3 +122,18 @@ Word2Vec :exclude-members: :show-inheritance: :undoc-members: + + +pLSI +-------------------------------------------------- +.. autoclass:: buffalo.algo.plsi.PLSI + :members: + :exclude-members: get_evaluation_metrics, init_factors, set_data + :show-inheritance: + :undoc-members: + +.. autoclass:: buffalo.algo.options.PLSIOption + :members: + :exclude-members: + :show-inheritance: + :undoc-members: diff --git a/docs/conf.py b/docs/conf.py index 537ed1b..65f5602 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -18,7 +18,7 @@ 'buffalo.parallel._core', 'buffalo.algo._als', 'buffalo.algo._bpr', 'buffalo.algo._cfr', 'buffalo.algo._w2v', - 'buffalo.algo._warp'] + 'buffalo.algo._warp', 'buffalo.algo._plsi'] for mod_name in MOCK_MODULES: sys.modules[mod_name] = mock.Mock() diff --git a/docs/intro.rst b/docs/intro.rst index 8e129c2..05989f3 100644 --- a/docs/intro.rst +++ b/docs/intro.rst @@ -5,10 +5,14 @@ Buffalo is a fast and scalable production-ready open source project for recommen Buffalo provides the following algorithms: - - Alternating Least Squares - - Bayesian Personalized Ranking Matrix Factorization - - Word2Vec - - CoFactors + - Alternating Least Squares (ALS) + - Bayesian Personalized Ranking Matrix Factorization (BPR) + - Word2Vec (W2V) + - CoFactors (CFR) + - Weighted Approximate Rank Pairwise (WARP) + - Probabilistic latent semantic indexing (pLSI) + +ALS is one of the most famous matrix factorization models which decompose the observed user-item interaction matrix into user and item latent factors. More ranking optimized models are BPR and WARP. W2V and CFR mainly focus on the item co-occurrence data. Unlike other models, pLSI (a.k.a probabilistic latent semantic analysis) is a soft clustering module that performs a low-rank approximation of user-item matrix on the basis of their frequencies. All algorithms are optimized for multi-threading and some support GPU accelerators. diff --git a/docs/requirements.txt b/docs/requirements.txt index e928df4..c0ac5da 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -1,9 +1,7 @@ setuptools>=1.3.2 fire -pathlib numpy psutil h5py hyperopt -tensorflow==1.15.2 pytest diff --git a/include/buffalo/algo_impl/plsi/plsi.hpp b/include/buffalo/algo_impl/plsi/plsi.hpp new file mode 100644 index 0000000..d35239a --- /dev/null +++ b/include/buffalo/algo_impl/plsi/plsi.hpp @@ -0,0 +1,39 @@ +#pragma once +#include +#include + +#include +#include +#include + +#include "buffalo/algo.hpp" + +using namespace std; +using namespace Eigen; + + +namespace plsi { + + +class CPLSI : public Algorithm { +public: + CPLSI(); + ~CPLSI(); + + bool init(string opt_path); + bool parse_option(string opt_path); + void swap(); + void reset(); + void release(); + void normalize(float alpha1, float alpha2); + void initialize_model(float* P, int P_rows, float* Q, int Q_rows); + float partial_update(int start_x, int next_x, int64_t* indptrs, int32_t* keys, float* vals); + +private: + Json opt_; + Map P_, Q_; + MatrixType P, Q; + int d_, num_workers_, seed_; +}; + +} diff --git a/lib/algo_impl/plsi/plsi.cc b/lib/algo_impl/plsi/plsi.cc new file mode 100644 index 0000000..1858443 --- /dev/null +++ b/lib/algo_impl/plsi/plsi.cc @@ -0,0 +1,133 @@ +#include +#include +#include + +#include "json11.hpp" +#include "buffalo/misc/log.hpp" +#include "buffalo/algo_impl/plsi/plsi.hpp" + + +namespace plsi { + + +CPLSI::CPLSI(): + P_(nullptr, 0, 0), Q_(nullptr, 0, 0) {} + +CPLSI::~CPLSI() { + new (&P_) Map (nullptr, 0, 0); + new (&Q_) Map (nullptr, 0, 0); +} + +bool CPLSI::init(string opt_path) { + bool ok = true; + ok = ok & parse_option(opt_path); + if (ok) { + d_ = opt_["d"].int_value(); + seed_ = opt_["random_seed"].int_value(); + num_workers_ = opt_["num_workers"].int_value(); + } + return ok; +} + +bool CPLSI::parse_option(string opt_path) { + return Algorithm::parse_option(opt_path, opt_); +} + +void CPLSI::release() { + P.resize(0, 0); Q.resize(0, 0); +} + +void CPLSI::reset() { + P.setZero(); Q.setZero(); +} + +void CPLSI::initialize_model(float* init_P, int P_rows, + float* init_Q, int Q_rows) { + new (&P_) Map (init_P, P_rows, d_); + new (&Q_) Map (init_Q, Q_rows, d_); + P.resize(P_rows, d_); + Q.resize(Q_rows, d_); + + mt19937 RNG(seed_); + normal_distribution dist(0 , 1.0/d_); + + #pragma omp parallel for + for (int u = 0; u < P_rows; ++u) { + _mm_prefetch((char*)P_.row(u).data(), _MM_HINT_T0); + for (int d = 0; d < d_; ++d) + P_(u, d) = fabs(dist(RNG)); + P_.row(u) /= P_.row(u).sum(); + } + + #pragma omp parallel for + for (int k = 0; k < d_; ++k) { + _mm_prefetch((char*)Q_.col(k).data(), _MM_HINT_T0); + for (int i = 0; i < Q_rows; ++i) + Q_(i, k) = fabs(dist(RNG)); + Q_.col(k) /= Q_.col(k).sum(); + } + + DEBUG("Set P({} x {}) Q({} x {})", + P.rows(), P.cols(), Q.rows(), Q.cols()); +} + +float CPLSI::partial_update(int start_x, int next_x, int64_t* indptr, + int32_t* keys, float* vals) { + omp_set_num_threads(num_workers_); + + vector losses(num_workers_, 0.0); + size_t job_size = next_x - start_x; + const int64_t shifted = start_x == 0 ? 0 : indptr[start_x - 1]; + + #pragma omp parallel for schedule(dynamic, 4) + for (size_t i = 0; i < job_size; ++i) { + const int x = start_x + i; + const int64_t& beg = x == 0 ? 0 : indptr[x - 1]; + const int64_t& end = indptr[x]; + const size_t data_size = end - beg; + if (data_size == 0) { + TRACE("No data exists for {}", x); + continue; + } + + for (int64_t j = beg; j < end; ++j) { + const int& c = keys[j - shifted]; + const float& v = vals[j - shifted]; + VectorXf latent = P_.row(x).array() * Q_.row(c).array(); + latent.noalias() = latent.cwiseMax(1e-10); + float norm = latent.sum(); + losses[omp_get_thread_num()] -= log(norm) * v; + latent.array() /= norm; + P.row(x) += latent * v; + Q.row(c) += latent * v; + } + } + + float loss = accumulate(losses.begin(), losses.end(), 0.0); + return loss; +} + +void CPLSI::normalize(float alpha1, float alpha2) { + omp_set_num_threads(num_workers_); + size_t num_users = P.rows(); + size_t num_items = Q.rows(); + alpha1 /= static_cast(d_); + alpha2 /= static_cast(num_items); + + #pragma omp parallel for schedule(static) + for (size_t i = 0; i < num_users; ++i) { + P.row(i).array() += alpha1; + P.row(i).array() /= P.row(i).sum(); + } + #pragma omp parallel for schedule(static) + for (int i = 0; i < d_; ++i) { + Q.col(i).array() += alpha2; + Q.col(i).array() /= Q.col(i).sum(); + } +} + +void CPLSI::swap() { + P_ = P; + Q_ = Q; +} +} // namespace plsi diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..f7cd891 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,2 @@ +[build-system] +requires = ["setuptools>=1.3.2", "wheel", "numpy", "cython"] diff --git a/requirements.txt b/requirements.txt index e93dc27..3e03f5d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,11 +1,6 @@ -setuptools>=1.3.2 fire -pathlib numpy psutil h5py hyperopt -tensorflow==1.15.2 -cython -n2==0.1.6 -pytest +tensorboard==2.9.1 diff --git a/setup.py b/setup.py index 89296f4..ca1de25 100644 --- a/setup.py +++ b/setup.py @@ -5,28 +5,25 @@ import os import sys -import pathlib import platform import sysconfig import subprocess -from setuptools import setup + +import numpy as np +from setuptools import setup, Extension + from cuda_setup import CUDA, build_ext -from distutils.extension import Extension -import n2 -import numpy # TODO: Python3 Support if sys.version_info[:3] < (3, 6): raise RuntimeError("Python version 3.6 or later required.") assert platform.system() == 'Linux' # TODO: MacOS -numpy_include_dirs = os.path.split(numpy.__file__)[0] + '/core/include' -n2_shared_object = n2.__file__ MAJOR = 1 MINOR = 2 -MICRO = 1 +MICRO = 2 Release = True STAGE = {True: '', False: 'b'}.get(Release) VERSION = f'{MAJOR}.{MINOR}.{MICRO}{STAGE}' @@ -41,12 +38,14 @@ Operating System :: Unix Operating System :: MacOS License :: OSI Approved :: Apache Software License""".format(status=STATUS.get(Release)) + +CLIB_DIR = os.path.join(sysconfig.get_path('purelib'), 'buffalo') +numpy_include_dirs = np.get_include() +LIBRARY_DIRS = [CLIB_DIR] EXTRA_INCLUDE_DIRS = [numpy_include_dirs, '3rd/json11', '3rd/spdlog/include', '3rd/eigen3'] -CLIB_DIR = os.path.join(sysconfig.get_path('purelib'), 'buffalo') -LIBRARY_DIRS = [CLIB_DIR] def get_extend_compile_flags(): @@ -66,65 +65,80 @@ def __init__(self, name, **kwargs): extensions = [ CMakeExtension(name="cbuffalo"), Extension(name="buffalo.algo._als", - sources=['buffalo/algo/_als.cpp'], + sources=['buffalo/algo/_als.pyx'], + language='c++', include_dirs=['./include'] + EXTRA_INCLUDE_DIRS, libraries=['gomp', 'cbuffalo'], library_dirs=LIBRARY_DIRS, runtime_library_dirs=LIBRARY_DIRS, extra_compile_args=['-fopenmp', '-std=c++14', '-ggdb', '-O3'] + extend_compile_flags), Extension(name="buffalo.algo._cfr", - sources=['buffalo/algo/_cfr.cpp'], + sources=['buffalo/algo/_cfr.pyx'], + language='c++', include_dirs=['./include'] + EXTRA_INCLUDE_DIRS, libraries=['gomp', 'cbuffalo'], library_dirs=LIBRARY_DIRS, runtime_library_dirs=LIBRARY_DIRS, extra_compile_args=['-fopenmp', '-std=c++14', '-ggdb', '-O3'] + extend_compile_flags), Extension(name="buffalo.algo._bpr", - sources=['buffalo/algo/_bpr.cpp'], + sources=['buffalo/algo/_bpr.pyx'], + language='c++', + include_dirs=['./include'] + EXTRA_INCLUDE_DIRS, + libraries=['gomp', 'cbuffalo'], + library_dirs=LIBRARY_DIRS, + runtime_library_dirs=LIBRARY_DIRS, + extra_compile_args=['-fopenmp', '-std=c++14', '-ggdb', '-O3'] + extend_compile_flags), + Extension(name="buffalo.algo._plsi", + sources=['buffalo/algo/_plsi.pyx'], + language='c++', include_dirs=['./include'] + EXTRA_INCLUDE_DIRS, libraries=['gomp', 'cbuffalo'], library_dirs=LIBRARY_DIRS, runtime_library_dirs=LIBRARY_DIRS, extra_compile_args=['-fopenmp', '-std=c++14', '-ggdb', '-O3'] + extend_compile_flags), Extension(name="buffalo.algo._warp", - sources=['buffalo/algo/_warp.cpp'], + sources=['buffalo/algo/_warp.pyx'], + language='c++', include_dirs=['./include'] + EXTRA_INCLUDE_DIRS, libraries=['gomp', 'cbuffalo'], library_dirs=LIBRARY_DIRS, runtime_library_dirs=LIBRARY_DIRS, extra_compile_args=['-fopenmp', '-std=c++14', '-ggdb', '-O3'] + extend_compile_flags), Extension(name="buffalo.algo._w2v", - sources=['buffalo/algo/_w2v.cpp'], + sources=['buffalo/algo/_w2v.pyx'], + language='c++', include_dirs=['./include'] + EXTRA_INCLUDE_DIRS, libraries=['gomp', 'cbuffalo'], library_dirs=LIBRARY_DIRS, runtime_library_dirs=LIBRARY_DIRS, extra_compile_args=['-fopenmp', '-std=c++14', '-ggdb', '-O3'] + extend_compile_flags), Extension(name="buffalo.misc._log", - sources=['buffalo/misc/_log.cpp'], + sources=['buffalo/misc/_log.pyx'], + language='c++', include_dirs=['./include'] + EXTRA_INCLUDE_DIRS, libraries=['gomp', 'cbuffalo'], library_dirs=LIBRARY_DIRS, runtime_library_dirs=LIBRARY_DIRS, extra_compile_args=['-fopenmp', '-std=c++14', '-ggdb', '-O3'] + extend_compile_flags), Extension(name="buffalo.data.fileio", - sources=['buffalo/data/fileio.cpp'], + sources=['buffalo/data/fileio.pyx'], + language='c++', libraries=['gomp'], extra_compile_args=['-fopenmp', '-std=c++14', '-ggdb', '-O3'] + extend_compile_flags), Extension(name="buffalo.parallel._core", - sources=['buffalo/parallel/_core.cpp'], - libraries=['gomp'], + sources=['buffalo/parallel/_core.pyx'], + language='c++', + libraries=['gomp', 'n2'], include_dirs=EXTRA_INCLUDE_DIRS + ['./3rd/n2/include', './3rd/'], library_dirs=LIBRARY_DIRS, runtime_library_dirs=LIBRARY_DIRS, - extra_objects=[n2_shared_object], extra_compile_args=['-fopenmp', '-std=c++14', '-ggdb', '-O3'] + extend_compile_flags), ] if CUDA: extra_compile_args = ['-std=c++14', '-ggdb', '-O3'] + extend_compile_flags extensions.append(Extension("buffalo.algo.cuda._als", - sources=["buffalo/algo/cuda/_als.cpp", + sources=["buffalo/algo/cuda/_als.pyx", "lib/cuda/als/als.cu", "./3rd/json11/json11.cpp", "lib/misc/log.cc"], @@ -136,7 +150,7 @@ def __init__(self, name, **kwargs): CUDA['include'], "./3rd/json11", "./3rd/spdlog/include"])) extensions.append(Extension("buffalo.algo.cuda._bpr", - sources=["buffalo/algo/cuda/_bpr.cpp", + sources=["buffalo/algo/cuda/_bpr.pyx", "lib/cuda/bpr/bpr.cu", "./3rd/json11/json11.cpp", "lib/misc/log.cc"], @@ -188,29 +202,11 @@ def run(self): for ext in self.extensions: if hasattr(ext, 'extension_type') and ext.extension_type == 'cmake': self.cmake(ext) - self.cythonize() super(BuildExtension, self).run() - def cythonize(self): - ext_files = ['buffalo/algo/_als.pyx', - 'buffalo/algo/cuda/_als.pyx', - 'buffalo/algo/_bpr.pyx', - 'buffalo/algo/cuda/_bpr.pyx', - 'buffalo/algo/_warp.pyx', - 'buffalo/algo/_w2v.pyx', - 'buffalo/misc/_log.pyx', - 'buffalo/algo/_cfr.pyx', - 'buffalo/parallel/_core.pyx', - 'buffalo/data/fileio.pyx'] - for path in ext_files: - from Cython.Build import cythonize - cythonize(path) - def cmake(self, ext): - cwd = pathlib.Path().absolute() - - build_temp = pathlib.Path(self.build_temp) - build_temp.mkdir(parents=True, exist_ok=True) + cwd = os.path.abspath(os.getcwd()) + os.makedirs(self.build_temp, exist_ok=True) build_type = 'Debug' if self.debug else 'Release' @@ -221,7 +217,7 @@ def cmake(self, ext): build_args = [] - os.chdir(str(build_temp)) + os.chdir(self.build_temp) self.spawn(['cmake', str(cwd)] + cmake_args) if not self.dry_run: self.spawn(['cmake', '--build', '.'] + build_args) @@ -234,6 +230,9 @@ def setup_package(): 'build_ext': BuildExtension } + with open('requirements.txt', 'r') as fin: + install_requires = [line.strip() for line in fin] + metadata = dict( name='buffalo', maintainer="lucas kwangseob kim", @@ -246,7 +245,6 @@ def setup_package(): license='Apache2', packages=['buffalo/algo/', 'buffalo/algo/cuda', - 'buffalo/algo/tensorflow', 'buffalo/data/', 'buffalo/evaluate/', 'buffalo/parallel/', @@ -262,6 +260,7 @@ def setup_package(): ] }, python_requires='>=3.6', + install_requires=install_requires, ) metadata['version'] = VERSION diff --git a/tests/algo/base.py b/tests/algo/base.py index c8c9a70..1ce60f5 100644 --- a/tests/algo/base.py +++ b/tests/algo/base.py @@ -3,7 +3,6 @@ import time import logging import unittest -logging.getLogger('tensorflow').disabled = True import numpy as np from hyperopt import STATUS_OK @@ -14,14 +13,14 @@ from buffalo.algo.options import ALSOption from buffalo.algo.optimize import Optimizable from buffalo.data.mm import MatrixMarketOptions -from buffalo.algo.base import TensorboardExtention +from buffalo.algo.base import TensorboardExtension -class MockAlgo(Algo, Optimizable, TensorboardExtention): +class MockAlgo(Algo, Optimizable, TensorboardExtension): def __init__(self, *args, **kwargs): Algo.__init__(self, *args, **kwargs) Optimizable.__init__(self, *args, **kwargs) - TensorboardExtention.__init__(self, *args, **kwargs) + TensorboardExtension.__init__(self, *args, **kwargs) self.logger = log.get_logger('MockAlgo') option = ALSOption().get_default_option() optimize_option = ALSOption().get_default_optimize_option() @@ -238,7 +237,7 @@ def _test_most_similar(self, model, q1, q2, q3): ret_a = model.most_similar(q1, pool=pool) indexes = model.get_index(pool) self.assertEqual(len(indexes), 2) - ret_b = model.most_similar(q1, pool=indexes) + ret_b = model.most_similar(q1, pool=np.array(indexes)) self.assertEqual(ret_a, ret_b) keys = [k[0] for k in model.most_similar(q1, topk=100)] diff --git a/tests/algo/test_plsi.py b/tests/algo/test_plsi.py new file mode 100644 index 0000000..29a6d0a --- /dev/null +++ b/tests/algo/test_plsi.py @@ -0,0 +1,104 @@ +# -*- coding: utf-8 -*- +import os +import unittest + +import numpy as np + +import buffalo.data +from buffalo.misc import aux +from buffalo.algo.plsi import PLSI +from buffalo.algo.options import PLSIOption +from buffalo.data.mm import MatrixMarketOptions + +from .base import TestBase + + +class TestPLSI(TestBase): + def test00_get_default_option(self): + PLSIOption().get_default_option() + self.assertTrue(True) + + def test01_is_valid_option(self): + opt = PLSIOption().get_default_option() + self.assertTrue(PLSIOption().is_valid_option(opt)) + opt['save_best'] = 1 + self.assertRaises(RuntimeError, PLSIOption().is_valid_option, opt) + opt['save_best'] = False + self.assertTrue(PLSIOption().is_valid_option(opt)) + + def test02_init_with_dict(self): + opt = PLSIOption().get_default_option() + PLSI(opt) + self.assertTrue(True) + + def test03_init(self): + opt = PLSIOption().get_default_option() + self._test3_init(PLSI, opt) + + def test04_train(self): + opt = PLSIOption().get_default_option() + self._test4_train(PLSI, opt) + + def test05_validation(self): + opt = PLSIOption().get_default_option() + opt.validation = aux.Option({'topk': 10}) + self._test5_validation(PLSI, opt, ndcg=0.03, map=0.02) + + def test06_topk(self): + opt = PLSIOption().get_default_option() + opt.d = 10 + self.maxDiff = None + self._test6_topk(PLSI, opt) + + def test07_train_ml_20m(self): + opt = PLSIOption().get_default_option() + opt.num_workers = 8 + self._test7_train_ml_20m(PLSI, opt) + + def test08_serialization(self): + opt = PLSIOption().get_default_option() + opt.d = 10 + self._test8_serialization(PLSI, opt) + + def test09_compact_serialization(self): + opt = PLSIOption().get_default_option() + opt.d = 10 + self._test9_compact_serialization(PLSI, opt) + + def test10_inheritance(self): + uid1 = np.arange(0, 100).astype(str) + iid1 = np.arange(200, 300).astype(str) + main1 = np.random.binomial(1, 0.1, size=(100, 100)) + data_opt1 = MatrixMarketOptions().get_default_option() + data_opt1.input = aux.Option({'main': main1, 'uid': uid1, 'iid': iid1}) + data1 = buffalo.data.load(data_opt1) + data1.create() + plsi_opt1 = PLSIOption().get_default_option() + model1 = PLSI(plsi_opt1, data=data1) + model1.initialize() + model1.train() + model1.save('inherit_model.plsi') + os.remove('mm.h5py') + + uid2 = np.arange(80, 150).astype(str) + iid2 = np.arange(280, 350).astype(str) + main2 = np.random.binomial(1, 0.1, size=(70, 70)) + data_opt2 = MatrixMarketOptions().get_default_option() + data_opt2.input = aux.Option({'main': main2, 'uid': uid2, 'iid': iid2}) + data2 = buffalo.data.load(data_opt2) + data2.create() + plsi_opt2 = PLSIOption().get_default_option() + plsi_opt2.inherit_opt = aux.Option({'model_path': 'inherit_model.plsi', 'inherit_user': True, 'inherit_item': True}) # noqa: E501 + model2 = PLSI(plsi_opt2, data=data2) + model2.initialize() + os.remove('mm.h5py') + os.remove('inherit_model.plsi') + + prev_p, curr_p = model1.P[-20:], model2.P[:20] + prev_q, curr_q = model1.Q[-20:], model2.Q[:20] + self.assertTrue(np.allclose(prev_p, curr_p)) + self.assertTrue(np.allclose(prev_q, curr_q)) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/algo/test_w2v.py b/tests/algo/test_w2v.py index 95972dd..b38a716 100644 --- a/tests/algo/test_w2v.py +++ b/tests/algo/test_w2v.py @@ -2,6 +2,8 @@ import os import unittest +import numpy as np +from tempfile import NamedTemporaryFile from buffalo.algo.w2v import W2V from buffalo.misc.log import set_log_level @@ -20,9 +22,9 @@ def load_text8_model(self): set_log_level(3) opt = W2VOption().get_default_option() opt.num_workers = 12 - opt.d = 40 + opt.d = 100 opt.min_count = 4 - opt.num_iters = 10 + opt.num_iters = 20 opt.model_path = 'text8.w2v.bin' data_opt = StreamOptions().get_default_option() data_opt.input.main = self.text8 + 'main' @@ -63,25 +65,7 @@ def test04_train(self): def test05_text8_accuracy(self): set_log_level(2) - opt = W2VOption().get_default_option() - opt.num_workers = 12 - opt.d = 200 - opt.num_iters = 15 - opt.min_count = 4 - data_opt = StreamOptions().get_default_option() - data_opt.input.main = self.text8 + 'main' - data_opt.data.path = './text8.h5py' - data_opt.data.use_cache = True - data_opt.data.validation = {} - - model_path = 'text8.accuracy.w2v.bin' - w = W2V(opt, data_opt=data_opt) - if os.path.isfile(model_path): - w.load(model_path) - else: - w.initialize() - w.train() - w.build_itemid_map() + w = self.load_text8_model() with open('./ext/text8/questions-words.txt') as fin: questions = fin.read().strip().split('\n') @@ -123,6 +107,31 @@ def test06_most_similar(self): q1, q2, q3 = 'apple', 'macintosh', 'microsoft' self._test_most_similar(w, q1, q2, q3) + def test07_oov_by_mincut(self): + opt = W2VOption().get_default_option() + opt.num_iters = 5 + opt.num_workers = 1 + opt.d = 10 + opt.min_count = 2 + data_opt = StreamOptions().get_default_option() + with NamedTemporaryFile('w', delete=False) as fout_main,\ + NamedTemporaryFile('w', delete=False) as fout_iid: + fout_main.write('1 2 1 2 1 2 1 2\n3\n') + fout_iid.write('1\n2\n3\n') + data_opt.input.main = fout_main.name + data_opt.input.iid = fout_iid.name + model = W2V(opt, data_opt=data_opt) + model.initialize() + model.train() + for k in ['1', '2', '3']: + vec = model.get_feature(k) + if k == '3': + self.assertIsNone(vec) + else: + self.assertIsInstance(vec, np.ndarray) + os.remove(data_opt.input.main) + os.remove(data_opt.input.iid) + if __name__ == '__main__': unittest.main() diff --git a/tests/algo/test_warp.py b/tests/algo/test_warp.py index 9c62d91..4315de7 100644 --- a/tests/algo/test_warp.py +++ b/tests/algo/test_warp.py @@ -63,6 +63,7 @@ def test07_train_ml_20m(self): def test08_serialization(self): opt = WARPOption().get_default_option() opt.d = 10 + opt.max_trials = 500 opt.validation = aux.Option({'topk': 10}) self._test8_serialization(WARP, opt) diff --git a/tests/data/test_mm.py b/tests/data/test_mm.py index 11d8d38..585d9f9 100644 --- a/tests/data/test_mm.py +++ b/tests/data/test_mm.py @@ -104,6 +104,11 @@ def setUpClass(cls): cls.mm_dense = np.random.rand(32, 4) cls.temp_files = [] + @classmethod + def tearDownClass(cls): + for path in cls.temp_files: + os.remove(path) + def test1_sparse(self): opt = MatrixMarketOptions().get_default_option() opt.input.main = self.mm_sparse @@ -122,8 +127,10 @@ def test3_list(self): opt = MatrixMarketOptions().get_default_option() opt.input.main = [[10, 123], [1, 2]] mm = MatrixMarket(opt) - self.assertRaises(RuntimeError, opt.is_valid_option) - self.assertRaises(RuntimeError, mm.create) + with self.assertRaises((AssertionError, RuntimeError)): + MatrixMarketOptions().is_valid_option(opt) + with self.assertRaises((RuntimeError, TypeError)): + mm.create() def test3_id_list(self): opt = MatrixMarketOptions().get_default_option() @@ -132,6 +139,7 @@ def test3_id_list(self): opt.input.iid = np.array(['1', 'a']) mm = MatrixMarket(opt) mm.create() + self.temp_files.append(opt.data.path) self.assertTrue(True) def test3_id_list_except(self): @@ -139,7 +147,9 @@ def test3_id_list_except(self): opt.input.main = np.array([[1, 2], [1, 2], [2, 1]]) opt.input.uid = [1, 2.0] # size should be 3 mm = MatrixMarket(opt) - self.assertRaises(TypeError, mm.create) + with self.assertRaises(TypeError): + mm.create() + if __name__ == '__main__': unittest.main()