diff --git a/ngraph_bridge/ngraph_encapsulate_impl.cc b/ngraph_bridge/ngraph_encapsulate_impl.cc index 4eb56dbaf..0931be42f 100644 --- a/ngraph_bridge/ngraph_encapsulate_impl.cc +++ b/ngraph_bridge/ngraph_encapsulate_impl.cc @@ -395,7 +395,6 @@ Status NGraphEncapsulateImpl::AllocateNGOutputTensors( current_dst_ptr, last_dst_ptr, last_ng_tensor, true, ng_exec, op_backend, ng_element_type, ng_shape, m_executable_can_create_tensor ? out_group_from_pipeline[i] : nullptr); - current_ng_tensor->set_stale(true); output_caches[i] = std::make_pair(current_dst_ptr, current_ng_tensor); ng_outputs.push_back(current_ng_tensor); @@ -416,18 +415,21 @@ std::shared_ptr NGraphEncapsulateImpl::GetCurrentNgTensor( // NOTE: we assume that TF's pointers WILL change if it actually changes // values. ie, it will not reuse the same space if its rewritten it bool tf_tensor_has_changed = current_tf_ptr != last_tf_ptr; + NGRAPH_VLOG(5) << "tf_tensor_has_changed: " << tf_tensor_has_changed; bool no_ng_tensor_found = last_ng_tensor == nullptr; - bool is_cpu = m_op_backend_name == "CPU"; + // m_op_backend_name might be BE:0, check if it starts with BE + bool is_cpu_or_nnpi = (m_op_backend_name.find("CPU") == 0) || + (m_op_backend_name.find("NNPI") == 0); // We need to check last_ng_tensor != nullptr, since there are cases where // at the first call to the ng_exec, both current_dst_ptr (when the // output is a 0-sized tensor) and last_dst_ptr (uninitialized at the // first call) are nullptr // A new tensor needs to be created for sure if no_ng_tensor_found - // Additionally for CPU, it needs to be created if tf_tensor_has_changed, + // Additionally for CPU/NNPI, it needs to be created if tf_tensor_has_changed, // for others, we do not create bool need_new_tensor_creation; - if (is_cpu) { + if (is_cpu_or_nnpi) { need_new_tensor_creation = no_ng_tensor_found || tf_tensor_has_changed; } else { need_new_tensor_creation = no_ng_tensor_found; @@ -449,7 +451,9 @@ std::shared_ptr NGraphEncapsulateImpl::GetCurrentNgTensor( current_ng_tensor = tensor_from_pipeline; } else { if (need_new_tensor_creation) { - if (is_cpu) { + if (is_cpu_or_nnpi) { + NGRAPH_VLOG(5) << "Backend creating tensor with pointer: " + << current_tf_ptr; current_ng_tensor = op_backend->create_tensor(ng_element_type, ng_shape, current_tf_ptr); } else { @@ -576,6 +580,36 @@ void NGraphEncapsulateImpl::DumpNgFunction( StringToFile(file_name, m_serialized_ng_function_map[ng_exec]); } +Status NGraphEncapsulateImpl::GetPersistentTFOutputTensor( + std::shared_ptr exec, + std::vector& tf_output_tensors) { + auto itr = m_out_persistents.find(exec); + if (itr == m_out_persistents.end()) { + return errors::Internal( + "Expected persistent tensor to be present in cache"); + } else { + tf_output_tensors = itr->second; + } + return Status::OK(); +} + +bool NGraphEncapsulateImpl::PersistentOutputsExist( + std::shared_ptr exec) { + return m_out_persistents.find(exec) != m_out_persistents.end(); +} + +Status NGraphEncapsulateImpl::RegisterPersistentOutputTensors( + std::shared_ptr exec, + std::vector persistent_tensors) { + auto itr = m_out_persistents.find(exec); + if (itr != m_out_persistents.end()) { + return errors::Internal( + "Found an entry already exists in the cache for persistent tensors"); + } + m_out_persistents.emplace(exec, persistent_tensors); + return Status::OK(); +} + } // namespace ngraph_bridge } // namespace tensorflow diff --git a/ngraph_bridge/ngraph_encapsulate_impl.h b/ngraph_bridge/ngraph_encapsulate_impl.h index b14bb7d77..d5fbe0e09 100644 --- a/ngraph_bridge/ngraph_encapsulate_impl.h +++ b/ngraph_bridge/ngraph_encapsulate_impl.h @@ -174,6 +174,18 @@ class NGraphEncapsulateImpl { m_serialized_ng_function_map.clear(); } + void ClearNgExecPersistentOutputCache() { m_out_persistents.clear(); } + + Status GetPersistentTFOutputTensor( + std::shared_ptr, + std::vector&); + + bool PersistentOutputsExist(std::shared_ptr); + + Status RegisterPersistentOutputTensors( + std::shared_ptr, + std::vector); + NGraphFreshnessTracker* GetNgraphFreshnessTracker() { return m_freshness_tracker; } @@ -249,6 +261,13 @@ class NGraphEncapsulateImpl { m_executable_pipelined_tensors_map; int m_depth{2}; // TODO make this settable + + // each executable (which comes from a new shape) corresponds to a vector of + // output tensors + // TODO: Should the vector store PersistentTensor or PersistentTensor* ? + std::unordered_map, + std::vector> + m_out_persistents; }; } // namespace ngraph_bridge diff --git a/ngraph_bridge/ngraph_encapsulate_op.cc b/ngraph_bridge/ngraph_encapsulate_op.cc index e6acce075..2759319a7 100644 --- a/ngraph_bridge/ngraph_encapsulate_op.cc +++ b/ngraph_bridge/ngraph_encapsulate_op.cc @@ -196,13 +196,22 @@ NGraphEncapsulateOp::NGraphEncapsulateOp(OpKernelConstruction* ctx) BackendManager::SetConfig(ng_encap_impl.GetOpBackend(), additional_attribute_map); - ng_encap_impl.SetExecCanCreateTensor( + // For NNPI (even though executable can create tensor) use backend to create + // tensor + // Keep the executable_can_create_tensors check before the + // backend_name!="NNPI" + bool executable_create_tensor = BackendManager::GetBackend(ng_encap_impl.GetOpBackend()) - ->executable_can_create_tensors()); + ->executable_can_create_tensors() && + (backend_name != "NNPI"); + ng_encap_impl.SetExecCanCreateTensor(executable_create_tensor); NGRAPH_VLOG(5) << "Executable can " << (ng_encap_impl.GetExecCanCreateTensor() ? "" : "not") << " create tensors"; + const char* not_persistent_flag = std::getenv("NGRAPH_TF_DISABLE_PERSISTENT"); + m_use_persistent = (not_persistent_flag == nullptr); + event.Stop(); ngraph::Event::write_trace(event); } @@ -262,6 +271,7 @@ NGraphEncapsulateOp::~NGraphEncapsulateOp() { ng_encap_impl.ClearNgExecMap(); ng_encap_impl.ClearNgExecPipelinedTensorMap(); ng_encap_impl.ClearNgExecSerializedFunctionCache(); + ng_encap_impl.ClearNgExecPersistentOutputCache(); // Release the backend NGRAPH_VLOG(2) << "~NGraphEncapsulateOp():: ReleaseBackend"; @@ -345,9 +355,20 @@ void NGraphEncapsulateOp::Compute(OpKernelContext* ctx) { // Allocate tensors for the output results. vector> ng_outputs; std::vector tf_output_tensors; + std::vector cached_persistent_output_tensors( + ng_exec->get_results().size()); + bool present_in_cache = false; { NG_TRACE("NGTF_Output_Alloc", ""); + if (m_use_persistent) { + present_in_cache = ng_encap_impl.PersistentOutputsExist(ng_exec); + if (present_in_cache) { + OP_REQUIRES_OK(ctx, ng_encap_impl.GetPersistentTFOutputTensor( + ng_exec, cached_persistent_output_tensors)); + } + } + for (auto i = 0; i < ng_exec->get_results().size(); i++) { auto ng_element = ng_exec->get_results()[i]; auto ng_shape = ng_element->get_shape(); @@ -360,12 +381,13 @@ void NGraphEncapsulateOp::Compute(OpKernelContext* ctx) { } TensorShape tf_shape(dims); Tensor* output_tensor = nullptr; - OP_REQUIRES_OK(ctx, ctx->allocate_output(i, tf_shape, &output_tensor)); - tf_output_tensors.push_back(output_tensor); // Make sure the nGraph-inferred element type agrees with what TensorFlow // expected. ng::element::Type expected_elem_type; + // TODO, we only need to do these checks once when the exec was + // created/compiled, not again and again + OP_REQUIRES_OK( ctx, TFDataTypeToNGraphElementType(ctx->expected_output_dtype(i), &expected_elem_type)); @@ -373,8 +395,26 @@ void NGraphEncapsulateOp::Compute(OpKernelContext* ctx) { ctx, ng_element_type == expected_elem_type, errors::Internal("Element type inferred by nGraph does not match " "the element type expected by TensorFlow")); - } + if (m_use_persistent) { + if (present_in_cache) { + output_tensor = cached_persistent_output_tensors[i].AccessTensor(ctx); + } else { + // create a persistent tensor + OP_REQUIRES_OK( + ctx, ctx->allocate_persistent( + ctx->expected_output_dtype(i), tf_shape, + &cached_persistent_output_tensors[i], &output_tensor)); + } + } else { + OP_REQUIRES_OK(ctx, ctx->allocate_output(i, tf_shape, &output_tensor)); + } + tf_output_tensors.push_back(output_tensor); + } + if (m_use_persistent && !present_in_cache) { + OP_REQUIRES_OK(ctx, ng_encap_impl.RegisterPersistentOutputTensors( + ng_exec, cached_persistent_output_tensors)); + } OP_REQUIRES_OK(ctx, ng_encap_impl.AllocateNGOutputTensors( tf_output_tensors, ng_exec, out_group_from_pipeline, op_backend, ng_outputs)); @@ -611,6 +651,16 @@ void NGraphEncapsulateOp::Compute(OpKernelContext* ctx) { exp.what(), "\n")); } } + + if (m_use_persistent) { + for (int out_idx = 0; out_idx < ng_exec->get_results().size(); out_idx++) { + OP_REQUIRES_OK(ctx, ng_encap_impl.GetPersistentTFOutputTensor( + ng_exec, cached_persistent_output_tensors)); + auto out_tensor = + cached_persistent_output_tensors[out_idx].AccessTensor(ctx); + ctx->set_output(out_idx, *out_tensor); + } + } } // end compute int NGraphEncapsulateImpl::s_instance_count = 0; diff --git a/ngraph_bridge/ngraph_encapsulate_op.h b/ngraph_bridge/ngraph_encapsulate_op.h index 86379e7e8..42b75f03f 100644 --- a/ngraph_bridge/ngraph_encapsulate_op.h +++ b/ngraph_bridge/ngraph_encapsulate_op.h @@ -43,6 +43,7 @@ class NGraphEncapsulateOp : public OpKernel { private: NGraphEncapsulateImpl ng_encap_impl; std::mutex m_compute_lock; + bool m_use_persistent; }; } // namespace ngraph_bridge diff --git a/test/tf_exec.cpp b/test/tf_exec.cpp index 9cea111de..65b1d7ccd 100644 --- a/test/tf_exec.cpp +++ b/test/tf_exec.cpp @@ -95,7 +95,16 @@ Status CreateSession(const string& graph_filename, const string& backend_name, return load_graph_status; } +// This test might fail when running with persistent output tensors +// Maybe because once we have computed outputs out to persistent tensors, +// the next thread comes in before Compare runs, and changes the values? +// For example, if we add a 1sec sleep right after entering Compute(), this test +// would pass (since we now allow enough time for compare to run before the next +// thread comes in and modifies the persistent tensor values) +// TODO: see how persistenttensors might fit in with this kind of multithreading +// (symmetric parallel) TEST(tf_exec, SingleGraphOn2Threads) { + SetEnvVariable("NGRAPH_TF_DISABLE_PERSISTENT", "1"); string graph_name = "test_axpy.pbtxt"; vector backends{"CPU", "INTERPRETER"}; for (auto be : backends) { @@ -136,6 +145,7 @@ TEST(tf_exec, SingleGraphOn2Threads) { thread0.join(); thread1.join(); } + UnsetEnvVariable("NGRAPH_TF_DISABLE_PERSISTENT"); } TEST(tf_exec, hello_world) {