Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Phibf #405

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft

Phibf #405

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/raptor/index.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#pragma once

#include <cereal/types/string.hpp>
#include <cereal/types/vector.hpp>

#include <sharg/exceptions.hpp>

Expand Down
18 changes: 18 additions & 0 deletions include/raptor/search/do_parallel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include <algorithm>
#include <functional>
#include <future>
#include <omp.h>
#include <vector>

Expand All @@ -34,4 +35,21 @@ void do_parallel(algorithm_t && worker, size_t const num_records, size_t const t
}
}

template <typename algorithm_t>
void do_parallel(algorithm_t && worker, size_t const num_records, size_t const threads, bool const output_results)
{
std::vector<decltype(std::async(std::launch::async, worker, size_t{}, size_t{}, bool{}))> tasks;
size_t const records_per_thread = num_records / threads;

for (size_t i = 0; i < threads; ++i)
{
size_t const start = records_per_thread * i;
size_t const extent = i == (threads - 1) ? num_records - i * records_per_thread : records_per_thread;
tasks.emplace_back(std::async(std::launch::async, worker, start, extent, output_results));
}

for (auto && task : tasks)
task.get();
}

} // namespace raptor
140 changes: 140 additions & 0 deletions include/raptor/search/search_partitioned_hibf.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// --------------------------------------------------------------------------------------------------
// Copyright (c) 2006-2023, Knut Reinert & Freie Universität Berlin
// Copyright (c) 2016-2023, Knut Reinert & MPI für molekulare Genetik
// This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License
// shipped with this file and also available at: https://github.com/seqan/raptor/blob/main/LICENSE.md
// --------------------------------------------------------------------------------------------------

/*!\file
* \brief Provides raptor::search_partitioned_hibf.
* \author Svenja Mehringer <svenja.mehringer AT fu-berlin.de>
*/

#pragma once

#include <seqan3/search/views/minimiser_hash.hpp>

#include <raptor/adjust_seed.hpp>
#include <raptor/contrib/std/chunk_view.hpp>
#include <raptor/dna4_traits.hpp>
#include <raptor/search/do_parallel.hpp>
#include <raptor/search/load_index.hpp>
#include <raptor/search/sync_out.hpp>
#include <raptor/threshold/threshold.hpp>

namespace raptor
{

template <typename index_t>
void search_partitioned_hibf(search_arguments const & arguments, index_t && index)
{
seqan3::sequence_file_input<dna4_traits, seqan3::fields<seqan3::field::id, seqan3::field::seq>> fin{
arguments.query_file};
using record_type = typename decltype(fin)::record_type;

std::vector<record_type> records{};

sync_out synced_out{arguments};

std::vector<std::string> results; // cache results since we are searching multiple hibfs

raptor::threshold::threshold const thresholder{arguments.make_threshold_parameters()};

// searching with storing all results in results map
auto worker = [&](size_t const start, size_t const extent, bool const output_results)
{
seqan::hibf::serial_timer local_compute_minimiser_timer{};
seqan::hibf::serial_timer local_query_ibf_timer{};
seqan::hibf::serial_timer local_generate_results_timer{};

auto agent = index.ibf().membership_agent();

std::string result_string{};
std::vector<uint64_t> minimiser;

auto hash_adaptor = seqan3::views::minimiser_hash(arguments.shape,
seqan3::window_size{arguments.window_size},
seqan3::seed{adjust_seed(arguments.shape_weight)});

for (size_t pos = start; pos < start + extent; ++pos)
{
auto const & seq = records[pos].sequence();
std::string & result_string = results[pos];

auto minimiser_view = seq | hash_adaptor | std::views::common;
local_compute_minimiser_timer.start();
minimiser.assign(minimiser_view.begin(), minimiser_view.end());
local_compute_minimiser_timer.stop();

size_t const minimiser_count{minimiser.size()};
size_t const threshold = thresholder.get(minimiser_count);

local_query_ibf_timer.start();
auto & result = agent.membership_for(minimiser, threshold); // Results contains user bin IDs
local_query_ibf_timer.stop();
local_generate_results_timer.start();
for (auto && user_bin_id : result)
{
result_string += std::to_string(user_bin_id);
result_string += ',';
}

if (output_results)
{
result_string.insert(result_string.begin(), '\t');
auto const & id = records[pos].id();
result_string.insert(result_string.begin(), id.begin(), id.end());

if (auto & last_char = result_string.back(); last_char == ',')
last_char = '\n';
else
result_string += '\n';

synced_out.write(result_string);
result_string.clear(); // free memory
}
local_generate_results_timer.stop();
}

arguments.compute_minimiser_timer += local_compute_minimiser_timer;
arguments.query_ibf_timer += local_query_ibf_timer;
arguments.generate_results_timer += local_generate_results_timer;
};

for (auto && chunked_records : fin | seqan::stl::views::chunk((1ULL << 20) * 10))
{
// prefetch the first partition while query IO is done
auto cereal_future = std::async(std::launch::async,
[&]()
{
load_index(index, arguments, 0);
});

records.clear();
arguments.query_file_io_timer.start();
std::ranges::move(chunked_records, std::back_inserter(records));
arguments.query_file_io_timer.stop();

results.resize(records.size());

cereal_future.get();
synced_out.write_header(arguments, index.ibf().ibf_vector[0].hash_function_count());

assert(arguments.parts > 0);
for (int part = 0; part < arguments.parts - 1; ++part)
{
do_parallel(worker, records.size(), arguments.threads, false /*do not write results*/);
arguments.write_timings_to_file();
arguments.compute_minimiser_timer = {};
arguments.query_ibf_timer = {};
arguments.generate_results_timer = {};
arguments.load_index_timer = {};
load_index(index, arguments, part + 1);
}

do_parallel(worker, records.size(), arguments.threads, true /*write results*/);
arguments.write_timings_to_file();
}
}

} // namespace raptor
17 changes: 11 additions & 6 deletions src/argument_parsing/build_parsing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ inline void parse_chopper_config(sharg::parser & parser, build_arguments & argum
}

