Skip to content

Commit

Permalink
Merge branch 'DOR-587_demux_folder_input' into 'master'
Browse files Browse the repository at this point in the history
DOR-587 added support for dorado demux reading from a folder and a --recursive option.

Closes DOR-587

See merge request machine-learning/dorado!864
  • Loading branch information
MarkBicknellONT committed Mar 1, 2024
2 parents 043be27 + dc21b5b commit c459890
Show file tree
Hide file tree
Showing 11 changed files with 300 additions and 46 deletions.
11 changes: 9 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,14 @@ To align existing basecalls, run:
```
$ dorado aligner <index> <reads> > aligned.bam
```
where `index` is a reference to align to in (FASTQ/FASTA/.mmi) format and `reads` is a file in any HTS format.
where `index` is a reference to align to in (FASTQ/FASTA/.mmi) format and `reads` is a folder or file in any HTS format.

When reading from an input folder, `dorado align` also supports emitting aligned files to an output folder, which will preserve the file structure of the inputs:

```
$ dorado aligner <index> <input_read_folder> --output-dir <output_read_folder>
```


To basecall with alignment with duplex or simplex, run with the `--reference` option:

Expand Down Expand Up @@ -230,7 +237,7 @@ Existing basecalled datasets can be classified as well as demultiplexed into per
$ dorado demux --kit-name <kit-name> --output-dir <output-folder-for-demuxed-bams> <reads>
```

`<reads>` can either be an HTS format file (e.g. FASTQ, BAM, etc.) or a stream of an HTS format (e.g. the output of dorado basecalling).
`<reads>` can either be a folder or a single file in an HTS format file (e.g. FASTQ, BAM, etc.) or a stream of an HTS format (e.g. the output of dorado basecalling).

This results in multiple BAM files being generated in the output folder, one per barcode (formatted as `KITNAME_BARCODEXX.bam`) and one for all unclassified reads. As with the in-line mode, `--no-trim` and `--barcode-both-ends` are also available as additional options.

Expand Down
8 changes: 5 additions & 3 deletions dorado/alignment/alignment_processing_items.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,12 @@ namespace dorado::alignment {

AlignmentProcessingItems::AlignmentProcessingItems(std::string input_path,
bool recursive_input,
std::string output_folder)
std::string output_folder,
bool allow_output_to_folder_from_stdin)
: m_input_path(std::move(input_path)),
m_output_folder(std::move(output_folder)),
m_recursive_input(recursive_input) {}
m_recursive_input(recursive_input),
m_allow_output_to_folder_from_stdin(allow_output_to_folder_from_stdin) {}

bool AlignmentProcessingItems::check_recursive_arg_false() {
if (!m_recursive_input) {
Expand Down Expand Up @@ -198,7 +200,7 @@ bool AlignmentProcessingItems::initialise_for_folder() {
}

bool AlignmentProcessingItems::initialise_for_stdin() {
if (!m_output_folder.empty()) {
if (!m_output_folder.empty() && !m_allow_output_to_folder_from_stdin) {
spdlog::error("--output-dir is not valid if input is stdin.");
return false;
}
Expand Down
4 changes: 3 additions & 1 deletion dorado/alignment/alignment_processing_items.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class AlignmentProcessingItems {
const std::string m_input_path;
const std::string m_output_folder;
bool m_recursive_input;
bool m_allow_output_to_folder_from_stdin;

std::unordered_map<std::string, std::vector<std::filesystem::path>> m_working_paths{};

Expand Down Expand Up @@ -55,7 +56,8 @@ class AlignmentProcessingItems {
public:
AlignmentProcessingItems(std::string input_path,
bool recursive_input,
std::string output_folder);
std::string output_folder,
bool allow_output_to_folder_from_stdin);

bool initialise();

Expand Down
9 changes: 5 additions & 4 deletions dorado/cli/aligner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ int aligner(int argc, char* argv[]) {
parser.visible.add_argument("index").help("reference in (fastq/fasta/mmi).");
parser.visible.add_argument("reads")
.help("An input file or the folder containing input file(s) (any HTS format).")
.nargs(argparse::nargs_pattern::any)
.nargs(argparse::nargs_pattern::optional)
.default_value(std::string{});
parser.visible.add_argument("-r", "--recursive")
.help("If the 'reads' positional argument is a folder any subfolders will also be "
Expand Down Expand Up @@ -160,13 +160,14 @@ int aligner(int argc, char* argv[]) {
auto max_reads(parser.visible.get<int>("max-reads"));
auto options = cli::process_minimap2_arguments(parser, alignment::dflt_options);

alignment::AlignmentProcessingItems processing_items{reads, recursive_input, output_folder};
alignment::AlignmentProcessingItems processing_items{reads, recursive_input, output_folder,
false};
if (!processing_items.initialise()) {
return EXIT_FAILURE;
}

const auto& all_files = processing_items.get();
spdlog::info("num files: {}", all_files.size());
spdlog::info("num input files: {}", all_files.size());

threads = threads == 0 ? std::thread::hardware_concurrency() : threads;
// The input thread is the total number of threads to use for dorado
Expand All @@ -177,14 +178,14 @@ int aligner(int argc, char* argv[]) {
cli::worker_vs_writer_thread_allocation(threads, 0.1f);
spdlog::debug("> aligner threads {}, writer threads {}", aligner_threads, writer_threads);

// Only allow `reads` to be empty if we're accepting input from a pipe
if (reads.empty()) {
#ifndef _WIN32
if (isatty(fileno(stdin))) {
std::cout << parser.visible << std::endl;
return 1;
}
#endif
reads = "-";
}

auto index_file_access = load_index(index, options, aligner_threads);
Expand Down
51 changes: 42 additions & 9 deletions dorado/cli/demux.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#include "Version.h"
#include "alignment/alignment_processing_items.h"
#include "cli/cli_utils.h"
#include "read_pipeline/BarcodeClassifierNode.h"
#include "read_pipeline/BarcodeDemuxerNode.h"
#include "read_pipeline/HtsReader.h"
#include "read_pipeline/ProgressTracker.h"
#include "summary/summary.h"
#include "utils/SampleSheet.h"
#include "utils/bam_utils.h"
#include "utils/barcode_kits.h"
#include "utils/basecaller_utils.h"
#include "utils/log_utils.h"
Expand Down Expand Up @@ -41,9 +43,18 @@ int demuxer(int argc, char* argv[]) {
argparse::ArgumentParser parser("dorado", DORADO_VERSION, argparse::default_arguments::help);
parser.add_description("Barcode demultiplexing tool. Users need to specify the kit name(s).");
parser.add_argument("reads")
.help("Path to a file with reads to demultiplex. Can be in any HTS format.")
.nargs(argparse::nargs_pattern::any);
parser.add_argument("--output-dir").help("Output folder for demultiplexed reads.").required();
.help("An input file or the folder containing input file(s) (any HTS format).")
.nargs(argparse::nargs_pattern::optional)
.default_value(std::string{});
parser.add_argument("-r", "--recursive")
.help("If the 'reads' positional argument is a folder any subfolders will also be "
"searched for input files.")
.default_value(false)
.implicit_value(true)
.nargs(0);
parser.add_argument("-o", "--output-dir")
.help("Output folder for demultiplexed reads.")
.required();
parser.add_argument("--kit-name")
.help("Barcoding kit name. Cannot be used with --no-classify. Choose "
"from: " +
Expand Down Expand Up @@ -121,12 +132,20 @@ int demuxer(int argc, char* argv[]) {
utils::SetVerboseLogging(static_cast<dorado::utils::VerboseLogLevel>(verbosity));
}

auto reads(parser.get<std::vector<std::string>>("reads"));
auto reads(parser.get<std::string>("reads"));
auto recursive_input(parser.get<bool>("recursive"));
auto output_dir(parser.get<std::string>("output-dir"));
auto emit_summary = parser.get<bool>("emit-summary");
auto threads(parser.get<int>("threads"));
auto max_reads(parser.get<int>("max-reads"));

alignment::AlignmentProcessingItems processing_items{reads, recursive_input, output_dir, true};
if (!processing_items.initialise()) {
return EXIT_FAILURE;
}
const auto& all_files = processing_items.get();
spdlog::info("num input files: {}", all_files.size());

threads = threads == 0 ? std::thread::hardware_concurrency() : threads;
// The input thread is the total number of threads to use for dorado
// barcoding. Heuristically use 10% of threads for BAM generation and
Expand All @@ -147,21 +166,29 @@ int demuxer(int argc, char* argv[]) {

auto read_list = utils::load_read_list(parser.get<std::string>("--read-ids"));

// Only allow `reads` to be empty if we're accepting input from a pipe
if (reads.empty()) {
#ifndef _WIN32
if (isatty(fileno(stdin))) {
std::cout << parser << std::endl;
return 1;
}
#endif
reads.push_back("-");
} else if (reads.size() > 1) {
spdlog::error("> multi file input not yet handled");
return 1;
}

HtsReader reader(reads[0], read_list);
HtsReader reader(all_files[0].input, read_list);
auto header = SamHdrPtr(sam_hdr_dup(reader.header));

// Fold in the headers from all the other files in the input list.
for (size_t input_idx = 1; input_idx < all_files.size(); input_idx++) {
HtsReader header_reader(all_files[input_idx].input, read_list);
std::string error_msg;
if (!utils::sam_hdr_merge(header.get(), header_reader.header, error_msg)) {
spdlog::error("Unable to combine headers from all input files: " + error_msg);
std::exit(EXIT_FAILURE);
}
}

add_pg_hdr(header.get());

auto barcode_sample_sheet = parser.get<std::string>("--sample-sheet");
Expand Down Expand Up @@ -215,6 +242,12 @@ int demuxer(int argc, char* argv[]) {
spdlog::info("> starting barcode demuxing");
reader.read(*pipeline, max_reads);

// Barcode all the other files passed in
for (size_t input_idx = 1; input_idx < all_files.size(); input_idx++) {
HtsReader input_reader(all_files[input_idx].input, read_list);
input_reader.read(*pipeline, max_reads);
}

// Wait for the pipeline to complete. When it does, we collect
// final stats to allow accurate summarisation.
auto final_stats = pipeline->terminate(DefaultFlushOptions());
Expand Down
80 changes: 80 additions & 0 deletions dorado/utils/bam_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,86 @@ void add_sq_hdr(sam_hdr_t* hdr, const sq_t& seqs) {
}
}

bool sam_hdr_merge(sam_hdr_t* dest_header, sam_hdr_t* source_header, std::string& error_msg) {
auto get_pg_id = [](std::string& str) {
size_t start_pos = str.find("ID:");
size_t end_pos = str.find("\t", start_pos);
return end_pos == std::string::npos ? str.substr(start_pos)
: str.substr(start_pos, end_pos - start_pos);
};

// Gather information about the target header.
std::set<std::string> dest_lines;
std::vector<std::string> dest_references;
std::map<std::string, std::string> dest_programs;
auto dest_stream = std::stringstream{sam_hdr_str(dest_header)};
for (std::string header_line; std::getline(dest_stream, header_line);) {
dest_lines.insert(header_line);
std::string_view header_type = std::string_view(header_line).substr(0, 3);
if (header_type == "@SQ") {
dest_references.push_back(header_line);
continue;
}
if (header_type == "@PG") {
std::string ID = get_pg_id(header_line);
dest_programs[ID] = header_line;
continue;
}
}

// Parse the source header to check if it's compatible with the destination header.
std::vector<std::string> source_references;
std::map<std::string, std::string> source_programs;
const char* source_header_cstr = sam_hdr_str(source_header);
// If the source file has no header, simply return true.
if (!source_header_cstr) {
return true;
}
auto source_stream = std::stringstream{sam_hdr_str(source_header)};
for (std::string header_line; std::getline(source_stream, header_line);) {
std::string_view header_type = std::string_view(header_line).substr(0, 3);
if (header_type == "@SQ") {
source_references.push_back(header_line);
continue;
}
if (header_type == "@PG") {
std::string ID = get_pg_id(header_line);
source_programs[ID] = header_line;
continue;
}
}

if (source_references != dest_references) {
error_msg = "Could not merge BAM headers as @SQ lines are not equal.";
return false;
}

for (auto& source_program : source_programs) {
if (dest_programs.find(source_program.first) != dest_programs.end() &&
dest_programs[source_program.first] != source_program.second) {
error_msg = "Could not merge BAM headers as @PG lines for " + source_program.first +
" are not equal.";
return false;
}
}

// Now we've validated that the headers are compatible, we can proceed with the copy across.
sam_hdr_update_line(dest_header, "HD", NULL, NULL, "SO", "unknown", NULL);
source_stream = std::stringstream{sam_hdr_str(source_header)};
for (std::string header_line; std::getline(source_stream, header_line);) {
std::string_view header_type = std::string_view(header_line).substr(0, 3);
if (header_type == "@HD" || header_type == "@SQ") {
// Don't copy these across - they are already there
continue;
}
if (dest_lines.find(header_line) == dest_lines.end()) {
sam_hdr_add_lines(dest_header, header_line.c_str(), 0);
}
}

return true;
}

std::map<std::string, std::string> get_read_group_info(sam_hdr_t* header, const char* key) {
if (header == nullptr) {
throw std::invalid_argument("header cannot be nullptr");
Expand Down
15 changes: 15 additions & 0 deletions dorado/utils/bam_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,21 @@ void add_rg_hdr(sam_hdr_t* hdr,

void add_sq_hdr(sam_hdr_t* hdr, const sq_t& seqs);

/**
* @brief Merges lines from source_header into dest_header if they aren't already present.
*
* This function will not copy any HD header lines from the source_header to the destination, even if they mismatch
* existing HD lines in the destination. This function will not merge headers if there are mismatching PG records,
* or if there are mismatching SQ records in the header.
*
* @param dest_header A pointer to a valid sam_hdr_t object which will be modified.
* @param source_header A pointer to a valid sam_hdr_t object which will have all it's lines copied into dest_header
* if they are not already present.
* @param error_msg A reference to a string to fill in with an error message if merge could not be performed.
* @return true if the merge happened, false if it did not. in the case false it returned, error_msg will contain more info.
*/
bool sam_hdr_merge(sam_hdr_t* dest_header, sam_hdr_t* source_header, std::string& error_msg);

/**
* @brief Retrieves read group information from a SAM/BAM/CRAM file header based on a specified key.
*
Expand Down

0 comments on commit c459890

Please sign in to comment.