Skip to content
9 changes: 8 additions & 1 deletion src/analysis/bsrate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ main_bsrate(int argc, const char **argv) {
bool VERBOSE = false;
bool INCLUDE_CPGS = false;
bool reads_are_a_rich = false;
size_t n_threads = 1;

string chroms_file;
string outfile;
Expand All @@ -249,6 +250,7 @@ main_bsrate(int argc, const char **argv) {
seq_to_use);
opt_parse.add_opt("a-rich", 'A', "reads are A-rich", false,
reads_are_a_rich);
opt_parse.add_opt("threads", 't', "number of threads", false, n_threads);
opt_parse.add_opt("verbose", 'v', "print more run info", false, VERBOSE);
vector<string> leftover_args;
opt_parse.parse(argc, argv, leftover_args);
Expand Down Expand Up @@ -282,11 +284,16 @@ main_bsrate(int argc, const char **argv) {
if (VERBOSE)
cerr << "[n chroms in reference: " << chroms.size() << "]" << endl;

bam_tpool tp(n_threads);

bam_infile hts(bam_file);
if (!hts) throw dnmt_error("failed to open input file: " + bam_file);
bam_header hdr(hts);
if (!hdr) throw dnmt_error("failed to read header");

if (n_threads > 1)
tp.set_io(hts);

// map the bam header index for each "target" to a sequence in the
// reference genome
unordered_map<int32_t, size_t> chrom_lookup;
Expand All @@ -312,7 +319,7 @@ main_bsrate(int argc, const char **argv) {
bam_rec aln;
unordered_set<int32_t> chroms_seen;

while (hts.read(hdr, aln)) { // sam_reader >> aln) {
while (hts.read(hdr, aln)) {

if (reads_are_a_rich) flip_conversion(aln);

Expand Down
92 changes: 31 additions & 61 deletions src/analysis/methcounts.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@
// #include "MSite.hpp"
#include "bsutils.hpp"
#include "dnmt_error.hpp"
#include "bam_record_utils.hpp"

/* HTSlib */
#include <htslib/sam.h>
#include <htslib/bgzf.h>
#include <htslib/thread_pool.h>


using std::string;
using std::vector;
using std::cerr;
Expand Down Expand Up @@ -239,7 +241,7 @@ tag_with_mut(const uint32_t tag, const bool mut) {


static void
write_output(sam_hdr_t *hdr, BGZF *out,
write_output(const bam_header &hdr, bam_bgzf &out,
const int32_t tid, const string &chrom,
const vector<CountSet> &counts, bool CPG_ONLY) {

Expand Down Expand Up @@ -268,10 +270,8 @@ write_output(sam_hdr_t *hdr, BGZF *out,
<< (n_reads > 0 ? unconverted/n_reads : 0.0) << '\t'
<< n_reads << '\n';
const size_t expected_size = buf.tellp();
const ssize_t err_code = bgzf_write(out, buf.c_str(), expected_size);
if (err_code < 0 ||
static_cast<size_t>(err_code) != expected_size)
throw dnmt_error(err_code, "error writing output");
if (out.write(buf.c_str(), expected_size))
throw dnmt_error("error writing output");
}
}
}
Expand All @@ -284,38 +284,25 @@ write_output(sam_hdr_t *hdr, BGZF *out,
// #define get_qlen(b) ((b)->core.l_qseq)


static inline int32_t
get_tid(const bam1_t *b) {return b->core.tid;}


static inline hts_pos_t
get_rlen(const bam1_t *b) {
return bam_cigar2rlen(b->core.n_cigar, bam_get_cigar(b));
}


static inline hts_pos_t
get_pos(const bam1_t *b) {return b->core.pos;}


static inline int32_t
get_qlen(const bam1_t *b) {return b->core.l_qseq;}


static void
count_states_pos(const bam1_t *aln, vector<CountSet> &counts) {
count_states_pos(const bam_rec &aln, vector<CountSet> &counts) {
/* Move through cigar, reference and read positions without
inflating cigar or read sequence */
const auto seq = bam_get_seq(aln);
const auto beg_cig = bam_get_cigar(aln);
const auto end_cig = beg_cig + aln->core.n_cigar;
size_t rpos = get_pos(aln);
int32_t qpos = 0; // to match type with b->core.l_qseq
const auto end_cig = beg_cig + get_n_cigar(aln);
auto rpos = get_pos(aln);
auto qpos = 0; // to match type with b->core.l_qseq
for (auto c_itr = beg_cig; c_itr != end_cig; ++c_itr) {
const char op = bam_cigar_op(*c_itr);
const uint32_t n = bam_cigar_oplen(*c_itr);
if (eats_ref(op) && eats_query(op)) {
const int32_t end_qpos = qpos + n;
const decltype(qpos) end_qpos = qpos + n;
for (; qpos < end_qpos; ++qpos) {
// ADS: beware!!! bam_seqi is a macro, so no "qpos++" inside
// its arguments! Why macros?!?! Just make sure the compiler
Expand All @@ -332,26 +319,26 @@ count_states_pos(const bam1_t *aln, vector<CountSet> &counts) {
}
// ADS: somehow previous code included a correction for rpos going
// past the end of the chromosome; this should result at least in a
// soft-clip by any mapper. I'm not allowing it here now as I don't
// see how it can be legit.
assert(qpos == get_qlen(aln) && rpos <= counts.size());
// soft-clip by any mapper. I'm not checking it here as even if it
// happens I don't want to terminate.
assert(qpos == get_l_qseq(aln));
}


static void
count_states_neg(const bam1_t *aln, vector<CountSet> &counts) {
count_states_neg(const bam_rec &aln, vector<CountSet> &counts) {
/* Move through cigar, reference and (*backward*) through read
positions without inflating cigar or read sequence */
const auto seq = bam_get_seq(aln);
const auto beg_cig = bam_get_cigar(aln);
const auto end_cig = beg_cig + aln->core.n_cigar;
const auto end_cig = beg_cig + get_n_cigar(aln);
size_t rpos = get_pos(aln);
int32_t qpos = get_qlen(aln); // to match type with b->core.l_qseq
size_t qpos = get_l_qseq(aln); // to match type with b->core.l_qseq
for (auto c_itr = beg_cig; c_itr != end_cig; ++c_itr) {
const char op = bam_cigar_op(*c_itr);
const uint32_t n = bam_cigar_oplen(*c_itr);
if (eats_ref(op) && eats_query(op)) {
const int32_t end_qpos = qpos - n; // to match type with qpos
const size_t end_qpos = qpos - n; // to match type with qpos
for (; qpos > end_qpos; --qpos) // beware ++ in macro below!!!
counts[rpos++].add_count_neg(seq_nt16_str[bam_seqi(seq, qpos-1)]);
}
Expand All @@ -363,9 +350,8 @@ count_states_neg(const bam1_t *aln, vector<CountSet> &counts) {
}
}
/* qpos is unsigned; would wrap around if < 0 */
// ADS: Same as count_states_pos; see comment there; not allowing
// rpos to go past the end of the chromosome.
assert(qpos <= get_qlen(aln) && rpos <= counts.size());
// ADS: Same as count_states_pos; see comment there
assert(qpos == 0);
}


Expand All @@ -375,8 +361,6 @@ process_reads(const bool VERBOSE,
const string &infile, const string &outfile,
const string &chroms_file, const bool CPG_ONLY) {

int err_code = 0;

vector<string> chroms, names;
read_fasta_file_short_names(chroms_file, names, chroms);
for (auto &&i: chroms)
Expand All @@ -392,55 +376,50 @@ process_reads(const bool VERBOSE,
chrom_sizes[i] = chroms[i].size();
}

bam_tpool tp(n_threads); // Must be destroyed after hts


// open the hts SAM/BAM input file and get the header
samFile *hts = hts_open(infile.c_str(), "r");
bam_infile hts(infile);
if (!hts) throw dnmt_error("failed to open input file");
// load the input file's header
sam_hdr_t *hdr = sam_hdr_read(hts);
bam_header hdr(hts);
if (!hdr) throw dnmt_error("failed to read header");

unordered_map<int32_t, size_t> tid_to_idx;
for (int32_t i = 0; i < hdr->n_targets; ++i) {
for (int32_t i = 0; i < hdr.h->n_targets; ++i) {
// "curr_name" gives a "tid_to_name" mapping allowing to jump
// through "name_to_idx" and get "tid_to_idx"
const string curr_name(hdr->target_name[i]);
const string curr_name(hdr.h->target_name[i]);
const auto name_itr(name_to_idx.find(curr_name));
if (name_itr == end(name_to_idx))
throw dnmt_error("failed to find chrom: " + curr_name);
tid_to_idx[i] = name_itr->second;
}
//// ADS: really should cross-check the chromosome sizes
// copy(begin(chrom_sizes), end(chrom_sizes),
// std::ostream_iterator<size_t>(cout, "\n"));

// open the output file
const string output_mode = compress_output ? "w" : "wu";
BGZF *out = bgzf_open(outfile.c_str(), output_mode.c_str());
bam_bgzf out(outfile, output_mode);
if (!out) throw dnmt_error("error opening output file: " + outfile);

/* set the threads for the input file decompression */
htsThreadPool tp;
if (n_threads > 1) {
tp = {hts_tpool_init(n_threads - 1), 0};
err_code = hts_set_thread_pool(hts, &tp);
if (err_code < 0) throw dnmt_error(err_code, "error setting threads");

/* set threads for the output file compression */
err_code = bgzf_thread_pool(out, tp.pool, tp.qsize);
if (err_code) throw dnmt_error(err_code, "error setting bgzf threads");
tp.set_io(hts);
tp.set_bgzf(out);
}

/* now iterate over the reads, switching chromosomes and writing
output as needed */
bam1_t *aln = bam_init1();
bam_rec aln;
int32_t prev_tid = -1;

// this is where all the counts are accumulated
vector<CountSet> counts;

unordered_set<int32_t> chroms_seen;
vector<string>::const_iterator chrom_itr;
while ((err_code = sam_read1(hts, hdr, aln)) >= 0) {
while (hts.read(hdr, aln)) {

// if chrom changes, output results, get the next one
const int32_t tid = get_tid(aln);
Expand Down Expand Up @@ -481,15 +460,6 @@ process_reads(const bool VERBOSE,
if (!counts.empty())
write_output(hdr, out, prev_tid, *chrom_itr, counts, CPG_ONLY);

// turn off the lights
bam_destroy1(aln);
bam_hdr_destroy(hdr);
err_code = hts_close(hts);
if (err_code < 0) throw dnmt_error(err_code, "process_reads:hts_close");
err_code = bgzf_close(out);
if (err_code) throw dnmt_error(err_code, "process_reads:bgzf_close");
// do this after the files have been closed
if (n_threads > 1) hts_tpool_destroy(tp.pool);
}


Expand Down
24 changes: 22 additions & 2 deletions src/common/bam_record.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
#include <stdexcept>
#include <string>

#include <htslib/sam.h> // from HTSlib
#include <htslib/thread_pool.h> // from HTSlib
// from HTSlib
#include <htslib/sam.h>
#include <htslib/bgzf.h>
#include <htslib/thread_pool.h>

struct bam_rec {
bam_rec() : b{bam_init1()} {}
Expand All @@ -58,6 +60,7 @@ struct bam_infile {
samFile *f{};
};


struct bam_header {
bam_header() = default;
bam_header(const bam_header &rhs) : h{bam_hdr_dup(rhs.h)} {}
Expand Down Expand Up @@ -85,13 +88,30 @@ struct bam_outfile {
htsFile *f{};
};

struct bam_bgzf {
bam_bgzf(const std::string &fn, const std::string &mode) :
f{bgzf_open(fn.c_str(), mode.c_str())} {}
~bam_bgzf() { if (f != nullptr) bgzf_close(f); }
operator bool() const { return f != nullptr; }
bool write(const char* const str, const size_t expected_size) {
ssize_t res = bgzf_write(f, str, expected_size) >= 0;
return (res >= 0 &&
static_cast<size_t>(res) == expected_size);
}
BGZF *f{};
};

struct bam_tpool{
bam_tpool(const size_t n_threads) : tpool{hts_tpool_init(n_threads), 0} {}
~bam_tpool() { hts_tpool_destroy(tpool.pool); }
template<class T> void set_io(const T &bam_file) {
const int ret = hts_set_thread_pool(bam_file.f, &tpool);
if (ret < 0) throw std::runtime_error("failed to set thread pool");
}
void set_bgzf(const bam_bgzf &bgzf) {
const int ret = bgzf_thread_pool(bgzf.f, tpool.pool, tpool.qsize);
if (ret < 0) throw std::runtime_error("failed to set thread pool");
}
htsThreadPool tpool{};
};

Expand Down
8 changes: 8 additions & 0 deletions src/common/bam_record_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,4 +217,12 @@ bam_aux_update_int(bam_rec &b, const char tag[2], T val) {
return bam_aux_update_int(b.b, tag, val);
}

inline std::string
sam_hdr_tid2name(const bam_header &hdr, const int32_t tid) {
return std::string(sam_hdr_tid2name(hdr.h, tid));
}




#endif
Loading