Skip to content

Commit

Permalink
Merge branch 'smalton/DOR-558-basecall-cuda-stream' into 'master'
Browse files Browse the repository at this point in the history
DOR-558: Koi update

Closes DOR-558

See merge request machine-learning/dorado!834
  • Loading branch information
malton-ont committed Feb 8, 2024
2 parents 8d5007d + 3b2acbf commit b5dc9f8
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 42 deletions.
2 changes: 1 addition & 1 deletion cmake/Koi.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ endfunction()

if(CMAKE_SYSTEM_NAME STREQUAL "Linux" OR WIN32)

set(KOI_VERSION 0.4.3)
set(KOI_VERSION 0.4.5)
if(BUILD_KOI_FROM_SOURCE)
message(STATUS "Building Koi from source")
set(KOI_DIR "${DORADO_3RD_PARTY_DOWNLOAD}/koi")
Expand Down
18 changes: 8 additions & 10 deletions dorado/basecall/CudaCaller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ CudaCaller::CudaCaller(const CRFModelConfig &model_config,
m_device(device),
m_decoder(decode::create_decoder(device, m_config)),
m_options(at::TensorOptions().dtype(m_decoder->dtype()).device(device)),
m_exclusive_gpu_access(exclusive_gpu_access) {
m_exclusive_gpu_access(exclusive_gpu_access),
m_stream(c10::cuda::getStreamFromPool(false, m_options.device().index())) {
assert(m_options.device().is_cuda());

m_decoder_options.q_shift = model_config.qbias;
Expand Down Expand Up @@ -83,10 +84,11 @@ CudaCaller::CudaCaller(const CRFModelConfig &model_config,
spdlog::info("{} using batch size {}", m_device, m_batch_size);

// Warmup
c10::cuda::CUDAStreamGuard stream_guard(m_stream);
auto input = torch::empty({m_batch_size, m_num_input_features, m_in_chunk_size}, m_options);
auto scores = m_module->forward(input);
m_decoder->beam_search_part_1({scores, m_batch_size, m_decoder_options});
torch::cuda::synchronize(m_options.device().index());
m_stream.synchronize();

start_threads();
}
Expand All @@ -101,11 +103,8 @@ CudaCaller::~CudaCaller() {

std::vector<decode::DecodedChunk> CudaCaller::call_chunks(at::Tensor &input,
at::Tensor &output,
int num_chunks,
c10::cuda::CUDAStream stream) {
int num_chunks) {
NVTX3_FUNC_RANGE();
c10::cuda::CUDAStreamGuard stream_guard(stream);

if (num_chunks == 0) {
return std::vector<decode::DecodedChunk>();
}
Expand Down Expand Up @@ -291,14 +290,13 @@ void CudaCaller::start_threads() {

void CudaCaller::cuda_thread_fn() {
at::InferenceMode guard;
c10::cuda::CUDAGuard device_guard(m_options.device());
auto stream = c10::cuda::getCurrentCUDAStream(m_options.device().index());

const std::string loop_scope_str =
"cuda_thread_fn_device_" + std::to_string(m_options.device().index());
const std::string input_q_cv_scope_str =
"input_queue_cv_device_" + std::to_string(m_options.device().index());
const std::string gpu_lock_scope_str = "gpu_lock_" + std::to_string(m_options.device().index());

c10::cuda::CUDAStreamGuard stream_guard(m_stream);
while (true) {
nvtx3::scoped_range loop{loop_scope_str};
std::unique_lock<std::mutex> input_lock(m_input_lock);
Expand Down Expand Up @@ -355,7 +353,7 @@ void CudaCaller::cuda_thread_fn() {
const auto forward_ms = timer.GetElapsedMS();
task->out =
m_decoder->beam_search_part_1({scores, task->num_chunks, m_decoder_options});
stream.synchronize();
m_stream.synchronize();
const auto forward_plus_decode_ms = timer.GetElapsedMS();
m_model_ms += forward_ms;
m_decode_ms += forward_plus_decode_ms - forward_ms;
Expand Down
4 changes: 2 additions & 2 deletions dorado/basecall/CudaCaller.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ class CudaCaller {
~CudaCaller();
std::vector<decode::DecodedChunk> call_chunks(at::Tensor &input,
at::Tensor &output,
int num_chunks,
c10::cuda::CUDAStream stream);
int num_chunks);

void terminate();
void restart();
Expand Down Expand Up @@ -73,6 +72,7 @@ class CudaCaller {
std::unique_ptr<std::thread> m_cuda_thread;
int m_num_input_features, m_batch_size, m_in_chunk_size, m_out_chunk_size;
bool m_exclusive_gpu_access;
c10::cuda::CUDAStream m_stream;

// Performance monitoring stats.
std::atomic<int64_t> m_num_batches_called = 0;
Expand Down
5 changes: 3 additions & 2 deletions dorado/basecall/CudaModelRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include "utils/cuda_utils.h"
#include "utils/math_utils.h"

#include <c10/cuda/CUDAStream.h>
#include <c10/cuda/CUDAGuard.h>

#include <sstream>

Expand All @@ -24,7 +24,8 @@ void CudaModelRunner::accept_chunk(int chunk_idx, const at::Tensor &chunk) {
std::vector<decode::DecodedChunk> CudaModelRunner::call_chunks(int num_chunks) {
++m_num_batches_called;
stats::Timer timer;
auto decoded_chunks = m_caller->call_chunks(m_input, m_output, num_chunks, m_stream);
c10::cuda::CUDAStreamGuard guard(m_stream);
auto decoded_chunks = m_caller->call_chunks(m_input, m_output, num_chunks);
return decoded_chunks;
}

Expand Down
17 changes: 9 additions & 8 deletions dorado/basecall/decode/CUDADecoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ DecodeData CUDADecoder::beam_search_part_1(DecodeData data) const {
at::TensorOptions().dtype(at::kChar).device(scores.device()).requires_grad(false);

auto chunks = at::empty({N, 4}, tensor_options_int32);
chunks.index({at::indexing::Slice(), 0}) = at::arange(0, int(T * N), int(T));
chunks.index({at::indexing::Slice(), 2}) = at::arange(0, int(T * N), int(T));
chunks.index({at::indexing::Slice(), 0}) =
at::arange(0, int(T * N), int(T), tensor_options_int32);
chunks.index({at::indexing::Slice(), 2}) =
at::arange(0, int(T * N), int(T), tensor_options_int32);
chunks.index({at::indexing::Slice(), 1}) = int(T);
chunks.index({at::indexing::Slice(), 3}) = 0;

Expand All @@ -42,40 +44,39 @@ DecodeData CUDADecoder::beam_search_part_1(DecodeData data) const {
auto path = at::zeros(N * (T + 1), tensor_options_int32);

auto moves_sequence_qstring = at::zeros({3, N * T}, tensor_options_int8);

moves_sequence_qstring.index({at::indexing::Slice()}) = 0.0;
auto moves = moves_sequence_qstring[0];
auto sequence = moves_sequence_qstring[1];
auto qstring = moves_sequence_qstring[2];

auto stream = at::cuda::getCurrentCUDAStream().stream();
{
utils::ScopedProfileRange spr{"back_guides", 2};
dorado::utils::handle_cuda_result(host_back_guide_step(
chunks.data_ptr(), chunk_results.data_ptr(), N, scores.data_ptr(),
stream, chunks.data_ptr(), chunk_results.data_ptr(), N, scores.data_ptr(),
m_score_clamp_val, C, aux.data_ptr(), path.data_ptr(), moves.data_ptr(), NULL,
sequence.data_ptr(), qstring.data_ptr(), options.q_scale, options.q_shift,
int(options.beam_width), options.beam_cut, options.blank_score));
}
{
utils::ScopedProfileRange spr{"beam_search", 2};
dorado::utils::handle_cuda_result(host_beam_search_step(
chunks.data_ptr(), chunk_results.data_ptr(), N, scores.data_ptr(),
stream, chunks.data_ptr(), chunk_results.data_ptr(), N, scores.data_ptr(),
m_score_clamp_val, C, aux.data_ptr(), path.data_ptr(), moves.data_ptr(), NULL,
sequence.data_ptr(), qstring.data_ptr(), options.q_scale, options.q_shift,
int(options.beam_width), options.beam_cut, options.blank_score));
}
{
utils::ScopedProfileRange spr{"compute_posts", 2};
dorado::utils::handle_cuda_result(host_compute_posts_step(
chunks.data_ptr(), chunk_results.data_ptr(), N, scores.data_ptr(),
stream, chunks.data_ptr(), chunk_results.data_ptr(), N, scores.data_ptr(),
m_score_clamp_val, C, aux.data_ptr(), path.data_ptr(), moves.data_ptr(), NULL,
sequence.data_ptr(), qstring.data_ptr(), options.q_scale, options.q_shift,
int(options.beam_width), options.beam_cut, options.blank_score));
}
{
utils::ScopedProfileRange spr{"decode", 2};
dorado::utils::handle_cuda_result(host_run_decode(
chunks.data_ptr(), chunk_results.data_ptr(), N, scores.data_ptr(),
stream, chunks.data_ptr(), chunk_results.data_ptr(), N, scores.data_ptr(),
m_score_clamp_val, C, aux.data_ptr(), path.data_ptr(), moves.data_ptr(), NULL,
sequence.data_ptr(), qstring.data_ptr(), options.q_scale, options.q_shift,
int(options.beam_width), options.beam_cut, options.blank_score, options.move_pad));
Expand Down
3 changes: 2 additions & 1 deletion dorado/basecall/nn/CRFModel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -700,12 +700,13 @@ void LSTMStackImpl::forward_quantized(WorkingMemory &wm) {
}
}

auto stream = at::cuda::getCurrentCUDAStream().stream();
auto mm_out = wm.temp({wm.N * wm.T, 4 * layer_size}, torch::kF16);
for (size_t i = 0; i < rnns.size(); ++i) {
int dir = (i & 1) ? 1 : -1;
dorado::utils::matmul_f16(inout.view({-1, layer_size}), device_w_ih[i], mm_out);
dorado::utils::handle_cuda_result(host_small_lstm(
wm.N, wm.T, layer_size, dir, mm_out.data_ptr(), device_w_hh[i].data_ptr(),
stream, wm.N, wm.T, layer_size, dir, mm_out.data_ptr(), device_w_hh[i].data_ptr(),
device_bias[i].data_ptr(), device_scale[i].data_ptr(), inout.data_ptr()));
}
}
Expand Down
26 changes: 9 additions & 17 deletions dorado/modbase/ModBaseCaller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ struct ModBaseCaller::ModBaseTask {
at::Tensor out;
bool done{false};
int num_chunks;
#if DORADO_CUDA_BUILD
c10::optional<c10::Stream> stream;
#endif
};

ModBaseCaller::ModBaseData::ModBaseData(const std::filesystem::path& model_path,
Expand All @@ -48,15 +45,18 @@ ModBaseCaller::ModBaseData::ModBaseData(const std::filesystem::path& model_path,

#if DORADO_CUDA_BUILD
if (opts.device().is_cuda()) {
stream = c10::cuda::getStreamFromPool(false, opts.device().index());

auto sig_len = static_cast<int64_t>(params.context_before + params.context_after);
auto kmer_len = params.bases_after + params.bases_before + 1;

// Warmup
c10::cuda::OptionalCUDAStreamGuard guard(stream);
auto input_sigs = torch::empty({batch_size, 1, sig_len}, opts);
auto input_seqs =
torch::empty({batch_size, sig_len, utils::BaseInfo::NUM_BASES * kmer_len}, opts);
module_holder->forward(input_sigs, input_seqs);
torch::cuda::synchronize(opts.device().index());
stream->synchronize();
}
#endif
}
Expand Down Expand Up @@ -147,11 +147,6 @@ at::Tensor ModBaseCaller::call_chunks(size_t model_id,
auto& caller_data = m_caller_data[model_id];
auto task = std::make_shared<ModBaseTask>(input_sigs.to(m_options.device()),
input_seqs.to(m_options.device()), num_chunks);
#if DORADO_CUDA_BUILD
if (m_options.device().is_cuda()) {
task->stream = c10::cuda::getCurrentCUDAStream(m_options.device().index());
}
#endif
{
std::lock_guard<std::mutex> lock(caller_data->input_lock);
caller_data->input_queue.push_front(task);
Expand Down Expand Up @@ -202,6 +197,9 @@ void ModBaseCaller::start_threads() {

void ModBaseCaller::modbase_task_thread_fn(size_t model_id) {
auto& caller_data = m_caller_data[model_id];
#if DORADO_CUDA_BUILD
c10::cuda::OptionalCUDAStreamGuard stream_guard(caller_data->stream);
#endif
while (true) {
nvtx3::scoped_range loop{"modbase_task_thread_fn"};
at::InferenceMode guard;
Expand All @@ -219,18 +217,12 @@ void ModBaseCaller::modbase_task_thread_fn(size_t model_id) {
caller_data->input_queue.pop_back();
input_lock.unlock();

#if DORADO_CUDA_BUILD
// If task->stream is set, sets the current stream to task->stream, and the current device to
// the device associated with the stream. Resets both to their prior state on destruction
c10::cuda::OptionalCUDAStreamGuard stream_guard(task->stream);
#endif

std::unique_lock<std::mutex> task_lock(task->mut);
stats::Timer timer;
task->out = caller_data->module_holder->forward(task->input_sigs, task->input_seqs);
#if DORADO_CUDA_BUILD
if (task->stream.has_value()) {
task->stream->synchronize();
if (caller_data->stream.has_value()) {
caller_data->stream->synchronize();
}
#endif
// Only meaningful if we're syncing the stream.
Expand Down
7 changes: 6 additions & 1 deletion dorado/modbase/ModBaseCaller.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
#include "ModBaseModelConfig.h"
#include "MotifMatcher.h"
#include "utils/stats.h"

#if DORADO_CUDA_BUILD
#include <c10/cuda/CUDAStream.h>
#endif
#include <torch/nn.h>

#include <atomic>
Expand Down Expand Up @@ -43,6 +45,9 @@ class ModBaseCaller {
std::mutex input_lock;
std::condition_variable input_cv;
const int batch_size;
#if DORADO_CUDA_BUILD
c10::optional<c10::Stream> stream;
#endif
};

ModBaseCaller(const std::vector<std::filesystem::path>& model_paths,
Expand Down

0 comments on commit b5dc9f8

Please sign in to comment.