validate_shape(parser, arguments);

arguments.parts = config.number_of_partitions; // update number of partitions for the HIBF
}

inline void parse_shape_from_minimiser(sharg::parser & parser, build_arguments & arguments)
Expand Down Expand Up @@ -174,11 +176,14 @@ void init_build_parser(sharg::parser & parser, build_arguments & arguments)
.description = "The number of hash functions to use.",
.default_message = std::to_string(arguments.hash) + ", or read from layout file",
.validator = sharg::arithmetic_range_validator{1, 5}});
parser.add_option(arguments.parts,
sharg::config{.short_id = '\0',
.long_id = "parts",
.description = "Splits the index in this many parts. Not available for the HIBF.",
.validator = power_of_two_validator{}});
parser.add_option(
arguments.parts,
sharg::config{
.short_id = '\0',
.long_id = "parts",
.description =
"Splits the index in this many parts. Not available for the HIBF. Needs to be set at layouting step.",
.validator = power_of_two_validator{}});

// GCOVR_EXCL_START
// Adding additional cwl information that currently aren't supported by sharg and tdl.
Expand Down Expand Up @@ -253,7 +258,7 @@ void build_parsing(sharg::parser & parser)
arguments.is_hibf = input_is_pack_file(arguments.bin_file);

if (arguments.is_hibf && arguments.parts != 1u)
throw sharg::parser_error{"The HIBF cannot yet be partitioned."};
throw sharg::parser_error{"The HIBF cannot yet be partitioned when building. Please set parts when layouting."};

parse_bin_path(arguments);

Expand Down
2 changes: 1 addition & 1 deletion src/argument_parsing/search_arguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ void search_arguments::print_timings() const

