Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into DOR-553_RG_HDR_FOR_…
Browse files Browse the repository at this point in the history
…CUSTOM_BARCODES
  • Loading branch information
hpendry-ont committed Mar 20, 2024
2 parents f37cd6a + c06dfa1 commit 0a9e40a
Show file tree
Hide file tree
Showing 25 changed files with 550 additions and 285 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ add_library(dorado_lib
dorado/read_pipeline/PairingNode.h
dorado/read_pipeline/PolyACalculatorNode.cpp
dorado/read_pipeline/PolyACalculatorNode.h
dorado/read_pipeline/ProgressTracker.cpp
dorado/read_pipeline/ProgressTracker.h
dorado/read_pipeline/ReadFilterNode.cpp
dorado/read_pipeline/ReadFilterNode.h
Expand Down
6 changes: 4 additions & 2 deletions dorado/api/caller_creation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ std::shared_ptr<basecall::CudaCaller> create_cuda_caller(
int batch_size,
const std::string& device,
float memory_limit_fraction,
PipelineType pipeline_type) {
PipelineType pipeline_type,
float batch_size_time_penalty) {
return std::make_shared<basecall::CudaCaller>(model_config, chunk_size, batch_size, device,
memory_limit_fraction, pipeline_type);
memory_limit_fraction, pipeline_type,
batch_size_time_penalty);
}
#elif DORADO_METAL_BUILD
std::shared_ptr<basecall::MetalCaller> create_metal_caller(
Expand Down
3 changes: 2 additions & 1 deletion dorado/api/caller_creation.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ std::shared_ptr<basecall::CudaCaller> create_cuda_caller(
int batch_size,
const std::string& device,
float memory_limit_fraction,
PipelineType pipeline_type);
PipelineType pipeline_type,
float batch_size_time_penalty);
#elif DORADO_METAL_BUILD
std::shared_ptr<basecall::MetalCaller> create_metal_caller(
const basecall::CRFModelConfig& model_config,
Expand Down
6 changes: 4 additions & 2 deletions dorado/api/runner_creation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ std::pair<std::vector<basecall::RunnerPtr>, size_t> create_basecall_runners(
size_t batch_size,
size_t chunk_size,
float memory_fraction,
PipelineType pipeline_type) {
PipelineType pipeline_type,
float batch_size_time_penalty) {
#ifdef __APPLE__
(void)pipeline_type;
#endif
Expand Down Expand Up @@ -88,7 +89,7 @@ std::pair<std::vector<basecall::RunnerPtr>, size_t> create_basecall_runners(
for (auto device_string : devices) {
futures.push_back(pool.push(create_cuda_caller, std::cref(model_config),
int(chunk_size), int(batch_size), device_string,
memory_fraction, pipeline_type));
memory_fraction, pipeline_type, batch_size_time_penalty));
}

for (auto& caller : futures) {
Expand All @@ -110,6 +111,7 @@ std::pair<std::vector<basecall::RunnerPtr>, size_t> create_basecall_runners(
throw std::runtime_error("Unsupported device: " + device);
}
(void)num_gpu_runners;
(void)batch_size_time_penalty;
#endif

auto adjusted_chunk_size = runners.front()->chunk_size();
Expand Down
3 changes: 2 additions & 1 deletion dorado/api/runner_creation.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ std::pair<std::vector<basecall::RunnerPtr>, size_t> create_basecall_runners(
size_t batch_size,
size_t chunk_size,
float memory_fraction,
PipelineType pipeline_type);
PipelineType pipeline_type,
float batch_size_time_penalty);

std::vector<modbase::RunnerPtr> create_modbase_runners(
const std::vector<std::filesystem::path>& remora_models,
Expand Down
67 changes: 49 additions & 18 deletions dorado/basecall/CudaCaller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <cassert>
#include <chrono>
#include <limits>
#include <map>

using namespace std::chrono_literals;

Expand All @@ -37,7 +38,8 @@ CudaCaller::CudaCaller(const CRFModelConfig &model_config,
int batch_size,
const std::string &device,
float memory_limit_fraction,
PipelineType pipeline_type)
PipelineType pipeline_type,
float batch_size_time_penalty)
: m_config(model_config),
m_device(device),
m_decoder(decode::create_decoder(device, m_config)),
Expand All @@ -54,7 +56,7 @@ CudaCaller::CudaCaller(const CRFModelConfig &model_config,
at::InferenceMode guard;
m_module = load_crf_model(model_config, m_options);

determine_batch_dims(memory_limit_fraction, batch_size, chunk_size);
determine_batch_dims(memory_limit_fraction, batch_size, chunk_size, batch_size_time_penalty);

c10::cuda::CUDAGuard device_guard(m_options.device());
c10::cuda::CUDACachingAllocator::emptyCache();
Expand Down Expand Up @@ -167,22 +169,29 @@ std::pair<int64_t, int64_t> CudaCaller::calculate_memory_requirements() const {
int64_t crfmodel_bytes_per_chunk_timestep;
if (m_config.out_features.has_value()) {
auto out_features = m_config.out_features.value();
std::unordered_map<int, int64_t> out_features_map{{128, 2312}, {256, 8712}};
crfmodel_bytes_per_chunk_timestep = out_features_map[out_features];
if (crfmodel_bytes_per_chunk_timestep == 0) {
spdlog::warn("Failed to set GPU memory requirements. Unexpected model out_features {}.",
out_features);
const std::map<int, int64_t> out_features_map{{128, 2312}, {256, 8712}};
auto it = out_features_map.upper_bound(out_features - 1);
if (it == out_features_map.end()) {
spdlog::error(
"Failed to set GPU memory requirements. Unexpected model out_features {}.",
out_features);
return {0, 0};
} else if (it->first != out_features) {
spdlog::warn("Unexpected model out_features {}. Estimating GPU memory requirements.");
}
crfmodel_bytes_per_chunk_timestep = it->second;
} else {
std::unordered_map<int, int64_t> insize_map{
const std::map<int, int64_t> insize_map{
{96, 960}, {128, 1280}, {384, 2816}, {768, 9728}, {1024, 10240}};
crfmodel_bytes_per_chunk_timestep = insize_map[m_config.lstm_size];
if (crfmodel_bytes_per_chunk_timestep == 0) {
spdlog::warn("Failed to set GPU memory requirements. Unexpected model insize {}.",
m_config.lstm_size);
auto it = insize_map.upper_bound(m_config.lstm_size - 1);
if (it == insize_map.end()) {
spdlog::error("Failed to set GPU memory requirements. Unexpected model insize {}.",
m_config.lstm_size);
return {0, 0};
} else if (it->first != m_config.lstm_size) {
spdlog::warn("Unexpected model insize {}. Estimating GPU memory requirements.");
}
crfmodel_bytes_per_chunk_timestep = it->second;
}

// Determine size of working memory for decoder divided by (batch_size * chunk_size)
Expand All @@ -197,7 +206,8 @@ std::pair<int64_t, int64_t> CudaCaller::calculate_memory_requirements() const {

void CudaCaller::determine_batch_dims(float memory_limit_fraction,
int requested_batch_size,
int requested_chunk_size) {
int requested_chunk_size,
float batch_size_time_penalty) {
c10::cuda::CUDAGuard device_guard(m_options.device());
int64_t available = utils::available_memory(m_options.device());
spdlog::debug("{} memory available: {:.2f}GB", m_device, available / GB);
Expand Down Expand Up @@ -307,6 +317,11 @@ void CudaCaller::determine_batch_dims(float memory_limit_fraction,
max_batch_size = std::min(max_batch_size, max_batch_size_limit);
spdlog::debug("Auto batchsize {}: testing up to {} in steps of {}", m_device, max_batch_size,
granularity);

// Times and corresponding batch sizes.
std::vector<std::pair<float, int>> times_and_batch_sizes;
times_and_batch_sizes.reserve(max_batch_size / granularity);

for (int batch_size = granularity; batch_size <= max_batch_size; batch_size += granularity) {
auto input = torch::empty({batch_size, m_config.num_features, chunk_size}, m_options);

Expand All @@ -332,17 +347,33 @@ void CudaCaller::determine_batch_dims(float memory_limit_fraction,
spdlog::debug("Auto batchsize {}: {}, time per chunk {:8f} ms", m_device, batch_size, time);
if (time < best_time) {
best_time = time;
for (size_t i = 0; i < m_batch_dims.size(); ++i) {
if (batch_size <= max_batch_sizes[i]) {
m_batch_dims[i].N = batch_size;
}
}
times_and_batch_sizes.emplace_back(time, batch_size);
}

// Clear the cache each time. Without this, intermittent cuda memory allocation errors
// are seen on windows laptop NVIDIA RTX A5500 Laptop GPU. See JIRA issue DOR-466
c10::cuda::CUDACachingAllocator::emptyCache();
}

// Find the first batch size that was under the threshold.
const float threshold_time = best_time * (1 + batch_size_time_penalty);
auto under_threshold = [threshold_time](auto pair) { return pair.first <= threshold_time; };
auto chosen_batch = std::find_if(times_and_batch_sizes.begin(), times_and_batch_sizes.end(),
under_threshold);
if (chosen_batch == times_and_batch_sizes.end()) {
// This should be impossible.
// Sanity check only, to avoid segfault or misleading behavior if there is a bug.
throw std::out_of_range("Error in batch size selection algorithm.");
}

const int best_batch_size = chosen_batch->second;
spdlog::debug("Chosen batch size {}: {}, time per chunk {:8f} ms", m_device, best_batch_size,
chosen_batch->first);
for (size_t i = 0; i < m_batch_dims.size(); ++i) {
if (best_batch_size <= max_batch_sizes[i]) {
m_batch_dims[i].N = best_batch_size;
}
}
}

void CudaCaller::start_threads() {
Expand Down
8 changes: 6 additions & 2 deletions dorado/basecall/CudaCaller.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ class CudaCaller {
int batch_size,
const std::string &device,
float memory_limit_fraction,
PipelineType pipeline_type);
PipelineType pipeline_type,
float batch_size_time_penalty);

~CudaCaller();
std::vector<decode::DecodedChunk> call_chunks(at::Tensor &input,
Expand Down Expand Up @@ -56,7 +57,10 @@ class CudaCaller {
}

std::pair<int64_t, int64_t> calculate_memory_requirements() const;
void determine_batch_dims(float memory_limit_fraction, int batch_size, int chunk_size);
void determine_batch_dims(float memory_limit_fraction,
int batch_size,
int chunk_size,
float batch_size_time_penalty);

void start_threads();
void cuda_thread_fn();
Expand Down
28 changes: 22 additions & 6 deletions dorado/cli/aligner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ int aligner(int argc, char* argv[]) {

if (parser.visible.get<bool>("--verbose")) {
mm_verbose = 3;
}

auto progress_stats_frequency(parser.hidden.get<int>("progress_stats_frequency"));
if (progress_stats_frequency > 0) {
utils::EnsureInfoLoggingEnabled(static_cast<dorado::utils::VerboseLogLevel>(verbosity));
} else {
utils::SetVerboseLogging(static_cast<dorado::utils::VerboseLogLevel>(verbosity));
}

Expand Down Expand Up @@ -194,7 +200,7 @@ int aligner(int argc, char* argv[]) {
}

auto index_file_access = load_index(index, options, aligner_threads);
auto progress_stats_frequency(parser.hidden.get<int>("progress_stats_frequency"));

ReadOutputProgressStats progress_stats(
std::chrono::seconds{progress_stats_frequency}, all_files.size(),
ReadOutputProgressStats::StatsCollectionMode::collector_per_input_file);
Expand All @@ -211,9 +217,10 @@ int aligner(int argc, char* argv[]) {
dorado::utils::strip_sq_hdr(header);
add_pg_hdr(header);

utils::HtsFile hts_file(file_info.output, file_info.output_mode, writer_threads);

PipelineDescriptor pipeline_desc;
auto hts_writer = pipeline_desc.add_node<HtsWriter>({}, file_info.output,
file_info.output_mode, writer_threads);
auto hts_writer = pipeline_desc.add_node<HtsWriter>({}, hts_file);
auto aligner = pipeline_desc.add_node<AlignerNode>({hts_writer}, index_file_access, index,
bed_file, options, aligner_threads);

Expand All @@ -230,10 +237,14 @@ int aligner(int argc, char* argv[]) {
const auto& aligner_ref = dynamic_cast<AlignerNode&>(pipeline->get_node_ref(aligner));
utils::add_sq_hdr(header, aligner_ref.get_sequence_records_for_header());
auto& hts_writer_ref = dynamic_cast<HtsWriter&>(pipeline->get_node_ref(hts_writer));
hts_writer_ref.set_and_write_header(header);
hts_file.set_and_write_header(header);

// All progress reporting is in the post-processing part.
ProgressTracker tracker(0, false, 1.f);
tracker.set_description("Aligning");

// Set up stats counting
std::vector<dorado::stats::StatsCallable> stats_callables;
ProgressTracker tracker(0, false);
stats_callables.push_back(
[&tracker](const stats::NamedStats& stats) { tracker.update_progress_bar(stats); });
stats_callables.push_back([&progress_stats](const stats::NamedStats& stats) {
Expand All @@ -252,11 +263,16 @@ int aligner(int argc, char* argv[]) {

// Stop the stats sampler thread before tearing down any pipeline objects.
stats_sampler->terminate();

tracker.update_progress_bar(final_stats);
progress_stats.update_reads_per_file_estimate(num_reads_in_file);
progress_stats.notify_stats_collector_completed(final_stats);

// Report progress during output file finalisation.
tracker.set_description("Sorting output files");
hts_file.finalise([&](size_t progress) {
tracker.update_post_processing_progress(static_cast<float>(progress));
});

tracker.summarize();

spdlog::info("> finished alignment");
Expand Down
25 changes: 18 additions & 7 deletions dorado/cli/basecaller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "utils/string_utils.h"
#include "utils/sys_stats.h"
#include "utils/torch_utils.h"
#include "utils/tty_utils.h"

#include <htslib/sam.h>
#include <spdlog/spdlog.h>
Expand Down Expand Up @@ -128,7 +129,7 @@ void setup(std::vector<std::string> args,

auto [runners, num_devices] =
api::create_basecall_runners(model_config, device, num_runners, 0, batch_size,
chunk_size, 1.f, api::PipelineType::simplex);
chunk_size, 1.f, api::PipelineType::simplex, 0.f);

auto read_groups = DataLoader::load_read_groups(data_path, model_name, modbase_model_names,
recursive_file_loading);
Expand All @@ -150,9 +151,10 @@ void setup(std::vector<std::string> args,
cli::add_pg_hdr(hdr.get(), args);
utils::add_rg_hdr(hdr.get(), read_groups, barcode_kits, sample_sheet.get());

utils::HtsFile hts_file("-", output_mode, thread_allocations.writer_threads);

PipelineDescriptor pipeline_desc;
auto hts_writer = pipeline_desc.add_node<HtsWriter>({}, "-", output_mode,
thread_allocations.writer_threads);
auto hts_writer = pipeline_desc.add_node<HtsWriter>({}, hts_file);
auto aligner = PipelineDescriptor::InvalidNodeHandle;
auto current_sink_node = hts_writer;
if (enable_aligner) {
Expand Down Expand Up @@ -209,7 +211,7 @@ void setup(std::vector<std::string> args,
const auto& aligner_ref = dynamic_cast<AlignerNode&>(pipeline->get_node_ref(aligner));
utils::add_sq_hdr(hdr.get(), aligner_ref.get_sequence_records_for_header());
}
hts_writer_ref.set_and_write_header(hdr.get());
hts_file.set_and_write_header(hdr.get());

std::unordered_set<std::string> reads_already_processed;
if (!resume_from_file.empty()) {
Expand Down Expand Up @@ -250,8 +252,10 @@ void setup(std::vector<std::string> args,
reads_already_processed = resume_loader.get_processed_read_ids();
}

ProgressTracker tracker(int(num_reads), false, hts_file.finalise_is_noop() ? 0.f : 0.5f);
tracker.set_description("Basecalling");

std::vector<dorado::stats::StatsCallable> stats_callables;
ProgressTracker tracker(int(num_reads), false);
stats_callables.push_back(
[&tracker](const stats::NamedStats& stats) { tracker.update_progress_bar(stats); });
constexpr auto kStatsPeriod = 100ms;
Expand All @@ -270,11 +274,18 @@ void setup(std::vector<std::string> args,
auto final_stats = pipeline->terminate(DefaultFlushOptions());

// Stop the stats sampler thread before tearing down any pipeline objects.
stats_sampler->terminate();

// Then update progress tracking one more time from this thread, to
// allow accurate summarisation.
stats_sampler->terminate();
tracker.update_progress_bar(final_stats);

// Report progress during output file finalisation.
tracker.set_description("Sorting output files");
hts_file.finalise([&](size_t progress) {
tracker.update_post_processing_progress(static_cast<float>(progress));
});

// Give the user a nice summary.
tracker.summarize();
if (!dump_stats_file.empty()) {
std::ofstream stats_file(dump_stats_file);
Expand Down
Loading

0 comments on commit 0a9e40a

Please sign in to comment.