Skip to content

Commit

Permalink
Merge branch 'DOR-488_aligner_check_client_connected' into 'master'
Browse files Browse the repository at this point in the history
DOR-488 Allow alignment to be skipped for disconnected clients

Closes DOR-488

See merge request machine-learning/dorado!771
  • Loading branch information
MarkBicknellONT committed Dec 18, 2023
2 parents 786fe64 + 5286985 commit b1302ae
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 7 deletions.
8 changes: 5 additions & 3 deletions dorado/alignment/IndexFileAccess.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,11 @@ IndexLoadResult IndexFileAccess::load_index(const std::string& file,
std::shared_ptr<const Minimap2Index> IndexFileAccess::get_index(const std::string& file,
const Minimap2Options& options) {
std::lock_guard<std::mutex> lock(m_mutex);
auto index = get_or_load_compatible_index(file, options);
assert(index && "get_index expects a compatible index to have been loaded");
return index;
// N.B. Although the index file for a client must be loaded before reads are added to the pipeline
// it is still possible that the index for a read in the pipeline does not have its index loaded
// if the client disconnected and caused the index to be unloaded while there were still reads in
// the pipeline. For this reason we do not assert a non-null index.
return get_or_load_compatible_index(file, options);
}

bool IndexFileAccess::is_index_loaded(const std::string& file,
Expand Down
29 changes: 25 additions & 4 deletions dorado/read_pipeline/AlignerNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,17 @@ std::shared_ptr<const alignment::Minimap2Index> AlignerNode::get_index(
}
auto index =
m_index_file_access->get_index(align_info.reference_file, align_info.minimap_options);
if (!index) {
if (read_common.client_info->is_disconnected()) {
// Unlikely but ... may have disconnected since last checked and caused a
// an unload of the index file.
return {};
}
throw std::runtime_error(
"Cannot align read. Expected alignment reference file is not loaded: " +
align_info.reference_file);
}

return index;
}

Expand Down Expand Up @@ -98,14 +109,24 @@ alignment::HeaderSequenceRecords AlignerNode::get_sequence_records_for_header()
return alignment::Minimap2Aligner(m_index_for_bam_messages).get_sequence_records_for_header();
}

void AlignerNode::align_read_common(ReadCommon& read_common, mm_tbuf_t* tbuf) {
if (read_common.client_info->is_disconnected()) {
return;
}

auto index = get_index(read_common);
if (!index) {
return;
}

alignment::Minimap2Aligner(index).align(read_common, tbuf);
}

void AlignerNode::worker_thread() {
Message message;
mm_tbuf_t* tbuf = mm_tbuf_init();
auto align_read = [this, tbuf](auto&& read) {
auto index = get_index(read->read_common);
if (index) {
alignment::Minimap2Aligner(index).align(read->read_common, tbuf);
}
align_read_common(read->read_common, tbuf);
send_message_to_sink(std::move(read));
};
while (get_input_message(message)) {
Expand Down
2 changes: 2 additions & 0 deletions dorado/read_pipeline/AlignerNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <vector>

struct bam1_t;
typedef struct mm_tbuf_s mm_tbuf_t;

namespace dorado {

Expand Down Expand Up @@ -39,6 +40,7 @@ class AlignerNode : public MessageSink {
void terminate_impl();
void worker_thread();
std::shared_ptr<const alignment::Minimap2Index> get_index(const ReadCommon& read_common);
void align_read_common(ReadCommon& read_common, mm_tbuf_t* tbuf);

size_t m_threads;
std::vector<std::thread> m_workers;
Expand Down

0 comments on commit b1302ae

Please sign in to comment.