Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 73 additions & 31 deletions src/rna/dupradar/counting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -957,32 +957,68 @@ fn process_counting_record(
}
}

/// Knobs controlling how a chromosome batch is read and counted.
///
/// Holds scalars and the alignment file path that don't change between batches
/// of the same run. Each worker receives a shared reference and copies the
/// fields it needs (all are `Copy`).
#[derive(Clone, Copy)]
struct BatchConfig<'a> {
bam_path: &'a str,
reference: Option<&'a str>,
stranded: Strandedness,
paired: bool,
htslib_threads: usize,
}

/// Read-only resources shared across every chromosome batch worker:
/// pre-built indices, chromosome name mappings, and optional accumulator
/// configuration for RSeQC and Qualimap.
#[derive(Clone, Copy)]
struct BatchResources<'a> {
tid_to_name: &'a [String],
index: &'a HashMap<String, ChromIndex>,
num_genes: usize,
chrom_mapping: &'a HashMap<String, String>,
chrom_prefix: Option<&'a str>,
gene_to_biotype: &'a [u16],
num_biotypes: usize,
qualimap_index: Option<&'a crate::rna::qualimap::QualimapIndex>,
rseqc_config: Option<&'a RseqcConfig>,
rseqc_annotations: Option<&'a RseqcAnnotations<'a>>,
}

/// Process a batch of chromosomes from an alignment file, counting reads against gene annotations.
///
/// Opens its own indexed reader and seeks to each chromosome in the batch.
/// Returns accumulated results for all chromosomes in the batch, plus optional
/// RSeQC accumulator results collected during the same pass.
#[allow(clippy::too_many_arguments)]
fn process_chromosome_batch(
bam_path: &str,
tids: &[u32],
tid_to_name: &[String],
index: &HashMap<String, ChromIndex>,
num_genes: usize,
stranded: Strandedness,
paired: bool,
chrom_mapping: &HashMap<String, String>,
chrom_prefix: Option<&str>,
config: &BatchConfig,
resources: &BatchResources,
global_read_counter: &AtomicU64,
reference: Option<&str>,
rseqc_config: Option<&RseqcConfig>,
rseqc_annotations: Option<&RseqcAnnotations>,
htslib_threads: usize,
qualimap_index: Option<&crate::rna::qualimap::QualimapIndex>,
gene_to_biotype: &[u16],
num_biotypes: usize,
progress: Option<&ProgressBar>,
) -> Result<(ChromResult, Option<RseqcAccumulators>)> {
let BatchConfig {
bam_path,
reference,
stranded,
paired,
htslib_threads,
} = *config;
let BatchResources {
tid_to_name,
index,
num_genes,
chrom_mapping,
chrom_prefix,
gene_to_biotype,
num_biotypes,
qualimap_index,
rseqc_config,
rseqc_annotations,
} = *resources;
let mut result = ChromResult::new(num_genes, num_biotypes);
if qualimap_index.is_some() {
result.qualimap = Some(crate::rna::qualimap::QualimapAccum::new(stranded));
Expand Down Expand Up @@ -1280,28 +1316,34 @@ pub fn count_reads(
.unwrap_or(0);

// Process chromosome batches in parallel
let batch_config = BatchConfig {
bam_path,
reference,
stranded,
paired,
htslib_threads,
};
let batch_resources = BatchResources {
tid_to_name: &tid_to_name,
index: &index,
num_genes: interner.len(),
chrom_mapping,
chrom_prefix,
gene_to_biotype: &gene_to_biotype,
num_biotypes,
qualimap_index,
rseqc_config,
rseqc_annotations,
};
let results: Vec<Result<(ChromResult, Option<RseqcAccumulators>)>> = pool.install(|| {
batches
.par_iter()
.map(|batch| {
process_chromosome_batch(
bam_path,
batch,
&tid_to_name,
&index,
interner.len(),
stranded,
paired,
chrom_mapping,
chrom_prefix,
&batch_config,
&batch_resources,
&global_read_counter,
reference,
rseqc_config,
rseqc_annotations,
htslib_threads,
qualimap_index,
&gene_to_biotype,
num_biotypes,
progress,
)
})
Expand Down
Loading