void search_arguments::write_timings_to_file() const
{
std::ofstream output_stream{timing_out};
std::ofstream output_stream{timing_out, std::ios_base::app};
output_stream << std::fixed << std::setprecision(2);
output_stream << "peak_memory_usage_in_kibibytes\t"
<< "index_size_in_kibibytes\t"
Expand Down
66 changes: 40 additions & 26 deletions src/build/build_hibf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,32 @@
namespace raptor
{

void construct_hibf(build_arguments const & arguments,
seqan::hibf::config & config,
seqan::hibf::layout::layout const & layout)
{
// Call ctor
seqan::hibf::hierarchical_interleaved_bloom_filter hibf{config, layout};

arguments.index_allocation_timer = std::move(hibf.index_allocation_timer);
arguments.user_bin_io_timer = std::move(hibf.user_bin_io_timer);
arguments.merge_kmers_timer = std::move(hibf.merge_kmers_timer);
arguments.fill_ibf_timer = std::move(hibf.fill_ibf_timer);

arguments.index_allocation_timer.start();
raptor_index<index_structure::hibf> index{window{arguments.window_size},
arguments.shape,
arguments.parts,
arguments.bin_path,
arguments.fpr,
std::move(hibf)};
arguments.index_allocation_timer.stop();

arguments.store_index_timer.start();
store_index(arguments.out_path, std::move(index));
arguments.store_index_timer.stop();
}

void build_hibf(build_arguments const & arguments)
{
std::variant<file_reader<file_types::sequence>, file_reader<file_types::minimiser>> reader;
Expand All @@ -34,40 +60,28 @@ void build_hibf(build_arguments const & arguments)
reader);
};

// Parse config+layout
seqan::hibf::config config{};
seqan::hibf::layout::layout layout{};

{
std::ifstream layout_stream{arguments.bin_file};
config.read_from(layout_stream);
layout.read_from(layout_stream);
}
std::ifstream layout_stream{arguments.bin_file};
config.read_from(layout_stream);

// Adapt config
config.input_fn = input_lambda;
// update config and build_arguments
config.threads = arguments.threads;
config.input_fn = input_lambda;

// Call ctor
seqan::hibf::hierarchical_interleaved_bloom_filter hibf{config, layout};
for (size_t part = 0; part < arguments.parts; ++part)
{
layout.clear();
layout.read_from(layout_stream); // read current layout

arguments.index_allocation_timer = std::move(hibf.index_allocation_timer);
arguments.user_bin_io_timer = std::move(hibf.user_bin_io_timer);
arguments.merge_kmers_timer = std::move(hibf.merge_kmers_timer);
arguments.fill_ibf_timer = std::move(hibf.fill_ibf_timer);
// Inc ase of partitioned HIBF, replace out_path by appending `_[part]` for each index partition.
build_arguments local_arguments = arguments;
local_arguments.out_path = (arguments.parts > 1) ? arguments.out_path.string() + "_" + std::to_string(part)
: arguments.out_path.string();

arguments.index_allocation_timer.start();
raptor_index<index_structure::hibf> index{window{arguments.window_size},
arguments.shape,
arguments.parts,
arguments.bin_path,
arguments.fpr,
std::move(hibf)};
arguments.index_allocation_timer.stop();

arguments.store_index_timer.start();
store_index(arguments.out_path, std::move(index));
arguments.store_index_timer.stop();
construct_hibf(local_arguments, config, layout);
}
}

} // namespace raptor
7 changes: 6 additions & 1 deletion src/search/search_hibf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*/

#include <raptor/search/search_hibf.hpp>
#include <raptor/search/search_partitioned_hibf.hpp>
#include <raptor/search/search_singular_ibf.hpp>

namespace raptor
Expand All @@ -16,7 +17,11 @@ namespace raptor
void search_hibf(search_arguments const & arguments)
{
auto index = raptor_index<index_structure::hibf>{};
search_singular_ibf(arguments, std::move(index));

if (arguments.parts == 1)
search_singular_ibf(arguments, std::move(index));
else
search_partitioned_hibf(arguments, std::move(index));
}

} // namespace raptor
3 changes: 2 additions & 1 deletion test/unit/cli/build/build_hibf_chopper_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ TEST_F(build_hibf_layout, pipeline)
std::filesystem::path const layout_filename = "raptor_cli_test.layout";
std::filesystem::path const index_filename = "raptor.index";
std::filesystem::path const search_filename = "search.out";
size_t const number_of_repeated_bins{16};
size_t const number_of_repeated_bins{64};
size_t const number_of_errors{0}; // search

{ // generate sequence (data) input file
Expand All @@ -30,6 +30,7 @@ TEST_F(build_hibf_layout, pipeline)
"layout",
"--kmer 19",
"--threads 1",
"--partitioning-approach 1",
"--input",
data_filename,
"--tmax 64",
Expand Down
1 change: 1 addition & 0 deletions test/unit/cli/search/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
cmake_minimum_required (VERSION 3.25)

raptor_add_unit_test (search_hibf_test.cpp)
raptor_add_unit_test (search_partitioned_hibf_test.cpp)
raptor_add_unit_test (search_hibf_preprocessing_test.cpp)
raptor_add_unit_test (search_ibf_preprocessing_test.cpp)
raptor_add_unit_test (search_ibf_test.cpp)
Loading
Loading