Skip to content

Commit

Permalink
[src] CUDA batched decoder pipeline fixes (#3659)
Browse files Browse the repository at this point in the history
bug fix: don't assume stride and number of colums is the same when packing matrix into a vector.

  src/cudadecoder/batched-threaded-nnet3-cuda-pipeline.cc:
    added additional sanity checking to better report errors in the field.
    no longer pinning memory for copying waves down.  This was causing consistency issues.  It is unclear why the code is not working and will continue to evaluate.
    This optimization doesn't add a lot of perf so we are disabling it for now.
    general cleanup
    fixed bug with tasks array being possibly being resized before being read.

  src/cudadecoderbin/batched-wav-nnet3-cuda.cc:
    Now outputting every iteration as a different lattice.  This way we can score every lattice and better ensure correctness of binary.
    clang-format (removing tabs)
  • Loading branch information
luitjens authored and danpovey committed Oct 17, 2019
1 parent b39ed47 commit 01cfda3
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 74 deletions.
68 changes: 42 additions & 26 deletions src/cudadecoder/batched-threaded-nnet3-cuda-pipeline.cc
Expand Up @@ -45,8 +45,9 @@ void BatchedThreadedNnet3CudaPipeline::Initialize(
// initialize threads and save their contexts so we can join them later
thread_contexts_.resize(config_.num_control_threads);

// create work queue
pending_task_queue_ = new TaskState *[config_.max_pending_tasks + 1];
// create work queue, padding by 10 so that we can better detect if this
// overflows. this should not happen and is just there as a sanity check
pending_task_queue_ = new TaskState *[config_.max_pending_tasks + 10];
tasks_front_ = 0;
tasks_back_ = 0;

Expand Down Expand Up @@ -356,7 +357,8 @@ void BatchedThreadedNnet3CudaPipeline::AddTaskToPendingTaskQueue(
// insert into pending task queue
pending_task_queue_[tasks_back_] = task;
// (int)tasks_back_);
tasks_back_ = (tasks_back_ + 1) % (config_.max_pending_tasks + 1);
tasks_back_ = (tasks_back_ + 1) % (config_.max_pending_tasks + 10);
KALDI_ASSERT(NumPendingTasks() <= config_.max_pending_tasks);
}
}

Expand All @@ -371,6 +373,7 @@ void BatchedThreadedNnet3CudaPipeline::AquireAdditionalTasks(
int tasksRequested =
std::min(free_channels.size(), config_.max_batch_size - channels.size());
int tasksAssigned = 0;
int firstTask = channels.size();

{
// lock required because front might change from other
Expand All @@ -384,8 +387,9 @@ void BatchedThreadedNnet3CudaPipeline::AquireAdditionalTasks(
// grab tasks
for (int i = 0; i < tasksAssigned; i++) {
// pending_task_queue_[tasks_front_]);
KALDI_ASSERT(NumPendingTasks() > 0);
tasks.push_back(pending_task_queue_[tasks_front_]);
tasks_front_ = (tasks_front_ + 1) % (config_.max_pending_tasks + 1);
tasks_front_ = (tasks_front_ + 1) % (config_.max_pending_tasks + 10);
}
}
}
Expand All @@ -406,6 +410,8 @@ void BatchedThreadedNnet3CudaPipeline::AquireAdditionalTasks(
channel = free_channels.back();
free_channels.pop_back();
}
// assign channel to task
tasks[i + firstTask]->ichannel = channel;
// add channel to processing list
channels.push_back(channel);
// add new channel to initialization list
Expand Down Expand Up @@ -559,22 +565,25 @@ void BatchedThreadedNnet3CudaPipeline::ComputeBatchFeatures(
thread_local Vector<BaseFloat> pinned_vector;

if (pinned_vector.Dim() < count) {
if (pinned_vector.Dim() != 0) {
cudaHostUnregister(pinned_vector.Data());
}
// WAR: Not pinning memory because it seems to impact correctness
// we are continuing to look into a fix but want to commit this workaround
// as a temporary measure.
// if (pinned_vector.Dim() != 0) {
// cudaHostUnregister(pinned_vector.Data());
//}

// allocated array 2x size
pinned_vector.Resize(count * 2, kUndefined);
cudaHostRegister(pinned_vector.Data(),
pinned_vector.Dim() * sizeof(BaseFloat), 0);
// cudaHostRegister(pinned_vector.Data(),
// pinned_vector.Dim() * sizeof(BaseFloat), 0);
}

// We will launch a thread for each task in order to get better host memory
// bandwidth
std::vector<std::future<void>> futures; // for syncing

// vector copy function for threading below.
auto copy_vec = [](SubVector<BaseFloat> &dst,
const SubVector<BaseFloat> &src) {
auto copy_vec = [](SubVector<BaseFloat> dst, const SubVector<BaseFloat> src) {
nvtxRangePushA("CopyVec");
dst.CopyFromVec(src);
nvtxRangePop();
Expand Down Expand Up @@ -670,6 +679,9 @@ void BatchedThreadedNnet3CudaPipeline::RemoveCompletedChannels(
// add channel to free and completed queues
completed_channels.push_back(channel);

// this was assigned earlier just making sure it is still consistent
KALDI_ASSERT(tasks[cur]->ichannel == channel);

// Rearrange queues,
// move this element to end and end to this spot
std::swap(tasks[cur], tasks[back]);
Expand Down Expand Up @@ -700,15 +712,8 @@ void BatchedThreadedNnet3CudaPipeline::PostDecodeProcessing(
std::vector<ChannelId> &channels = channel_state.channels;
std::vector<ChannelId> &completed_channels = channel_state.completed_channels;

/*
// Generate lattices for GetRawLattice
std::vector<Lattice *> lattices(completed_channels.size());
for (int i = 0; i < completed_channels.size(); i++) {
// reverse order of lattices to match channel order
// tasks order was reversed when reordering to the back
lattices[i] = &(tasks[tasks.size() - i - 1]->lat);
}
*/
// consistency check
KALDI_ASSERT(tasks.size() == channels.size() + completed_channels.size());

// Prepare data for GetRawLattice
cuda_decoder.PrepareForGetRawLattice(completed_channels, true);
Expand All @@ -717,12 +722,23 @@ void BatchedThreadedNnet3CudaPipeline::PostDecodeProcessing(
delete decodables[i];
}

std::vector<std::future<void>> futures;

// Calling GetRawLattice + Determinize (optional) on a CPU worker thread
for (int i = channels.size(); i < tasks.size(); i++) {
tasks[i]->ichannel = channels[i];
work_pool_->enqueue(THREAD_POOL_NORMAL_PRIORITY,
&BatchedThreadedNnet3CudaPipeline::CompleteTask, this,
&cuda_decoder, &channel_state, tasks[i]);
// checking that this channel is actually in the completed channels list
// order is reversed because we used push_back into completed_channel list
KALDI_ASSERT(tasks[i]->ichannel ==
completed_channels[channels.size() +
completed_channels.size() - i - 1]);
futures.push_back(
work_pool_->enqueue(THREAD_POOL_NORMAL_PRIORITY,
&BatchedThreadedNnet3CudaPipeline::CompleteTask,
this, &cuda_decoder, &channel_state, tasks[i]));
}

for (int i = 0; i < futures.size(); i++) {
futures[i].get();
}

tasks.resize(channels.size());
Expand Down Expand Up @@ -753,10 +769,11 @@ void BatchedThreadedNnet3CudaPipeline::CompleteTask(CudaDecoder *cuda_decoder,
if (task->callback) // if callable
task->callback(task->dlat);

task->finished = true;
// Clear working data (raw input, posteriors, etc.)
task->task_data.reset();

task->finished = true;

{
std::lock_guard<std::mutex> lk(group_tasks_mutex_);
--all_group_tasks_not_done_;
Expand Down Expand Up @@ -838,7 +855,6 @@ void BatchedThreadedNnet3CudaPipeline::ExecuteWorker(int threadId) {
int start = tasks.size(); // Save the current assigned tasks size

AquireAdditionalTasks(cuda_decoder, channel_state, tasks);

// New tasks are now in the in tasks[start,tasks.size())
if (start != tasks.size()) { // if there are new tasks
if (config_.gpu_feature_extract)
Expand Down
22 changes: 17 additions & 5 deletions src/cudadecoder/batched-threaded-nnet3-cuda-pipeline.h
Expand Up @@ -219,11 +219,23 @@ class BatchedThreadedNnet3CudaPipeline {

TaskData(const WaveData &wave_data_in)
: wave_samples(NULL), sample_frequency(0) {
raw_data.Resize(
wave_data_in.Data().NumRows() * wave_data_in.Data().NumCols(),
kUndefined);
memcpy(raw_data.Data(), wave_data_in.Data().Data(),
raw_data.Dim() * sizeof(BaseFloat));
int rows = wave_data_in.Data().NumRows();
int cols = wave_data_in.Data().NumCols();
int stride = wave_data_in.Data().Stride();

raw_data.Resize(rows * cols, kUndefined);

if (stride == cols) {
// contigious so use one large memory copy
memcpy(raw_data.Data(), wave_data_in.Data().Data(),
rows * cols * sizeof(BaseFloat));
} else {
// data is not contigious so we need to copy one row at a time
for (int i = 0; i < rows; i++) {
memcpy(raw_data.Data() + i * cols, wave_data_in.Data().RowData(i),
cols * sizeof(BaseFloat));
}
}
wave_samples =
std::make_shared<SubVector<BaseFloat>>(raw_data, 0, raw_data.Dim());
sample_frequency = wave_data_in.SampFreq();
Expand Down
89 changes: 46 additions & 43 deletions src/cudadecoderbin/batched-wav-nnet3-cuda.cc
Expand Up @@ -28,7 +28,6 @@
#include "nnet3/am-nnet-simple.h"
#include "nnet3/nnet-utils.h"
#include "util/kaldi-thread.h"

using namespace kaldi;
using namespace cuda_decoder;

Expand Down Expand Up @@ -60,9 +59,9 @@ void GetDiagnosticsAndPrintOutput(const std::string &utt,
GetLinearSymbolSequence(best_path_lat, &alignment, &words, &weight);
num_frames = alignment.size();
likelihood = -(weight.Value1() + weight.Value2());
*tot_num_frames += num_frames;
*tot_like += likelihood;
{
*tot_num_frames += num_frames;
*tot_like += likelihood;
std::lock_guard<std::mutex> lk(*stdout_mutex);
KALDI_VLOG(2) << "Likelihood per frame for utterance " << utt << " is "
<< (likelihood / num_frames) << " over " << num_frames
Expand All @@ -85,18 +84,17 @@ void GetDiagnosticsAndPrintOutput(const std::string &utt,
// Called when a task is complete. Will be called by different threads
// concurrently,
// so it must be threadsafe
void FinishOneDecode(
const std::string &utt, const std::string &key,
const BatchedThreadedNnet3CudaPipelineConfig &batched_decoder_config,
const fst::SymbolTable *word_syms, const bool write_lattice,
BatchedThreadedNnet3CudaPipeline *cuda_pipeline, int64 *num_frames,
double *tot_like, CompactLatticeWriter *clat_writer,
std::mutex *clat_writer_mutex, std::mutex *stdout_mutex,
CompactLattice &clat) {
void FinishOneDecode(const std::string &utt, const std::string &key,
const fst::SymbolTable *word_syms,
BatchedThreadedNnet3CudaPipeline *cuda_pipeline,
int64 *num_frames, double *tot_like,
CompactLatticeWriter *clat_writer,
std::mutex *clat_writer_mutex, std::mutex *stdout_mutex,
CompactLattice &clat) {
nvtxRangePushA("FinishOneDecode");
GetDiagnosticsAndPrintOutput(utt, word_syms, clat, stdout_mutex, num_frames,
tot_like);
if (write_lattice) {
{
std::lock_guard<std::mutex> lk(*clat_writer_mutex);
clat_writer->Write(utt, clat);
}
Expand Down Expand Up @@ -128,10 +126,10 @@ int main(int argc, char *argv[]) {
int num_todo = -1;
int iterations = 1;
ParseOptions po(usage);
std::mutex stdout_mutex, clat_writer_mutex;
std::mutex stdout_mutex;
int pipeline_length = 4000; // length of pipeline of outstanding requests,
// this is independent of queue lengths in
// decoder
// this is independent of queue lengths in
// decoder

po.Register("write-lattice", &write_lattice,
"Output lattice to a file. Setting to false is useful when "
Expand All @@ -143,8 +141,7 @@ int main(int argc, char *argv[]) {
"After N files are processed the remaining files are ignored. "
"Useful for profiling");
po.Register("iterations", &iterations,
"Number of times to decode the corpus. Output will be written "
"only once.");
"Number of times to decode the corpus.");

// Multi-threaded CPU and batched GPU decoder
BatchedThreadedNnet3CudaPipelineConfig batched_decoder_config;
Expand Down Expand Up @@ -181,7 +178,8 @@ int main(int argc, char *argv[]) {
SetDropoutTestMode(true, &(am_nnet.GetNnet()));
nnet3::CollapseModel(nnet3::CollapseModelConfig(), &(am_nnet.GetNnet()));

CompactLatticeWriter clat_writer(clat_wspecifier);
std::vector<CompactLatticeWriter> clat_writers(iterations);
std::vector<std::mutex> clat_write_mutexs(iterations);

fst::Fst<fst::StdArc> *decode_fst =
fst::ReadFstKaldiGeneric(fst_rxfilename);
Expand All @@ -203,7 +201,7 @@ int main(int argc, char *argv[]) {

nvtxRangePush("Global Timer");

int num_groups_done=0;
int num_groups_done = 0;

// starting timer here so we
// can measure throughput
Expand All @@ -216,9 +214,14 @@ int main(int argc, char *argv[]) {
std::string task_group = std::to_string(iter);
num_task_submitted = 0;
SequentialTableReader<WaveHolder> wav_reader(wav_rspecifier);
if (iter > 0)
write_lattice =
false; // write the lattices only on the first iteration

std::mutex *clat_writer_mutex = &clat_write_mutexs[iter];
CompactLatticeWriter *clat_writer = &clat_writers[iter];

stringstream filename;
filename << clat_wspecifier << "-" << iter;
clat_writer->Open(filename.str());

for (; !wav_reader.Done(); wav_reader.Next()) {
nvtxRangePushA("Utterance Iteration");

Expand All @@ -228,10 +231,10 @@ int main(int argc, char *argv[]) {

std::string utt = wav_reader.Key();
std::string key = utt;
if (iter > 0) {
// make key unique for subsequent iterations
key = key + "-" + std::to_string(iter);
}

// make key unique for each iteration
key = key + "-" + std::to_string(iter);

const WaveData &wave_data = wav_reader.Value();

if (iter == 0) {
Expand All @@ -241,13 +244,13 @@ int main(int argc, char *argv[]) {
}

// Creating a function alias for the callback function of that utterance
auto finish_one_decode_lamba = [
// Capturing the arguments that will change by copy
utt, key, write_lattice,
// Capturing the const/global args by reference
&word_syms, &batched_decoder_config, &cuda_pipeline,
&clat_writer_mutex, &stdout_mutex, &clat_writer, &num_frames,
&tot_like]
auto finish_one_decode_lamba =
[
// Capturing the arguments that will change by copy
utt, key, clat_writer_mutex, clat_writer,
// Capturing the const/global args by reference
&word_syms, &cuda_pipeline, &stdout_mutex, &num_frames,
&tot_like]
// The callback function receive the compact lattice as argument
// if determinize_lattice is true, it is a determinized lattice
// otherwise, it is a raw lattice converted to compact format
Expand All @@ -258,9 +261,8 @@ int main(int argc, char *argv[]) {
FinishOneDecode(
// Captured arguments used to specialize FinishOneDecode for
// this task
utt, key, batched_decoder_config, word_syms, write_lattice,
&cuda_pipeline, &num_frames, &tot_like, &clat_writer,
&clat_writer_mutex, &stdout_mutex,
utt, key, word_syms, &cuda_pipeline, &num_frames, &tot_like,
clat_writer, clat_writer_mutex, &stdout_mutex,
// Generated lattice that will be passed once the task is
// complete
clat_in);
Expand All @@ -277,7 +279,7 @@ int main(int argc, char *argv[]) {
nvtxRangePop();
if (num_todo != -1 && num_task_submitted >= num_todo) break;
} // end utterance loop

std::string group_done;
// Non-blocking way to check if a group is done
// returns false if zero groups are ready
Expand All @@ -290,12 +292,13 @@ int main(int argc, char *argv[]) {
<< " Audio: " << total_audio * (iter + 1)
<< " RealTimeX: " << total_audio * (iter + 1) / total_time;
num_groups_done++;
clat_writers[iter].Close();
}
} // end iterations loop
} // end iterations loop

// We've submitted all tasks. Now waiting for them to complete
// We could also have called WaitForAllTasks and CloseAllDecodeHandles
while (num_groups_done<iterations) {
while (num_groups_done < iterations) {
// WaitForAnyGroup is blocking. It will hold until one group is ready
std::string group_done = cuda_pipeline.WaitForAnyGroup();
cuda_pipeline.CloseAllDecodeHandlesForGroup(group_done);
Expand All @@ -306,6 +309,7 @@ int main(int argc, char *argv[]) {
<< " Audio: " << total_audio * (iter + 1)
<< " RealTimeX: " << total_audio * (iter + 1) / total_time;
num_groups_done++;
clat_writers[iter].Close();
}

// number of seconds elapsed since the creation of timer
Expand All @@ -322,14 +326,13 @@ int main(int argc, char *argv[]) {
<< " Total Audio: " << total_audio * iterations
<< " RealTimeX: " << total_audio * iterations / total_time;

delete word_syms; // will delete if non-NULL.

clat_writer.Close();

cuda_pipeline.Finalize();
cudaDeviceSynchronize();

delete word_syms; // will delete if non-NULL.

return 0;

} catch (const std::exception &e) {
std::cerr << e.what();
return -1;
Expand Down

0 comments on commit 01cfda3

Please sign in to comment.