diff --git a/egs/sre16/v2/thread-test.sh b/egs/sre16/v2/thread-test.sh new file mode 100644 index 00000000000..85a0b629cc5 --- /dev/null +++ b/egs/sre16/v2/thread-test.sh @@ -0,0 +1,40 @@ +# Simple script comparing standard and parallel xvector extractors + +# conventional xvector computation with master kaldi + +time ~/src/kaldi/kaldi-master/src/nnet3bin/nnet3-xvector-compute --min-chunk-size=25 --chunk-size=10000 '~/src/kaldi/kaldi-master/src/nnet3bin/nnet3-copy --nnet-config=0003_sre16_v2_1a/exp/xvector_nnet_1a//extract.config 0003_sre16_v2_1a/exp/xvector_nnet_1a/final.raw - |' 'ark:apply-cmvn-sliding --norm-vars=false --center=true --cmn-window=300 scp:dev/split2/1/feats_50.scp ark:- | select-voiced-frames ark:- scp,s,cs:dev/split2/1/vad.scp ark:- |' ark,t:xvec_conv.txt + +# real 1m35.620s +# user 1m35.406s +# sys 0m0.194s + +# parallel single-thread + +time ~/src/kaldi/kaldi-my/src/nnet3bin/nnet3-xvector-compute-parallel --min-chunk-size=25 --chunk-size=10000 'nnet3-copy --nnet-config=0003_sre16_v2_1a/exp/xvector_nnet_1a//extract.config 0003_sre16_v2_1a/exp/xvector_nnet_1a/final.raw - |' 'ark:apply-cmvn-sliding --norm-vars=false --center=true --cmn-window=300 scp:dev/split2/1/feats_50.scp ark:- | select-voiced-frames ark:- scp,s,cs:dev/split2/1/vad.scp ark:- |' ark,t:xvec_para_1t.txt + +# real 1m34.160s +# #user 1m33.855s +# sys 0m0.291s + +# parallel 4 threads +time ~/src/kaldi/kaldi-my/src/nnet3bin/nnet3-xvector-compute-parallel --num-threads=4 --min-chunk-size=25 --chunk-size=10000 'nnet3-copy --nnet-config=0003_sre16_v2_1a/exp/xvector_nnet_1a//extract.config 0003_sre16_v2_1a/exp/xvector_nnet_1a/final.raw - |' 'ark:apply-cmvn-sliding --norm-vars=false --center=true --cmn-window=300 scp:dev/split2/1/feats_50.scp ark:- | select-voiced-frames ark:- scp,s,cs:dev/split2/1/vad.scp ark:- |' ark,t:xvec_para_10t.txt + +# real 0m40.513s +# user 2m13.843s +# sys 0m0.620s + + +# parallel 10 threads +time ~/src/kaldi/kaldi-my/src/nnet3bin/nnet3-xvector-compute-parallel --num-threads=10 --min-chunk-size=25 --chunk-size=10000 'nnet3-copy --nnet-config=0003_sre16_v2_1a/exp/xvector_nnet_1a//extract.config 0003_sre16_v2_1a/exp/xvector_nnet_1a/final.raw - |' 'ark:apply-cmvn-sliding --norm-vars=false --center=true --cmn-window=300 scp:dev/split2/1/feats_50.scp ark:- | select-voiced-frames ark:- scp,s,cs:dev/split2/1/vad.scp ark:- |' ark,t:xvec_para_10t.txt + +# real 0m42.263s +# user 2m17.649s +# sys 0m1.136s + +# parallel 10 threads with limited cache capacity 3 +time ~/src/kaldi/kaldi-my/src/nnet3bin/nnet3-xvector-compute-parallel --cache-capacity=3 --num-threads=10 --min-chunk-size=25 --chunk-size=10000 'nnet3-copy --nnet-config=0003_sre16_v2_1a/exp/xvector_nnet_1a/extract.config 0003_sre16_v2_1a/exp/xvector_nnet_1a//final.raw - |' 'ark:apply-cmvn-sliding --norm-vars=false --center=true --cmn-window=300 scp:dev/split2/1/feats_50.scp ark:- | select-voiced-frames ark:- scp,s,cs:dev/split2/1/vad.scp ark:- |' ark,t:xvec_para_4t_c3.txt + +# real 0m43.296s +# user 2m16.898s +# sys 0m1.451s + diff --git a/src/nnet3/Makefile b/src/nnet3/Makefile index df0fb2d4502..5559ca22bff 100644 --- a/src/nnet3/Makefile +++ b/src/nnet3/Makefile @@ -31,7 +31,7 @@ OBJFILES = nnet-common.o nnet-compile.o nnet-component-itf.o \ nnet-compile-looped.o decodable-simple-looped.o \ decodable-online-looped.o convolution.o \ nnet-convolutional-component.o attention.o \ - nnet-attention-component.o + nnet-attention-component.o nnet-xvector-threaded.o LIBNAME = kaldi-nnet3 diff --git a/src/nnet3/nnet-optimize.h b/src/nnet3/nnet-optimize.h index 78763732469..3186895838b 100644 --- a/src/nnet3/nnet-optimize.h +++ b/src/nnet3/nnet-optimize.h @@ -232,11 +232,10 @@ class CachingOptimizingCompiler { /// Does the compilation and returns a const pointer to the result, which is /// owned by this class, not the caller. It calls ComputeCudaIndexes() for - /// you, because you wouldn't be able to do this on a const object. - /// - /// Note: this used to return 'const NnetComputation*'. If you get a - /// compilation failure, just replace 'const NnetComputation*' with - /// 'std::shared_ptr' in the calling code. + /// you, because you wouldn't be able to do this on a const object. If you + /// want to preserve thread safety you should hold the result in the same type + /// (std::shared_ptr) while you still need it, but + /// otherwise you can just cast to const NnetComputation*. std::shared_ptr Compile( const ComputationRequest &request); void ReadCache(std::istream &is, bool binary); diff --git a/src/nnet3/nnet-xvector-threaded.cc b/src/nnet3/nnet-xvector-threaded.cc new file mode 100644 index 00000000000..1ca6244ffa2 --- /dev/null +++ b/src/nnet3/nnet-xvector-threaded.cc @@ -0,0 +1,130 @@ +// nnet3/nnet3-xvector-threaded.cc + +// Copyright 2017 Johns Hopkins University (author: Daniel Povey) +// 2017 Johns Hopkins University (author: Daniel Garcia-Romero) +// 2017 David Snyder +// 2018 Behavox Limited (author: Arseniy Gorin) + +// See ../../COPYING for clarification regarding multiple authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED +// WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, +// MERCHANTABLITY OR NON-INFRINGEMENT. +// See the Apache 2 License for the specific language governing permissions and +// limitations under the License. + + +#include "nnet3/nnet-xvector-threaded.h" + +namespace kaldi { +namespace nnet3 { + +XVectorExtractorParallelClass::XVectorExtractorParallelClass( + const NnetSimpleComputationOptions &opts, + const Nnet &nnet, + CachingOptimizingCompiler *compiler, + std::string utt, + const int chunk_size, + const int min_chunk_size, + const Matrix &feats, + BaseFloatVectorWriter *xvector_writer + ): + opts_(opts), + nnet_(&nnet), + compiler_(*compiler), + utt_(utt), + chunk_size_(chunk_size), + min_chunk_size_(min_chunk_size), + feats_(feats), + xvector_writer_(xvector_writer) { + tot_weight_ = 0.0; + xvector_avg_.Resize(nnet_->OutputDim("output"), kSetZero); +} + +void XVectorExtractorParallelClass::operator () () { + int32 num_rows = feats_.NumRows(), + feat_dim = feats_.NumCols(), + this_chunk_size = chunk_size_; + + if (num_rows < min_chunk_size_) { + KALDI_WARN << "Minimum chunk size of " << min_chunk_size_ + << " is greater than the number of rows " + << "in utterance: " << utt_; + // let's make sure client does this check + // TODO: exit gracefully + } else if (num_rows < chunk_size_) { + // KALDI_LOG << "Chunk size of " << chunk_size_ << " is greater than " + // << "the number of rows in utterance: " << utt_ + // << ", using chunk size of " << num_rows; + this_chunk_size = num_rows; + } else if (chunk_size_ == -1) { + this_chunk_size = num_rows; + } + + int32 num_chunks = ceil( + num_rows / static_cast(this_chunk_size)); + + // Iterate over the feature chunks. + for (int32 chunk_indx = 0; chunk_indx < num_chunks; chunk_indx++) { + // If we're nearing the end of the input, we may need to shift the + // offset back so that we can get this_chunk_size frames of input to + // the nnet. + int32 offset = std::min( + this_chunk_size, num_rows - chunk_indx * this_chunk_size); + if (offset < min_chunk_size_) + continue; + SubMatrix sub_features( + feats_, chunk_indx * this_chunk_size, offset, 0, feat_dim); + Vector xvector; + tot_weight_ += offset; + + RunNnetComputation(sub_features, *nnet_, &compiler_, &xvector); + + xvector_avg_.AddVec(offset, xvector); + } +} + + +XVectorExtractorParallelClass::~XVectorExtractorParallelClass () { + xvector_avg_.Scale(1.0 / tot_weight_); + xvector_writer_->Write(utt_, xvector_avg_); +} + + +void XVectorExtractorParallelClass::RunNnetComputation(const MatrixBase &features, + const Nnet &nnet, CachingOptimizingCompiler *compiler, + Vector *xvector) { + ComputationRequest request; + request.need_model_derivative = false; + request.store_component_stats = false; + request.inputs.push_back( + IoSpecification("input", 0, features.NumRows())); + IoSpecification output_spec; + output_spec.name = "output"; + output_spec.has_deriv = false; + output_spec.indexes.resize(1); + request.outputs.resize(1); + request.outputs[0].Swap(&output_spec); + std::shared_ptr computation = compiler->Compile(request); + Nnet *nnet_to_update = NULL; // we're not doing any update. + NnetComputer computer(NnetComputeOptions(), *computation, + nnet, nnet_to_update); + CuMatrix input_feats_cu(features); + computer.AcceptInput("input", &input_feats_cu); + computer.Run(); + CuMatrix cu_output; + computer.GetOutputDestructive("output", &cu_output); + xvector->Resize(cu_output.NumCols()); + xvector->CopyFromVec(cu_output.Row(0)); +} +} +// nnet3 +} +// diff --git a/src/nnet3/nnet-xvector-threaded.h b/src/nnet3/nnet-xvector-threaded.h new file mode 100644 index 00000000000..6397fac4ff9 --- /dev/null +++ b/src/nnet3/nnet-xvector-threaded.h @@ -0,0 +1,83 @@ +// nnet3/xvector. + +// Copyright 2017 Johns Hopkins University (author: Daniel Povey) +// 2017 Johns Hopkins University (author: Daniel Garcia-Romero) +// 2017 David Snyder +// 2018 Behavox Limited (author: Arseniy Gorin) + +// See ../../COPYING for clarification regarding multiple authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED +// WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, +// MERCHANTABLITY OR NON-INFRINGEMENT. +// See the Apache 2 License for the specific language governing permissions and +// limitations under the License. + + +#include "base/kaldi-common.h" +#include "util/common-utils.h" +#include "nnet3/nnet-am-decodable-simple.h" +#include "base/timer.h" +#include "nnet3/nnet-utils.h" + +namespace kaldi { +namespace nnet3 { + +class XVectorExtractorParallelClass { + +/* +This version is intended for multi-thread xvector extraction. +We allow compiler to be passed as a pointer. + +IMPORTANT NOTE: + +CachingOptimizingCompiler is not thread safe in terms of graph compilation. +To use this class without run-time errors with multiple threads, +one must make sure to pre-compile the graph cache in advance + +*/ + + public: + XVectorExtractorParallelClass( + const NnetSimpleComputationOptions &opts, + const Nnet &nnet, + CachingOptimizingCompiler *compiler, + std::string utt, + const int chunk_size, + const int min_chunk_size, + const Matrix &feats, + BaseFloatVectorWriter *xvector_writer + ); + + void operator () (); + + ~XVectorExtractorParallelClass (); + + private: + void DeletePointers(); + KALDI_DISALLOW_COPY_AND_ASSIGN(XVectorExtractorParallelClass); + + static void RunNnetComputation(const MatrixBase &features, + const Nnet &nnet, CachingOptimizingCompiler *compiler, + Vector *xvector); + const NnetSimpleComputationOptions opts_; + const Nnet *nnet_; + CachingOptimizingCompiler &compiler_; + std::string utt_; + int chunk_size_; + int min_chunk_size_; + Matrix feats_; + BaseFloatVectorWriter *xvector_writer_; + + BaseFloat tot_weight_; + Vector xvector_avg_; // (nnet_->OutputDim("output"), kSetZero); +}; + +}} diff --git a/src/nnet3bin/Makefile b/src/nnet3bin/Makefile index 3f39c361c65..117b369d5fc 100644 --- a/src/nnet3bin/Makefile +++ b/src/nnet3bin/Makefile @@ -18,7 +18,8 @@ BINFILES = nnet3-init nnet3-info nnet3-get-egs nnet3-copy-egs nnet3-subset-egs \ nnet3-discriminative-compute-objf nnet3-discriminative-train \ nnet3-discriminative-subset-egs nnet3-get-egs-simple \ nnet3-discriminative-compute-from-egs nnet3-latgen-faster-looped \ - nnet3-egs-augment-image nnet3-xvector-get-egs nnet3-xvector-compute + nnet3-egs-augment-image nnet3-xvector-get-egs nnet3-xvector-compute \ + nnet3-xvector-compute-parallel OBJFILES = diff --git a/src/nnet3bin/nnet3-xvector-compute-parallel.cc b/src/nnet3bin/nnet3-xvector-compute-parallel.cc new file mode 100644 index 00000000000..95c616390d3 --- /dev/null +++ b/src/nnet3bin/nnet3-xvector-compute-parallel.cc @@ -0,0 +1,189 @@ +// nnet3bin/nnet3-xvector-compute-parallel.cc + +// Copyright 2017 Johns Hopkins University (author: Daniel Povey) +// 2017 Johns Hopkins University (author: Daniel Garcia-Romero) +// 2017 David Snyder +// 2018 Behavox Limited (author: Arseniy Gorin) + +// See ../../COPYING for clarification regarding multiple authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED +// WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, +// MERCHANTABLITY OR NON-INFRINGEMENT. +// See the Apache 2 License for the specific language governing permissions and +// limitations under the License. + + +#include "base/kaldi-common.h" +#include "util/common-utils.h" +#include "nnet3/nnet-am-decodable-simple.h" +#include "base/timer.h" +#include "nnet3/nnet-utils.h" +#include "nnet3/nnet-xvector-threaded.h" +#include "util/kaldi-thread.h" + + +int main(int argc, char *argv[]) { + try { + using namespace kaldi; + using namespace kaldi::nnet3; + typedef kaldi::int32 int32; + typedef kaldi::int64 int64; + + const char *usage = + "Propagate features through an xvector neural network model and write\n" + "the output vectors. \"Xvector\" is our term for a vector or\n" + "embedding which is the output of a particular type of neural network\n" + "architecture found in speaker recognition. This architecture\n" + "consists of several layers that operate on frames, a statistics\n" + "pooling layer that aggregates over the frame-level representations\n" + "and possibly additional layers that operate on segment-level\n" + "representations. The xvectors are generally extracted from an\n" + "output layer after the statistics pooling layer. By default, one\n" + "xvector is extracted directly from the set of features for each\n" + "utterance. Optionally, xvectors are extracted from chunks of input\n" + "features and averaged, to produce a single vector.\n" + "\n" + "This version supports multi threading. WARNING: this works properly\n" + "only if you use with the same parameters \n" + "(to not allow re-compiling cache in multiple thread, as this operation is not thread-safe)\n" + "\n" + "Usage: nnet3-xvector-compute [options] " + " \n" + "e.g.: nnet3-xvector-compute final.raw scp:feats.scp " + "ark:nnet_prediction.ark\n" + "See also: nnet3-compute\n"; + + ParseOptions po(usage); + Timer timer; + + TaskSequencerConfig sequencer_config; // has --num-threads option + NnetSimpleComputationOptions opts; + + opts.acoustic_scale = 1.0; // by default do no scaling in this recipe. + + int32 chunk_size = -1, + min_chunk_size = 100, + chunk_sampling_rate = 1; + + opts.Register(&po); + sequencer_config.Register(&po); + + CachingOptimizingCompilerOptions compiler_config; + compiler_config.Register(&po); + po.Register("chunk-size", &chunk_size, + "If set, extracts xectors from specified chunk-size, and averages. " + "If not set, extracts an xvector from all available features."); + po.Register("min-chunk-size", &min_chunk_size, + "Minimum chunk-size allowed when extracting xvectors."); + po.Register("chunk-sampling-rate", &chunk_sampling_rate, + "Chunk size will be rounded to this number of frames (to take advantage of compiler cache)."); + + po.Read(argc, argv); + + if (po.NumArgs() < 3 || po.NumArgs() > 4) { + po.PrintUsage(); + exit(1); + } + + std::string cache_rxfilename = "", nnet_rxfilename, feature_rspecifier, vector_wspecifier; + + if (po.NumArgs() == 3) { + nnet_rxfilename = po.GetArg(1); + feature_rspecifier = po.GetArg(2); + vector_wspecifier = po.GetArg(3); + } + else { + cache_rxfilename = po.GetArg(1); + nnet_rxfilename = po.GetArg(2); + feature_rspecifier = po.GetArg(3); + vector_wspecifier = po.GetArg(4); + } + + Nnet nnet; + ReadKaldiObject(nnet_rxfilename, &nnet); + SetBatchnormTestMode(true, &nnet); + SetDropoutTestMode(true, &nnet); + CollapseModel(CollapseModelConfig(), &nnet); + + BaseFloatVectorWriter vector_writer(vector_wspecifier); + + int32 num_success = 0, num_fail = 0; + int64 frame_count = 0; + + TaskSequencer sequencer(sequencer_config); + SequentialBaseFloatMatrixReader feature_reader(feature_rspecifier); + + if (chunk_size > 0 and chunk_sampling_rate > 0) { + compiler_config.cache_capacity = (chunk_size - min_chunk_size) / chunk_sampling_rate + 1; + } + CachingOptimizingCompiler compiler(nnet, opts.optimize_config, compiler_config); + + if (cache_rxfilename != "") { + KALDI_LOG << "Reading cache from " << cache_rxfilename; + bool cache_binary_in; + Input ki(cache_rxfilename, &cache_binary_in); + compiler.ReadCache(ki.Stream(), cache_binary_in); + } + + for (; !feature_reader.Done(); feature_reader.Next()) { + std::string utt = feature_reader.Key(); + Matrix &features (feature_reader.Value()); + + // pad features to make sure chunk_sampling_rate is satisfied + int32 num_rows = features.NumRows(), + feat_dim = features.NumCols(); + + if (num_rows < min_chunk_size) { + KALDI_WARN << "Minimum chunk size of " << min_chunk_size + << " is greater than the number of rows " + << "in utterance: " << utt; + num_fail++; + continue; + } + + if (features.NumRows() == 0) { + KALDI_WARN << "Zero-length utterance: " << utt; + num_fail++; + continue; + } + + if (chunk_sampling_rate > 1) { + int32 feat_pad = chunk_sampling_rate - num_rows % chunk_sampling_rate; + if (feat_pad > 0){ + features.Resize(num_rows + feat_pad, feat_dim, kCopyData); + for (int32 i = 0; i < feat_pad; i++) { + for (int32 j = 0; j < feat_dim; j++) { + features(num_rows + i, j) = features(i, j); + } + } + } + } + + sequencer.Run(new XVectorExtractorParallelClass(opts, nnet, &compiler, utt, chunk_size, min_chunk_size, + features, &vector_writer)); + frame_count += features.NumRows(); + num_success++; + } + sequencer.Wait(); + double elapsed = timer.Elapsed(); + KALDI_LOG << "Time taken "<< elapsed + << "s: real-time factor assuming 100 frames/sec is " + << (elapsed*100.0/frame_count); + KALDI_LOG << "Done " << num_success << " utterances, failed for " + << num_fail; + + if (num_success != 0) return 0; + else return 1; + } catch(const std::exception &e) { + std::cerr << e.what(); + return -1; + } +}