Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[src] Cuda Decoder / D2H pipeline fix #3443

Merged
merged 1 commit into from Jul 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
112 changes: 58 additions & 54 deletions src/cudadecoder/batched-threaded-nnet3-cuda-pipeline.cc
Expand Up @@ -249,10 +249,10 @@ void BatchedThreadedNnet3CudaPipeline::OpenDecodeHandle(
task->Init(key, wave_data);

if (config_.gpu_feature_extract) {
//Feature extraction done on device
// Feature extraction done on device
AddTaskToPendingTaskQueue(task);
} else {
//Feature extraction done on host thread
// Feature extraction done on host thread
work_pool_->enqueue(THREAD_POOL_LOW_PRIORITY,
&BatchedThreadedNnet3CudaPipeline::ComputeOneFeatureCPU,
this, task);
Expand All @@ -266,13 +266,12 @@ void BatchedThreadedNnet3CudaPipeline::OpenDecodeHandle(
TaskState *task = AddTask(key, group);
task->Init(key, wave_data, sample_rate);
task->callback = std::move(callback);
task->Init(key, wave_data, sample_rate);

if (config_.gpu_feature_extract) {
//Feature extraction done on device
// Feature extraction done on device
AddTaskToPendingTaskQueue(task);
} else {
//Feature extraction done on host thread
// Feature extraction done on host thread
work_pool_->enqueue(THREAD_POOL_LOW_PRIORITY,
&BatchedThreadedNnet3CudaPipeline::ComputeOneFeatureCPU,
this, task);
Expand Down Expand Up @@ -437,7 +436,7 @@ void BatchedThreadedNnet3CudaPipeline::ComputeBatchNnet(
TaskState &task = *tasks[i];
std::shared_ptr<TaskData> &task_data = task.task_data;
std::vector<nnet3::NnetInferenceTask> &ntasks = nnet_tasks[i];

if (config_.gpu_feature_extract) {
CuVector<BaseFloat> &ivector_features = task_data->ivector_features;
CuMatrix<BaseFloat> &input_features = task_data->input_features;
Expand All @@ -447,19 +446,19 @@ void BatchedThreadedNnet3CudaPipeline::ComputeBatchNnet(
ifeat = &ivector_features;
}
// create task list
computer.SplitUtteranceIntoTasks(output_to_cpu, input_features, ifeat,
computer.SplitUtteranceIntoTasks(output_to_cpu, input_features, ifeat,
NULL, online_ivector_period, &ntasks);
} else {
Vector<BaseFloat> &ivector_features = task_data->ivector_features_cpu;
Vector<BaseFloat> &ivector_features = task_data->ivector_features_cpu;
Matrix<BaseFloat> &input_features = task_data->input_features_cpu;

Vector<BaseFloat> *ifeat = NULL;
if (ivector_features.Dim() > 0) {
ifeat = &ivector_features;
}
// create task list
computer.SplitUtteranceIntoTasks(output_to_cpu, input_features, ifeat,
NULL, online_ivector_period, &ntasks);
computer.SplitUtteranceIntoTasks(output_to_cpu, input_features, ifeat,
NULL, online_ivector_period, &ntasks);
}

// Add tasks to computer
Expand Down Expand Up @@ -504,10 +503,9 @@ void BatchedThreadedNnet3CudaPipeline::ComputeOneFeatureCPU(TaskState *task_) {
OnlineNnet2FeaturePipeline feature(*feature_info_);

// Accept waveforms
feature.AcceptWaveform(
task_data->sample_frequency,
SubVector<BaseFloat>(*task_data->wave_samples, 0,
task_data->wave_samples->Dim()));
feature.AcceptWaveform(task_data->sample_frequency,
SubVector<BaseFloat>(*task_data->wave_samples, 0,
task_data->wave_samples->Dim()));
feature.InputFinished();
// All frames should be ready here
int32 numFrames = feature.NumFramesReady();
Expand All @@ -520,8 +518,7 @@ void BatchedThreadedNnet3CudaPipeline::ComputeOneFeatureCPU(TaskState *task_) {

std::vector<int> frames(numFrames);
// create list of frames
for (int j = 0; j < numFrames; j++)
frames[j] = j;
for (int j = 0; j < numFrames; j++) frames[j] = j;

// Copy Features
input_features.Resize(numFrames, input_dim);
Expand All @@ -543,51 +540,55 @@ void BatchedThreadedNnet3CudaPipeline::ComputeOneFeatureCPU(TaskState *task_) {

// Computes features across the tasks[first,tasks.size()
void BatchedThreadedNnet3CudaPipeline::ComputeBatchFeatures(
int32 first, std::vector<TaskState *> &tasks,
int32 first, std::vector<TaskState *> &tasks,
OnlineCudaFeaturePipeline &feature_pipeline) {
KALDI_ASSERT(config_.gpu_feature_extract==true);
KALDI_ASSERT(config_.gpu_feature_extract == true);
nvtxRangePushA("CopyBatchWaves");
// below we will pack waves into a single buffer for efficient transfer across device

// below we will pack waves into a single buffer for efficient transfer across
// device

// first count the total number of elements and create a single large vector
int count=0;
int count = 0;
for (int i = first; i < tasks.size(); i++) {
count+=tasks[i]->task_data->wave_samples->Dim();
count += tasks[i]->task_data->wave_samples->Dim();
}

// creating a thread local vector of pinned memory.
// wave data will be stagged through this memory to get
// wave data will be stagged through this memory to get
// more efficient non-blocking transfers to the device.
thread_local Vector<BaseFloat> pinned_vector;
if (pinned_vector.Dim() < count ) {
if ( pinned_vector.Dim()!=0) {

if (pinned_vector.Dim() < count) {
if (pinned_vector.Dim() != 0) {
cudaHostUnregister(pinned_vector.Data());
}
// allocated array 2x size
pinned_vector.Resize(count*2,kUndefined);
cudaHostRegister(pinned_vector.Data(), pinned_vector.Dim()*sizeof(BaseFloat),0);
pinned_vector.Resize(count * 2, kUndefined);
cudaHostRegister(pinned_vector.Data(),
pinned_vector.Dim() * sizeof(BaseFloat), 0);
}

// We will launch a thread for each task in order to get better host memory bandwidth
std::vector<std::future<void> > futures; //for syncing
// We will launch a thread for each task in order to get better host memory
// bandwidth
std::vector<std::future<void>> futures; // for syncing

//vector copy function for threading below.
auto copy_vec = [](SubVector<BaseFloat> &dst, const SubVector<BaseFloat> &src) {
// vector copy function for threading below.
auto copy_vec = [](SubVector<BaseFloat> &dst,
const SubVector<BaseFloat> &src) {
nvtxRangePushA("CopyVec");
dst.CopyFromVec(src);
dst.CopyFromVec(src);
nvtxRangePop();
};

// next launch threads to copy all waves for each task in parallel
count=0;
count = 0;
for (int i = first; i < tasks.size(); i++) {
std::shared_ptr<TaskData> &task_data = tasks[i]->task_data;
SubVector<BaseFloat> wave(pinned_vector,count,task_data->wave_samples->Dim());
count+=task_data->wave_samples->Dim();
SubVector<BaseFloat> wave(pinned_vector, count,
task_data->wave_samples->Dim());
count += task_data->wave_samples->Dim();
futures.push_back(
work_pool_->enqueue(copy_vec, wave, *(task_data->wave_samples))
);
work_pool_->enqueue(copy_vec, wave, *(task_data->wave_samples)));
}

// wait for waves to be copied into place
Expand All @@ -596,37 +597,40 @@ void BatchedThreadedNnet3CudaPipeline::ComputeBatchFeatures(
}

CuVector<BaseFloat> cu_waves(count, kUndefined);
// copy memory down asynchronously. Vector copy functions are synchronous so we do it manually.
// It is important for this to happen asynchrously to help hide launch latency of smaller kernels
// copy memory down asynchronously. Vector copy functions are synchronous so
// we do it manually.
// It is important for this to happen asynchrously to help hide launch latency
// of smaller kernels
// that come in the future.
cudaMemcpyAsync(cu_waves.Data(), pinned_vector.Data(), cu_waves.Dim()*sizeof(BaseFloat),
cudaMemcpyHostToDevice, cudaStreamPerThread);
cudaMemcpyAsync(cu_waves.Data(), pinned_vector.Data(),
cu_waves.Dim() * sizeof(BaseFloat), cudaMemcpyHostToDevice,
cudaStreamPerThread);
nvtxRangePop();

nvtxRangePushA("ComputeBatchFeatures");
// extract features for each wave
count=0;
count = 0;
for (int i = first; i < tasks.size(); i++) {
TaskState &task = *tasks[i];
std::shared_ptr<TaskData> &task_data = task.task_data;

CuSubVector<BaseFloat> cu_wave(cu_waves,count,task_data->wave_samples->Dim());
count+=task_data->wave_samples->Dim();
feature_pipeline.ComputeFeatures(cu_wave, task_data->sample_frequency,
&task_data->input_features, &task_data->ivector_features);


CuSubVector<BaseFloat> cu_wave(cu_waves, count,
task_data->wave_samples->Dim());
count += task_data->wave_samples->Dim();
feature_pipeline.ComputeFeatures(cu_wave, task_data->sample_frequency,
&task_data->input_features,
&task_data->ivector_features);

int32 numFrames = task_data->input_features.NumRows();

if (numFrames == 0) {
//Make this a warning for now. Need to check how this is handled
// Make this a warning for now. Need to check how this is handled
KALDI_WARN << "Warning empty audio file";
}
}
nvtxRangePop();
}



// Allocates decodables for tasks in the range of tasks[first,tasks.size())
void BatchedThreadedNnet3CudaPipeline::AllocateDecodables(
int32 first, std::vector<TaskState *> &tasks,
Expand Down Expand Up @@ -837,7 +841,7 @@ void BatchedThreadedNnet3CudaPipeline::ExecuteWorker(int threadId) {

// New tasks are now in the in tasks[start,tasks.size())
if (start != tasks.size()) { // if there are new tasks
if (config_.gpu_feature_extract)
if (config_.gpu_feature_extract)
ComputeBatchFeatures(start, tasks, feature_pipeline);
ComputeBatchNnet(computer, start, tasks);
AllocateDecodables(start, tasks, decodables);
Expand Down
15 changes: 8 additions & 7 deletions src/cudadecoder/cuda-decoder.cc
Expand Up @@ -655,11 +655,6 @@ void CudaDecoder::PostProcessingMainQueue() {
}

void CudaDecoder::CopyMainQueueDataToHost() {
// Computing lanes offsets for concatenated copies
cudaStreamWaitEvent(
compute_st_, d2h_copy_extra_prev_tokens_evt_,
0); // waiting on preivous d2h before writing on same device memory
ConcatenateData();
cudaEventRecord(concatenated_data_ready_evt_, compute_st_);
cudaStreamWaitEvent(copy_st_, concatenated_data_ready_evt_,
0); // the copies on copy_st will wait on compute_st_
Expand Down Expand Up @@ -836,6 +831,14 @@ void CudaDecoder::AdvanceDecoding(
// - compute the extra costs
PostProcessingMainQueue();

// Waiting on previous d2h before writing on same device memory
cudaStreamWaitEvent(compute_st_, d2h_copy_extra_prev_tokens_evt_, 0);
// Concatenating the data that will be moved to host into large arrays
ConcatenateData();
// Copying the final lane counters for that frame
CopyLaneCountersToHostSync();
CheckOverflow();

// Moving the data necessary for GetRawLattice/GetBestPath back to host for
// storage
CopyMainQueueDataToHost();
Expand All @@ -850,8 +853,6 @@ void CudaDecoder::AdvanceDecoding(
frame_offsets_[ichannel].push_back(frame_offsets_[ichannel].back() +
main_q_end);
}
CopyLaneCountersToHostSync();
CheckOverflow();
}

SaveChannelsStateFromLanes();
Expand Down
16 changes: 8 additions & 8 deletions src/cudadecoderbin/batched-wav-nnet3-cuda.cc
Expand Up @@ -102,7 +102,7 @@ void FinishOneDecode(
}

nvtxRangePop();
}
}

int main(int argc, char *argv[]) {
try {
Expand All @@ -129,9 +129,9 @@ int main(int argc, char *argv[]) {
int iterations = 1;
ParseOptions po(usage);
std::mutex stdout_mutex, clat_writer_mutex;
int pipeline_length = 4000; // length of pipeline of outstanding requests,
// this is independent of queue lengths in
// decoder
int pipeline_length = 4000; // length of pipeline of outstanding requests,
// this is independent of queue lengths in
// decoder

po.Register("write-lattice", &write_lattice,
"Output lattice to a file. Setting to false is useful when "
Expand Down Expand Up @@ -286,8 +286,8 @@ int main(int argc, char *argv[]) {

nvtxRangePop();
if (num_todo != -1 && num_task_submitted >= num_todo) break;
} // end utterance loop
} // end iterations loop
} // end utterance loop
} // end iterations loop

// We've submitted all tasks. Now waiting for them to complete
// We could also have called WaitForAllTasks and CloseAllDecodeHandles
Expand Down Expand Up @@ -317,7 +317,7 @@ int main(int argc, char *argv[]) {
<< " Total Audio: " << total_audio * iterations
<< " RealTimeX: " << total_audio * iterations / total_time;

delete word_syms; // will delete if non-NULL.
delete word_syms; // will delete if non-NULL.

clat_writer.Close();

Expand All @@ -329,6 +329,6 @@ int main(int argc, char *argv[]) {
std::cerr << e.what();
return -1;
}
} // main()
} // main()

#endif // if HAVE_CUDA == 1