Skip to content

Commit

Permalink
Use BufferedReadWriter in BAMRealigner
Browse files Browse the repository at this point in the history
Helps ensure sorted realigned BAMs
  • Loading branch information
Daniel Cooke committed Jan 26, 2019
1 parent 97f2d41 commit 785bfa2
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 31 deletions.
2 changes: 2 additions & 0 deletions src/core/octopus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1553,6 +1553,7 @@ void run_bam_realign(GenomeCallingComponents& components)
components.read_manager().close();
BAMRealigner::Config config {};
config.copy_hom_ref_reads = components.write_full_bamouts();
config.max_buffer = components.read_buffer_footprint();
if (components.read_manager().paths().size() == 1) {
realign(components.read_manager().paths().front(), get_bam_realignment_vcf(components),
*components.bamout(), components.reference(), config);
Expand Down Expand Up @@ -1590,6 +1591,7 @@ void run_bam_realign(GenomeCallingComponents& components)
components.read_manager().close();
BAMRealigner::Config config {};
config.copy_hom_ref_reads = components.write_full_bamouts();
config.max_buffer = components.read_buffer_footprint();
if (components.read_manager().paths().size() == 1) {
auto out_paths = get_haplotype_bam_paths(*components.split_bamout(), get_max_ploidy(components));
realign(components.read_manager().paths().front(), get_bam_realignment_vcf(components),
Expand Down
114 changes: 86 additions & 28 deletions src/core/tools/bam_realigner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "basics/cigar_string.hpp"
#include "io/variant/vcf_record.hpp"
#include "io/variant/vcf_header.hpp"
#include "io/read/buffered_read_writer.hpp"
#include "utils/genotype_reader.hpp"
#include "utils/append.hpp"
#include "utils/read_stats.hpp"
Expand Down Expand Up @@ -114,14 +115,19 @@ auto move_merge(Container&& src, std::vector<AlignedRead>& dst)

} // namespace

BAMRealigner::Report BAMRealigner::realign(ReadReader& src, VcfReader& variants, ReadWriter& dst,
const ReferenceGenome& reference, SampleList samples) const
BAMRealigner::Report
BAMRealigner::realign(ReadReader& src, VcfReader& variants, ReadWriter& dst,
const ReferenceGenome& reference, SampleList samples) const
{
io::BufferedReadWriter::Config writer_config {};
writer_config.max_buffer_footprint = config_.max_buffer;
io::BufferedReadWriter writer {dst, writer_config};
Report report {};
BatchList batch {};
boost::optional<GenomicRegion> batch_region {};
std::vector<AlignedRead> buffer {};
for (auto p = variants.iterate(); p.first != p.second; ) {
auto batch = read_next_batch(p.first, p.second, src, reference, samples, batch_region);
std::tie(batch, batch_region) = read_next_batch(p.first, p.second, src, reference, samples, batch_region);
for (auto& sample : batch) {
std::vector<AlignedRead> genotype_reads {}, realigned_reads {};
auto sample_reads_itr = std::begin(sample.reads);
Expand All @@ -138,7 +144,7 @@ BAMRealigner::Report BAMRealigner::realign(ReadReader& src, VcfReader& variants,
move_merge(realignments, realigned_reads);
}
move_merge(realigned_reads, sample.reads);
dst << sample.reads;
writer << sample.reads;
}
batch_region = encompassing_region(batch.front().genotypes);
}
Expand Down Expand Up @@ -188,18 +194,38 @@ auto split_and_realign(const std::vector<AlignedRead>& reads, const Genotype<Hap
return result;
}

void move_merge(std::vector<std::vector<AlignedRead>>& src, std::vector<std::vector<AlignedRead>>& dst)
{
if (dst.empty()) {
dst = std::move(src);
} else {
if (src.size() > dst.size()) dst.resize(src.size());
for (std::size_t i {0}; i < src.size(); ++i) {
move_merge(std::move(src[i]), dst[i]);
}
}
}

} // namespace

BAMRealigner::Report BAMRealigner::realign(ReadReader& src, VcfReader& variants, std::vector<ReadWriter>& dsts,
const ReferenceGenome& reference, SampleList samples) const
BAMRealigner::Report
BAMRealigner::realign(ReadReader& src, VcfReader& variants, std::vector<ReadWriter>& dsts,
const ReferenceGenome& reference, SampleList samples) const
{
if (dsts.size() == 1) return realign(src, variants, dsts.front(), reference, samples);
io::BufferedReadWriter::Config writer_config {};
writer_config.max_buffer_footprint = config_.max_buffer.bytes() / dsts.size();
std::vector<io::BufferedReadWriter> writers {};
writers.reserve(dsts.size());
for (auto& dst : dsts) writers.emplace_back(dst, writer_config);
Report report {};
BatchList batch {};
boost::optional<GenomicRegion> batch_region {};
for (auto p = variants.iterate(); p.first != p.second; ) {
auto batch = read_next_batch(p.first, p.second, src, reference, samples, batch_region);
std::tie(batch, batch_region) = read_next_batch(p.first, p.second, src, reference, samples, batch_region);
for (auto& sample : batch) {
std::vector<AlignedRead> genotype_reads {}, unassigned_realigned_reads {};
std::vector<std::vector<AlignedRead>> assigned_realigned_reads {};
auto sample_reads_itr = std::begin(sample.reads);
for (const auto& genotype : sample.genotypes) {
const auto overlapped_reads = bases(overlap_range(sample_reads_itr, std::end(sample.reads), genotype));
Expand All @@ -210,16 +236,16 @@ BAMRealigner::Report BAMRealigner::realign(ReadReader& src, VcfReader& variants,
auto realignments = split_and_realign(genotype_reads, genotype, report);
report.n_reads_unassigned += bad_reads.size();
move_merge(bad_reads, realignments.back());
assert(realignments.size() <= dsts.size());
for (unsigned i {0}; i < realignments.size() - 1; ++i) {
dsts[i] << realignments[i];
}
move_merge(realignments.back(), unassigned_realigned_reads); // end is always unassigned, but ploidy can change
realignments.pop_back();
move_merge(realignments, assigned_realigned_reads);
}
move_merge(unassigned_realigned_reads, sample.reads);
dsts.back() << sample.reads;
for (unsigned i {0}; i < assigned_realigned_reads.size(); ++i) {
writers[i] << assigned_realigned_reads[i];
}
writers.back() << sample.reads;
}
batch_region = encompassing_region(batch.front().genotypes);
}
return report;
}
Expand Down Expand Up @@ -302,6 +328,22 @@ BAMRealigner::read_next_block(VcfIterator& first, const VcfIterator& last, const

namespace {

void sort(io::ReadReader::SampleReadMap& reads)
{
for (auto& p : reads) std::sort(std::begin(p.second), std::end(p.second));
}

void filter_primary(std::vector<AlignedRead>& reads)
{
auto itr = std::remove_if(std::begin(reads), std::end(reads), [&] (const auto& read) { return !is_primary_alignment(read); });
reads.erase(itr, std::end(reads));
}

void filter_primary(io::ReadReader::SampleReadMap& reads)
{
for (auto& p : reads) filter_primary(p.second);
}

void erase_overlapped(std::vector<AlignedRead>& reads, const GenomicRegion& region)
{
auto itr = std::remove_if(std::begin(reads), std::end(reads), [&] (const auto& read) { return overlaps(read, region); });
Expand All @@ -310,37 +352,53 @@ void erase_overlapped(std::vector<AlignedRead>& reads, const GenomicRegion& regi

} // namespace

BAMRealigner::BatchList
BAMRealigner::BatchListRegionPair
BAMRealigner::read_next_batch(VcfIterator& first, const VcfIterator& last, ReadReader& src,
const ReferenceGenome& reference, const SampleList& samples,
const boost::optional<GenomicRegion>& prev_batch_region) const
{
const auto records = read_next_block(first, last, samples);
BatchList result {};
BatchList batches {};
boost::optional<GenomicRegion> batch_region {};
if (!records.empty()) {
auto genotypes = extract_genotypes(records, samples, reference);
result.reserve(samples.size());
auto batch_region = encompassing_region(records);
batches.reserve(samples.size());
batch_region = encompassing_region(records);
if (config_.copy_hom_ref_reads) {
if (prev_batch_region && is_same_contig(batch_region, *prev_batch_region)) {
batch_region = expand_lhs(batch_region, intervening_region_size(*prev_batch_region, batch_region));
if (prev_batch_region && is_same_contig(*batch_region, *prev_batch_region)) {
batch_region = right_overhang_region(*batch_region, *prev_batch_region);
if (first != last && is_same_contig(*batch_region, *first)) {
batch_region = expand_rhs(*batch_region, intervening_region_size(*batch_region, *first) / 2);
} else {
batch_region = closed_region(*batch_region, reference.contig_region(prev_batch_region->contig_name()));
}
} else {
batch_region = expand_lhs(batch_region, batch_region.begin());
if (first != last && is_same_contig(*batch_region, *first)) {
batch_region = expand(*batch_region, batch_region->begin(), intervening_region_size(*batch_region, *first) / 2);
} else {
batch_region = expand_lhs(*batch_region, batch_region->begin());
}
}
}
auto reads = src.fetch_reads(samples, expand(batch_region, 1));
auto reads = src.fetch_reads(samples, expand(*batch_region, 1)); // Pad for soft clipped reads
sort(reads);
if (config_.primary_only) {
filter_primary(reads);
}
boost::optional<GenomicRegion> reads_region {};
if (has_coverage(reads)) reads_region = encompassing_region(reads);
for (const auto& sample : samples) {
auto& sample_genotypes = genotypes[sample];
if (prev_batch_region) {
erase_overlapped(reads[sample], *prev_batch_region);
erase_overlapped(reads[sample], expand_rhs(*prev_batch_region, 1));
}
result.push_back({std::move(sample_genotypes), std::move(reads[sample])});
batches.push_back({std::move(sample_genotypes), std::move(reads[sample])});
}
if (first != last && reads_region && overlaps(*first, *reads_region)) {
auto next_batch = read_next_batch(first, last, src, reference, samples, batch_region);
merge(next_batch, result);
auto p = read_next_batch(first, last, src, reference, samples, batch_region);
merge(p.first, batches);
assert(p.second);
batch_region = closed_region(*batch_region, *p.second);
}
} else if (prev_batch_region && config_.copy_hom_ref_reads) {
const auto contig_region = reference.contig_region(prev_batch_region->contig_name());
Expand All @@ -349,11 +407,11 @@ BAMRealigner::read_next_batch(VcfIterator& first, const VcfIterator& last, ReadR
auto reads = src.fetch_reads(samples, reads_region);
for (const auto& sample : samples) {
erase_overlapped(reads[sample], *prev_batch_region);
result.push_back({{}, std::move(reads[sample])});
batches.push_back({{}, std::move(reads[sample])});
}
}
}
return result;
return {std::move(batches), std::move(batch_region)};
}

void BAMRealigner::merge(BatchList& src, BatchList& dst) const
Expand All @@ -362,7 +420,7 @@ void BAMRealigner::merge(BatchList& src, BatchList& dst) const
for (std::size_t i {0}; i < src.size(); ++i) {
dst[i].genotypes.insert(std::make_move_iterator(std::begin(src[i].genotypes)),
std::make_move_iterator(std::end(src[i].genotypes)));
utils::append(std::move(src[i].reads), dst[i].reads);
move_merge(std::move(src[i].reads), dst[i].reads);
}
}

Expand Down
10 changes: 7 additions & 3 deletions src/core/tools/bam_realigner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "io/read/read_reader.hpp"
#include "io/read/read_writer.hpp"
#include "io/variant/vcf_reader.hpp"
#include "utils/memory_footprint.hpp"
#include "utils/thread_pool.hpp"

namespace octopus {
Expand All @@ -31,8 +32,10 @@ class BAMRealigner

struct Config
{
bool primary_only = true;
bool copy_hom_ref_reads = false;
bool simplify_cigars = false;
MemoryFootprint max_buffer = *parse_footprint("50M");
boost::optional<unsigned> max_threads = 1;
};

Expand Down Expand Up @@ -71,14 +74,15 @@ class BAMRealigner
std::vector<AlignedRead> reads;
};
using BatchList = std::vector<Batch>;
using BatchListRegionPair = std::pair<BatchList, boost::optional<GenomicRegion>>;

Config config_;
mutable ThreadPool workers_;

CallBlock read_next_block(VcfIterator& first, const VcfIterator& last, const SampleList& samples) const;
BatchList read_next_batch(VcfIterator& first, const VcfIterator& last, ReadReader& src,
const ReferenceGenome& reference, const SampleList& samples,
const boost::optional<GenomicRegion>& prev_batch_region) const;
BatchListRegionPair read_next_batch(VcfIterator& first, const VcfIterator& last, ReadReader& src,
const ReferenceGenome& reference, const SampleList& samples,
const boost::optional<GenomicRegion>& prev_batch_region) const;
void merge(BatchList& src, BatchList& dst) const;
};

Expand Down

0 comments on commit 785bfa2

Please sign in to comment.