Skip to content

Commit

Permalink
Refactored Fast5Reader, ReadAln, and Mapper stuff into single ReadBuffer
Browse files Browse the repository at this point in the history
  • Loading branch information
skovaka committed Apr 25, 2019
1 parent df6f60f commit 405e0c5
Show file tree
Hide file tree
Showing 14 changed files with 747 additions and 719 deletions.
3 changes: 0 additions & 3 deletions .gitmodules
Expand Up @@ -10,6 +10,3 @@
[submodule "pdqsort"]
path = pdqsort
url = https://github.com/orlp/pdqsort
[submodule "argparse"]
path = argparse
url = https://github.com/hbristow/argparse.git
1 change: 0 additions & 1 deletion argparse
Submodule argparse deleted from 43edb8
1 change: 1 addition & 0 deletions src/chunk.cpp
Expand Up @@ -54,6 +54,7 @@ Chunk::Chunk(const std::string &id, u16 channel, u32 number, u64 start_time,
if (raw_st + raw_len > raw_data.size()) raw_len = raw_data.size() - raw_st;
raw_data_.resize(raw_len);
for (u32 i = 0; i < raw_len; i++) raw_data_[i] = raw_data[raw_st+i];

}

Chunk::Chunk(const Chunk &c)
Expand Down
47 changes: 22 additions & 25 deletions src/chunk_pool.cpp
Expand Up @@ -34,7 +34,7 @@ ChunkPool::ChunkPool(const UncalledOpts &opts) {

//mappers_.reserve(nchannels);
channel_active_.reserve(opts.num_channels_);
read_buffer_.resize(opts.num_channels_);
chunk_buffer_.resize(opts.num_channels_);
buffer_queue_.reserve(opts.num_channels_);
for (u16 i = 0; i < opts.num_channels_; i++) {
mappers_.push_back(Mapper(opts));
Expand All @@ -52,13 +52,13 @@ ChunkPool::ChunkPool(const UncalledOpts &opts) {

void ChunkPool::buffer_chunk(Chunk &c) {
u16 ch = c.get_channel();
if (read_buffer_[ch].empty()) {
if (chunk_buffer_[ch].empty()) {
buffer_queue_.push_back(ch);
} else {
//TODO: handle backlog
read_buffer_[ch].clear();
chunk_buffer_[ch].clear();
}
read_buffer_[ch].swap(c);
chunk_buffer_[ch].swap(c);
}

//Add chunk to master buffer
Expand All @@ -77,7 +77,7 @@ bool ChunkPool::add_chunk(Chunk &c) {
//Previous alignment finished but mapper hasn't reset
//Happens if update hasn't been called yet
if (mappers_[ch].finished()) {
if (mappers_[ch].get_loc().get_number() != c.get_number()){
if (mappers_[ch].get_read().number_ != c.get_number()){
buffer_chunk(c);
}
return true;
Expand All @@ -87,9 +87,8 @@ bool ChunkPool::add_chunk(Chunk &c) {
if (mappers_[ch].get_state() == Mapper::State::INACTIVE) {
mappers_[ch].new_read(c);
active_queue_.push_back(ch);
}

if (mappers_[ch].swap_chunk(c)) {
return true;
} else if (mappers_[ch].swap_chunk(c)) {
return true;
}

Expand All @@ -102,11 +101,12 @@ void ChunkPool::end_read(u16 ch, u32 number) {
mappers_[ch].end_read(number);
}

std::vector<ReadLoc> ChunkPool::update() {
std::vector<MapResult> ChunkPool::update() {

std::vector< u16 > read_counts(threads_.size());
u16 active_count = 0;

std::vector<MapResult> ret;
//Get alignment outputs
//TODO: redo this
for (u16 t = 0; t < threads_.size(); t++) {
Expand All @@ -119,8 +119,9 @@ std::vector<ReadLoc> ChunkPool::update() {

//Loop over alignments
for (auto ch : out_chs_) {
//std::cout << "# popped " << ch << " " << t << "\n";
locs_.push_back(mappers_[ch].pop_loc());
ReadBuffer &r = mappers_[ch].get_read();
ret.emplace_back(r.channel_, r.number_, r.loc_);
mappers_[ch].deactivate();
}
out_chs_.clear();
}
Expand All @@ -134,26 +135,25 @@ std::vector<ReadLoc> ChunkPool::update() {
//std::cout.flush();

for (u16 i = buffer_queue_.size()-1; i < buffer_queue_.size(); i--) {
u16 ch = buffer_queue_[i];
Chunk &c = read_buffer_[ch];
u16 ch = buffer_queue_[i];//TODO: store chunks in queue
Chunk &c = chunk_buffer_[ch];

bool added;

if (mappers_[ch].get_state() == Mapper::State::INACTIVE) {
mappers_[ch].new_read(c);
active_queue_.push_back(ch);
}
added = true;
} else {
added = mappers_[ch].swap_chunk(c);
}

if (mappers_[ch].swap_chunk(read_buffer_[ch])) {
//std::cout << "# empty buffer\n";
if (added) {
if (i != buffer_queue_.size()-1) {
buffer_queue_[i] = buffer_queue_.back();
}
buffer_queue_.pop_back();
} //else {
//std::cout << "# failed to empty buffer "
//<< mappers_[ch].is_resetting() << " "
//<< read_buffer_[ch].raw_data.size() << " "
//<< read_buffer_[ch].id << "\n";
//}
}
}

//Estimate how much to fill each thread
Expand All @@ -179,9 +179,6 @@ std::vector<ReadLoc> ChunkPool::update() {
}
}

std::vector<ReadLoc> ret;
ret.swap(locs_);

return ret;
}

Expand Down
9 changes: 6 additions & 3 deletions src/chunk_pool.hpp
Expand Up @@ -29,14 +29,17 @@
#include <deque>
#include "mapper.hpp"

using MapResult = std::tuple<u16, u32, Paf>;

class ChunkPool {
public:
ChunkPool(const UncalledOpts &opts);

void start_timer();
bool add_chunk(Chunk &chunk);
void end_read(u16 ch, u32 number);

std::vector<ReadLoc> update();
std::vector<MapResult> update();
bool all_finished();
void stop_all();

Expand Down Expand Up @@ -73,12 +76,12 @@ class ChunkPool {
//List of mappers - one for each channel
std::vector<Mapper> mappers_;
std::vector<MapperThread> threads_;
std::vector<Chunk> read_buffer_;
std::vector<ReadLoc> locs_;
std::vector<Chunk> chunk_buffer_;

std::vector<u16> buffer_queue_, active_queue_, out_chs_;
std::vector<bool> channel_active_;

Timer time_;
//Store threads in order of # active mappers
};

Expand Down

0 comments on commit 405e0c5

Please sign in to comment.