diff --git a/README.md b/README.md index 0342692..34a25c7 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,16 @@ -# cusim -cuda implementaion of w2v and lda +### How to install + + +```shell +# clone repo and submodules +git clone git@github.com:js1010/cusim.git && cd cusim && git submodule update --init + +# install requirements +pip install -r requirements.txt + +# generate proto +python -m grpc_tools.protoc --python_out cusim/ --proto_path cusim/proto/ config.proto + +# install +python setup.py install +``` diff --git a/cpp/include/culda.hpp b/cpp/include/culda.hpp index fa3c58f..0c2ba61 100644 --- a/cpp/include/culda.hpp +++ b/cpp/include/culda.hpp @@ -33,7 +33,7 @@ namespace cusim { class CuLDA { - public: + public: CuLDA(); ~CuLDA(); private: diff --git a/cpp/include/ioutils.hpp b/cpp/include/ioutils.hpp index 88bc95b..3e8304d 100644 --- a/cpp/include/ioutils.hpp +++ b/cpp/include/ioutils.hpp @@ -20,17 +20,35 @@ #include #include +#include "json11.hpp" +#include "log.hpp" +#include "types.hpp" + namespace cusim { class IoUtils { public: IoUtils(); ~IoUtils(); - void LoadGensimVocab(std::string filepath, int min_count); + bool Init(std::string opt_path); + int LoadStreamFile(std::string filepath); + std::pair ReadStreamForVocab(int num_lines, int num_threads); + std::pair TokenizeStream(int num_lines, int num_threads); + void GetWordVocab(int min_count, std::string keys_path); + void GetToken(int* indices, int* indptr, int offset); private: - std::vector parse_line(std::string line); - std::unordered_map word_idmap_; + void ParseLine(std::string line, std::vector& line_vec); + void ParseLineImpl(std::string line, std::vector& line_vec); + + std::vector> indices_; + std::vector indptr_; + std::mutex global_lock_; + std::ifstream stream_fin_; + json11::Json opt_; + std::shared_ptr logger_; + std::unordered_map word_idmap_, word_count_; std::vector word_list_; + int num_lines_, remain_lines_; }; // class IoUtils } // namespace cusim diff --git a/cpp/include/log.hpp b/cpp/include/log.hpp index 6ec766e..05f30ec 100644 --- a/cpp/include/log.hpp +++ b/cpp/include/log.hpp @@ -39,6 +39,6 @@ class CuSimLogger { private: static int global_logging_level_; std::shared_ptr logger_; -}; // class CuHNSWLogger +}; // class CuSimLogger } // namespace cusim diff --git a/cpp/src/culda.cu b/cpp/src/culda.cu index 115d2fe..0c12182 100644 --- a/cpp/src/culda.cu +++ b/cpp/src/culda.cu @@ -3,7 +3,7 @@ // // This source code is licensed under the Apache 2.0 license found in the // LICENSE file in the root directory of this source tree. -#include "culda.cuh" +#include "culda.hpp" namespace cusim { diff --git a/cpp/src/ioutils.cc b/cpp/src/ioutils.cc index 9466d06..45d551b 100644 --- a/cpp/src/ioutils.cc +++ b/cpp/src/ioutils.cc @@ -13,47 +13,166 @@ IoUtils::IoUtils() { IoUtils::~IoUtils() {} -std::vector IoUtils::parse_line(std::string line) { +bool IoUtils::Init(std::string opt_path) { + std::ifstream in(opt_path.c_str()); + if (not in.is_open()) return false; + + std::string str((std::istreambuf_iterator(in)), + std::istreambuf_iterator()); + std::string err_cmt; + auto _opt = json11::Json::parse(str, err_cmt); + if (not err_cmt.empty()) return false; + opt_ = _opt; + CuSimLogger().set_log_level(opt_["c_log_level"].int_value()); + return true; +} + +void IoUtils::ParseLine(std::string line, std::vector& ret) { + ParseLineImpl(line, ret); +} + + +void IoUtils::ParseLineImpl(std::string line, std::vector& ret) { + ret.clear(); int n = line.size(); - std::vector ret; std::string element; for (int i = 0; i < n; ++i) { - if (line[i] == ' ') { + if (line[i] == ' ' or line[i] == ',') { ret.push_back(element); element.clear(); - } else { - element += line[i]; + } else if (line[i] != '"') { + element += std::tolower(line[i]); } } if (element.size() > 0) { ret.push_back(element); } - return ret; } -void IoUtils::LoadGensimVocab(std::string filepath, int min_count) { - INFO("read gensim file to generate vocabulary: {}, min_count: {}", filepath, min_count); - std::ifstream fin(filepath.c_str()); - std::unordered_map word_count; - while (not fin.eof()) { +int IoUtils::LoadStreamFile(std::string filepath) { + INFO("read gensim file to generate vocabulary: {}", filepath); + if (stream_fin_.is_open()) stream_fin_.close(); + stream_fin_.open(filepath.c_str()); + int count = 0; + std::string line; + while (getline(stream_fin_, line)) + count++; + stream_fin_.close(); + stream_fin_.open(filepath.c_str()); + num_lines_ = count; + remain_lines_ = num_lines_; + INFO("number of lines: {}", num_lines_); + return count; +} + +std::pair IoUtils::TokenizeStream(int num_lines, int num_threads) { + int read_lines = std::min(num_lines, remain_lines_); + if (not read_lines) return {0, 0}; + remain_lines_ -= read_lines; + indices_.clear(); + indices_.resize(read_lines); + indptr_.resize(read_lines); + std::fill(indptr_.begin(), indptr_.end(), 0); + #pragma omp parallel num_threads(num_threads) + { + std::string line; + std::vector line_vec; + #pragma omp for schedule(dynamic, 4) + for (int i = 0; i < read_lines; ++i) { + // get line thread-safely + { + std::unique_lock lock(global_lock_); + getline(stream_fin_, line); + } + + // seems to be bottle-neck + ParseLine(line, line_vec); + + // tokenize + for (auto& word: line_vec) { + if (not word_count_.count(word)) continue; + indices_[i].push_back(word_count_[word]); + } + } + } + int cumsum = 0; + for (int i = 0; i < read_lines; ++i) { + cumsum += indices_[i].size(); + indptr_[i] = cumsum; + } + return {read_lines, indptr_[read_lines - 1]}; +} + +void IoUtils::GetToken(int* indices, int* indptr, int offset) { + int n = indices_.size(); + for (int i = 0; i < n; ++i) { + int beg = i == 0? 0: indptr_[i - 1]; + int end = indptr_[i]; + for (int j = beg; j < end; ++j) { + indices[j] = indices_[i][j - beg]; + } + indptr[i] = offset + indptr_[i]; + } +} + +std::pair IoUtils::ReadStreamForVocab(int num_lines, int num_threads) { + int read_lines = std::min(num_lines, remain_lines_); + remain_lines_ -= read_lines; + #pragma omp parallel num_threads(num_threads) + { std::string line; - getline(fin, line); - std::vector line_vec = parse_line(line); - for (auto& word: line_vec) { - if (not word_count.count(word)) word_count[word] = 0; - word_count[word]++; + std::vector line_vec; + std::unordered_map word_count; + #pragma omp for schedule(dynamic, 4) + for (int i = 0; i < read_lines; ++i) { + // get line thread-safely + { + std::unique_lock lock(global_lock_); + getline(stream_fin_, line); + } + + // seems to be bottle-neck + ParseLine(line, line_vec); + + // update private word count + for (auto& word: line_vec) { + word_count[word]++; + } + } + + // update word count to class variable + { + std::unique_lock lock(global_lock_); + for (auto& it: word_count) { + word_count_[it.first] += it.second; + } } } - INFO("number of raw words: {}", word_count.size()); - word_idmap_.clear(); - word_list_.clear(); - for (auto& it: word_count) { + if (not remain_lines_) stream_fin_.close(); + return {read_lines, word_count_.size()}; +} + +void IoUtils::GetWordVocab(int min_count, std::string keys_path) { + INFO("number of raw words: {}", word_count_.size()); + for (auto& it: word_count_) { if (it.second >= min_count) { - word_idmap_[it.first] = vocab_.size(); + word_idmap_[it.first] = word_idmap_.size(); word_list_.push_back(it.first); } } INFO("number of words after filtering: {}", word_list_.size()); + + // write keys to csv file + std::ofstream fout(keys_path.c_str()); + INFO("dump keys to {}", keys_path); + std::string header = "index,key\n"; + fout.write(header.c_str(), header.size()); + int n = word_list_.size(); + for (int i = 0; i < n; ++i) { + std::string line = std::to_string(i) + ",\"" + word_list_[i] + "\"\n"; + fout.write(line.c_str(), line.size()); + } + fout.close(); } } // namespace cusim diff --git a/cpp/src/log.cc b/cpp/src/log.cc index b02fa8d..ef5252b 100644 --- a/cpp/src/log.cc +++ b/cpp/src/log.cc @@ -11,12 +11,12 @@ namespace cusim { int CuSimLogger::global_logging_level_ = 2; -CuSimLogger::CuHNSWLogger() { +CuSimLogger::CuSimLogger() { spdlog::set_pattern("[%^%-8l%$] %Y-%m-%d %H:%M:%S %v"); logger_ = spdlog::default_logger(); } -std::shared_ptr& CuHNSWLogger::get_logger() { +std::shared_ptr& CuSimLogger::get_logger() { return logger_; } diff --git a/cuda_setup.py b/cuda_setup.py new file mode 100644 index 0000000..5ff76b6 --- /dev/null +++ b/cuda_setup.py @@ -0,0 +1,205 @@ +# Copyright (c) 2020 Jisang Yoon +# All rights reserved. +# +# This source code is licensed under the Apache 2.0 license found in the +# LICENSE file in the root directory of this source tree. + +# Adapted from https://github.com/rmcgibbo/npcuda-example and +# https://github.com/cupy/cupy/blob/master/cupy_setup_build.py +# pylint: disable=fixme,access-member-before-definition +# pylint: disable=attribute-defined-outside-init,arguments-differ +import logging +import os +import sys + +from distutils import ccompiler, errors, msvccompiler, unixccompiler +from setuptools.command.build_ext import build_ext as setuptools_build_ext + +HALF_PRECISION = False + +def find_in_path(name, path): + "Find a file in a search path" + # adapted fom http://code.activestate.com/ + # recipes/52224-find-a-file-given-a-search-path/ + for _dir in path.split(os.pathsep): + binpath = os.path.join(_dir, name) + if os.path.exists(binpath): + return os.path.abspath(binpath) + return None + + +def locate_cuda(): + """Locate the CUDA environment on the system + If a valid cuda installation is found + this returns a dict with keys 'home', 'nvcc', 'include', + and 'lib64' and values giving the absolute path to each directory. + Starts by looking for the CUDAHOME env variable. + If not found, everything is based on finding + 'nvcc' in the PATH. + If nvcc can't be found, this returns None + """ + nvcc_bin = 'nvcc' + if sys.platform.startswith("win"): + nvcc_bin = 'nvcc.exe' + + # check env variables CUDA_HOME, CUDAHOME, CUDA_PATH. + found = False + for env_name in ['CUDA_PATH', 'CUDAHOME', 'CUDA_HOME']: + if env_name not in os.environ: + continue + found = True + home = os.environ[env_name] + nvcc = os.path.join(home, 'bin', nvcc_bin) + break + if not found: + # otherwise, search the PATH for NVCC + nvcc = find_in_path(nvcc_bin, os.environ['PATH']) + if nvcc is None: + logging.warning('The nvcc binary could not be located in your ' + '$PATH. Either add it to ' + 'your path, or set $CUDA_HOME to enable CUDA extensions') + return None + home = os.path.dirname(os.path.dirname(nvcc)) + + cudaconfig = {'home': home, + 'nvcc': nvcc, + 'include': os.path.join(home, 'include'), + 'lib64': os.path.join(home, 'lib64')} + post_args = [ + "-arch=sm_52", + "-gencode=arch=compute_52,code=sm_52", + "-gencode=arch=compute_60,code=sm_60", + "-gencode=arch=compute_61,code=sm_61", + "-gencode=arch=compute_70,code=sm_70", + "-gencode=arch=compute_75,code=sm_75", + "-gencode=arch=compute_80,code=sm_80", + "-gencode=arch=compute_86,code=sm_86", + "-gencode=arch=compute_86,code=compute_86", + '--ptxas-options=-v', '-O2'] + if HALF_PRECISION: + post_args = [flag for flag in post_args if "52" not in flag] + + if sys.platform == "win32": + cudaconfig['lib64'] = os.path.join(home, 'lib', 'x64') + post_args += ['-Xcompiler', '/MD', '-std=c++14', "-Xcompiler", "/openmp"] + if HALF_PRECISION: + post_args += ["-Xcompiler", "/D HALF_PRECISION"] + else: + post_args += ['-c', '--compiler-options', "'-fPIC'", + "--compiler-options", "'-std=c++14'"] + if HALF_PRECISION: + post_args += ["--compiler-options", "'-D HALF_PRECISION'"] + for k, val in cudaconfig.items(): + if not os.path.exists(val): + logging.warning('The CUDA %s path could not be located in %s', k, val) + return None + + cudaconfig['post_args'] = post_args + return cudaconfig + + +# This code to build .cu extensions with nvcc is taken from cupy: +# https://github.com/cupy/cupy/blob/master/cupy_setup_build.py +class _UnixCCompiler(unixccompiler.UnixCCompiler): + src_extensions = list(unixccompiler.UnixCCompiler.src_extensions) + src_extensions.append('.cu') + + def _compile(self, obj, src, ext, cc_args, extra_postargs, pp_opts): + # For sources other than CUDA C ones, just call the super class method. + if os.path.splitext(src)[1] != '.cu': + return unixccompiler.UnixCCompiler._compile( + self, obj, src, ext, cc_args, extra_postargs, pp_opts) + + # For CUDA C source files, compile them with NVCC. + _compiler_so = self.compiler_so + try: + nvcc_path = CUDA['nvcc'] + post_args = CUDA['post_args'] + # TODO? base_opts = build.get_compiler_base_options() + self.set_executable('compiler_so', nvcc_path) + + return unixccompiler.UnixCCompiler._compile( + self, obj, src, ext, cc_args, post_args, pp_opts) + finally: + self.compiler_so = _compiler_so + + +class _MSVCCompiler(msvccompiler.MSVCCompiler): + _cu_extensions = ['.cu'] + + src_extensions = list(unixccompiler.UnixCCompiler.src_extensions) + src_extensions.extend(_cu_extensions) + + def _compile_cu(self, sources, output_dir=None, macros=None, + include_dirs=None, debug=0, extra_preargs=None, + extra_postargs=None, depends=None): + # Compile CUDA C files, mainly derived from UnixCCompiler._compile(). + macros, objects, extra_postargs, pp_opts, _build = \ + self._setup_compile(output_dir, macros, include_dirs, sources, + depends, extra_postargs) + + compiler_so = CUDA['nvcc'] + cc_args = self._get_cc_args(pp_opts, debug, extra_preargs) + post_args = CUDA['post_args'] + + for obj in objects: + try: + src, _ = _build[obj] + except KeyError: + continue + try: + self.spawn([compiler_so] + cc_args + [src, '-o', obj] + post_args) + except errors.DistutilsExecError as e: + raise errors.CompileError(str(e)) + + return objects + + def compile(self, sources, **kwargs): + # Split CUDA C sources and others. + cu_sources = [] + other_sources = [] + for source in sources: + if os.path.splitext(source)[1] == '.cu': + cu_sources.append(source) + else: + other_sources.append(source) + + # Compile source files other than CUDA C ones. + other_objects = msvccompiler.MSVCCompiler.compile( + self, other_sources, **kwargs) + + # Compile CUDA C sources. + cu_objects = self._compile_cu(cu_sources, **kwargs) + + # Return compiled object filenames. + return other_objects + cu_objects + + +class CudaBuildExt(setuptools_build_ext): + """Custom `build_ext` command to include CUDA C source files.""" + + def run(self): + if CUDA is not None: + def wrap_new_compiler(func): + def _wrap_new_compiler(*args, **kwargs): + try: + return func(*args, **kwargs) + except errors.DistutilsPlatformError: + if sys.platform != 'win32': + CCompiler = _UnixCCompiler + else: + CCompiler = _MSVCCompiler + return CCompiler( + None, kwargs['dry_run'], kwargs['force']) + return _wrap_new_compiler + ccompiler.new_compiler = wrap_new_compiler(ccompiler.new_compiler) + # Intentionally causes DistutilsPlatformError in + # ccompiler.new_compiler() function to hook. + self.compiler = 'nvidia' + + setuptools_build_ext.run(self) + + +CUDA = locate_cuda() +assert CUDA is not None +BUILDEXT = CudaBuildExt if CUDA else setuptools_build_ext diff --git a/cusim/.gitignore b/cusim/.gitignore new file mode 100644 index 0000000..19f06b3 --- /dev/null +++ b/cusim/.gitignore @@ -0,0 +1 @@ +config_pb2.py diff --git a/cusim/__init__.py b/cusim/__init__.py new file mode 100644 index 0000000..796d7b2 --- /dev/null +++ b/cusim/__init__.py @@ -0,0 +1,6 @@ +# Copyright (c) 2021 Jisang Yoon +# All rights reserved. +# +# This source code is licensed under the Apache 2.0 license found in the +# LICENSE file in the root directory of this source tree. +from cusim.ioutils import IoUtils diff --git a/cusim/aux.py b/cusim/aux.py new file mode 100644 index 0000000..4a1c2c5 --- /dev/null +++ b/cusim/aux.py @@ -0,0 +1,337 @@ +# Copyright (c) 2021 Jisang Yoon +# All rights reserved. +# +# This source code is licensed under the Apache 2.0 license found in the +# LICENSE file in the root directory of this source tree. +import os +import re +import sys +import json +import time +import logging +import logging.handlers +import numpy as np +import jsmin +from google.protobuf.json_format import Parse, MessageToDict + +# get_logger and Option refer to +# https://github.com/kakao/buffalo/blob/ +# 5f571c2c7d8227e6625c6e538da929e4db11b66d/buffalo/misc/aux.py +def get_logger(name=__file__, level=2): + if level == 1: + level = logging.WARNING + elif level == 2: + level = logging.INFO + elif level == 3: + level = logging.DEBUG + logger = logging.getLogger(name) + if logger.handlers: + return logger + logger.setLevel(level) + sh0 = logging.StreamHandler() + sh0.setLevel(level) + formatter = logging.Formatter('[%(levelname)-8s] %(asctime)s ' + '[%(filename)s] [%(funcName)s:%(lineno)d]' + '%(message)s', '%Y-%m-%d %H:%M:%S') + sh0.setFormatter(formatter) + logger.addHandler(sh0) + return logger + +# This function helps you to read non-standard json strings. +# - Handles json string with c++ style inline comments +# - Handles json string with trailing commas. +def load_json_string(cont): + # (1) Removes comment. + # Refer to https://plus.google.com/+DouglasCrockfordEsq/posts/RK8qyGVaGSr + cont = jsmin.jsmin(cont) + + # (2) Removes trailing comma. + cont = re.sub(",[ \t\r\n]*}", "}", cont) + cont = re.sub(",[ \t\r\n]*" + r"\]", "]", cont) + + return json.loads(cont) + + +# function read json file from filename +def load_json_file(fname): + with open(fname, "r") as fin: + ret = load_json_string(fin.read()) + return ret + +# use protobuf to restrict field and types +def get_opt_as_proto(raw, proto_type=None): + assert proto_type is not None + proto = proto_type() + # convert raw to proto + Parse(json.dumps(Option(raw)), proto) + err = [] + assert proto.IsInitialized(err), \ + f"some required fields are missing in proto {err}\n {proto}" + return proto + +def proto_to_dict(proto): + return MessageToDict(proto, \ + including_default_value_fields=True, \ + preserving_proto_field_name=True) + +def copy_proto(proto): + newproto = type(proto)() + Parse(json.dumps(proto_to_dict(proto)), newproto) + return newproto + +class Option(dict): + def __init__(self, *args, **kwargs): + args = [arg if isinstance(arg, dict) + else load_json_file(arg) for arg in args] + super().__init__(*args, **kwargs) + for arg in args: + if isinstance(arg, dict): + for k, val in arg.items(): + if isinstance(val, dict): + self[k] = Option(val) + else: + self[k] = val + if kwargs: + for k, val in kwargs.items(): + if isinstance(val, dict): + self[k] = Option(val) + else: + self[k] = val + + def __getattr__(self, attr): + return self.get(attr) + + def __setattr__(self, key, value): + self.__setitem__(key, value) + + def __setitem__(self, key, value): + super().__setitem__(key, value) + self.__dict__.update({key: value}) + + def __delattr__(self, item): + self.__delitem__(item) + + def __delitem__(self, key): + super().__delitem__(key) + del self.__dict__[key] + + def __getstate__(self): + return vars(self) + + def __setstate__(self, state): + vars(self).update(state) + +# reference: https://github.com/tensorflow/tensorflow/blob/ +# 85c8b2a817f95a3e979ecd1ed95bff1dc1335cff/tensorflow/python/ +# keras/utils/generic_utils.py#L483 +class Progbar: + # pylint: disable=too-many-branches,too-many-statements,invalid-name + # pylint: disable=blacklisted-name,no-else-return + """Displays a progress bar. + Arguments: + target: Total number of steps expected, None if unknown. + width: Progress bar width on screen. + verbose: Verbosity mode, 0 (silent), 1 (verbose), 2 (semi-verbose) + stateful_metrics: Iterable of string names of metrics that should *not* be + averaged over time. Metrics in this list will be displayed as-is. All + others will be averaged by the progbar before display. + interval: Minimum visual progress update interval (in seconds). + unit_name: Display name for step counts (usually "step" or "sample"). + """ + + def __init__(self, + target, + width=30, + verbose=1, + interval=0.05, + stateful_metrics=None, + unit_name='step'): + self.target = target + self.width = width + self.verbose = verbose + self.interval = interval + self.unit_name = unit_name + if stateful_metrics: + self.stateful_metrics = set(stateful_metrics) + else: + self.stateful_metrics = set() + + self._dynamic_display = ((hasattr(sys.stdout, 'isatty') and + sys.stdout.isatty()) or + 'ipykernel' in sys.modules or + 'posix' in sys.modules or + 'PYCHARM_HOSTED' in os.environ) + self._total_width = 0 + self._seen_so_far = 0 + # We use a dict + list to avoid garbage collection + # issues found in OrderedDict + self._values = {} + self._values_order = [] + self._start = time.time() + self._last_update = 0 + + self._time_after_first_step = None + + def update(self, current, values=None, finalize=None): + """Updates the progress bar. + Arguments: + current: Index of current step. + values: List of tuples: `(name, value_for_last_step)`. If `name` is in + `stateful_metrics`, `value_for_last_step` will be displayed as-is. + Else, an average of the metric over time will be displayed. + finalize: Whether this is the last update for the progress bar. If + `None`, defaults to `current >= self.target`. + """ + if finalize is None: + if self.target is None: + finalize = False + else: + finalize = current >= self.target + + values = values or [] + for k, v in values: + if k not in self._values_order: + self._values_order.append(k) + if k not in self.stateful_metrics: + # In the case that progress bar doesn't have a target value in the first + # epoch, both on_batch_end and on_epoch_end will be called, which will + # cause 'current' and 'self._seen_so_far' to have the same value. Force + # the minimal value to 1 here, otherwise stateful_metric will be 0s. + value_base = max(current - self._seen_so_far, 1) + if k not in self._values: + self._values[k] = [v * value_base, value_base] + else: + self._values[k][0] += v * value_base + self._values[k][1] += value_base + else: + # Stateful metrics output a numeric value. This representation + # means "take an average from a single value" but keeps the + # numeric formatting. + self._values[k] = [v, 1] + self._seen_so_far = current + + now = time.time() + info = ' - %.0fs' % (now - self._start) + if self.verbose == 1: + if now - self._last_update < self.interval and not finalize: + return + + prev_total_width = self._total_width + if self._dynamic_display: + sys.stdout.write('\b' * prev_total_width) + sys.stdout.write('\r') + else: + sys.stdout.write('\n') + + if self.target is not None: + numdigits = int(np.log10(self.target)) + 1 + bar = ('%' + str(numdigits) + 'd/%d [') % (current, self.target) + prog = float(current) / self.target + prog_width = int(self.width * prog) + if prog_width > 0: + bar += ('=' * (prog_width - 1)) + if current < self.target: + bar += '>' + else: + bar += '=' + bar += ('.' * (self.width - prog_width)) + bar += ']' + else: + bar = '%7d/Unknown' % current + + self._total_width = len(bar) + sys.stdout.write(bar) + + time_per_unit = self._estimate_step_duration(current, now) + + if self.target is None or finalize: + if time_per_unit >= 1 or time_per_unit == 0: + info += ' %.0fs/%s' % (time_per_unit, self.unit_name) + elif time_per_unit >= 1e-3: + info += ' %.0fms/%s' % (time_per_unit * 1e3, self.unit_name) + else: + info += ' %.0fus/%s' % (time_per_unit * 1e6, self.unit_name) + else: + eta = time_per_unit * (self.target - current) + if eta > 3600: + eta_format = '%d:%02d:%02d' % (eta // 3600, + (eta % 3600) // 60, eta % 60) + elif eta > 60: + eta_format = '%d:%02d' % (eta // 60, eta % 60) + else: + eta_format = '%ds' % eta + + info = ' - ETA: %s' % eta_format + + for k in self._values_order: + info += ' - %s:' % k + if isinstance(self._values[k], list): + avg = np.mean(self._values[k][0] / max(1, self._values[k][1])) + if abs(avg) > 1e-3: + info += ' %.4f' % avg + else: + info += ' %.4e' % avg + else: + info += ' %s' % self._values[k] + + self._total_width += len(info) + if prev_total_width > self._total_width: + info += (' ' * (prev_total_width - self._total_width)) + + if finalize: + info += '\n' + + sys.stdout.write(info) + sys.stdout.flush() + + elif self.verbose == 2: + if finalize: + numdigits = int(np.log10(self.target)) + 1 + count = ('%' + str(numdigits) + 'd/%d') % (current, self.target) + info = count + info + for k in self._values_order: + info += ' - %s:' % k + avg = np.mean(self._values[k][0] / max(1, self._values[k][1])) + if avg > 1e-3: + info += ' %.4f' % avg + else: + info += ' %.4e' % avg + info += '\n' + + sys.stdout.write(info) + sys.stdout.flush() + + self._last_update = now + + def add(self, n, values=None): + self.update(self._seen_so_far + n, values) + + def _estimate_step_duration(self, current, now): + """Estimate the duration of a single step. + Given the step number `current` and the corresponding time `now` + this function returns an estimate for how long a single step + takes. If this is called before one step has been completed + (i.e. `current == 0`) then zero is given as an estimate. The duration + estimate ignores the duration of the (assumed to be non-representative) + first step for estimates when more steps are available (i.e. `current>1`). + Arguments: + current: Index of current step. + now: The current time. + Returns: Estimate of the duration of a single step. + """ + if current: + # there are a few special scenarios here: + # 1) somebody is calling the progress bar without ever supplying step 1 + # 2) somebody is calling the progress bar and supplies step one mulitple + # times, e.g. as part of a finalizing call + # in these cases, we just fall back to the simple calculation + if self._time_after_first_step is not None and current > 1: + time_per_unit = (now - self._time_after_first_step) / (current - 1) + else: + time_per_unit = (now - self._start) / current + + if current == 1: + self._time_after_first_step = now + return time_per_unit + else: + return 0 diff --git a/cusim/ioutils/__init__.py b/cusim/ioutils/__init__.py new file mode 100644 index 0000000..61fc1fe --- /dev/null +++ b/cusim/ioutils/__init__.py @@ -0,0 +1,6 @@ +# Copyright (c) 2021 Jisang Yoon +# All rights reserved. +# +# This source code is licensed under the Apache 2.0 license found in the +# LICENSE file in the root directory of this source tree. +from cusim.ioutils.pyioutils import IoUtils diff --git a/cusim/ioutils/bindings.cc b/cusim/ioutils/bindings.cc new file mode 100644 index 0000000..5b2c1dd --- /dev/null +++ b/cusim/ioutils/bindings.cc @@ -0,0 +1,73 @@ +// Copyright (c) 2021 Jisang Yoon +// All rights reserved. +// +// This source code is licensed under the Apache 2.0 license found in the +// LICENSE file in the root directory of this source tree. +#include +#include +#include + +#include +#include "ioutils.hpp" + +namespace py = pybind11; + +typedef py::array_t float_array; +typedef py::array_t int_array; + +class IoUtilsBind { + public: + IoUtilsBind() {} + + bool Init(std::string opt_path) { + return obj_.Init(opt_path); + } + + int LoadStreamFile(std::string filepath) { + return obj_.LoadStreamFile(filepath); + } + + std::pair ReadStreamForVocab(int num_lines, int num_threads) { + return obj_.ReadStreamForVocab(num_lines, num_threads); + } + + std::pair TokenizeStream(int num_lines, int num_threads) { + return obj_.TokenizeStream(num_lines, num_threads); + } + + void GetWordVocab(int min_count, std::string keys_path) { + obj_.GetWordVocab(min_count, keys_path); + } + + void GetToken(py::object& indices, py::object& indptr, int offset) { + int_array _indices(indices); + int_array _indptr(indptr); + obj_.GetToken(_indices.mutable_data(0), _indptr.mutable_data(0), offset); + } + + private: + cusim::IoUtils obj_; +}; + +PYBIND11_PLUGIN(ioutils_bind) { + py::module m("IoUtilsBind"); + + py::class_(m, "IoUtilsBind") + .def(py::init()) + .def("init", &IoUtilsBind::Init, py::arg("opt_path")) + .def("load_stream_file", &IoUtilsBind::LoadStreamFile, py::arg("filepath")) + .def("read_stream_for_vocab", &IoUtilsBind::ReadStreamForVocab, + py::arg("num_lines"), py::arg("num_threads")) + .def("tokenize_stream", &IoUtilsBind::TokenizeStream, + py::arg("num_lines"), py::arg("num_threads")) + .def("get_word_vocab", &IoUtilsBind::GetWordVocab, + py::arg("min_count"), py::arg("keys_path")) + .def("get_token", &IoUtilsBind::GetToken, + py::arg("indices"), py::arg("indptr"), py::arg("offset")) + .def("__repr__", + [](const IoUtilsBind &a) { + return ""; + } + ); + return m.ptr(); +} diff --git a/cusim/ioutils/pyioutils.py b/cusim/ioutils/pyioutils.py new file mode 100644 index 0000000..1a65f74 --- /dev/null +++ b/cusim/ioutils/pyioutils.py @@ -0,0 +1,84 @@ +# Copyright (c) 2021 Jisang Yoon +# All rights reserved. +# +# This source code is licensed under the Apache 2.0 license found in the +# LICENSE file in the root directory of this source tree. + +# pylint: disable=no-name-in-module,too-few-public-methods,no-member +import os +from os.path import join as pjoin + +import json +import tempfile + +import h5py +import numpy as np + +from cusim import aux +from cusim.ioutils.ioutils_bind import IoUtilsBind +from cusim.config_pb2 import IoUtilsConfigProto + +class IoUtils: + def __init__(self, opt=None): + self.opt = aux.get_opt_as_proto(opt or {}, IoUtilsConfigProto) + self.logger = aux.get_logger("ioutils", level=self.opt.py_log_level) + + tmp = tempfile.NamedTemporaryFile(mode='w', delete=False) + opt_content = json.dumps(aux.proto_to_dict(self.opt), indent=2) + tmp.write(opt_content) + tmp.close() + + self.logger.info("opt: %s", opt_content) + self.obj = IoUtilsBind() + assert self.obj.init(bytes(tmp.name, "utf8")), f"failed to load {tmp.name}" + os.remove(tmp.name) + + def load_stream_vocab(self, filepath, min_count, keys_path): + full_num_lines = self.obj.load_stream_file(filepath) + pbar = aux.Progbar(full_num_lines, unit_name="line", + stateful_metrics=["word_count"]) + processed = 0 + while True: + read_lines, word_count = \ + self.obj.read_stream_for_vocab( + self.opt.chunk_lines, self.opt.num_threads) + processed += read_lines + pbar.update(processed, values=[("word_count", word_count)]) + if processed == full_num_lines: + break + self.obj.get_word_vocab(min_count, keys_path) + + def convert_stream_to_h5(self, filepath, min_count, out_dir, + chunk_indices=10000): + os.makedirs(out_dir, exist_ok=True) + keys_path = pjoin(out_dir, "keys.csv") + token_path = pjoin(out_dir, "token.h5") + self.logger.info("save key and token to %s, %s", + keys_path, token_path) + self.load_stream_vocab(filepath, min_count, keys_path) + full_num_lines = self.obj.load_stream_file(filepath) + pbar = aux.Progbar(full_num_lines, unit_name="line") + processed = 0 + h5f = h5py.File(token_path, "w") + indices = h5f.create_dataset("indices", shape=(chunk_indices,), + maxshape=(None,), dtype=np.int32, + chunks=(chunk_indices,)) + indptr = h5f.create_dataset("indptr", shape=(full_num_lines + 1,), + dtype=np.int32, chunks=True) + processed, offset = 1, 0 + indptr[0] = 0 + while True: + read_lines, data_size = self.obj.tokenize_stream( + self.opt.chunk_lines, self.opt.num_threads) + _indices = np.empty(shape=(data_size,), dtype=np.int32) + _indptr = np.empty(shape=(read_lines,), dtype=np.int32) + self.obj.get_token(_indices, _indptr, offset) + indices.resize((offset + data_size,)) + indices[offset:offset + data_size] = _indices + indptr[processed:processed + read_lines] = _indptr + offset += data_size + processed += read_lines + pbar.update(processed - 1) + if processed == full_num_lines + 1: + break + h5f.close() diff --git a/cusim/proto/config.proto b/cusim/proto/config.proto new file mode 100644 index 0000000..071184b --- /dev/null +++ b/cusim/proto/config.proto @@ -0,0 +1,14 @@ +// Copyright (c) 2020 Jisang Yoon +// All rights reserved. +// +// This source code is licensed under the Apache 2.0 license found in the +// LICENSE file in the root directory of this source tree. + +syntax = "proto2"; + +message IoUtilsConfigProto { + optional int32 py_log_level = 1 [default = 2]; + optional int32 c_log_level = 2 [default = 2]; + optional int32 chunk_lines = 3 [default = 100000]; + optional int32 num_threads = 4 [default = 4]; +} diff --git a/examples/example1.py b/examples/example1.py new file mode 100644 index 0000000..6cbdaa9 --- /dev/null +++ b/examples/example1.py @@ -0,0 +1,42 @@ +# Copyright (c) 2021 Jisang Yoon +# All rights reserved. +# +# This source code is licensed under the Apache 2.0 license found in the +# LICENSE file in the root directory of this source tree. + +# pylint: disable=no-name-in-module,logging-format-truncated +import os +import subprocess +import fire + +from gensim import downloader as api +from cusim import aux, IoUtils + +LOGGER = aux.get_logger() +DOWNLOAD_PATH = "./res" +# DATASET = "wiki-english-20171001" +DATASET = "fake-news" +DATA_PATH = f"./res/{DATASET}.stream.txt" +DATA_PATH2 = f"./res/{DATASET}-converted" +MIN_COUNT = 5 + +def download(): + if os.path.exists(DATA_PATH): + LOGGER.info("%s already exists", DATA_PATH) + return + api.BASE_DIR = DOWNLOAD_PATH + filepath = api.load(DATASET, return_path=True) + LOGGER.info("filepath: %s", filepath) + cmd = ["gunzip", "-c", filepath, ">", DATA_PATH] + cmd = " ".join(cmd) + LOGGER.info("cmd: %s", cmd) + subprocess.call(cmd, shell=True) + +def run(): + download() + iou = IoUtils(opt={"chunk_lines": 10000, "num_threads": 8}) + iou.convert_stream_to_h5(DATA_PATH, 5, DATA_PATH2) + + +if __name__ == "__main__": + fire.Fire() diff --git a/examples/requirements.txt b/examples/requirements.txt new file mode 100644 index 0000000..728d5c2 --- /dev/null +++ b/examples/requirements.txt @@ -0,0 +1,2 @@ +fire +gensim diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..bfe001f --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +jsmin +numpy +pandas +pybind11 +protobuf==3.10.0 +grpcio-tools==1.27.1 diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..dacb5f3 --- /dev/null +++ b/setup.py @@ -0,0 +1,186 @@ +# Copyright (c) 2020 Jisang Yoon +# All rights reserved. +# +# This source code is licensed under the Apache 2.0 license found in the +# LICENSE file in the root directory of this source tree. + +# pylint: disable=fixme,too-few-public-methods +# reference: https://github.com/kakao/buffalo/blob/ +# 5f571c2c7d8227e6625c6e538da929e4db11b66d/setup.py +"""cusim +""" +import os +import sys +import glob +import pathlib +import platform +import sysconfig +import subprocess +from setuptools import setup, Extension + +import pybind11 +import numpy as np +from cuda_setup import CUDA, BUILDEXT + + +DOCLINES = __doc__.split("\n") + +# TODO: Python3 Support +if sys.version_info[:3] < (3, 6): + raise RuntimeError("Python version 3.6 or later required.") + +assert platform.system() == 'Linux' # TODO: MacOS + + +MAJOR = 0 +MINOR = 0 +MICRO = 0 +RELEASE = True +STAGE = {True: '', False: 'b'}.get(RELEASE) +VERSION = f'{MAJOR}.{MINOR}.{MICRO}{STAGE}' +STATUS = {False: 'Development Status :: 4 - Beta', + True: 'Development Status :: 5 - Production/Stable'} + +CLASSIFIERS = """{status} +Programming Language :: C++ +Programming Language :: Python :: 3.6 +Operating System :: POSIX :: Linux +Operating System :: Unix +Operating System :: MacOS +License :: OSI Approved :: Apache Software License""".format( \ + status=STATUS.get(RELEASE)) +CLIB_DIR = os.path.join(sysconfig.get_path('purelib'), 'cusim') +LIBRARY_DIRS = [CLIB_DIR] + + +def get_extend_compile_flags(): + flags = ['-march=native'] + return flags + + +class CMakeExtension(Extension): + extension_type = 'cmake' + + def __init__(self, name): + super().__init__(name, sources=[]) + + +extend_compile_flags = get_extend_compile_flags() +extra_compile_args = ['-fopenmp', '-std=c++14', '-ggdb', '-O3'] + \ + extend_compile_flags +csrcs = glob.glob("cpp/src/*.cu") + glob.glob("cpp/src/*.cc") +extensions = [ + Extension("cusim.ioutils.ioutils_bind", + sources= csrcs + [ \ + "cusim/ioutils/bindings.cc", + "3rd/json11/json11.cpp"], + language="c++", + extra_compile_args=extra_compile_args, + extra_link_args=["-fopenmp"], + library_dirs=[CUDA['lib64']], + libraries=['cudart', 'cublas', 'curand'], + extra_objects=[], + include_dirs=[ \ + "cpp/include/", np.get_include(), pybind11.get_include(), + pybind11.get_include(True), CUDA['include'], + "3rd/json11", "3rd/spdlog/include"]) +] + + +# Return the git revision as a string +def git_version(): + def _minimal_ext_cmd(cmd): + # construct minimal environment + env = {} + for k in ['SYSTEMROOT', 'PATH']: + val = os.environ.get(k) + if val is not None: + env[k] = val + out = subprocess.Popen(cmd, stdout=subprocess.PIPE, env=env). \ + communicate()[0] + return out + + try: + out = _minimal_ext_cmd(['git', 'rev-parse', 'HEAD']) + git_revision = out.strip().decode('ascii') + except OSError: + git_revision = "Unknown" + + return git_revision + + +def write_version_py(filename='cusim/version.py'): + cnt = """ +short_version = '%(version)s' +git_revision = '%(git_revision)s' +""" + git_revision = git_version() + with open(filename, 'w') as fout: + fout.write(cnt % {'version': VERSION, + 'git_revision': git_revision}) + + +class BuildExtension(BUILDEXT): + def run(self): + for ext in self.extensions: + print(ext.name) + if hasattr(ext, 'extension_type') and ext.extension_type == 'cmake': + self.cmake() + super().run() + + def cmake(self): + cwd = pathlib.Path().absolute() + + build_temp = pathlib.Path(self.build_temp) + build_temp.mkdir(parents=True, exist_ok=True) + + build_type = 'Debug' if self.debug else 'Release' + + cmake_args = [ + '-DCMAKE_BUILD_TYPE=' + build_type, + '-DCMAKE_LIBRARY_OUTPUT_DIRECTORY=' + CLIB_DIR, + ] + + build_args = [] + + os.chdir(str(build_temp)) + self.spawn(['cmake', str(cwd)] + cmake_args) + if not self.dry_run: + self.spawn(['cmake', '--build', '.'] + build_args) + os.chdir(str(cwd)) + + +def setup_package(): + write_version_py() + cmdclass = { + 'build_ext': BuildExtension + } + + metadata = dict( + name='cusim', + maintainer="Jisang Yoon", + maintainer_email="vjs10101v@gmail.com", + description=DOCLINES[0], + long_description="\n".join(DOCLINES[2:]), + url="https://github.com/js1010/cusim", + download_url="https://github.com/js1010/cusim/releases", + include_package_data=False, + license='Apac2', + packages=['cusim/', "cusim/ioutils/"], + cmdclass=cmdclass, + classifiers=[_f for _f in CLASSIFIERS.split('\n') if _f], + platforms=['Linux', 'Mac OSX', 'Unix'], + ext_modules=extensions, + entry_points={ + 'console_scripts': [ + ] + }, + python_requires='>=3.6', + ) + + metadata['version'] = VERSION + setup(**metadata) + + +if __name__ == '__main__': + setup_package()