Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[c++] Handling edge cases for C++ re-indexer #2098

Merged
merged 1 commit into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 23 additions & 19 deletions libtiledbsoma/src/reindexer/reindexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,50 +46,51 @@ KHASH_MAP_INIT_INT64(m64, int64_t)

namespace tiledbsoma {

void IntIndexer::map_locations(const int64_t* keys, int size, int threads) {
void IntIndexer::map_locations(
const int64_t* keys, size_t size, size_t threads) {
map_size_ = size;

// Handling edge cases
if (size == 0) {
return;
}
if (threads == 0) {
throw std::runtime_error("Re-indexer thread count cannot be zero.");
}

LOG_DEBUG(fmt::format(
"End of Map locations of size {} and {} threads", size, threads));

LOG_DEBUG(fmt::format(
"[Re-indexer] Start of Map locations with {} keys and {} threads",
size,
threads));
hash_ = kh_init(m64);
kh_resize(m64, hash_, size * 1.25);
LOG_DEBUG(
fmt::format("[Re-indexer] Thread pool started and hash table created"));
int ret;
khint64_t k;
int64_t counter = 0;
// Hash map construction
for (int i = 0; i < size; i++) {
LOG_DEBUG(fmt::format(
"[Re-indexer] Start of Map locations with {} keys and {} threads",
size,
threads));
for (size_t i = 0; i < size; i++) {
k = kh_put(m64, hash_, keys[i], &ret);
assert(k != kh_end(hash_));
kh_val(hash_, k) = counter;
counter++;
}
if (int(kh_size(hash_)) != size) {
if (kh_size(hash_) != size) {
throw std::runtime_error("There are duplicate keys.");
}
auto hsize = kh_size(hash_);
LOG_DEBUG(fmt::format("[Re-indexer] khash size = {}", hsize));
tiledb_thread_pool_ = std::make_unique<tiledbsoma::ThreadPool>(threads);
if (threads > 1) {
tiledb_thread_pool_ = std::make_unique<tiledbsoma::ThreadPool>(threads);
}
LOG_DEBUG(
fmt::format("[Re-indexer] Thread pool started and hash table created"));
}

void IntIndexer::lookup(const int64_t* keys, int64_t* results, int size) {
if (size == 0) {
return;
}
LOG_DEBUG(fmt::format(
"Lookup with thread concurrency {} on data size {}",
tiledb_thread_pool_->concurrency_level(),
size));
if (tiledb_thread_pool_->concurrency_level() == 1) {
if (tiledb_thread_pool_ == nullptr) { // When concurrency is 1
for (int i = 0; i < size; i++) {
auto k = kh_get(m64, hash_, keys[i]);
if (k == kh_end(hash_)) {
Expand All @@ -101,8 +102,11 @@ void IntIndexer::lookup(const int64_t* keys, int64_t* results, int size) {
}
return;
}
LOG_DEBUG(fmt::format(
"Lookup with thread concurrency {} on data size {}",
tiledb_thread_pool_->concurrency_level(),
size));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: these two log lines are largely redundant as well

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


LOG_DEBUG(fmt::format("Creating tileDB tasks for the size of {}", size));
std::vector<tiledbsoma::ThreadPool::Task> tasks;

size_t thread_chunk_size = size / tiledb_thread_pool_->concurrency_level();
Expand Down
4 changes: 2 additions & 2 deletions libtiledbsoma/src/reindexer/reindexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ class IntIndexer {
* @param size yhr number of keys in the put
* @param threads number of threads in the thread pool
*/
void map_locations(const int64_t* keys, int size, int threads = 4);
void map_locations(const std::vector<int64_t>& keys, int threads = 4) {
void map_locations(const int64_t* keys, size_t size, size_t threads = 4);
void map_locations(const std::vector<int64_t>& keys, size_t threads = 4) {
map_locations(keys.data(), keys.size(), threads);
}
/**
Expand Down
Loading