Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
smehringer committed Nov 15, 2023
1 parent f45f6ba commit 80f6502
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 97 deletions.
136 changes: 75 additions & 61 deletions include/raptor/search/search_partitioned_hibf.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
// --------------------------------------------------------------------------------------------------

/*!\file
* \brief Provides raptor::search_singular_ibf.
* \author Enrico Seiler <enrico.seiler AT fu-berlin.de>
* \brief Provides raptor::search_partitioned_hibf.
* \author Svenja Mehringer <svenja.mehringer AT fu-berlin.de>
*/

#pragma once
Expand All @@ -26,42 +26,26 @@ namespace raptor
{

template <typename index_t>
void search_singular_ibf(search_arguments const & arguments, index_t && index)
void search_partitioned_hibf(search_arguments const & arguments, index_t && index)
{
constexpr bool is_ibf = std::same_as<index_t, raptor_index<index_structure::ibf>>;

auto cereal_future = std::async(std::launch::async,
[&]()
{
load_index(index, arguments);
});

seqan3::sequence_file_input<dna4_traits, seqan3::fields<seqan3::field::id, seqan3::field::seq>> fin{
arguments.query_file};
using record_type = typename decltype(fin)::record_type;
std::vector<record_type> records{};

sync_out synced_out{arguments};
robin_hood::unordered_flat_map<std::string, std::string> results; // cache results when searching multiple hibfs

raptor::threshold::threshold const thresholder{arguments.make_threshold_parameters()};

// searching with storing all results in results map
auto worker = [&](size_t const start, size_t const extent)
{
seqan::hibf::serial_timer local_compute_minimiser_timer{};
seqan::hibf::serial_timer local_query_ibf_timer{};
seqan::hibf::serial_timer local_generate_results_timer{};

#if defined(__clang__)
auto counter = [&index]()
#else
auto counter = [&index, is_ibf]()
#endif
{
if constexpr (is_ibf)
return index.ibf().template counting_agent<uint16_t>();
else
return index.ibf().membership_agent();
}();
auto counter = index.ibf().membership_agent();

std::string result_string{};
std::vector<uint64_t> minimiser;

Expand All @@ -71,9 +55,7 @@ void search_singular_ibf(search_arguments const & arguments, index_t && index)

for (auto && [id, seq] : std::span{records.data() + start, extent})
{
result_string.clear();
result_string += id;
result_string += '\t';
std::string & result_string = results[id]; // TODO concurrent access??

auto minimiser_view = seq | hash_adaptor | std::views::common;
local_compute_minimiser_timer.start();
Expand All @@ -83,41 +65,67 @@ void search_singular_ibf(search_arguments const & arguments, index_t && index)
size_t const minimiser_count{minimiser.size()};
size_t const threshold = thresholder.get(minimiser_count);

if constexpr (is_ibf)
local_query_ibf_timer.start();
auto & result = counter.membership_for(minimiser, threshold); // Results contains user bin IDs
local_query_ibf_timer.stop();
local_generate_results_timer.start();
for (auto && count : result)
{
local_query_ibf_timer.start();
auto & result = counter.bulk_count(minimiser);
local_query_ibf_timer.stop();
size_t current_bin{0};
local_generate_results_timer.start();
for (auto && count : result)
{
if (count >= threshold)
{
result_string += std::to_string(current_bin);
result_string += ',';
}
++current_bin;
}
result_string += std::to_string(count);
result_string += ',';
}
else

local_generate_results_timer.stop();
}

arguments.compute_minimiser_timer += local_compute_minimiser_timer;
arguments.query_ibf_timer += local_query_ibf_timer;
arguments.generate_results_timer += local_generate_results_timer;
};

// searching and writing results to file
auto output_worker = [&](size_t const start, size_t const extent)
{
seqan::hibf::serial_timer local_compute_minimiser_timer{};
seqan::hibf::serial_timer local_query_ibf_timer{};
seqan::hibf::serial_timer local_generate_results_timer{};

auto counter = return index.ibf().membership_agent();
std::vector<uint64_t> minimiser;

auto hash_adaptor = seqan3::views::minimiser_hash(arguments.shape,
seqan3::window_size{arguments.window_size},
seqan3::seed{adjust_seed(arguments.shape_weight)});

for (auto && [id, seq] : std::span{records.data() + start, extent})
{
std::string & result_string = results[id];

auto minimiser_view = seq | hash_adaptor | std::views::common;
local_compute_minimiser_timer.start();
minimiser.assign(minimiser_view.begin(), minimiser_view.end());
local_compute_minimiser_timer.stop();

size_t const minimiser_count{minimiser.size()};
size_t const threshold = thresholder.get(minimiser_count);

local_query_ibf_timer.start();
auto & result = counter.membership_for(minimiser, threshold); // Results contains user bin IDs
local_query_ibf_timer.stop();
local_generate_results_timer.start();
for (auto && count : result)
{
local_query_ibf_timer.start();
auto & result = counter.membership_for(minimiser, threshold); // Results contains user bin IDs
local_query_ibf_timer.stop();
local_generate_results_timer.start();
for (auto && count : result)
{
result_string += std::to_string(count);
result_string += ',';
}
result_string += std::to_string(count);
result_string += ',';
}

if (auto & last_char = result_string.back(); last_char == ',')
last_char = '\n';
else
result_string += '\n';

result_string.insert(result_string.begin(), '\t');
result_string.insert(result_string.begin(), id.begin(), id.end());
synced_out.write(result_string);
local_generate_results_timer.stop();
}
Expand All @@ -127,25 +135,31 @@ void search_singular_ibf(search_arguments const & arguments, index_t && index)
arguments.generate_results_timer += local_generate_results_timer;
};

auto write_header = [&]()
{
if constexpr (is_ibf)
return synced_out.write_header(arguments, index.ibf().hash_function_count());
else
return synced_out.write_header(arguments, index.ibf().ibf_vector[0].hash_function_count());
};

for (auto && chunked_records : fin | seqan::stl::views::chunk((1ULL << 20) * 10))
{
assert(arguments.parts > 1); // a partitioned HIBF should have at lease 2 partitions
// prefetch the first partition while query IO is done
auto cereal_future = std::async(std::launch::async,
[&]()
{
load_index(index, arguments, 0);
});

records.clear();
arguments.query_file_io_timer.start();
std::ranges::move(chunked_records, std::back_inserter(records));
arguments.query_file_io_timer.stop();

cereal_future.get();
[[maybe_unused]] static bool header_written = write_header(); // called exactly once
synced_out.write_header(arguments, index.ibf().ibf_vector[0].hash_function_count());

for (size_t part{}; part < arguments.parts; ++part)
{
do_parallel(worker, records.size(), arguments.threads);
load_index(index, arguments, part);
}

do_parallel(worker, records.size(), arguments.threads);
do_parallel(output_worker, records.size(), arguments.threads); // when last part also write result
}
}

Expand Down
94 changes: 59 additions & 35 deletions src/build/build_hibf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,32 @@
namespace raptor
{

void build_hibf(build_arguments const & arguments,
seqan::hibf::config const & config,
seqan::hibf::layout::layout const & layout)
{
// Call ctor
seqan::hibf::hierarchical_interleaved_bloom_filter hibf{config, layout};

arguments.index_allocation_timer = std::move(hibf.index_allocation_timer);
arguments.user_bin_io_timer = std::move(hibf.user_bin_io_timer);
arguments.merge_kmers_timer = std::move(hibf.merge_kmers_timer);
arguments.fill_ibf_timer = std::move(hibf.fill_ibf_timer);

arguments.index_allocation_timer.start();
raptor_index<index_structure::hibf> index{window{arguments.window_size},
arguments.shape,
arguments.parts,
arguments.bin_path,
arguments.fpr,
std::move(hibf)};
arguments.index_allocation_timer.stop();

arguments.store_index_timer.start();
store_index(arguments.out_path, std::move(index));
arguments.store_index_timer.stop();
}

void build_hibf(build_arguments const & arguments)
{
std::variant<file_reader<file_types::sequence>, file_reader<file_types::minimiser>> reader;
Expand Down Expand Up @@ -51,47 +77,45 @@ void build_hibf(build_arguments const & arguments)
config.input_fn = input_lambda;
config.threads = arguments.threads;

// Call ctor
seqan::hibf::hierarchical_interleaved_bloom_filter hibf{config, layout};

arguments.index_allocation_timer = std::move(hibf.index_allocation_timer);
arguments.user_bin_io_timer = std::move(hibf.user_bin_io_timer);
arguments.merge_kmers_timer = std::move(hibf.merge_kmers_timer);
arguments.fill_ibf_timer = std::move(hibf.fill_ibf_timer);

arguments.index_allocation_timer.start();
raptor_index<index_structure::hibf> index{window{arguments.window_size},
arguments.shape,
arguments.parts,
arguments.bin_path,
arguments.fpr,
std::move(hibf)};
arguments.index_allocation_timer.stop();

arguments.store_index_timer.start();
store_index(arguments.out_path, std::move(index));
arguments.store_index_timer.stop();
build_hibf(arguments, config, layout);
}

void build_partitioned_hibf(build_arguments const & arguments)
{
std::variant<file_reader<file_types::sequence>, file_reader<file_types::minimiser>> reader;
if (arguments.input_is_minimiser)
reader = file_reader<file_types::minimiser>{};
else
reader = file_reader<file_types::sequence>{arguments.shape, arguments.window_size};

partition_config const cfg{arguments.parts};
index_factory factory{arguments, cfg};
std::vector<size_t> const kmers_per_partition = max_count_per_partition(cfg, arguments);

for (size_t part = 0; part < arguments.parts; ++part)
{
arguments.bits = seqan::hibf::build::bin_size_in_bits(
{.fpr = arguments.fpr, .hash_count = arguments.hash, .elements = kmers_per_partition[part]});
auto index = factory(part);
std::filesystem::path out_path{arguments.out_path};
out_path += "_" + std::to_string(part);
arguments.store_index_timer.start();
store_index(out_path, std::move(index));
arguments.store_index_timer.stop();
}
auto input_lambda = [&arguments, &reader](size_t const user_bin_id, seqan::hibf::insert_iterator it)
{
std::visit(
[&](auto const & reader)
{
reader.hash_into(arguments.bin_path[user_bin_id], it);
},
reader);
};

seqan::hibf::config config{};
seqan::hibf::layout::layout layout{};

std::ifstream layout_stream{arguments.bin_file};
config.read_from(layout_stream);
config.threads = arguments.threads;
config.input_fn = input_lambda;

for (size_t part = 0; part < arguments.parts; ++part)
{
layout.read_from(layout_stream); // read current layout

// replace out_path by appending `_[part]` for each index partition.
build_arguments local_arguments = arguments;
local_arguments.out_path = arguments.out_path + "_" + std::to_string(part);

build_hibf(local_arguments, config, layout);
}
}

} // namespace raptor
7 changes: 6 additions & 1 deletion src/search/search_hibf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
*/

#include <raptor/search/search_hibf.hpp>
#include <raptor/search/search_partitioned_hibf.hpp>
#include <raptor/search/search_singular_ibf.hpp>

namespace raptor
Expand All @@ -19,7 +20,11 @@ namespace raptor
void search_hibf(search_arguments const & arguments)
{
auto index = raptor_index<index_structure::hibf>{};
search_singular_ibf(arguments, std::move(index));

if (arguments.parts == 1)
search_singular_ibf(arguments, std::move(index));
else
search_partitioned_hibf(arguments, std::move(index));
}

} // namespace raptor

0 comments on commit 80f6502

Please sign in to comment.