From 0e38e537691ba0872a940f74f6dde3ed3b4da6ba Mon Sep 17 00:00:00 2001 From: js1010 Date: Sat, 6 Feb 2021 08:32:19 +0000 Subject: [PATCH 01/14] add setup --- cusim/aux.py | 119 ++++++++++++++++++++++++ cusim/ioutils/__init__.py | 6 ++ cusim/ioutils/bindings.cc | 40 ++++++++ cusim/ioutils/pyioutils.py | 18 ++++ cusim/proto/config.proto | 27 ++++++ requirements.txt | 5 + setup.py | 186 +++++++++++++++++++++++++++++++++++++ 7 files changed, 401 insertions(+) create mode 100644 cusim/aux.py create mode 100644 cusim/ioutils/__init__.py create mode 100644 cusim/ioutils/bindings.cc create mode 100644 cusim/ioutils/pyioutils.py create mode 100644 cusim/proto/config.proto create mode 100644 requirements.txt create mode 100644 setup.py diff --git a/cusim/aux.py b/cusim/aux.py new file mode 100644 index 0000000..0538a04 --- /dev/null +++ b/cusim/aux.py @@ -0,0 +1,119 @@ +# Copyright (c) 2021 Jisang Yoon +# All rights reserved. +# +# This source code is licensed under the Apache 2.0 license found in the +# LICENSE file in the root directory of this source tree. +import re +import json +import logging +import logging.handlers + +import jsmin +from google.protobuf.json_format import Parse, MessageToDict +from cusim.config_pb2 import ConfigProto + +# get_logger and Option refer to +# https://github.com/kakao/buffalo/blob/ +# 5f571c2c7d8227e6625c6e538da929e4db11b66d/buffalo/misc/aux.py +def get_logger(name=__file__, level=2): + if level == 1: + level = logging.WARNING + elif level == 2: + level = logging.INFO + elif level == 3: + level = logging.DEBUG + logger = logging.getLogger(name) + if logger.handlers: + return logger + logger.setLevel(level) + sh0 = logging.StreamHandler() + sh0.setLevel(level) + formatter = logging.Formatter('[%(levelname)-8s] %(asctime)s ' + '[%(filename)s] [%(funcName)s:%(lineno)d]' + '%(message)s', '%Y-%m-%d %H:%M:%S') + sh0.setFormatter(formatter) + logger.addHandler(sh0) + return logger + +# This function helps you to read non-standard json strings. +# - Handles json string with c++ style inline comments +# - Handles json string with trailing commas. +def load_json_string(cont): + # (1) Removes comment. + # Refer to https://plus.google.com/+DouglasCrockfordEsq/posts/RK8qyGVaGSr + cont = jsmin.jsmin(cont) + + # (2) Removes trailing comma. + cont = re.sub(",[ \t\r\n]*}", "}", cont) + cont = re.sub(",[ \t\r\n]*" + r"\]", "]", cont) + + return json.loads(cont) + + +# function read json file from filename +def load_json_file(fname): + with open(fname, "r") as fin: + ret = load_json_string(fin.read()) + return ret + +# use protobuf to restrict field and types +def get_opt_as_proto(raw, proto_type=ConfigProto): + proto = proto_type() + # convert raw to proto + Parse(json.dumps(Option(raw)), proto) + err = [] + assert proto.IsInitialized(err), \ + f"some required fields are missing in proto {err}\n {proto}" + return proto + +def proto_to_dict(proto): + return MessageToDict(proto, \ + including_default_value_fields=True, \ + preserving_proto_field_name=True) + +def copy_proto(proto): + newproto = type(proto)() + Parse(json.dumps(proto_to_dict(proto)), newproto) + return newproto + +class Option(dict): + def __init__(self, *args, **kwargs): + args = [arg if isinstance(arg, dict) + else load_json_file(arg) for arg in args] + super().__init__(*args, **kwargs) + for arg in args: + if isinstance(arg, dict): + for k, val in arg.items(): + if isinstance(val, dict): + self[k] = Option(val) + else: + self[k] = val + if kwargs: + for k, val in kwargs.items(): + if isinstance(val, dict): + self[k] = Option(val) + else: + self[k] = val + + def __getattr__(self, attr): + return self.get(attr) + + def __setattr__(self, key, value): + self.__setitem__(key, value) + + def __setitem__(self, key, value): + super().__setitem__(key, value) + self.__dict__.update({key: value}) + + def __delattr__(self, item): + self.__delitem__(item) + + def __delitem__(self, key): + super().__delitem__(key) + del self.__dict__[key] + + def __getstate__(self): + return vars(self) + + def __setstate__(self, state): + vars(self).update(state) diff --git a/cusim/ioutils/__init__.py b/cusim/ioutils/__init__.py new file mode 100644 index 0000000..61fc1fe --- /dev/null +++ b/cusim/ioutils/__init__.py @@ -0,0 +1,6 @@ +# Copyright (c) 2021 Jisang Yoon +# All rights reserved. +# +# This source code is licensed under the Apache 2.0 license found in the +# LICENSE file in the root directory of this source tree. +from cusim.ioutils.pyioutils import IoUtils diff --git a/cusim/ioutils/bindings.cc b/cusim/ioutils/bindings.cc new file mode 100644 index 0000000..b81141c --- /dev/null +++ b/cusim/ioutils/bindings.cc @@ -0,0 +1,40 @@ +// Copyright (c) 2021 Jisang Yoon +// All rights reserved. +// +// This source code is licensed under the Apache 2.0 license found in the +// LICENSE file in the root directory of this source tree. +#include +#include +#include + +#include +#include "ioutils.hpp" + +namespace py = pybind11; + +typedef py::array_t float_array; +typedef py::array_t int_array; + +class IoUtilsBind { + public: + IoUtilsBind() {} + LoadGensimVocab(std::string filepath, int min_count) { + obj_.LoadGensimVocab(filepath, min_count); + } + private: + cusim::IoUtils obj_; +}; + +PYBIND11_PLUGIN(ioutils_bind) { + py::module m("IoUtilsBind"); + + py::class_(m, "IoUtilsBind") + .def(py::init()) + .def("load_gensim_vocab", &IoUtilsBind::Init, py::arg("filepath"), py::arg("min_count")) + .def("__repr__", + [](const IoUtilsBind &a) { + return ""; + } + ); + return m.ptr(); +} diff --git a/cusim/ioutils/pyioutils.py b/cusim/ioutils/pyioutils.py new file mode 100644 index 0000000..d79cef2 --- /dev/null +++ b/cusim/ioutils/pyioutils.py @@ -0,0 +1,18 @@ +# Copyright (c) 2021 Jisang Yoon +# All rights reserved. +# +# This source code is licensed under the Apache 2.0 license found in the +# LICENSE file in the root directory of this source tree. + +# pylint: disable=no-name-in-module,too-few-public-methods,no-member +from cusim import aux +from cusim.ioutils.ioutils_bind import IoUtilsBind + + +class IoUtils: + def __init__(self, log_level=2): + self.logger = aux.get_logger("ioutils", log_level=log_level) + self.obj = IoUtilsBind(log_level) + + def load_gensim_vocab(self, filepath, min_count): + self.obj.load_gensim_vocab(filepath, min_count) diff --git a/cusim/proto/config.proto b/cusim/proto/config.proto new file mode 100644 index 0000000..f90d1bc --- /dev/null +++ b/cusim/proto/config.proto @@ -0,0 +1,27 @@ +// Copyright (c) 2020 Jisang Yoon +// All rights reserved. +// +// This source code is licensed under the Apache 2.0 license found in the +// LICENSE file in the root directory of this source tree. + +syntax = "proto2"; + +message ConfigProto { + optional int32 seed = 1 [default = 777]; + optional int32 c_log_level = 3 [default = 2]; + optional int32 py_log_level = 4 [default = 2]; + optional int32 max_m = 5 [default = 12]; + optional int32 max_m0 = 6 [default = 24]; + optional int32 ef_construction = 7 [default = 150]; + // optional int32 ef_search = 8 [default = 50]; + optional double level_mult = 9; + optional bool save_remains = 10; + optional double hyper_threads = 11 [default = 10]; + optional int32 block_dim = 12 [default = 32]; + optional string dist_type = 13 [default = "dot"]; + optional int32 visited_table_size = 17; + optional int32 visited_list_size = 14 [default = 8192]; + optional bool nrz = 15 [default = true]; + optional bool reverse_cand = 16; + optional double heuristic_coef = 18 [default = 0.25]; +} diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..c268069 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +jsmin +numpy +pybind11 +protobuf==3.10.0 +grpcio-tools==1.27.1 diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..de6ca72 --- /dev/null +++ b/setup.py @@ -0,0 +1,186 @@ +# Copyright (c) 2020 Jisang Yoon +# All rights reserved. +# +# This source code is licensed under the Apache 2.0 license found in the +# LICENSE file in the root directory of this source tree. + +# pylint: disable=fixme,too-few-public-methods +# reference: https://github.com/kakao/buffalo/blob/ +# 5f571c2c7d8227e6625c6e538da929e4db11b66d/setup.py +"""cusim +""" +import os +import sys +import glob +import pathlib +import platform +import sysconfig +import subprocess +from setuptools import setup, Extension + +import pybind11 +import numpy as np +from cuda_setup import CUDA, BUILDEXT + + +DOCLINES = __doc__.split("\n") + +# TODO: Python3 Support +if sys.version_info[:3] < (3, 6): + raise RuntimeError("Python version 3.6 or later required.") + +assert platform.system() == 'Linux' # TODO: MacOS + + +MAJOR = 0 +MINOR = 0 +MICRO = 0 +RELEASE = True +STAGE = {True: '', False: 'b'}.get(RELEASE) +VERSION = f'{MAJOR}.{MINOR}.{MICRO}{STAGE}' +STATUS = {False: 'Development Status :: 4 - Beta', + True: 'Development Status :: 5 - Production/Stable'} + +CLASSIFIERS = """{status} +Programming Language :: C++ +Programming Language :: Python :: 3.6 +Operating System :: POSIX :: Linux +Operating System :: Unix +Operating System :: MacOS +License :: OSI Approved :: Apache Software License""".format( \ + status=STATUS.get(RELEASE)) +CLIB_DIR = os.path.join(sysconfig.get_path('purelib'), 'cusim') +LIBRARY_DIRS = [CLIB_DIR] + + +def get_extend_compile_flags(): + flags = ['-march=native'] + return flags + + +class CMakeExtension(Extension): + extension_type = 'cmake' + + def __init__(self, name): + super().__init__(name, sources=[]) + + +extend_compile_flags = get_extend_compile_flags() +extra_compile_args = ['-fopenmp', '-std=c++14', '-ggdb', '-O3'] + \ + extend_compile_flags +csrcs = glob.glob("cpp/src/*.cu") + glob.glob("cpp/src/*.cc") +extensions = [ + Extension("cusim.ioutils.ioutils_bind", + sources= csrcs + [ \ + "cusim/ioutils/bindings.cc", + "3rd/json11/json11.cpp"], + language="c++", + extra_compile_args=extra_compile_args, + extra_link_args=["-fopenmp"], + library_dirs=[CUDA['lib64']], + libraries=['cudart', 'cublas', 'curand'], + extra_objects=[], + include_dirs=[ \ + "cpp/include/", np.get_include(), pybind11.get_include(), + pybind11.get_include(True), CUDA['include'], + "3rd/json11", "3rd/spdlog/include"]) +] + + +# Return the git revision as a string +def git_version(): + def _minimal_ext_cmd(cmd): + # construct minimal environment + env = {} + for k in ['SYSTEMROOT', 'PATH']: + val = os.environ.get(k) + if val is not None: + env[k] = val + out = subprocess.Popen(cmd, stdout=subprocess.PIPE, env=env). \ + communicate()[0] + return out + + try: + out = _minimal_ext_cmd(['git', 'rev-parse', 'HEAD']) + git_revision = out.strip().decode('ascii') + except OSError: + git_revision = "Unknown" + + return git_revision + + +def write_version_py(filename='cusim/version.py'): + cnt = """ +short_version = '%(version)s' +git_revision = '%(git_revision)s' +""" + git_revision = git_version() + with open(filename, 'w') as fout: + fout.write(cnt % {'version': VERSION, + 'git_revision': git_revision}) + + +class BuildExtension(BUILDEXT): + def run(self): + for ext in self.extensions: + print(ext.name) + if hasattr(ext, 'extension_type') and ext.extension_type == 'cmake': + self.cmake() + super().run() + + def cmake(self): + cwd = pathlib.Path().absolute() + + build_temp = pathlib.Path(self.build_temp) + build_temp.mkdir(parents=True, exist_ok=True) + + build_type = 'Debug' if self.debug else 'Release' + + cmake_args = [ + '-DCMAKE_BUILD_TYPE=' + build_type, + '-DCMAKE_LIBRARY_OUTPUT_DIRECTORY=' + CLIB_DIR, + ] + + build_args = [] + + os.chdir(str(build_temp)) + self.spawn(['cmake', str(cwd)] + cmake_args) + if not self.dry_run: + self.spawn(['cmake', '--build', '.'] + build_args) + os.chdir(str(cwd)) + + +def setup_package(): + write_version_py() + cmdclass = { + 'build_ext': BuildExtension + } + + metadata = dict( + name='cusim', + maintainer="Jisang Yoon", + maintainer_email="vjs10101v@gmail.com", + description=DOCLINES[0], + long_description="\n".join(DOCLINES[2:]), + url="https://github.com/js1010/cusim", + download_url="https://github.com/js1010/cusim/releases", + include_package_data=False, + license='Apac2', + packages=['cusim/'], + cmdclass=cmdclass, + classifiers=[_f for _f in CLASSIFIERS.split('\n') if _f], + platforms=['Linux', 'Mac OSX', 'Unix'], + ext_modules=extensions, + entry_points={ + 'console_scripts': [ + ] + }, + python_requires='>=3.6', + ) + + metadata['version'] = VERSION + setup(**metadata) + + +if __name__ == '__main__': + setup_package() From 052bf652c99dbd24c568514c3a6d3f0e05a54bc9 Mon Sep 17 00:00:00 2001 From: js1010 Date: Sat, 6 Feb 2021 08:33:06 +0000 Subject: [PATCH 02/14] update readme --- README.md | 134 ++++++++++++++++++++++++++++++++- cuda_setup.py | 205 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 337 insertions(+), 2 deletions(-) create mode 100644 cuda_setup.py diff --git a/README.md b/README.md index 0342692..fdb7ed0 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,132 @@ -# cusim -cuda implementaion of w2v and lda +### Introduction + +This project is to speed up HNSW algorithm by CUDA. I expect that anyone who will be interested in this project might be already familiar with the following paper and the open source project. If not, I strongly recommend that you check them first. + +- hnsw paper: https://arxiv.org/pdf/1603.09320.pdf (2016) +- hnsw implementation (cpu only) by the author of hnsw (Yury Markov): https://github.com/nmslib/hnswlib +- Approximate Nearest Neighbor (ANN) Benchmark Site: http://ann-benchmarks.com/ + +I also adapted some ideas from the following project. + +- n2 (alternative hnsw cpu implementation project): https://github.com/kakao/n2 + +By brief survey, I found there are several papers and projects to suggest to speed up ANN algorithms by GPU. + +- papers or projects related to using GPU for ANN + - paper (2020): http://research.baidu.com/Public/uploads/5f5c37aa9c37c.pdf + - paper (2017): https://arxiv.org/pdf/1702.05911.pdf + - slides (2020): https://wangzwhu.github.io/home/file/acmmm-t-part3-ann.pdf + - project (2017): https://github.com/facebookresearch/faiss + +I started this project because I was originally interested in both CUDA programming and ANN algorithms. I release this project because it achieved meaningful performance and hope to develop further by community participation. + +Literally, this package is implemented to build HNSW graphs using GPU, and to approximate nearest neighbor search through the built graphs, and the format of the model file is compatible with hnswlib. In other words, you can build a HNSW graph from this package, then save it and load it from hnswlib for search, and vice versa. + + +### How to install + + +```shell +# clone repo and submodules +git clone git@github.com:js1010/cuhnsw.git && cd cuhnsw && git submodule update --init + +# install requirements +pip install -r requirements.txt + +# generate proto +python -m grpc_tools.protoc --python_out cuhnsw/ --proto_path cuhnsw/proto/ config.proto + +# install +python setup.py install +``` + +### How to use + +- `examples/example1.py` and `examples/README.md` will be very helpful to understand the usage. +- build and save model + +```python +import h5py +from cuhnsw import CuHNSW + + +h5f = h5py.File("glove-50-angular.hdf5", "r") +data = h5f["train"][:, :].astype(np.float32) +h5f.close() +ch0 = CuHNSW(opt={}) +ch0.set_data(data) +ch0.build() +ch0.save_index("cuhnsw.index") +``` + +- load model and search + +```python +import h5py +from cuhnsw import CuHNSW + +h5f = h5py.File("glove-50-angular.hdf5", "r") +data = h5f["test"][:, :].astype(np.float32) +h5f.close() +ch0 = CuHNSW(opt={}) +ch0.load_index("cuhnsw.index") +nns, distances, found_cnt = ch0.search_knn(data, topk=10, ef_search=300) +``` + +- Option parameters (see `cuhnsw/proto/config.proto`) + - `seed`: numpy random seed (used in random levels) + - `c_log_level`: log level in cpp logging (spdlog) + - `py_log_level`: log level in python logging + - `max_m`: maximum number of links in layers higher than ground layer + - `max_m0`: maximum number of links in the ground layer + - `level_mult`: multiplier to draw levels of each element (defualt: 0 => setted as `1 / log(max_m0)` in initialization as recommended in hnsw paper) + - `save_remains`: link to remained candidates in SearchHeuristic (adapted from n2) + - `heuristic_coff`: select some closest candidates by default (also adapted from n2) + - `hyper_threads`: set the number of gpu blocks as the total number of concurrent cores exceeds the physical number of cores + - `block_dim`: block dimension (should be smaller than 32^2=1024 and should be the multiple of 32) + - `nrz`: normalize data vector if True + - `visited_table_size`: size of table to store the visited nodes in each search + - `visited_list_size`: size of list to store the visited nodes in each search (useful to reset table after each search) + - `reverse_cand`: select the candidate with the furthest distance if True (it makes the build slower but achieves better quality) + - `dist_type`: euclidean distance if "l2" and inner product distaance if "dot" + +### Performance + +- tl;dr + - cuhnsw achieved the same build quality by 8~9 times faster build time than hnswlib with 8 vcpus on certain data and parameter setup + - cuhnsw achieved the same search quality by 3 times faster search time than hnswlib with 8 vcpus instance on certain data and parameter setup +- Note1: HNSW search algorithm can be verified by exact match since it is deterministic. + - I verified it with hnswlib, in other words, cuhnsw search and hnswlib search returns exactly same results by loading the same model file and the same queries and the same ef search. +- Note2: GPU search has the advantage over CPU search only when it comes to the `Batch` search (i.e. processing large number of queries at once.) +- [AWS P3 2xlarge instance](https://aws.amazon.com/ec2/instance-types/p3/) is used to the experiment. (One Tesla V100 GPU with 8 vcpus) +- results can be reproduced by running `example/example1.py`. +- build time / quality results on glove-50-angular + - used `ef_construction`=150 for hnswlib and `ef_construction=110` for cuhnsw to achieve the same build quality + - build quality is measured by the accuracy by the same search parameter (`ef_search`=300) + +| attr | 1 vcpu | 2 vcpu | 4 vcpu | 8 vcpu | gpu | +|:--------------|-----------:|-----------:|----------:|----------:|----------:| +| build time | 343.909 | 179.836 | 89.7936 | 70.5476 | 8.2847 | +| build quality | 0.863193 | 0.863301 | 0.863238 | 0.863165 | 0.865471 | + +- search time comparison on glove-50-angular + - search time on 100k random queries + - search `quality` is guaranteed to the same (exact match) + +| attr | 1 vcpu | 2 vcpu | 4 vcpu | 8 vcpu | gpu | +|:------------|--------:|--------:|--------:|--------:|--------:| +| search time | 52.3024 | 26.5086 | 13.9146 | 10.8525 | 3.07964 | + +- the reason why the parallel efficiency significantly drops from 4 vcpu to 8 vcpu might be hyper threading (there might be only 4 "physical" cores in this instance). + +### Thoughts on Future Task + +- Considering the cost of GPU and CPU, it seems impractical yet (currently one Tesla V100 device is equivalently fast as one standard cpu server (24-56 vcores) but the cost of Tesla V100 device is quite more expensive). Therefore, there seems to be a lot to do in the future. +- The word in the parentheses shows the expected level of difficulty for each task + +1. **upload package to pypi (easy)**: the task itself is very easy but will not worth it unless this project is successful. +2. **implement parallel compilation using bazel or cmake (easy-medium)**: bazel is more preferable. compilation time is a little bit painful. +3. **achieve meaningful speed-up by using half-precision operation (medium)**: I experimented it, but only got around 10 % improvement. I am not sure if I have used the half-precision feature appropriately. +4. **support multi-device (very hard)**: it only supports single-device (gpu) yet since the graph should be shared across all the building threads. + +- contribution is always welcome diff --git a/cuda_setup.py b/cuda_setup.py new file mode 100644 index 0000000..5ff76b6 --- /dev/null +++ b/cuda_setup.py @@ -0,0 +1,205 @@ +# Copyright (c) 2020 Jisang Yoon +# All rights reserved. +# +# This source code is licensed under the Apache 2.0 license found in the +# LICENSE file in the root directory of this source tree. + +# Adapted from https://github.com/rmcgibbo/npcuda-example and +# https://github.com/cupy/cupy/blob/master/cupy_setup_build.py +# pylint: disable=fixme,access-member-before-definition +# pylint: disable=attribute-defined-outside-init,arguments-differ +import logging +import os +import sys + +from distutils import ccompiler, errors, msvccompiler, unixccompiler +from setuptools.command.build_ext import build_ext as setuptools_build_ext + +HALF_PRECISION = False + +def find_in_path(name, path): + "Find a file in a search path" + # adapted fom http://code.activestate.com/ + # recipes/52224-find-a-file-given-a-search-path/ + for _dir in path.split(os.pathsep): + binpath = os.path.join(_dir, name) + if os.path.exists(binpath): + return os.path.abspath(binpath) + return None + + +def locate_cuda(): + """Locate the CUDA environment on the system + If a valid cuda installation is found + this returns a dict with keys 'home', 'nvcc', 'include', + and 'lib64' and values giving the absolute path to each directory. + Starts by looking for the CUDAHOME env variable. + If not found, everything is based on finding + 'nvcc' in the PATH. + If nvcc can't be found, this returns None + """ + nvcc_bin = 'nvcc' + if sys.platform.startswith("win"): + nvcc_bin = 'nvcc.exe' + + # check env variables CUDA_HOME, CUDAHOME, CUDA_PATH. + found = False + for env_name in ['CUDA_PATH', 'CUDAHOME', 'CUDA_HOME']: + if env_name not in os.environ: + continue + found = True + home = os.environ[env_name] + nvcc = os.path.join(home, 'bin', nvcc_bin) + break + if not found: + # otherwise, search the PATH for NVCC + nvcc = find_in_path(nvcc_bin, os.environ['PATH']) + if nvcc is None: + logging.warning('The nvcc binary could not be located in your ' + '$PATH. Either add it to ' + 'your path, or set $CUDA_HOME to enable CUDA extensions') + return None + home = os.path.dirname(os.path.dirname(nvcc)) + + cudaconfig = {'home': home, + 'nvcc': nvcc, + 'include': os.path.join(home, 'include'), + 'lib64': os.path.join(home, 'lib64')} + post_args = [ + "-arch=sm_52", + "-gencode=arch=compute_52,code=sm_52", + "-gencode=arch=compute_60,code=sm_60", + "-gencode=arch=compute_61,code=sm_61", + "-gencode=arch=compute_70,code=sm_70", + "-gencode=arch=compute_75,code=sm_75", + "-gencode=arch=compute_80,code=sm_80", + "-gencode=arch=compute_86,code=sm_86", + "-gencode=arch=compute_86,code=compute_86", + '--ptxas-options=-v', '-O2'] + if HALF_PRECISION: + post_args = [flag for flag in post_args if "52" not in flag] + + if sys.platform == "win32": + cudaconfig['lib64'] = os.path.join(home, 'lib', 'x64') + post_args += ['-Xcompiler', '/MD', '-std=c++14', "-Xcompiler", "/openmp"] + if HALF_PRECISION: + post_args += ["-Xcompiler", "/D HALF_PRECISION"] + else: + post_args += ['-c', '--compiler-options', "'-fPIC'", + "--compiler-options", "'-std=c++14'"] + if HALF_PRECISION: + post_args += ["--compiler-options", "'-D HALF_PRECISION'"] + for k, val in cudaconfig.items(): + if not os.path.exists(val): + logging.warning('The CUDA %s path could not be located in %s', k, val) + return None + + cudaconfig['post_args'] = post_args + return cudaconfig + + +# This code to build .cu extensions with nvcc is taken from cupy: +# https://github.com/cupy/cupy/blob/master/cupy_setup_build.py +class _UnixCCompiler(unixccompiler.UnixCCompiler): + src_extensions = list(unixccompiler.UnixCCompiler.src_extensions) + src_extensions.append('.cu') + + def _compile(self, obj, src, ext, cc_args, extra_postargs, pp_opts): + # For sources other than CUDA C ones, just call the super class method. + if os.path.splitext(src)[1] != '.cu': + return unixccompiler.UnixCCompiler._compile( + self, obj, src, ext, cc_args, extra_postargs, pp_opts) + + # For CUDA C source files, compile them with NVCC. + _compiler_so = self.compiler_so + try: + nvcc_path = CUDA['nvcc'] + post_args = CUDA['post_args'] + # TODO? base_opts = build.get_compiler_base_options() + self.set_executable('compiler_so', nvcc_path) + + return unixccompiler.UnixCCompiler._compile( + self, obj, src, ext, cc_args, post_args, pp_opts) + finally: + self.compiler_so = _compiler_so + + +class _MSVCCompiler(msvccompiler.MSVCCompiler): + _cu_extensions = ['.cu'] + + src_extensions = list(unixccompiler.UnixCCompiler.src_extensions) + src_extensions.extend(_cu_extensions) + + def _compile_cu(self, sources, output_dir=None, macros=None, + include_dirs=None, debug=0, extra_preargs=None, + extra_postargs=None, depends=None): + # Compile CUDA C files, mainly derived from UnixCCompiler._compile(). + macros, objects, extra_postargs, pp_opts, _build = \ + self._setup_compile(output_dir, macros, include_dirs, sources, + depends, extra_postargs) + + compiler_so = CUDA['nvcc'] + cc_args = self._get_cc_args(pp_opts, debug, extra_preargs) + post_args = CUDA['post_args'] + + for obj in objects: + try: + src, _ = _build[obj] + except KeyError: + continue + try: + self.spawn([compiler_so] + cc_args + [src, '-o', obj] + post_args) + except errors.DistutilsExecError as e: + raise errors.CompileError(str(e)) + + return objects + + def compile(self, sources, **kwargs): + # Split CUDA C sources and others. + cu_sources = [] + other_sources = [] + for source in sources: + if os.path.splitext(source)[1] == '.cu': + cu_sources.append(source) + else: + other_sources.append(source) + + # Compile source files other than CUDA C ones. + other_objects = msvccompiler.MSVCCompiler.compile( + self, other_sources, **kwargs) + + # Compile CUDA C sources. + cu_objects = self._compile_cu(cu_sources, **kwargs) + + # Return compiled object filenames. + return other_objects + cu_objects + + +class CudaBuildExt(setuptools_build_ext): + """Custom `build_ext` command to include CUDA C source files.""" + + def run(self): + if CUDA is not None: + def wrap_new_compiler(func): + def _wrap_new_compiler(*args, **kwargs): + try: + return func(*args, **kwargs) + except errors.DistutilsPlatformError: + if sys.platform != 'win32': + CCompiler = _UnixCCompiler + else: + CCompiler = _MSVCCompiler + return CCompiler( + None, kwargs['dry_run'], kwargs['force']) + return _wrap_new_compiler + ccompiler.new_compiler = wrap_new_compiler(ccompiler.new_compiler) + # Intentionally causes DistutilsPlatformError in + # ccompiler.new_compiler() function to hook. + self.compiler = 'nvidia' + + setuptools_build_ext.run(self) + + +CUDA = locate_cuda() +assert CUDA is not None +BUILDEXT = CudaBuildExt if CUDA else setuptools_build_ext From 19923aabd052350df4074dc5c257bc0dcc7180ac Mon Sep 17 00:00:00 2001 From: js1010 Date: Sat, 6 Feb 2021 08:33:50 +0000 Subject: [PATCH 03/14] remove unnecessary parts in readme --- README.md | 120 +----------------------------------------------------- 1 file changed, 2 insertions(+), 118 deletions(-) diff --git a/README.md b/README.md index fdb7ed0..34a25c7 100644 --- a/README.md +++ b/README.md @@ -1,132 +1,16 @@ -### Introduction - -This project is to speed up HNSW algorithm by CUDA. I expect that anyone who will be interested in this project might be already familiar with the following paper and the open source project. If not, I strongly recommend that you check them first. - -- hnsw paper: https://arxiv.org/pdf/1603.09320.pdf (2016) -- hnsw implementation (cpu only) by the author of hnsw (Yury Markov): https://github.com/nmslib/hnswlib -- Approximate Nearest Neighbor (ANN) Benchmark Site: http://ann-benchmarks.com/ - -I also adapted some ideas from the following project. - -- n2 (alternative hnsw cpu implementation project): https://github.com/kakao/n2 - -By brief survey, I found there are several papers and projects to suggest to speed up ANN algorithms by GPU. - -- papers or projects related to using GPU for ANN - - paper (2020): http://research.baidu.com/Public/uploads/5f5c37aa9c37c.pdf - - paper (2017): https://arxiv.org/pdf/1702.05911.pdf - - slides (2020): https://wangzwhu.github.io/home/file/acmmm-t-part3-ann.pdf - - project (2017): https://github.com/facebookresearch/faiss - -I started this project because I was originally interested in both CUDA programming and ANN algorithms. I release this project because it achieved meaningful performance and hope to develop further by community participation. - -Literally, this package is implemented to build HNSW graphs using GPU, and to approximate nearest neighbor search through the built graphs, and the format of the model file is compatible with hnswlib. In other words, you can build a HNSW graph from this package, then save it and load it from hnswlib for search, and vice versa. - - ### How to install ```shell # clone repo and submodules -git clone git@github.com:js1010/cuhnsw.git && cd cuhnsw && git submodule update --init +git clone git@github.com:js1010/cusim.git && cd cusim && git submodule update --init # install requirements pip install -r requirements.txt # generate proto -python -m grpc_tools.protoc --python_out cuhnsw/ --proto_path cuhnsw/proto/ config.proto +python -m grpc_tools.protoc --python_out cusim/ --proto_path cusim/proto/ config.proto # install python setup.py install ``` - -### How to use - -- `examples/example1.py` and `examples/README.md` will be very helpful to understand the usage. -- build and save model - -```python -import h5py -from cuhnsw import CuHNSW - - -h5f = h5py.File("glove-50-angular.hdf5", "r") -data = h5f["train"][:, :].astype(np.float32) -h5f.close() -ch0 = CuHNSW(opt={}) -ch0.set_data(data) -ch0.build() -ch0.save_index("cuhnsw.index") -``` - -- load model and search - -```python -import h5py -from cuhnsw import CuHNSW - -h5f = h5py.File("glove-50-angular.hdf5", "r") -data = h5f["test"][:, :].astype(np.float32) -h5f.close() -ch0 = CuHNSW(opt={}) -ch0.load_index("cuhnsw.index") -nns, distances, found_cnt = ch0.search_knn(data, topk=10, ef_search=300) -``` - -- Option parameters (see `cuhnsw/proto/config.proto`) - - `seed`: numpy random seed (used in random levels) - - `c_log_level`: log level in cpp logging (spdlog) - - `py_log_level`: log level in python logging - - `max_m`: maximum number of links in layers higher than ground layer - - `max_m0`: maximum number of links in the ground layer - - `level_mult`: multiplier to draw levels of each element (defualt: 0 => setted as `1 / log(max_m0)` in initialization as recommended in hnsw paper) - - `save_remains`: link to remained candidates in SearchHeuristic (adapted from n2) - - `heuristic_coff`: select some closest candidates by default (also adapted from n2) - - `hyper_threads`: set the number of gpu blocks as the total number of concurrent cores exceeds the physical number of cores - - `block_dim`: block dimension (should be smaller than 32^2=1024 and should be the multiple of 32) - - `nrz`: normalize data vector if True - - `visited_table_size`: size of table to store the visited nodes in each search - - `visited_list_size`: size of list to store the visited nodes in each search (useful to reset table after each search) - - `reverse_cand`: select the candidate with the furthest distance if True (it makes the build slower but achieves better quality) - - `dist_type`: euclidean distance if "l2" and inner product distaance if "dot" - -### Performance - -- tl;dr - - cuhnsw achieved the same build quality by 8~9 times faster build time than hnswlib with 8 vcpus on certain data and parameter setup - - cuhnsw achieved the same search quality by 3 times faster search time than hnswlib with 8 vcpus instance on certain data and parameter setup -- Note1: HNSW search algorithm can be verified by exact match since it is deterministic. - - I verified it with hnswlib, in other words, cuhnsw search and hnswlib search returns exactly same results by loading the same model file and the same queries and the same ef search. -- Note2: GPU search has the advantage over CPU search only when it comes to the `Batch` search (i.e. processing large number of queries at once.) -- [AWS P3 2xlarge instance](https://aws.amazon.com/ec2/instance-types/p3/) is used to the experiment. (One Tesla V100 GPU with 8 vcpus) -- results can be reproduced by running `example/example1.py`. -- build time / quality results on glove-50-angular - - used `ef_construction`=150 for hnswlib and `ef_construction=110` for cuhnsw to achieve the same build quality - - build quality is measured by the accuracy by the same search parameter (`ef_search`=300) - -| attr | 1 vcpu | 2 vcpu | 4 vcpu | 8 vcpu | gpu | -|:--------------|-----------:|-----------:|----------:|----------:|----------:| -| build time | 343.909 | 179.836 | 89.7936 | 70.5476 | 8.2847 | -| build quality | 0.863193 | 0.863301 | 0.863238 | 0.863165 | 0.865471 | - -- search time comparison on glove-50-angular - - search time on 100k random queries - - search `quality` is guaranteed to the same (exact match) - -| attr | 1 vcpu | 2 vcpu | 4 vcpu | 8 vcpu | gpu | -|:------------|--------:|--------:|--------:|--------:|--------:| -| search time | 52.3024 | 26.5086 | 13.9146 | 10.8525 | 3.07964 | - -- the reason why the parallel efficiency significantly drops from 4 vcpu to 8 vcpu might be hyper threading (there might be only 4 "physical" cores in this instance). - -### Thoughts on Future Task - -- Considering the cost of GPU and CPU, it seems impractical yet (currently one Tesla V100 device is equivalently fast as one standard cpu server (24-56 vcores) but the cost of Tesla V100 device is quite more expensive). Therefore, there seems to be a lot to do in the future. -- The word in the parentheses shows the expected level of difficulty for each task - -1. **upload package to pypi (easy)**: the task itself is very easy but will not worth it unless this project is successful. -2. **implement parallel compilation using bazel or cmake (easy-medium)**: bazel is more preferable. compilation time is a little bit painful. -3. **achieve meaningful speed-up by using half-precision operation (medium)**: I experimented it, but only got around 10 % improvement. I am not sure if I have used the half-precision feature appropriately. -4. **support multi-device (very hard)**: it only supports single-device (gpu) yet since the graph should be shared across all the building threads. - -- contribution is always welcome From c6e09cea2beffb1070b256cacb8bf48106ebfe6a Mon Sep 17 00:00:00 2001 From: js1010 Date: Sat, 6 Feb 2021 08:35:16 +0000 Subject: [PATCH 04/14] add gitignore --- cusim/.gitignore | 1 + 1 file changed, 1 insertion(+) create mode 100644 cusim/.gitignore diff --git a/cusim/.gitignore b/cusim/.gitignore new file mode 100644 index 0000000..19f06b3 --- /dev/null +++ b/cusim/.gitignore @@ -0,0 +1 @@ +config_pb2.py From a292344316e96cd58e01730f219ef5308bee3027 Mon Sep 17 00:00:00 2001 From: js1010 Date: Sat, 6 Feb 2021 08:46:04 +0000 Subject: [PATCH 05/14] bug-fix --- cpp/include/culda.hpp | 2 +- cpp/include/ioutils.hpp | 6 ++++++ cpp/include/log.hpp | 2 +- cpp/src/culda.cu | 2 +- cpp/src/ioutils.cc | 2 +- cpp/src/log.cc | 4 ++-- 6 files changed, 12 insertions(+), 6 deletions(-) diff --git a/cpp/include/culda.hpp b/cpp/include/culda.hpp index fa3c58f..0c2ba61 100644 --- a/cpp/include/culda.hpp +++ b/cpp/include/culda.hpp @@ -33,7 +33,7 @@ namespace cusim { class CuLDA { - public: + public: CuLDA(); ~CuLDA(); private: diff --git a/cpp/include/ioutils.hpp b/cpp/include/ioutils.hpp index 88bc95b..235593c 100644 --- a/cpp/include/ioutils.hpp +++ b/cpp/include/ioutils.hpp @@ -20,6 +20,10 @@ #include #include +#include "json11.hpp" +#include "log.hpp" +#include "types.hpp" + namespace cusim { class IoUtils { @@ -29,6 +33,8 @@ class IoUtils { void LoadGensimVocab(std::string filepath, int min_count); private: std::vector parse_line(std::string line); + + std::shared_ptr logger_; std::unordered_map word_idmap_; std::vector word_list_; }; // class IoUtils diff --git a/cpp/include/log.hpp b/cpp/include/log.hpp index 6ec766e..05f30ec 100644 --- a/cpp/include/log.hpp +++ b/cpp/include/log.hpp @@ -39,6 +39,6 @@ class CuSimLogger { private: static int global_logging_level_; std::shared_ptr logger_; -}; // class CuHNSWLogger +}; // class CuSimLogger } // namespace cusim diff --git a/cpp/src/culda.cu b/cpp/src/culda.cu index 115d2fe..0c12182 100644 --- a/cpp/src/culda.cu +++ b/cpp/src/culda.cu @@ -3,7 +3,7 @@ // // This source code is licensed under the Apache 2.0 license found in the // LICENSE file in the root directory of this source tree. -#include "culda.cuh" +#include "culda.hpp" namespace cusim { diff --git a/cpp/src/ioutils.cc b/cpp/src/ioutils.cc index 9466d06..004fd46 100644 --- a/cpp/src/ioutils.cc +++ b/cpp/src/ioutils.cc @@ -49,7 +49,7 @@ void IoUtils::LoadGensimVocab(std::string filepath, int min_count) { word_list_.clear(); for (auto& it: word_count) { if (it.second >= min_count) { - word_idmap_[it.first] = vocab_.size(); + word_idmap_[it.first] = word_idmap_.size(); word_list_.push_back(it.first); } } diff --git a/cpp/src/log.cc b/cpp/src/log.cc index b02fa8d..ef5252b 100644 --- a/cpp/src/log.cc +++ b/cpp/src/log.cc @@ -11,12 +11,12 @@ namespace cusim { int CuSimLogger::global_logging_level_ = 2; -CuSimLogger::CuHNSWLogger() { +CuSimLogger::CuSimLogger() { spdlog::set_pattern("[%^%-8l%$] %Y-%m-%d %H:%M:%S %v"); logger_ = spdlog::default_logger(); } -std::shared_ptr& CuHNSWLogger::get_logger() { +std::shared_ptr& CuSimLogger::get_logger() { return logger_; } From b7d4034612b569ca13483cd3438375ffad396350 Mon Sep 17 00:00:00 2001 From: js1010 Date: Sat, 6 Feb 2021 08:52:57 +0000 Subject: [PATCH 06/14] compile succeed --- cusim/ioutils/bindings.cc | 5 +++-- examples/example1.py | 20 ++++++++++++++++++++ examples/requirements.txt | 1 + 3 files changed, 24 insertions(+), 2 deletions(-) create mode 100644 examples/example1.py create mode 100644 examples/requirements.txt diff --git a/cusim/ioutils/bindings.cc b/cusim/ioutils/bindings.cc index b81141c..f1b67ae 100644 --- a/cusim/ioutils/bindings.cc +++ b/cusim/ioutils/bindings.cc @@ -18,7 +18,7 @@ typedef py::array_t int_array; class IoUtilsBind { public: IoUtilsBind() {} - LoadGensimVocab(std::string filepath, int min_count) { + void LoadGensimVocab(std::string filepath, int min_count) { obj_.LoadGensimVocab(filepath, min_count); } private: @@ -30,7 +30,8 @@ PYBIND11_PLUGIN(ioutils_bind) { py::class_(m, "IoUtilsBind") .def(py::init()) - .def("load_gensim_vocab", &IoUtilsBind::Init, py::arg("filepath"), py::arg("min_count")) + .def("load_gensim_vocab", &IoUtilsBind::LoadGensimVocab, + py::arg("filepath"), py::arg("min_count")) .def("__repr__", [](const IoUtilsBind &a) { return ""; diff --git a/examples/example1.py b/examples/example1.py new file mode 100644 index 0000000..722a92d --- /dev/null +++ b/examples/example1.py @@ -0,0 +1,20 @@ +# Copyright (c) 2021 Jisang Yoon +# All rights reserved. +# +# This source code is licensed under the Apache 2.0 license found in the +# LICENSE file in the root directory of this source tree. + +# pylint: disable=no-name-in-module,logging-format-truncated +import fire +from cusim import aux +from cusim.ioutils import IoUtils + +LOGGER = aux.get_logger() + +def run(): + iou = IoUtils() + iou.load_gensim_vocab("corpora.txt", 5) + + +if __name__ == "__main__": + fire.Fire() diff --git a/examples/requirements.txt b/examples/requirements.txt new file mode 100644 index 0000000..f8e1e74 --- /dev/null +++ b/examples/requirements.txt @@ -0,0 +1 @@ +fire From 89d4220eaf679119e3ce9643af8f34a3b0367490 Mon Sep 17 00:00:00 2001 From: js1010 Date: Sat, 6 Feb 2021 09:14:11 +0000 Subject: [PATCH 07/14] make example work --- cusim/ioutils/pyioutils.py | 4 ++-- examples/example1.py | 7 ++++--- setup.py | 2 +- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/cusim/ioutils/pyioutils.py b/cusim/ioutils/pyioutils.py index d79cef2..b50eb82 100644 --- a/cusim/ioutils/pyioutils.py +++ b/cusim/ioutils/pyioutils.py @@ -11,8 +11,8 @@ class IoUtils: def __init__(self, log_level=2): - self.logger = aux.get_logger("ioutils", log_level=log_level) - self.obj = IoUtilsBind(log_level) + self.logger = aux.get_logger("ioutils", level=log_level) + self.obj = IoUtilsBind() def load_gensim_vocab(self, filepath, min_count): self.obj.load_gensim_vocab(filepath, min_count) diff --git a/examples/example1.py b/examples/example1.py index 722a92d..c4254a7 100644 --- a/examples/example1.py +++ b/examples/example1.py @@ -6,14 +6,15 @@ # pylint: disable=no-name-in-module,logging-format-truncated import fire -from cusim import aux -from cusim.ioutils import IoUtils +from cusim import aux, IoUtils LOGGER = aux.get_logger() +CORPORA_PATH = "res/corpora.txt" +MIN_COUNT = 5 def run(): iou = IoUtils() - iou.load_gensim_vocab("corpora.txt", 5) + iou.load_gensim_vocab(CORPORA_PATH, MIN_COUNT) if __name__ == "__main__": diff --git a/setup.py b/setup.py index de6ca72..dacb5f3 100644 --- a/setup.py +++ b/setup.py @@ -166,7 +166,7 @@ def setup_package(): download_url="https://github.com/js1010/cusim/releases", include_package_data=False, license='Apac2', - packages=['cusim/'], + packages=['cusim/', "cusim/ioutils/"], cmdclass=cmdclass, classifiers=[_f for _f in CLASSIFIERS.split('\n') if _f], platforms=['Linux', 'Mac OSX', 'Unix'], From 3fea98aa7b0760f1fd437cbbed8a7f5111dc1b05 Mon Sep 17 00:00:00 2001 From: js1010 Date: Sat, 6 Feb 2021 09:14:27 +0000 Subject: [PATCH 08/14] add init --- cusim/__init__.py | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 cusim/__init__.py diff --git a/cusim/__init__.py b/cusim/__init__.py new file mode 100644 index 0000000..796d7b2 --- /dev/null +++ b/cusim/__init__.py @@ -0,0 +1,6 @@ +# Copyright (c) 2021 Jisang Yoon +# All rights reserved. +# +# This source code is licensed under the Apache 2.0 license found in the +# LICENSE file in the root directory of this source tree. +from cusim.ioutils import IoUtils From d7a2094b652af1391de78d2aae901273c694f897 Mon Sep 17 00:00:00 2001 From: js1010 Date: Sat, 6 Feb 2021 11:59:52 +0000 Subject: [PATCH 09/14] use progressbar in reading stream --- cpp/include/ioutils.hpp | 11 +++++-- cpp/src/ioutils.cc | 64 ++++++++++++++++++++++++++++---------- cusim/aux.py | 4 +-- cusim/ioutils/bindings.cc | 24 +++++++++++--- cusim/ioutils/pyioutils.py | 33 +++++++++++++++++--- cusim/proto/config.proto | 21 ++----------- examples/example1.py | 23 ++++++++++++-- examples/requirements.txt | 1 + 8 files changed, 130 insertions(+), 51 deletions(-) diff --git a/cpp/include/ioutils.hpp b/cpp/include/ioutils.hpp index 235593c..d56fe09 100644 --- a/cpp/include/ioutils.hpp +++ b/cpp/include/ioutils.hpp @@ -30,12 +30,17 @@ class IoUtils { public: IoUtils(); ~IoUtils(); - void LoadGensimVocab(std::string filepath, int min_count); + bool Init(std::string opt_path); + int LoadStreamFile(std::string filepath); + int ReadStreamForVocab(int num_lines); + void GetWordVocab(int min_count); private: - std::vector parse_line(std::string line); + void ParseLine(std::string line, std::vector& line_vec); + std::ifstream stream_fin_; + json11::Json opt_; std::shared_ptr logger_; - std::unordered_map word_idmap_; + std::unordered_map word_idmap_, word_count_; std::vector word_list_; }; // class IoUtils diff --git a/cpp/src/ioutils.cc b/cpp/src/ioutils.cc index 004fd46..ee525b9 100644 --- a/cpp/src/ioutils.cc +++ b/cpp/src/ioutils.cc @@ -13,9 +13,23 @@ IoUtils::IoUtils() { IoUtils::~IoUtils() {} -std::vector IoUtils::parse_line(std::string line) { +bool IoUtils::Init(std::string opt_path) { + std::ifstream in(opt_path.c_str()); + if (not in.is_open()) return false; + + std::string str((std::istreambuf_iterator(in)), + std::istreambuf_iterator()); + std::string err_cmt; + auto _opt = json11::Json::parse(str, err_cmt); + if (not err_cmt.empty()) return false; + opt_ = _opt; + CuSimLogger().set_log_level(opt_["c_log_level"].int_value()); + return true; +} + +void IoUtils::ParseLine(std::string line, std::vector& ret) { + ret.clear(); int n = line.size(); - std::vector ret; std::string element; for (int i = 0; i < n; ++i) { if (line[i] == ' ') { @@ -28,26 +42,42 @@ std::vector IoUtils::parse_line(std::string line) { if (element.size() > 0) { ret.push_back(element); } - return ret; } -void IoUtils::LoadGensimVocab(std::string filepath, int min_count) { - INFO("read gensim file to generate vocabulary: {}, min_count: {}", filepath, min_count); - std::ifstream fin(filepath.c_str()); - std::unordered_map word_count; - while (not fin.eof()) { - std::string line; - getline(fin, line); - std::vector line_vec = parse_line(line); +int IoUtils::LoadStreamFile(std::string filepath) { + INFO("read gensim file to generate vocabulary: {}", filepath); + stream_fin_.open(filepath.c_str()); + int count = 0; + std::string line; + while (getline(stream_fin_, line)) + count++; + stream_fin_.close(); + stream_fin_.open(filepath.c_str()); + word_idmap_.clear(); + word_list_.clear(); + word_count_.clear(); + return count; +} + +int IoUtils::ReadStreamForVocab(int num_lines) { + int read_cnt = 0; + std::string line; + std::vector line_vec; + while (getline(stream_fin_, line) and read_cnt < num_lines) { + ParseLine(line, line_vec); for (auto& word: line_vec) { - if (not word_count.count(word)) word_count[word] = 0; - word_count[word]++; + if (not word_count_.count(word)) word_count_[word] = 0; + word_count_[word]++; } + read_cnt++; } - INFO("number of raw words: {}", word_count.size()); - word_idmap_.clear(); - word_list_.clear(); - for (auto& it: word_count) { + if (read_cnt < num_lines) stream_fin_.close(); + return read_cnt; +} + +void IoUtils::GetWordVocab(int min_count) { + INFO("number of raw words: {}", word_count_.size()); + for (auto& it: word_count_) { if (it.second >= min_count) { word_idmap_[it.first] = word_idmap_.size(); word_list_.push_back(it.first); diff --git a/cusim/aux.py b/cusim/aux.py index 0538a04..2d9c584 100644 --- a/cusim/aux.py +++ b/cusim/aux.py @@ -10,7 +10,6 @@ import jsmin from google.protobuf.json_format import Parse, MessageToDict -from cusim.config_pb2 import ConfigProto # get_logger and Option refer to # https://github.com/kakao/buffalo/blob/ @@ -57,7 +56,8 @@ def load_json_file(fname): return ret # use protobuf to restrict field and types -def get_opt_as_proto(raw, proto_type=ConfigProto): +def get_opt_as_proto(raw, proto_type=None): + assert proto_type is not None proto = proto_type() # convert raw to proto Parse(json.dumps(Option(raw)), proto) diff --git a/cusim/ioutils/bindings.cc b/cusim/ioutils/bindings.cc index f1b67ae..5ea3d29 100644 --- a/cusim/ioutils/bindings.cc +++ b/cusim/ioutils/bindings.cc @@ -18,9 +18,23 @@ typedef py::array_t int_array; class IoUtilsBind { public: IoUtilsBind() {} - void LoadGensimVocab(std::string filepath, int min_count) { - obj_.LoadGensimVocab(filepath, min_count); + + bool Init(std::string opt_path) { + return obj_.Init(opt_path); + } + + int LoadStreamFile(std::string filepath) { + return obj_.LoadStreamFile(filepath); + } + + int ReadStreamForVocab(int num_lines) { + return obj_.ReadStreamForVocab(num_lines); + } + + void GetWordVocab(int min_count) { + return obj_.GetWordVocab(min_count); } + private: cusim::IoUtils obj_; }; @@ -30,8 +44,10 @@ PYBIND11_PLUGIN(ioutils_bind) { py::class_(m, "IoUtilsBind") .def(py::init()) - .def("load_gensim_vocab", &IoUtilsBind::LoadGensimVocab, - py::arg("filepath"), py::arg("min_count")) + .def("init", &IoUtilsBind::Init, py::arg("opt_path")) + .def("load_stream_file", &IoUtilsBind::LoadStreamFile, py::arg("filepath")) + .def("read_stream_for_vocab", &IoUtilsBind::ReadStreamForVocab, py::arg("num_lines")) + .def("get_word_vocab", &IoUtilsBind::GetWordVocab, py::arg("min_count")) .def("__repr__", [](const IoUtilsBind &a) { return ""; diff --git a/cusim/ioutils/pyioutils.py b/cusim/ioutils/pyioutils.py index b50eb82..2c0787e 100644 --- a/cusim/ioutils/pyioutils.py +++ b/cusim/ioutils/pyioutils.py @@ -5,14 +5,37 @@ # LICENSE file in the root directory of this source tree. # pylint: disable=no-name-in-module,too-few-public-methods,no-member +import os +import json +import tempfile +import tqdm + from cusim import aux from cusim.ioutils.ioutils_bind import IoUtilsBind - +from cusim.config_pb2 import IoUtilsConfigProto class IoUtils: - def __init__(self, log_level=2): - self.logger = aux.get_logger("ioutils", level=log_level) + def __init__(self, opt=None): + self.opt = aux.get_opt_as_proto(opt or {}, IoUtilsConfigProto) + self.logger = aux.get_logger("ioutils", level=self.opt.py_log_level) + + tmp = tempfile.NamedTemporaryFile(mode='w', delete=False) + opt_content = json.dumps(aux.proto_to_dict(self.opt), indent=2) + tmp.write(opt_content) + tmp.close() + + self.logger.info("opt: %s", opt_content) self.obj = IoUtilsBind() + assert self.obj.init(bytes(tmp.name, "utf8")), f"failed to load {tmp.name}" + os.remove(tmp.name) - def load_gensim_vocab(self, filepath, min_count): - self.obj.load_gensim_vocab(filepath, min_count) + def load_stream_vocab(self, filepath, min_count, chunk_lines=100000): + full_num_lines = self.obj.load_stream_file(filepath) + pbar = tqdm.trange(full_num_lines) + while True: + num_lines = self.obj.read_stream_for_vocab(chunk_lines) + pbar.update(num_lines) + if num_lines < chunk_lines: + pbar.close() + break + self.obj.get_word_vocab(min_count) diff --git a/cusim/proto/config.proto b/cusim/proto/config.proto index f90d1bc..b4ab0b8 100644 --- a/cusim/proto/config.proto +++ b/cusim/proto/config.proto @@ -6,22 +6,7 @@ syntax = "proto2"; -message ConfigProto { - optional int32 seed = 1 [default = 777]; - optional int32 c_log_level = 3 [default = 2]; - optional int32 py_log_level = 4 [default = 2]; - optional int32 max_m = 5 [default = 12]; - optional int32 max_m0 = 6 [default = 24]; - optional int32 ef_construction = 7 [default = 150]; - // optional int32 ef_search = 8 [default = 50]; - optional double level_mult = 9; - optional bool save_remains = 10; - optional double hyper_threads = 11 [default = 10]; - optional int32 block_dim = 12 [default = 32]; - optional string dist_type = 13 [default = "dot"]; - optional int32 visited_table_size = 17; - optional int32 visited_list_size = 14 [default = 8192]; - optional bool nrz = 15 [default = true]; - optional bool reverse_cand = 16; - optional double heuristic_coef = 18 [default = 0.25]; +message IoUtilsConfigProto { + optional int32 py_log_level = 1 [default = 2]; + optional int32 c_log_level = 2 [default = 2]; } diff --git a/examples/example1.py b/examples/example1.py index c4254a7..c9565e3 100644 --- a/examples/example1.py +++ b/examples/example1.py @@ -5,16 +5,35 @@ # LICENSE file in the root directory of this source tree. # pylint: disable=no-name-in-module,logging-format-truncated +import os +import subprocess import fire + +from gensim import downloader as api from cusim import aux, IoUtils LOGGER = aux.get_logger() -CORPORA_PATH = "res/corpora.txt" +DOWNLOAD_PATH = "./res" +DATASET = "wiki-english-20171001" +DATA_PATH = f"./res/{DATASET}.stream.txt" MIN_COUNT = 5 +def download(): + if os.path.exists(DATA_PATH): + LOGGER.info("%s already exists", DATA_PATH) + return + api.BASE_DIR = DOWNLOAD_PATH + filepath = api.load(DATASET, return_path=True) + LOGGER.info("filepath: %s", filepath) + cmd = ["gunzip", "-c", filepath, ">", DATA_PATH] + cmd = " ".join(cmd) + LOGGER.info("cmd: %s", cmd) + subprocess.call(cmd, shell=True) + def run(): + download() iou = IoUtils() - iou.load_gensim_vocab(CORPORA_PATH, MIN_COUNT) + iou.load_stream_vocab(DATA_PATH, 5) if __name__ == "__main__": diff --git a/examples/requirements.txt b/examples/requirements.txt index f8e1e74..728d5c2 100644 --- a/examples/requirements.txt +++ b/examples/requirements.txt @@ -1 +1,2 @@ fire +gensim From 9a49abec2c85f29a16dbfbe8ba09d5d0fab42d0a Mon Sep 17 00:00:00 2001 From: js1010 Date: Sat, 6 Feb 2021 21:40:30 +0900 Subject: [PATCH 10/14] return pair --- cpp/include/ioutils.hpp | 2 +- cpp/src/ioutils.cc | 13 +++++++++---- cusim/ioutils/bindings.cc | 2 +- cusim/ioutils/pyioutils.py | 6 +++--- examples/example1.py | 2 +- 5 files changed, 15 insertions(+), 10 deletions(-) diff --git a/cpp/include/ioutils.hpp b/cpp/include/ioutils.hpp index d56fe09..5d544b3 100644 --- a/cpp/include/ioutils.hpp +++ b/cpp/include/ioutils.hpp @@ -32,7 +32,7 @@ class IoUtils { ~IoUtils(); bool Init(std::string opt_path); int LoadStreamFile(std::string filepath); - int ReadStreamForVocab(int num_lines); + std::pair ReadStreamForVocab(int num_lines); void GetWordVocab(int min_count); private: void ParseLine(std::string line, std::vector& line_vec); diff --git a/cpp/src/ioutils.cc b/cpp/src/ioutils.cc index ee525b9..884c24e 100644 --- a/cpp/src/ioutils.cc +++ b/cpp/src/ioutils.cc @@ -59,11 +59,12 @@ int IoUtils::LoadStreamFile(std::string filepath) { return count; } -int IoUtils::ReadStreamForVocab(int num_lines) { +std::pair IoUtils::ReadStreamForVocab(int num_lines) { int read_cnt = 0; std::string line; std::vector line_vec; - while (getline(stream_fin_, line) and read_cnt < num_lines) { + while (not stream_fin_.eof() and read_cnt < num_lines) { + getline(stream_fin_, line); ParseLine(line, line_vec); for (auto& word: line_vec) { if (not word_count_.count(word)) word_count_[word] = 0; @@ -71,8 +72,12 @@ int IoUtils::ReadStreamForVocab(int num_lines) { } read_cnt++; } - if (read_cnt < num_lines) stream_fin_.close(); - return read_cnt; + bool finished = false; + if (stream_fin_.eof()) { + stream_fin_.close(); + finished = true; + } + return {read_cnt, finished}; } void IoUtils::GetWordVocab(int min_count) { diff --git a/cusim/ioutils/bindings.cc b/cusim/ioutils/bindings.cc index 5ea3d29..9f7e39b 100644 --- a/cusim/ioutils/bindings.cc +++ b/cusim/ioutils/bindings.cc @@ -27,7 +27,7 @@ class IoUtilsBind { return obj_.LoadStreamFile(filepath); } - int ReadStreamForVocab(int num_lines) { + std::pair ReadStreamForVocab(int num_lines) { return obj_.ReadStreamForVocab(num_lines); } diff --git a/cusim/ioutils/pyioutils.py b/cusim/ioutils/pyioutils.py index 2c0787e..70e826f 100644 --- a/cusim/ioutils/pyioutils.py +++ b/cusim/ioutils/pyioutils.py @@ -33,9 +33,9 @@ def load_stream_vocab(self, filepath, min_count, chunk_lines=100000): full_num_lines = self.obj.load_stream_file(filepath) pbar = tqdm.trange(full_num_lines) while True: - num_lines = self.obj.read_stream_for_vocab(chunk_lines) + num_lines, finished = self.obj.read_stream_for_vocab(chunk_lines) pbar.update(num_lines) - if num_lines < chunk_lines: - pbar.close() + if finished: break + pbar.close() self.obj.get_word_vocab(min_count) diff --git a/examples/example1.py b/examples/example1.py index c9565e3..531076c 100644 --- a/examples/example1.py +++ b/examples/example1.py @@ -33,7 +33,7 @@ def download(): def run(): download() iou = IoUtils() - iou.load_stream_vocab(DATA_PATH, 5) + iou.load_stream_vocab(DATA_PATH, 5, 10000) if __name__ == "__main__": From 27a54d6d9d82bff430dcf38f89657a3925e73bcd Mon Sep 17 00:00:00 2001 From: js1010 Date: Sat, 6 Feb 2021 22:24:23 +0900 Subject: [PATCH 11/14] parallel data load --- cpp/include/ioutils.hpp | 4 ++- cpp/src/ioutils.cc | 51 +++++++++++++++++++++++++------------- cusim/ioutils/bindings.cc | 7 +++--- cusim/ioutils/pyioutils.py | 10 +++++--- examples/example1.py | 5 ++-- 5 files changed, 50 insertions(+), 27 deletions(-) diff --git a/cpp/include/ioutils.hpp b/cpp/include/ioutils.hpp index 5d544b3..a8fc2dc 100644 --- a/cpp/include/ioutils.hpp +++ b/cpp/include/ioutils.hpp @@ -32,16 +32,18 @@ class IoUtils { ~IoUtils(); bool Init(std::string opt_path); int LoadStreamFile(std::string filepath); - std::pair ReadStreamForVocab(int num_lines); + std::pair ReadStreamForVocab(int num_lines, int num_threads); void GetWordVocab(int min_count); private: void ParseLine(std::string line, std::vector& line_vec); + std::mutex global_lock_; std::ifstream stream_fin_; json11::Json opt_; std::shared_ptr logger_; std::unordered_map word_idmap_, word_count_; std::vector word_list_; + int num_lines_, remain_lines_; }; // class IoUtils } // namespace cusim diff --git a/cpp/src/ioutils.cc b/cpp/src/ioutils.cc index 884c24e..38fbb41 100644 --- a/cpp/src/ioutils.cc +++ b/cpp/src/ioutils.cc @@ -56,28 +56,45 @@ int IoUtils::LoadStreamFile(std::string filepath) { word_idmap_.clear(); word_list_.clear(); word_count_.clear(); + num_lines_ = count; + remain_lines_ = num_lines_; return count; } -std::pair IoUtils::ReadStreamForVocab(int num_lines) { - int read_cnt = 0; - std::string line; - std::vector line_vec; - while (not stream_fin_.eof() and read_cnt < num_lines) { - getline(stream_fin_, line); - ParseLine(line, line_vec); - for (auto& word: line_vec) { - if (not word_count_.count(word)) word_count_[word] = 0; - word_count_[word]++; +std::pair IoUtils::ReadStreamForVocab(int num_lines, int num_threads) { + int read_lines = std::min(num_lines, remain_lines_); + remain_lines_ -= read_lines; + #pragma omp parallel num_threads(num_threads) + { + std::string line; + std::vector line_vec; + std::unordered_map word_count; + #pragma omp for schedule(dynamic, 4) + for (int i = 0; i < read_lines; ++i) { + // get line thread-safely + { + std::unique_lock lock(global_lock_); + getline(stream_fin_, line); + } + + // seems to bottle-neck + ParseLine(line, line_vec); + + // update private word count + for (auto& word: line_vec) { + word_count[word]++; + } + } + + // update word count to class variable + { + std::unique_lock lock(global_lock_); + for (auto& it: word_count) { + word_count_[it.first] += it.second; + } } - read_cnt++; - } - bool finished = false; - if (stream_fin_.eof()) { - stream_fin_.close(); - finished = true; } - return {read_cnt, finished}; + return {read_lines, remain_lines_}; } void IoUtils::GetWordVocab(int min_count) { diff --git a/cusim/ioutils/bindings.cc b/cusim/ioutils/bindings.cc index 9f7e39b..3142628 100644 --- a/cusim/ioutils/bindings.cc +++ b/cusim/ioutils/bindings.cc @@ -27,8 +27,8 @@ class IoUtilsBind { return obj_.LoadStreamFile(filepath); } - std::pair ReadStreamForVocab(int num_lines) { - return obj_.ReadStreamForVocab(num_lines); + std::pair ReadStreamForVocab(int num_lines, int num_threads) { + return obj_.ReadStreamForVocab(num_lines, num_threads); } void GetWordVocab(int min_count) { @@ -46,7 +46,8 @@ PYBIND11_PLUGIN(ioutils_bind) { .def(py::init()) .def("init", &IoUtilsBind::Init, py::arg("opt_path")) .def("load_stream_file", &IoUtilsBind::LoadStreamFile, py::arg("filepath")) - .def("read_stream_for_vocab", &IoUtilsBind::ReadStreamForVocab, py::arg("num_lines")) + .def("read_stream_for_vocab", &IoUtilsBind::ReadStreamForVocab, + py::arg("num_lines"), py::arg("num_threads")) .def("get_word_vocab", &IoUtilsBind::GetWordVocab, py::arg("min_count")) .def("__repr__", [](const IoUtilsBind &a) { diff --git a/cusim/ioutils/pyioutils.py b/cusim/ioutils/pyioutils.py index 70e826f..9b4c3a1 100644 --- a/cusim/ioutils/pyioutils.py +++ b/cusim/ioutils/pyioutils.py @@ -29,13 +29,15 @@ def __init__(self, opt=None): assert self.obj.init(bytes(tmp.name, "utf8")), f"failed to load {tmp.name}" os.remove(tmp.name) - def load_stream_vocab(self, filepath, min_count, chunk_lines=100000): + def load_stream_vocab(self, filepath, min_count, + chunk_lines=100000, num_threads=4): full_num_lines = self.obj.load_stream_file(filepath) pbar = tqdm.trange(full_num_lines) while True: - num_lines, finished = self.obj.read_stream_for_vocab(chunk_lines) - pbar.update(num_lines) - if finished: + read_lines, remain_lines = \ + self.obj.read_stream_for_vocab(chunk_lines, num_threads) + pbar.update(read_lines) + if not remain_lines: break pbar.close() self.obj.get_word_vocab(min_count) diff --git a/examples/example1.py b/examples/example1.py index 531076c..4e7db2d 100644 --- a/examples/example1.py +++ b/examples/example1.py @@ -14,7 +14,8 @@ LOGGER = aux.get_logger() DOWNLOAD_PATH = "./res" -DATASET = "wiki-english-20171001" +# DATASET = "wiki-english-20171001" +DATASET = "fake-news" DATA_PATH = f"./res/{DATASET}.stream.txt" MIN_COUNT = 5 @@ -33,7 +34,7 @@ def download(): def run(): download() iou = IoUtils() - iou.load_stream_vocab(DATA_PATH, 5, 10000) + iou.load_stream_vocab(DATA_PATH, 5, 100000, 8) if __name__ == "__main__": From 6eed2f8ded9fa235ed0c48360dd9b93725ee1574 Mon Sep 17 00:00:00 2001 From: js1010 Date: Sun, 7 Feb 2021 10:25:00 +0900 Subject: [PATCH 12/14] dump keys --- cpp/include/ioutils.hpp | 7 +++- cpp/src/ioutils.cc | 77 +++++++++++++++++++++++++++++++++++--- cusim/ioutils/bindings.cc | 21 +++++++++-- cusim/ioutils/pyioutils.py | 26 +++++++++---- cusim/proto/config.proto | 2 + examples/example1.py | 5 ++- requirements.txt | 1 + 7 files changed, 120 insertions(+), 19 deletions(-) diff --git a/cpp/include/ioutils.hpp b/cpp/include/ioutils.hpp index a8fc2dc..3e8304d 100644 --- a/cpp/include/ioutils.hpp +++ b/cpp/include/ioutils.hpp @@ -33,10 +33,15 @@ class IoUtils { bool Init(std::string opt_path); int LoadStreamFile(std::string filepath); std::pair ReadStreamForVocab(int num_lines, int num_threads); - void GetWordVocab(int min_count); + std::pair TokenizeStream(int num_lines, int num_threads); + void GetWordVocab(int min_count, std::string keys_path); + void GetToken(int* indices, int* indptr, int offset); private: void ParseLine(std::string line, std::vector& line_vec); + void ParseLineImpl(std::string line, std::vector& line_vec); + std::vector> indices_; + std::vector indptr_; std::mutex global_lock_; std::ifstream stream_fin_; json11::Json opt_; diff --git a/cpp/src/ioutils.cc b/cpp/src/ioutils.cc index 38fbb41..a28725a 100644 --- a/cpp/src/ioutils.cc +++ b/cpp/src/ioutils.cc @@ -28,14 +28,19 @@ bool IoUtils::Init(std::string opt_path) { } void IoUtils::ParseLine(std::string line, std::vector& ret) { + ParseLineImpl(line, ret); +} + + +void IoUtils::ParseLineImpl(std::string line, std::vector& ret) { ret.clear(); int n = line.size(); std::string element; for (int i = 0; i < n; ++i) { - if (line[i] == ' ') { + if (line[i] == ' ' or line[i] == ',') { ret.push_back(element); element.clear(); - } else { + } else if (line[i] != '"') { element += line[i]; } } @@ -61,6 +66,56 @@ int IoUtils::LoadStreamFile(std::string filepath) { return count; } +std::pair IoUtils::TokenizeStream(int num_lines, int num_threads) { + int read_lines = std::min(num_lines, remain_lines_); + if (not read_lines) return {0, 0}; + remain_lines_ -= read_lines; + indices_.clear(); + indices_.resize(read_lines); + indptr_.resize(read_lines); + std::fill(indptr_.begin(), indptr_.end(), 0); + #pragma omp parallel num_threads(num_threads) + { + std::string line; + std::vector line_vec; + #pragma omp for schedule(dynamic, 4) + for (int i = 0; i < read_lines; ++i) { + // get line thread-safely + { + std::unique_lock lock(global_lock_); + getline(stream_fin_, line); + } + + // seems to be bottle-neck + ParseLine(line, line_vec); + + // tokenize + for (auto& word: line_vec) { + if (word_count_.count(word)) continue; + indices_[i].push_back(word_count_[word]); + } + } + } + int cumsum = 0; + for (int i = 0; i < read_lines; ++i) { + cumsum += indices_[i].size(); + indptr_[i] = cumsum; + } + return {read_lines, indptr_[read_lines - 1]}; +} + +void IoUtils::GetToken(int* indices, int* indptr, int offset) { + int n = indices_.size(); + for (int i = 0; i < n; ++i) { + int beg = i == 0? 0: indptr_[i - 1]; + int end = indptr_[i]; + for (int j = beg; j < end; ++j) { + indices[j] = indices_[i][j - beg]; + } + indptr[i] = offset + indptr_[i]; + } +} + std::pair IoUtils::ReadStreamForVocab(int num_lines, int num_threads) { int read_lines = std::min(num_lines, remain_lines_); remain_lines_ -= read_lines; @@ -77,7 +132,7 @@ std::pair IoUtils::ReadStreamForVocab(int num_lines, int num_threads) getline(stream_fin_, line); } - // seems to bottle-neck + // seems to be bottle-neck ParseLine(line, line_vec); // update private word count @@ -94,10 +149,10 @@ std::pair IoUtils::ReadStreamForVocab(int num_lines, int num_threads) } } } - return {read_lines, remain_lines_}; + return {read_lines, word_count_.size()}; } -void IoUtils::GetWordVocab(int min_count) { +void IoUtils::GetWordVocab(int min_count, std::string keys_path) { INFO("number of raw words: {}", word_count_.size()); for (auto& it: word_count_) { if (it.second >= min_count) { @@ -106,6 +161,18 @@ void IoUtils::GetWordVocab(int min_count) { } } INFO("number of words after filtering: {}", word_list_.size()); + + // write keys to csv file + std::ofstream fout(keys_path.c_str()); + INFO("dump keys to {}", keys_path); + std::string header = "index,key\n"; + fout.write(header.c_str(), header.size()); + int n = word_list_.size(); + for (int i = 0; i < n; ++i) { + std::string line = std::to_string(i) + ",\"" + word_list_[i] + "\"\n"; + fout.write(line.c_str(), line.size()); + } + fout.close(); } } // namespace cusim diff --git a/cusim/ioutils/bindings.cc b/cusim/ioutils/bindings.cc index 3142628..5b2c1dd 100644 --- a/cusim/ioutils/bindings.cc +++ b/cusim/ioutils/bindings.cc @@ -31,8 +31,18 @@ class IoUtilsBind { return obj_.ReadStreamForVocab(num_lines, num_threads); } - void GetWordVocab(int min_count) { - return obj_.GetWordVocab(min_count); + std::pair TokenizeStream(int num_lines, int num_threads) { + return obj_.TokenizeStream(num_lines, num_threads); + } + + void GetWordVocab(int min_count, std::string keys_path) { + obj_.GetWordVocab(min_count, keys_path); + } + + void GetToken(py::object& indices, py::object& indptr, int offset) { + int_array _indices(indices); + int_array _indptr(indptr); + obj_.GetToken(_indices.mutable_data(0), _indptr.mutable_data(0), offset); } private: @@ -48,7 +58,12 @@ PYBIND11_PLUGIN(ioutils_bind) { .def("load_stream_file", &IoUtilsBind::LoadStreamFile, py::arg("filepath")) .def("read_stream_for_vocab", &IoUtilsBind::ReadStreamForVocab, py::arg("num_lines"), py::arg("num_threads")) - .def("get_word_vocab", &IoUtilsBind::GetWordVocab, py::arg("min_count")) + .def("tokenize_stream", &IoUtilsBind::TokenizeStream, + py::arg("num_lines"), py::arg("num_threads")) + .def("get_word_vocab", &IoUtilsBind::GetWordVocab, + py::arg("min_count"), py::arg("keys_path")) + .def("get_token", &IoUtilsBind::GetToken, + py::arg("indices"), py::arg("indptr"), py::arg("offset")) .def("__repr__", [](const IoUtilsBind &a) { return ""; diff --git a/cusim/ioutils/pyioutils.py b/cusim/ioutils/pyioutils.py index 9b4c3a1..cdec4be 100644 --- a/cusim/ioutils/pyioutils.py +++ b/cusim/ioutils/pyioutils.py @@ -6,10 +6,11 @@ # pylint: disable=no-name-in-module,too-few-public-methods,no-member import os +from os.path import join as pjoin + import json import tempfile import tqdm - from cusim import aux from cusim.ioutils.ioutils_bind import IoUtilsBind from cusim.config_pb2 import IoUtilsConfigProto @@ -29,15 +30,24 @@ def __init__(self, opt=None): assert self.obj.init(bytes(tmp.name, "utf8")), f"failed to load {tmp.name}" os.remove(tmp.name) - def load_stream_vocab(self, filepath, min_count, - chunk_lines=100000, num_threads=4): + def load_stream_vocab(self, filepath, min_count, keys_path): full_num_lines = self.obj.load_stream_file(filepath) - pbar = tqdm.trange(full_num_lines) + pbar = tqdm.trange(full_num_lines, unit="line", + postfix={"word_count": 0}) + processed = 0 while True: - read_lines, remain_lines = \ - self.obj.read_stream_for_vocab(chunk_lines, num_threads) + read_lines, word_count = \ + self.obj.read_stream_for_vocab( + self.opt.chunk_lines, self.opt.num_threads) + processed += read_lines + pbar.set_postfix({"word_count": word_count}, refresh=False) pbar.update(read_lines) - if not remain_lines: + if processed == full_num_lines: break pbar.close() - self.obj.get_word_vocab(min_count) + self.obj.get_word_vocab(min_count, keys_path) + + def convert_stream_to_h5(self, filepath, min_count, out_dir): + os.makedirs(out_dir, exist_ok=True) + keys_path = pjoin(out_dir, "keys.csv") + self.load_stream_vocab(filepath, min_count, keys_path) diff --git a/cusim/proto/config.proto b/cusim/proto/config.proto index b4ab0b8..071184b 100644 --- a/cusim/proto/config.proto +++ b/cusim/proto/config.proto @@ -9,4 +9,6 @@ syntax = "proto2"; message IoUtilsConfigProto { optional int32 py_log_level = 1 [default = 2]; optional int32 c_log_level = 2 [default = 2]; + optional int32 chunk_lines = 3 [default = 100000]; + optional int32 num_threads = 4 [default = 4]; } diff --git a/examples/example1.py b/examples/example1.py index 4e7db2d..6cbdaa9 100644 --- a/examples/example1.py +++ b/examples/example1.py @@ -17,6 +17,7 @@ # DATASET = "wiki-english-20171001" DATASET = "fake-news" DATA_PATH = f"./res/{DATASET}.stream.txt" +DATA_PATH2 = f"./res/{DATASET}-converted" MIN_COUNT = 5 def download(): @@ -33,8 +34,8 @@ def download(): def run(): download() - iou = IoUtils() - iou.load_stream_vocab(DATA_PATH, 5, 100000, 8) + iou = IoUtils(opt={"chunk_lines": 10000, "num_threads": 8}) + iou.convert_stream_to_h5(DATA_PATH, 5, DATA_PATH2) if __name__ == "__main__": diff --git a/requirements.txt b/requirements.txt index c268069..9d207a9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ +tqdm jsmin numpy pybind11 From a08ed01af3d233e540a25b4db750417e80ac38be Mon Sep 17 00:00:00 2001 From: js1010 Date: Sun, 7 Feb 2021 10:30:12 +0900 Subject: [PATCH 13/14] add pandas dependency --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index 9d207a9..afe4ecd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,7 @@ tqdm jsmin numpy +pandas pybind11 protobuf==3.10.0 grpcio-tools==1.27.1 From 68915a840c23ab7ccc26f734b42ebc8b96cd3303 Mon Sep 17 00:00:00 2001 From: js1010 Date: Sun, 7 Feb 2021 13:07:29 +0900 Subject: [PATCH 14/14] implement tokenization and use keras progressbar --- cpp/src/ioutils.cc | 10 +- cusim/aux.py | 220 ++++++++++++++++++++++++++++++++++++- cusim/ioutils/pyioutils.py | 45 ++++++-- requirements.txt | 1 - 4 files changed, 262 insertions(+), 14 deletions(-) diff --git a/cpp/src/ioutils.cc b/cpp/src/ioutils.cc index a28725a..45d551b 100644 --- a/cpp/src/ioutils.cc +++ b/cpp/src/ioutils.cc @@ -41,7 +41,7 @@ void IoUtils::ParseLineImpl(std::string line, std::vector& ret) { ret.push_back(element); element.clear(); } else if (line[i] != '"') { - element += line[i]; + element += std::tolower(line[i]); } } if (element.size() > 0) { @@ -51,6 +51,7 @@ void IoUtils::ParseLineImpl(std::string line, std::vector& ret) { int IoUtils::LoadStreamFile(std::string filepath) { INFO("read gensim file to generate vocabulary: {}", filepath); + if (stream_fin_.is_open()) stream_fin_.close(); stream_fin_.open(filepath.c_str()); int count = 0; std::string line; @@ -58,11 +59,9 @@ int IoUtils::LoadStreamFile(std::string filepath) { count++; stream_fin_.close(); stream_fin_.open(filepath.c_str()); - word_idmap_.clear(); - word_list_.clear(); - word_count_.clear(); num_lines_ = count; remain_lines_ = num_lines_; + INFO("number of lines: {}", num_lines_); return count; } @@ -91,7 +90,7 @@ std::pair IoUtils::TokenizeStream(int num_lines, int num_threads) { // tokenize for (auto& word: line_vec) { - if (word_count_.count(word)) continue; + if (not word_count_.count(word)) continue; indices_[i].push_back(word_count_[word]); } } @@ -149,6 +148,7 @@ std::pair IoUtils::ReadStreamForVocab(int num_lines, int num_threads) } } } + if (not remain_lines_) stream_fin_.close(); return {read_lines, word_count_.size()}; } diff --git a/cusim/aux.py b/cusim/aux.py index 2d9c584..4a1c2c5 100644 --- a/cusim/aux.py +++ b/cusim/aux.py @@ -3,11 +3,14 @@ # # This source code is licensed under the Apache 2.0 license found in the # LICENSE file in the root directory of this source tree. +import os import re +import sys import json +import time import logging import logging.handlers - +import numpy as np import jsmin from google.protobuf.json_format import Parse, MessageToDict @@ -117,3 +120,218 @@ def __getstate__(self): def __setstate__(self, state): vars(self).update(state) + +# reference: https://github.com/tensorflow/tensorflow/blob/ +# 85c8b2a817f95a3e979ecd1ed95bff1dc1335cff/tensorflow/python/ +# keras/utils/generic_utils.py#L483 +class Progbar: + # pylint: disable=too-many-branches,too-many-statements,invalid-name + # pylint: disable=blacklisted-name,no-else-return + """Displays a progress bar. + Arguments: + target: Total number of steps expected, None if unknown. + width: Progress bar width on screen. + verbose: Verbosity mode, 0 (silent), 1 (verbose), 2 (semi-verbose) + stateful_metrics: Iterable of string names of metrics that should *not* be + averaged over time. Metrics in this list will be displayed as-is. All + others will be averaged by the progbar before display. + interval: Minimum visual progress update interval (in seconds). + unit_name: Display name for step counts (usually "step" or "sample"). + """ + + def __init__(self, + target, + width=30, + verbose=1, + interval=0.05, + stateful_metrics=None, + unit_name='step'): + self.target = target + self.width = width + self.verbose = verbose + self.interval = interval + self.unit_name = unit_name + if stateful_metrics: + self.stateful_metrics = set(stateful_metrics) + else: + self.stateful_metrics = set() + + self._dynamic_display = ((hasattr(sys.stdout, 'isatty') and + sys.stdout.isatty()) or + 'ipykernel' in sys.modules or + 'posix' in sys.modules or + 'PYCHARM_HOSTED' in os.environ) + self._total_width = 0 + self._seen_so_far = 0 + # We use a dict + list to avoid garbage collection + # issues found in OrderedDict + self._values = {} + self._values_order = [] + self._start = time.time() + self._last_update = 0 + + self._time_after_first_step = None + + def update(self, current, values=None, finalize=None): + """Updates the progress bar. + Arguments: + current: Index of current step. + values: List of tuples: `(name, value_for_last_step)`. If `name` is in + `stateful_metrics`, `value_for_last_step` will be displayed as-is. + Else, an average of the metric over time will be displayed. + finalize: Whether this is the last update for the progress bar. If + `None`, defaults to `current >= self.target`. + """ + if finalize is None: + if self.target is None: + finalize = False + else: + finalize = current >= self.target + + values = values or [] + for k, v in values: + if k not in self._values_order: + self._values_order.append(k) + if k not in self.stateful_metrics: + # In the case that progress bar doesn't have a target value in the first + # epoch, both on_batch_end and on_epoch_end will be called, which will + # cause 'current' and 'self._seen_so_far' to have the same value. Force + # the minimal value to 1 here, otherwise stateful_metric will be 0s. + value_base = max(current - self._seen_so_far, 1) + if k not in self._values: + self._values[k] = [v * value_base, value_base] + else: + self._values[k][0] += v * value_base + self._values[k][1] += value_base + else: + # Stateful metrics output a numeric value. This representation + # means "take an average from a single value" but keeps the + # numeric formatting. + self._values[k] = [v, 1] + self._seen_so_far = current + + now = time.time() + info = ' - %.0fs' % (now - self._start) + if self.verbose == 1: + if now - self._last_update < self.interval and not finalize: + return + + prev_total_width = self._total_width + if self._dynamic_display: + sys.stdout.write('\b' * prev_total_width) + sys.stdout.write('\r') + else: + sys.stdout.write('\n') + + if self.target is not None: + numdigits = int(np.log10(self.target)) + 1 + bar = ('%' + str(numdigits) + 'd/%d [') % (current, self.target) + prog = float(current) / self.target + prog_width = int(self.width * prog) + if prog_width > 0: + bar += ('=' * (prog_width - 1)) + if current < self.target: + bar += '>' + else: + bar += '=' + bar += ('.' * (self.width - prog_width)) + bar += ']' + else: + bar = '%7d/Unknown' % current + + self._total_width = len(bar) + sys.stdout.write(bar) + + time_per_unit = self._estimate_step_duration(current, now) + + if self.target is None or finalize: + if time_per_unit >= 1 or time_per_unit == 0: + info += ' %.0fs/%s' % (time_per_unit, self.unit_name) + elif time_per_unit >= 1e-3: + info += ' %.0fms/%s' % (time_per_unit * 1e3, self.unit_name) + else: + info += ' %.0fus/%s' % (time_per_unit * 1e6, self.unit_name) + else: + eta = time_per_unit * (self.target - current) + if eta > 3600: + eta_format = '%d:%02d:%02d' % (eta // 3600, + (eta % 3600) // 60, eta % 60) + elif eta > 60: + eta_format = '%d:%02d' % (eta // 60, eta % 60) + else: + eta_format = '%ds' % eta + + info = ' - ETA: %s' % eta_format + + for k in self._values_order: + info += ' - %s:' % k + if isinstance(self._values[k], list): + avg = np.mean(self._values[k][0] / max(1, self._values[k][1])) + if abs(avg) > 1e-3: + info += ' %.4f' % avg + else: + info += ' %.4e' % avg + else: + info += ' %s' % self._values[k] + + self._total_width += len(info) + if prev_total_width > self._total_width: + info += (' ' * (prev_total_width - self._total_width)) + + if finalize: + info += '\n' + + sys.stdout.write(info) + sys.stdout.flush() + + elif self.verbose == 2: + if finalize: + numdigits = int(np.log10(self.target)) + 1 + count = ('%' + str(numdigits) + 'd/%d') % (current, self.target) + info = count + info + for k in self._values_order: + info += ' - %s:' % k + avg = np.mean(self._values[k][0] / max(1, self._values[k][1])) + if avg > 1e-3: + info += ' %.4f' % avg + else: + info += ' %.4e' % avg + info += '\n' + + sys.stdout.write(info) + sys.stdout.flush() + + self._last_update = now + + def add(self, n, values=None): + self.update(self._seen_so_far + n, values) + + def _estimate_step_duration(self, current, now): + """Estimate the duration of a single step. + Given the step number `current` and the corresponding time `now` + this function returns an estimate for how long a single step + takes. If this is called before one step has been completed + (i.e. `current == 0`) then zero is given as an estimate. The duration + estimate ignores the duration of the (assumed to be non-representative) + first step for estimates when more steps are available (i.e. `current>1`). + Arguments: + current: Index of current step. + now: The current time. + Returns: Estimate of the duration of a single step. + """ + if current: + # there are a few special scenarios here: + # 1) somebody is calling the progress bar without ever supplying step 1 + # 2) somebody is calling the progress bar and supplies step one mulitple + # times, e.g. as part of a finalizing call + # in these cases, we just fall back to the simple calculation + if self._time_after_first_step is not None and current > 1: + time_per_unit = (now - self._time_after_first_step) / (current - 1) + else: + time_per_unit = (now - self._start) / current + + if current == 1: + self._time_after_first_step = now + return time_per_unit + else: + return 0 diff --git a/cusim/ioutils/pyioutils.py b/cusim/ioutils/pyioutils.py index cdec4be..1a65f74 100644 --- a/cusim/ioutils/pyioutils.py +++ b/cusim/ioutils/pyioutils.py @@ -10,7 +10,10 @@ import json import tempfile -import tqdm + +import h5py +import numpy as np + from cusim import aux from cusim.ioutils.ioutils_bind import IoUtilsBind from cusim.config_pb2 import IoUtilsConfigProto @@ -32,22 +35,50 @@ def __init__(self, opt=None): def load_stream_vocab(self, filepath, min_count, keys_path): full_num_lines = self.obj.load_stream_file(filepath) - pbar = tqdm.trange(full_num_lines, unit="line", - postfix={"word_count": 0}) + pbar = aux.Progbar(full_num_lines, unit_name="line", + stateful_metrics=["word_count"]) processed = 0 while True: read_lines, word_count = \ self.obj.read_stream_for_vocab( self.opt.chunk_lines, self.opt.num_threads) processed += read_lines - pbar.set_postfix({"word_count": word_count}, refresh=False) - pbar.update(read_lines) + pbar.update(processed, values=[("word_count", word_count)]) if processed == full_num_lines: break - pbar.close() self.obj.get_word_vocab(min_count, keys_path) - def convert_stream_to_h5(self, filepath, min_count, out_dir): + def convert_stream_to_h5(self, filepath, min_count, out_dir, + chunk_indices=10000): os.makedirs(out_dir, exist_ok=True) keys_path = pjoin(out_dir, "keys.csv") + token_path = pjoin(out_dir, "token.h5") + self.logger.info("save key and token to %s, %s", + keys_path, token_path) self.load_stream_vocab(filepath, min_count, keys_path) + full_num_lines = self.obj.load_stream_file(filepath) + pbar = aux.Progbar(full_num_lines, unit_name="line") + processed = 0 + h5f = h5py.File(token_path, "w") + indices = h5f.create_dataset("indices", shape=(chunk_indices,), + maxshape=(None,), dtype=np.int32, + chunks=(chunk_indices,)) + indptr = h5f.create_dataset("indptr", shape=(full_num_lines + 1,), + dtype=np.int32, chunks=True) + processed, offset = 1, 0 + indptr[0] = 0 + while True: + read_lines, data_size = self.obj.tokenize_stream( + self.opt.chunk_lines, self.opt.num_threads) + _indices = np.empty(shape=(data_size,), dtype=np.int32) + _indptr = np.empty(shape=(read_lines,), dtype=np.int32) + self.obj.get_token(_indices, _indptr, offset) + indices.resize((offset + data_size,)) + indices[offset:offset + data_size] = _indices + indptr[processed:processed + read_lines] = _indptr + offset += data_size + processed += read_lines + pbar.update(processed - 1) + if processed == full_num_lines + 1: + break + h5f.close() diff --git a/requirements.txt b/requirements.txt index afe4ecd..bfe001f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,3 @@ -tqdm jsmin numpy pandas