Skip to content

Commit

Permalink
Merge branch 'INSTX-2570_duplex_depth_control' into 'master'
Browse files Browse the repository at this point in the history
INSTX-2570 Expose the depth of the duplex cache at the pipeline construction

See merge request machine-learning/dorado!660
  • Loading branch information
MarkBicknellONT committed Oct 25, 2023
2 parents 5028d95 + e13fd42 commit f6bf232
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 19 deletions.
3 changes: 2 additions & 1 deletion dorado/cli/duplex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,8 @@ int duplex(int argc, char* argv[]) {

PairingParameters pairing_parameters;
if (template_complement_map.empty()) {
pairing_parameters = ReadOrder::BY_CHANNEL;
pairing_parameters =
DuplexPairingParameters{ReadOrder::BY_CHANNEL, DEFAULT_DUPLEX_CACHE_DEPTH};
} else {
pairing_parameters = std::move(template_complement_map);
}
Expand Down
14 changes: 9 additions & 5 deletions dorado/read_pipeline/PairingNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -454,21 +454,25 @@ PairingNode::PairingNode(std::map<std::string, std::string> template_complement_
start_threads();
}

PairingNode::PairingNode(ReadOrder read_order, int num_worker_threads, size_t max_reads)
PairingNode::PairingNode(DuplexPairingParameters pairing_params,
int num_worker_threads,
size_t max_reads)
: MessageSink(max_reads),
m_num_worker_threads(num_worker_threads),
m_max_num_keys(std::numeric_limits<size_t>::max()),
m_max_num_reads(std::numeric_limits<size_t>::max()) {
switch (read_order) {
switch (pairing_params.read_order) {
case ReadOrder::BY_CHANNEL:
m_max_num_keys = 10;
m_max_num_keys = pairing_params.cache_depth;
spdlog::debug("Using dorado duplex channel count of {}", m_max_num_keys);
break;
case ReadOrder::BY_TIME:
m_max_num_reads = 10;
m_max_num_reads = pairing_params.cache_depth;
spdlog::debug("Using dorado duplex read-per-channel count of {}", m_max_num_reads);
break;
default:
throw std::runtime_error("Unsupported read order detected: " +
dorado::to_string(read_order));
dorado::to_string(pairing_params.read_order));
}
m_pairing_func = &PairingNode::pair_generating_worker_thread;
start_threads();
Expand Down
6 changes: 3 additions & 3 deletions dorado/read_pipeline/PairingNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ class PairingNode : public MessageSink {
public:
// Template-complement map: uses the pair_list pairing method
PairingNode(std::map<std::string, std::string> template_complement_map,
int num_worker_threads = 2,
size_t max_reads = 1000);
int num_worker_threads,
size_t max_reads);

// No template-complement map: uses the pair_generation pairing method
PairingNode(ReadOrder read_order, int num_worker_threads = 2, size_t max_reads = 1000);
PairingNode(DuplexPairingParameters pairing_params, int num_worker_threads, size_t max_reads);
~PairingNode() { terminate_impl(); }
std::string get_name() const override { return "PairingNode"; }
stats::NamedStats sample_stats() const override;
Expand Down
14 changes: 8 additions & 6 deletions dorado/read_pipeline/Pipelines.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,15 @@ void create_stereo_duplex_pipeline(PipelineDescriptor& pipeline_desc,
simplex_model_stride);

auto pairing_node =
std::holds_alternative<ReadOrder>(pairing_parameters)
? pipeline_desc.add_node<PairingNode>({stereo_node},
std::get<ReadOrder>(pairing_parameters),
std::thread::hardware_concurrency())
std::holds_alternative<DuplexPairingParameters>(pairing_parameters)
? pipeline_desc.add_node<PairingNode>(
{stereo_node}, std::get<DuplexPairingParameters>(pairing_parameters),
std::thread::hardware_concurrency(), 1000)
: pipeline_desc.add_node<PairingNode>(
{stereo_node}, std::move(std::get<std::map<std::string, std::string>>(
pairing_parameters)));
{stereo_node},
std::move(std::get<std::map<std::string, std::string>>(
pairing_parameters)),
2, 1000);

// Create a duplex split node with the given settings and number of devices.
// If splitter_settings.enabled is set to false, the splitter node will act
Expand Down
2 changes: 1 addition & 1 deletion dorado/read_pipeline/Pipelines.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class ModBaseRunner;
class ModelRunnerBase;

using Runner = std::shared_ptr<ModelRunnerBase>;
using PairingParameters = std::variant<ReadOrder, std::map<std::string, std::string>>;
using PairingParameters = std::variant<DuplexPairingParameters, std::map<std::string, std::string>>;

namespace pipelines {

Expand Down
7 changes: 7 additions & 0 deletions dorado/utils/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ using SamHdrPtr = std::unique_ptr<sam_hdr_t, SamHdrDestructor>;

enum class ReadOrder { UNRESTRICTED, BY_CHANNEL, BY_TIME };

struct DuplexPairingParameters {
ReadOrder read_order;
size_t cache_depth;
};
/// Default cache depth to be used for the duplex pairing cache.
constexpr static size_t DEFAULT_DUPLEX_CACHE_DEPTH = 10;

inline std::string to_string(ReadOrder read_order) {
switch (read_order) {
case ReadOrder::UNRESTRICTED:
Expand Down
7 changes: 5 additions & 2 deletions tests/DuplexSplitTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,11 @@ TEST_CASE("4 subread split tagging", TEST_GROUP) {
auto tag_node = pipeline_desc.add_node<dorado::SubreadTaggerNode>({sink});
auto stereo_node = pipeline_desc.add_node<dorado::StereoDuplexEncoderNode>(
{tag_node}, read->read_common.model_stride);
auto pairing_node = pipeline_desc.add_node<dorado::PairingNode>({stereo_node},
dorado::ReadOrder::BY_CHANNEL);
auto pairing_node = pipeline_desc.add_node<dorado::PairingNode>(
{stereo_node},
dorado::DuplexPairingParameters{dorado::ReadOrder::BY_CHANNEL,
dorado::DEFAULT_DUPLEX_CACHE_DEPTH},
2, 1000);
auto splitter_node = pipeline_desc.add_node<dorado::DuplexSplitNode>(
{pairing_node}, dorado::DuplexSplitSettings{}, 1);
auto pipeline = dorado::Pipeline::create(std::move(pipeline_desc));
Expand Down
5 changes: 4 additions & 1 deletion tests/PairingNodeTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ TEST_CASE("Split read pairing", TEST_GROUP) {
auto sink = pipeline_desc.add_node<MessageSinkToVector>({}, 5, messages);
// one thread, one read - force reads through in order
auto pairing_node = pipeline_desc.add_node<dorado::PairingNode>(
{sink}, dorado::ReadOrder::BY_CHANNEL, 1, 1);
{sink},
dorado::DuplexPairingParameters{dorado::ReadOrder::BY_CHANNEL,
dorado::DEFAULT_DUPLEX_CACHE_DEPTH},
1, 1);
auto pipeline = dorado::Pipeline::create(std::move(pipeline_desc));

for (auto& read : reads) {
Expand Down

0 comments on commit f6bf232

Please sign in to comment.