diff --git a/bazel/BUILD b/bazel/BUILD index 7028b6a95..034ff0dec 100644 --- a/bazel/BUILD +++ b/bazel/BUILD @@ -48,6 +48,7 @@ cc_library( "ngraph_bridge/ngraph_tensor_manager.h", "ngraph_bridge/ngraph_timer.h", "ngraph_bridge/ngraph_utils.h", + "ngraph_bridge/ngraph_var.h", "ngraph_bridge/ngraph_version_utils.h", "ngraph_bridge/tf_deadness_analysis.h", "ngraph_bridge/tf_graphcycles.h", @@ -92,6 +93,7 @@ cc_library( "ngraph_bridge/ngraph_tensor_manager.cc", "ngraph_bridge/ngraph_tracked_variable.cc", "ngraph_bridge/ngraph_utils.cc", + "ngraph_bridge/ngraph_var.cc", "ngraph_bridge/tf_deadness_analysis.cc", "ngraph_bridge/tf_graphcycles.cc", "ngraph_bridge/ops/ngraph_ops.cc", diff --git a/ngraph_bridge/CMakeLists.txt b/ngraph_bridge/CMakeLists.txt index 18d218dad..eb104ae3b 100644 --- a/ngraph_bridge/CMakeLists.txt +++ b/ngraph_bridge/CMakeLists.txt @@ -57,6 +57,7 @@ set(SRC ngraph_rewrite_pass.cc ngraph_tensor_manager.cc ngraph_tracked_variable.cc + ngraph_var.cc ngraph_utils.cc tf_graphcycles.cc tf_deadness_analysis.cc @@ -86,7 +87,6 @@ if(NGRAPH_TF_ENABLE_VARIABLES_AND_OPTIMIZERS) list(APPEND SRC enable_variable_ops/ngraph_tracked_variable.cc) # new files - list(APPEND SRC enable_variable_ops/ngraph_var.cc) list(APPEND SRC enable_variable_ops/ngraph_assign_op.cc) list(APPEND SRC enable_variable_ops/ngraph_enter_in_catalog.cc) list(APPEND SRC enable_variable_ops/ngraph_remove_ngraphassigns.cc) diff --git a/ngraph_bridge/enable_variable_ops/ngraph_assign_op.cc b/ngraph_bridge/enable_variable_ops/ngraph_assign_op.cc index b9f041e8b..35099bbc7 100644 --- a/ngraph_bridge/enable_variable_ops/ngraph_assign_op.cc +++ b/ngraph_bridge/enable_variable_ops/ngraph_assign_op.cc @@ -25,11 +25,11 @@ #include "ngraph/event_tracing.hpp" #include "ngraph/runtime/backend.hpp" -#include "ngraph_bridge/enable_variable_ops/ngraph_var.h" #include "ngraph_bridge/ngraph_catalog.h" #include "ngraph_bridge/ngraph_freshness_tracker.h" #include "ngraph_bridge/ngraph_timer.h" #include "ngraph_bridge/ngraph_utils.h" +#include "ngraph_bridge/ngraph_var.h" using namespace std; namespace ng = ngraph; @@ -83,7 +83,7 @@ class NGraphAssignOp : public OpKernel { void Compute(OpKernelContext* context) override { std::ostringstream oss; - oss << "Execute: Assign_" << my_instance_id << ": " << name(); + oss << "NGAssign::Compute::" << name(); ngraph::Event event_compute(oss.str(), name(), ""); NGRAPH_VLOG(4) << "NGraphAssign:: Compute called for: " << def().name() diff --git a/ngraph_bridge/enable_variable_ops/ngraph_enter_in_catalog.cc b/ngraph_bridge/enable_variable_ops/ngraph_enter_in_catalog.cc index a456ef6e8..c96a4932e 100644 --- a/ngraph_bridge/enable_variable_ops/ngraph_enter_in_catalog.cc +++ b/ngraph_bridge/enable_variable_ops/ngraph_enter_in_catalog.cc @@ -160,15 +160,12 @@ Status EnterInCatalog(Graph* graph, int graph_id) { } } - // are there indexes that need copy - if (op_index_to_copy.size() > 0) { - try { - NGraphCatalog::AddToEncapOutputCopyIndexesMap(graph_id, node->name(), - op_index_to_copy); - } catch (const std::exception& exp) { - return errors::Internal( - "Caught exception while entering in catalog: ", exp.what(), "\n"); - } + try { + NGraphCatalog::AddToEncapOutputCopyIndexesMap(graph_id, node->name(), + op_index_to_copy); + } catch (const std::exception& exp) { + return errors::Internal("Caught exception while entering in catalog: ", + exp.what(), "\n"); } } // end of node is type NGraphEncapsulate diff --git a/ngraph_bridge/enable_variable_ops/ngraph_rewrite_pass.cc b/ngraph_bridge/enable_variable_ops/ngraph_rewrite_pass.cc index b764713ab..ea97ff417 100644 --- a/ngraph_bridge/enable_variable_ops/ngraph_rewrite_pass.cc +++ b/ngraph_bridge/enable_variable_ops/ngraph_rewrite_pass.cc @@ -30,6 +30,7 @@ #include "ngraph_bridge/ngraph_cluster_manager.h" #include "ngraph_bridge/ngraph_deassign_clusters.h" #include "ngraph_bridge/ngraph_encapsulate_clusters.h" +#include "ngraph_bridge/ngraph_enter_prefetch_in_catalog.h" #include "ngraph_bridge/ngraph_mark_for_clustering.h" #include "ngraph_bridge/ngraph_rewrite_for_tracking.h" #include "ngraph_bridge/ngraph_utils.h" @@ -255,6 +256,13 @@ class NGraphEncapsulationPass : public NGraphRewritePass { "Graph with NGraphAssigns Optimized/Removed"); } + // 8. Enter Prefetch in catalog then. + TF_RETURN_IF_ERROR(EnterPrefetchInCatalog(options.graph->get(), idx)); + if (DumpCatalogedGraphs()) { + DumpGraphs(options, idx, "prefetch-cataloged", + "Graph with Prefetched Inputs Entered in Catalog"); + } + return Status::OK(); } diff --git a/ngraph_bridge/enable_variable_ops/ngraph_tracked_variable.cc b/ngraph_bridge/enable_variable_ops/ngraph_tracked_variable.cc index c034d13c7..8b5b81f68 100644 --- a/ngraph_bridge/enable_variable_ops/ngraph_tracked_variable.cc +++ b/ngraph_bridge/enable_variable_ops/ngraph_tracked_variable.cc @@ -23,11 +23,11 @@ #include "ngraph/event_tracing.hpp" #include "ngraph/runtime/backend.hpp" -#include "ngraph_bridge/enable_variable_ops/ngraph_var.h" #include "ngraph_bridge/ngraph_backend_manager.h" #include "ngraph_bridge/ngraph_catalog.h" #include "ngraph_bridge/ngraph_freshness_tracker.h" #include "ngraph_bridge/ngraph_utils.h" +#include "ngraph_bridge/ngraph_var.h" using namespace std; namespace ng = ngraph; @@ -119,7 +119,7 @@ void NGraphVariableOp::Compute(OpKernelContext* ctx) { << " ,backend_name " << ng_backend_name_; std::ostringstream oss; - oss << "NGraphVariable: " << my_instance_id << ": " << name(); + oss << "NGVariable::Compute::" << name(); ngraph::Event event_compute(oss.str(), name(), ""); bool log_copies = false; @@ -250,6 +250,7 @@ void NGraphVariableOp::Compute(OpKernelContext* ctx) { ctx->record_persistent_memory_allocation(var->tensor()->AllocatedBytes()); } var->Unref(); + event_compute.Stop(); ngraph::Event::write_trace(event_compute); } diff --git a/ngraph_bridge/enable_variable_ops/ngraph_variable_modifiers.cc b/ngraph_bridge/enable_variable_ops/ngraph_variable_modifiers.cc index 5fc190bea..376a596a9 100644 --- a/ngraph_bridge/enable_variable_ops/ngraph_variable_modifiers.cc +++ b/ngraph_bridge/enable_variable_ops/ngraph_variable_modifiers.cc @@ -26,12 +26,12 @@ #include "ngraph/runtime/backend.hpp" -#include "ngraph_bridge/enable_variable_ops/ngraph_var.h" #include "ngraph_bridge/ngraph_backend_manager.h" #include "ngraph_bridge/ngraph_catalog.h" #include "ngraph_bridge/ngraph_freshness_tracker.h" #include "ngraph_bridge/ngraph_timer.h" #include "ngraph_bridge/ngraph_utils.h" +#include "ngraph_bridge/ngraph_var.h" using namespace std; namespace ng = ngraph; diff --git a/ngraph_bridge/enable_variable_ops/ngraph_variable_update_ng_tensor_op.cc b/ngraph_bridge/enable_variable_ops/ngraph_variable_update_ng_tensor_op.cc index fdb432f79..8755f6f76 100644 --- a/ngraph_bridge/enable_variable_ops/ngraph_variable_update_ng_tensor_op.cc +++ b/ngraph_bridge/enable_variable_ops/ngraph_variable_update_ng_tensor_op.cc @@ -24,10 +24,10 @@ #include "ngraph/event_tracing.hpp" -#include "ngraph_bridge/enable_variable_ops/ngraph_var.h" #include "ngraph_bridge/enable_variable_ops/ngraph_variable_update_ng_tensor_op.h" #include "ngraph_bridge/ngraph_timer.h" #include "ngraph_bridge/ngraph_utils.h" +#include "ngraph_bridge/ngraph_var.h" using namespace std; namespace ng = ngraph; @@ -67,6 +67,7 @@ NGraphVariableUpdateNGTensorOp::~NGraphVariableUpdateNGTensorOp() { void NGraphVariableUpdateNGTensorOp::Compute(OpKernelContext* context) { std::ostringstream oss; // Start event tracing + oss << "NGVariableUpdateNGTensor::Compute::" << name(); ngraph::Event event_compute(oss.str(), name(), ""); bool log_copies = false; OP_REQUIRES_OK(context, diff --git a/ngraph_bridge/ngraph_encapsulate_impl.cc b/ngraph_bridge/ngraph_encapsulate_impl.cc index 7823f0a7d..f2ddf1ecd 100644 --- a/ngraph_bridge/ngraph_encapsulate_impl.cc +++ b/ngraph_bridge/ngraph_encapsulate_impl.cc @@ -45,8 +45,8 @@ #include "ngraph_bridge/ngraph_timer.h" #include "ngraph_bridge/ngraph_utils.h" +#include "ngraph_bridge/ngraph_var.h" #if defined(NGRAPH_TF_ENABLE_VARIABLES_AND_OPTIMIZERS) -#include "ngraph_bridge/enable_variable_ops/ngraph_var.h" #include "ngraph_bridge/ngraph_catalog.h" #endif diff --git a/ngraph_bridge/ngraph_encapsulate_op.cc b/ngraph_bridge/ngraph_encapsulate_op.cc index 9a48d8c92..4605757ae 100644 --- a/ngraph_bridge/ngraph_encapsulate_op.cc +++ b/ngraph_bridge/ngraph_encapsulate_op.cc @@ -49,9 +49,9 @@ #include "ngraph_bridge/ngraph_prefetch_shared_data.h" #include "ngraph_bridge/ngraph_timer.h" #include "ngraph_bridge/ngraph_utils.h" +#include "ngraph_bridge/ngraph_var.h" #if defined(NGRAPH_TF_ENABLE_VARIABLES_AND_OPTIMIZERS) -#include "ngraph_bridge/enable_variable_ops/ngraph_var.h" #include "ngraph_bridge/ngraph_catalog.h" #endif @@ -88,13 +88,8 @@ NGraphEncapsulateOp::NGraphEncapsulateOp(OpKernelConstruction* ctx) ctx, backend != nullptr, errors::Internal("Cannot get the backend object for BE: ", be_name)); -// If we have the VARIABLE capture on then we can't use the -// parallel executor until that support is added. -#if !defined(NGRAPH_TF_ENABLE_VARIABLES_AND_OPTIMIZERS) + // If backend executable can create tensors we use parallel executor m_use_parallel_executor = backend->executable_can_create_tensors(); -#else - m_use_parallel_executor = false; -#endif // Override the switch for debugging/testing if (std::getenv("NGRAPH_TF_USE_LEGACY_EXECUTOR") != nullptr) { @@ -402,7 +397,7 @@ NGraphEncapsulateOp::~NGraphEncapsulateOp() { // OpKernel::Compute //--------------------------------------------------------------------------- void NGraphEncapsulateOp::Compute(OpKernelContext* ctx) { - ngraph::Event event_compute("Compute", "", ""); + ngraph::Event event_compute("NGEncap::Compute::" + name(), name(), ""); if (m_use_parallel_executor) { NGRAPH_VLOG(1) << "NGraphEncapsulateOp::Compute: Using Parallel Executor"; @@ -459,6 +454,7 @@ void NGraphEncapsulateOp::ComputeUsingParallelExecutor(OpKernelContext* ctx) { m_parallel_executor->GetTensorPipelineDepth())); // Get Tensor Manager and some error checking + ngraph::Event event_prepare_ng_tensors("Prepare NG In/Out Tensors", "", ""); auto tensor_manager = m_parallel_executor->GetTensorManager(); int num_of_inputs = tensor_manager->GetNumberOfInputs(); int num_of_outputs = tensor_manager->GetNumberOfOutputs(); @@ -499,14 +495,18 @@ void NGraphEncapsulateOp::ComputeUsingParallelExecutor(OpKernelContext* ctx) { vector> ng_inputs(num_of_inputs); vector> ng_outputs(num_of_outputs); - // All inputs and outputs are pipelined. - // Of all these pipelined inputs some are prefetched - // TODO: Fit in variables - ng_inputs = get<1>(pipelined_io_tensors); - ng_outputs = get<2>(pipelined_io_tensors); + // Prepare NG Input Output Tensors + // Assemble Variable tensors and pipelined tensors to ng_input and ng_outputs + OP_REQUIRES_OK(ctx, GetIOTensorsReadyForExecution( + ctx, tensor_manager, get<1>(pipelined_io_tensors), + get<2>(pipelined_io_tensors), ng_inputs, ng_outputs)); + event_prepare_ng_tensors.Stop(); + ngraph::Event::write_trace(event_prepare_ng_tensors); // And execute - ngraph::Event event_execute_graph("Execute Graph", "", ""); + ngraph::Event event_execute_graph( + "Execute Graph Pipeline Indx" + to_string(current_iter_pipeline_depth), + "", ""); BackendManager::LockBackend(m_parallel_executor->GetOpBackendName()); NGRAPH_VLOG(4) << "NGraphEncapsulateOp::Compute call starting for cluster " @@ -540,12 +540,14 @@ void NGraphEncapsulateOp::ComputeUsingParallelExecutor(OpKernelContext* ctx) { ngraph::Event::write_trace(event_execute_graph); // Now prepare the output - ngraph::Event event_copy_output_tensor("Copy Output Tensor", "", ""); + // Allocate TF Tensors + NGRAPH_VLOG(4) << "NGraphEncapsulateOp::Compute Allocating TF Output Tensors " + << m_parallel_executor->GetNgraphClusterId(); - std::vector> output_copy_events; + ngraph::Event event_prepare_tf_output_tensors("Prepare TF Output Tensor", "", + ""); + vector tf_output_tensors; for (auto i = 0; i < ng_exec->get_results().size(); i++) { - std::unique_ptr event_copy_prep( - new ngraph::Event("Copy Prep", "", "")); auto ng_element = ng_exec->get_results()[i]; auto ng_shape = ng_element->get_shape(); auto ng_element_type = ng_element->get_element_type(); @@ -558,7 +560,7 @@ void NGraphEncapsulateOp::ComputeUsingParallelExecutor(OpKernelContext* ctx) { TensorShape tf_shape(dims); Tensor* tf_output_tensor = nullptr; OP_REQUIRES_OK(ctx, ctx->allocate_output(i, tf_shape, &tf_output_tensor)); - + tf_output_tensors.push_back(tf_output_tensor); // Make sure the nGraph-inferred element type agrees with what TensorFlow // expected. ng::element::Type expected_elem_type; @@ -569,28 +571,45 @@ void NGraphEncapsulateOp::ComputeUsingParallelExecutor(OpKernelContext* ctx) { ctx, ng_element_type == expected_elem_type, errors::Internal("Element type inferred by nGraph does not match " "the element type expected by TensorFlow")); - event_copy_prep->Stop(); - output_copy_events.push_back(std::move(event_copy_prep)); + } - // Now copy the nGraph Tensor to Host Tensor - std::unique_ptr event_copy_d2h( - new ngraph::Event("Device to Host Copy", "", "")); - void* dst_ptr = DMAHelper::base(tf_output_tensor); + // Copy Tensors that are required + NGRAPH_VLOG(4) << "NGraphEncapsulateOp::Compute Read NG Output Tensors " + << m_parallel_executor->GetNgraphClusterId(); - ng_outputs[i]->read( - dst_ptr, ng_outputs[i]->get_element_count() * ng_element_type.size()); + std::vector> output_copy_events; + + auto output_indexes_to_be_copied = + tensor_manager->GetOutputIndexesThatNeedCopy(); + for (auto output_index : output_indexes_to_be_copied) { + // Copy the nGraph Tensor to Host Tensor + std::unique_ptr event_copy_d2h(new ngraph::Event( + "D2H_Output_" + std::to_string(output_index), "", "")); + void* dst_ptr = (void*)DMAHelper::base(tf_output_tensors[output_index]); + ng_outputs[output_index]->read( + dst_ptr, ng_outputs[output_index]->get_element_count() * + ng_outputs[output_index]->get_element_type().size()); event_copy_d2h->Stop(); output_copy_events.push_back(std::move(event_copy_d2h)); } - for (auto& next : output_copy_events) { ngraph::Event::write_trace(*next.get()); } + event_prepare_tf_output_tensors.Stop(); + ngraph::Event::write_trace(event_prepare_tf_output_tensors); - event_copy_output_tensor.Stop(); - ngraph::Event::write_trace(event_copy_output_tensor); + // Synch Var Output Tensors as required + NGRAPH_VLOG(4) + << "NGraphEncapsulateOp::Compute Sync NG Output Variable Tensors " + << m_parallel_executor->GetNgraphClusterId(); + ngraph::Event event_update_ngvar_tensors("Update NGVar Tensors", "", ""); + OP_REQUIRES_OK(ctx, SyncOutputVarTensors(ctx, tensor_manager)); + event_update_ngvar_tensors.Stop(); + ngraph::Event::write_trace(event_update_ngvar_tensors); // Now return them to the cache + NGRAPH_VLOG(4) << "NGraphEncapsulateOp::Returning Tensors " + << m_parallel_executor->GetNgraphClusterId(); ngraph::Event event_return_tensor("Return Tensor", "", ""); pipelined_tensor_store->return_tensors(current_iter_pipeline_depth); diff --git a/ngraph_bridge/ngraph_encapsulate_op_utils.cc b/ngraph_bridge/ngraph_encapsulate_op_utils.cc index 51eca36de..d12494e45 100644 --- a/ngraph_bridge/ngraph_encapsulate_op_utils.cc +++ b/ngraph_bridge/ngraph_encapsulate_op_utils.cc @@ -18,17 +18,22 @@ #include "ngraph_bridge/ngraph_prefetch_shared_data.h" #include "ngraph_bridge/ngraph_utils.h" +#include "ngraph_bridge/ngraph_var.h" + using namespace std; namespace tensorflow { namespace ngraph_bridge { +//--------------------------------------------------------------------------- +// GetPipelinedIOTensorsReadyForExecution +//--------------------------------------------------------------------------- Status GetPipelinedIOTensorsReadyForExecution( - OpKernelContext* ctx, std::vector& tf_input_tensors, - shared_ptr& pipelined_tensor_store, - shared_ptr& tensor_manager, - std::tuple& + OpKernelContext* ctx, const vector& tf_input_tensors, + const shared_ptr& pipelined_tensor_store, + const shared_ptr& tensor_manager, + tuple& pipelined_io_tensors) { auto io_tensors = pipelined_tensor_store->get_tensors(); @@ -84,7 +89,7 @@ Status GetPipelinedIOTensorsReadyForExecution( tensor_manager->GetInputIndexesForPrefetchSharedObject()); // Get the set of IO tensors for the next iteration - std::tuple + tuple io_tensors_next_iter; io_tensors_next_iter = pipelined_tensor_store->get_tensors(); @@ -154,18 +159,21 @@ Status GetPipelinedIOTensorsReadyForExecution( // Allocate the input/ ngraph::Event event_copy_input_tensor("Copy Pipelined Input Tensors", "", ""); - + std::vector> input_write_events; if (!skip_tf2ng_copy) { // All pipelined inputs are copied for (auto i = 0; i < pipelined_input_indexes.size(); i++) { int tf_index = pipelined_input_indexes[i]; - ng::element::Type ng_element_type; TF_RETURN_IF_ERROR(TFDataTypeToNGraphElementType( tf_input_tensors[tf_index].dtype(), &ng_element_type)); void* current_src_ptr = (void*)DMAHelper::base(&tf_input_tensors[tf_index]); + + std::unique_ptr event_copy_h2d( + new ngraph::Event("H2D_Input_" + std::to_string(tf_index), "", "")); + try { ng_pipelined_inputs[i]->write( current_src_ptr, ng_pipelined_inputs[i]->get_element_count() * @@ -176,6 +184,8 @@ Status GetPipelinedIOTensorsReadyForExecution( } catch (...) { return errors::Internal("Error copying TF tensor to device tensor"); } + event_copy_h2d->Stop(); + input_write_events.push_back(std::move(event_copy_h2d)); } } else { // All pipelined inputs that are not prefetched are copied @@ -199,19 +209,27 @@ Status GetPipelinedIOTensorsReadyForExecution( tf_input_tensors[tf_index].dtype(), &ng_element_type)); void* current_src_ptr = (void*)DMAHelper::base(&tf_input_tensors[tf_index]); + unique_ptr event_copy_h2d( + new ngraph::Event("H2D_Input_" + to_string(tf_index), "", "")); try { ng_pipelined_inputs[ng_index]->write( current_src_ptr, ng_pipelined_inputs[ng_index]->get_element_count() * ng_element_type.size()); - } catch (const std::exception& exp) { + } catch (const exception& exp) { return errors::Internal("Error copying TF tensor to device tensor: ", exp.what()); } catch (...) { return errors::Internal("Error copying TF tensor to device tensor"); } + event_copy_h2d->Stop(); + input_write_events.push_back(move(event_copy_h2d)); } } + + for (auto& next : input_write_events) { + ngraph::Event::write_trace(*next.get()); + } event_copy_input_tensor.Stop(); ngraph::Event::write_trace(event_copy_input_tensor); @@ -221,5 +239,101 @@ Status GetPipelinedIOTensorsReadyForExecution( return Status::OK(); } +//--------------------------------------------------------------------------- +// GetTensorFromContext +//--------------------------------------------------------------------------- +Status GetTensorFromContext(const OpKernelContext* ctx, + const string& shared_name, + shared_ptr& ng_tensor) { + // Get shared name from tensor manager + NGraphVar* var; + TF_RETURN_IF_ERROR(ctx->resource_manager()->Lookup( + ctx->resource_manager()->default_container(), shared_name, &var)); + ng_tensor = var->ng_tensor(); + var->Unref(); + return Status::OK(); +} + +//--------------------------------------------------------------------------- +// GetIOTensorsReadyForExecution +//--------------------------------------------------------------------------- +Status GetIOTensorsReadyForExecution( + OpKernelContext* ctx, const shared_ptr& tensor_manager, + const PipelinedTensorVector& pipelined_in_tensors, + const PipelinedTensorVector& pipelined_out_tensors, + vector>& ng_inputs, + vector>& ng_outputs) { + // Get Variables that are inputs + auto var_input_indexes = tensor_manager->GetInputIndexesFedByVariables(); + for (int input_index : var_input_indexes) { + string shared_name; + TF_RETURN_IF_ERROR( + tensor_manager->GetInputVariableSharedName(input_index, &shared_name)); + TF_RETURN_IF_ERROR( + GetTensorFromContext(ctx, shared_name, ng_inputs[input_index])); + } + + // Get Variables that are outputs + auto var_output_indexes = + tensor_manager->GetOutputIndexesAssigningVariables(); + for (int output_index : var_output_indexes) { + string shared_name; + TF_RETURN_IF_ERROR(tensor_manager->GetOutputVariableSharedName( + output_index, &shared_name)); + TF_RETURN_IF_ERROR( + GetTensorFromContext(ctx, shared_name, ng_outputs[output_index])); + } + + // Fit Pipelined Input Tensors + auto pipelined_input_indexes = tensor_manager->GetPipelinedInputIndexes(); + for (int i = 0; i < pipelined_input_indexes.size(); i++) { + int input_index = pipelined_input_indexes[i]; + ng_inputs[input_index] = pipelined_in_tensors[i]; + } + + // Fit Pipelined Output Tensors + auto pipelined_output_indexes = tensor_manager->GetPipelinedOutputIndexes(); + for (int i = 0; i < pipelined_output_indexes.size(); i++) { + int output_index = pipelined_output_indexes[i]; + ng_outputs[output_index] = pipelined_out_tensors[i]; + } + + return Status::OK(); +} + +//--------------------------------------------------------------------------- +// SyncOutputVarTensors +//--------------------------------------------------------------------------- +Status SyncOutputVarTensors( + const OpKernelContext* ctx, + const shared_ptr& tensor_manager) { + // Get Variables that are outputs + auto var_output_indexes = + tensor_manager->GetOutputIndexesAssigningVariables(); + NGRAPH_VLOG(4) << "output indexes size " << var_output_indexes.size(); + + for (int output_index : var_output_indexes) { + bool copy_to_tf; + TF_RETURN_IF_ERROR( + tensor_manager->GetOutputVariableCopyToTF(output_index, ©_to_tf)); + + if (copy_to_tf) { + NGRAPH_VLOG(4) << "Sync NG Output Variable Tensors " << output_index; + // Get shared name from tensor manager + string shared_name; + TF_RETURN_IF_ERROR(tensor_manager->GetOutputVariableSharedName( + output_index, &shared_name)); + NGraphVar* var; + TF_RETURN_IF_ERROR(ctx->resource_manager()->Lookup( + ctx->resource_manager()->default_container(), shared_name, &var)); + // update tensor + var->copy_ng_to_tf(); + var->Unref(); + NGRAPH_VLOG(4) << "Sync Completed " << output_index; + } + } + return Status::OK(); +} + } // namespace ngraph_bridge } // namespace tensorflow diff --git a/ngraph_bridge/ngraph_encapsulate_op_utils.h b/ngraph_bridge/ngraph_encapsulate_op_utils.h index 7f48eb09c..1a6df4ede 100644 --- a/ngraph_bridge/ngraph_encapsulate_op_utils.h +++ b/ngraph_bridge/ngraph_encapsulate_op_utils.h @@ -46,12 +46,44 @@ namespace ngraph_bridge { // Status GetPipelinedIOTensorsReadyForExecution( - OpKernelContext* ctx, vector& tf_input_tensors, - shared_ptr& pipelined_tensor_store, - shared_ptr& tensor_manager, + OpKernelContext* ctx, const vector& tf_input_tensors, + const shared_ptr& pipelined_tensor_store, + const shared_ptr& tensor_manager, tuple& pipelined_io_tensors); +// Assembles the different types of input and output tensors +// Variable tensors and pipelined tensors are put together in the right order +// into ng_inputs and ng_outputs +// 1. For input indexes that are fed by variables, get the variable tensors from +// context +// 2. For output indexes that are updating variables, get the variable tensors +// from context +// This enable update-in-place +// 3. For input and output indexes that are pipelined, get the respective tensor +// +Status GetIOTensorsReadyForExecution( + OpKernelContext* ctx, const shared_ptr& tensor_manager, + const PipelinedTensorVector& pipelined_in_tensors, + const PipelinedTensorVector& pipelined_out_tensors, + vector>& ng_inputs, + vector>& ng_outputs); + +// Gets the Tensor from OpKernelContext's Container for the given shared_name +Status GetTensorFromContext(const OpKernelContext* ctx, + const string& shared_name, + shared_ptr& ng_tensor); + +// Encapsulate Op updates the NGVariable's device tensor in-place +// ie. the NGVariable's backend tensor is updated +// Some of these Variables may be required by the TF ops and they will use the +// host tensor +// These were marked as "copy-to-tf" True in the Rewrite Phase +// We will update these tensors here +Status SyncOutputVarTensors( + const OpKernelContext* ctx, + const shared_ptr& tensor_manager); + } // namespace ngraph_bridge } // namespace tensorflow diff --git a/ngraph_bridge/ngraph_enter_prefetch_in_catalog.h b/ngraph_bridge/ngraph_enter_prefetch_in_catalog.h index d7ab8cc9c..534166aa1 100644 --- a/ngraph_bridge/ngraph_enter_prefetch_in_catalog.h +++ b/ngraph_bridge/ngraph_enter_prefetch_in_catalog.h @@ -13,8 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ -#ifndef NGRAPH_TF_ENTER_IN_CATALOG_H_ -#define NGRAPH_TF_ENTER_IN_CATALOG_H_ +#ifndef NGRAPH_TF_ENTER_PREFETCH_IN_CATALOG_H_ +#define NGRAPH_TF_ENTER_PREFETCH_IN_CATALOG_H_ #pragma once #include "tensorflow/core/graph/graph.h" diff --git a/ngraph_bridge/ngraph_executor.cc b/ngraph_bridge/ngraph_executor.cc index 37e1b8b40..7d4fe2c2a 100644 --- a/ngraph_bridge/ngraph_executor.cc +++ b/ngraph_bridge/ngraph_executor.cc @@ -43,9 +43,9 @@ #include "ngraph_bridge/ngraph_mark_for_clustering.h" #include "ngraph_bridge/ngraph_timer.h" #include "ngraph_bridge/ngraph_utils.h" +#include "ngraph_bridge/ngraph_var.h" #if defined(NGRAPH_TF_ENABLE_VARIABLES_AND_OPTIMIZERS) -#include "ngraph_bridge/enable_variable_ops/ngraph_var.h" #include "ngraph_bridge/ngraph_catalog.h" #endif diff --git a/ngraph_bridge/ngraph_prefetch_dataset_op.cc b/ngraph_bridge/ngraph_prefetch_dataset_op.cc index 18b946191..7c131bcce 100644 --- a/ngraph_bridge/ngraph_prefetch_dataset_op.cc +++ b/ngraph_bridge/ngraph_prefetch_dataset_op.cc @@ -415,14 +415,15 @@ class NGraphPrefetchDatasetOp::Dataset : public DatasetBase { ngraph_bridge::NGraphPrefetchSharedResouce::RESOURCE_NAME, &shared_data); if (s.ok()) { - ngraph::Event evt_dev_cp("Prf Dev Copy", "Copy", ""); shared_data->SetBufferDepth(m_buffer_size); auto ng_input_tensor_bundle = shared_data->GetNextIOTensorBundleForDeviceTransfer(); auto ng_prefetch_input_indexes_map = shared_data->GetPrefetchInputIndexesMap(); - + ngraph::Event evt_dev_cp( + "Prf Dev Copy: Pipe_Ind_" + to_string(ng_input_tensor_bundle.Id), + "Copy", ""); int number_of_buffer_elements = buffer_element.value.size(); if (number_of_buffer_elements != ng_prefetch_input_indexes_map.size()) { @@ -433,7 +434,8 @@ class NGraphPrefetchDatasetOp::Dataset : public DatasetBase { "encap " + to_string(ng_prefetch_input_indexes_map.size())); } - + std::vector> + prefetch_input_write_events; // Write to these tensors for (auto itr : ng_prefetch_input_indexes_map) { int ng_index = itr.first; @@ -445,6 +447,8 @@ class NGraphPrefetchDatasetOp::Dataset : public DatasetBase { void* current_src_ptr = (void*)DMAHelper::base(&buffer_element.value[tf_index]); + std::unique_ptr event_copy_h2d(new ngraph::Event( + "H2D_PrefetchInput_" + std::to_string(tf_index), "Copy", "")); try { NGRAPH_VLOG(2) << "[PREFETCH] INPUT tensor being written by Prefetch: " @@ -459,6 +463,12 @@ class NGraphPrefetchDatasetOp::Dataset : public DatasetBase { throw std::runtime_error( "Error copying TF tensor to device tensor"); } + event_copy_h2d->Stop(); + prefetch_input_write_events.push_back(std::move(event_copy_h2d)); + } + + for (auto& next : prefetch_input_write_events) { + ngraph::Event::write_trace(*next.get()); } // Now add them back to the other queue diff --git a/ngraph_bridge/ngraph_tensor_manager.cc b/ngraph_bridge/ngraph_tensor_manager.cc index 518ae96fe..116c213ec 100644 --- a/ngraph_bridge/ngraph_tensor_manager.cc +++ b/ngraph_bridge/ngraph_tensor_manager.cc @@ -42,7 +42,6 @@ NGraphTensorManager::NGraphTensorManager(const string ng_encap_node_name, void NGraphTensorManager::Initialize() { #if defined(NGRAPH_TF_ENABLE_VARIABLES_AND_OPTIMIZERS) - // input variables book-keeping for (int index = 0; index < m_number_of_inputs; index++) { if (NGraphCatalog::ExistsInInputVariableSharedNameMap( @@ -86,6 +85,17 @@ void NGraphTensorManager::Initialize() { m_output_indexes_that_need_copy.push_back(index); } } + + // For graphs that were run through AOT + // Graph rewrite is not done, and there is no entry in catalog + // If there is no entry in catalog all outputs need to be copied + if (!NGraphCatalog::EncapOutputNeedsCopy(m_ng_encap_graph_id, + m_ng_encap_node_name)) { + m_output_indexes_that_need_copy.resize(m_number_of_outputs); + iota(begin(m_output_indexes_that_need_copy), + end(m_output_indexes_that_need_copy), 0); + } + #else m_output_indexes_that_need_copy.resize(m_number_of_outputs); iota(begin(m_output_indexes_that_need_copy), @@ -140,6 +150,40 @@ void NGraphTensorManager::Initialize() { FindComplement(m_pipelined_input_indexes, m_prefetched_input_indexes); } +//--------------------------------------------------------------------------- +// NGraphTensorManager::Print +//--------------------------------------------------------------------------- +void NGraphTensorManager::Print() { + auto PrintVector = [](const vector& input_vector, const string title) { + cout << title << endl; + cout << ng::join(input_vector) << endl; + }; + + cout << "** NGEncapsulate TensorManager:" << m_ng_encap_node_name << " **" + << endl; + + cout << "** Variables Related **" << endl; + PrintVector(m_input_indexes_from_variables, "Input Indexes from Variables"); + PrintVector(m_output_indexes_assigning_variable, + "Output Indexes Referring to Variables"); + PrintVector(m_output_indexes_that_need_copy, "Output Indexes to be Read"); + + cout << "** Pipelined **" << endl; + PrintVector(m_pipelined_input_indexes, "Pipelined Input Indexes"); + PrintVector(m_pipelined_output_indexes, "Pipelined Output Indexes"); + + cout << "** Prefetched **" << endl; + PrintVector(m_prefetched_input_indexes, "Prefetched Input Indexes"); + PrintVector(m_pipelined_not_prefetched_input_indexes, + "Pipelined But Not Prefetched Input Indexes"); + + cout << "** Prefetched wrt pipelined indexes **" << endl; + PrintVector(m_pipelined_input_indexes_that_are_prefetched, + "Prefetched Input Indexes wrt Pipelined Inputs"); + PrintVector(m_pipelined_input_indexes_that_are_not_prefetched, + "Not Prefetched Input Indexes wrt Pipelined Inputs"); +} + //--------------------------------------------------------------------------- // NGraphTensorManager::~NGraphTensorManager //--------------------------------------------------------------------------- diff --git a/ngraph_bridge/ngraph_tensor_manager.h b/ngraph_bridge/ngraph_tensor_manager.h index 9143241fb..73f2ca9d4 100644 --- a/ngraph_bridge/ngraph_tensor_manager.h +++ b/ngraph_bridge/ngraph_tensor_manager.h @@ -109,6 +109,8 @@ class NGraphTensorManager { Status GetOutputVariableCopyToTF(const int& output_index, bool* output_var_copy_to_tf); + void Print(); + private: void Initialize(); string m_ng_encap_node_name; diff --git a/ngraph_bridge/ngraph_tracked_variable.cc b/ngraph_bridge/ngraph_tracked_variable.cc index bf277b6c1..22b1e584e 100644 --- a/ngraph_bridge/ngraph_tracked_variable.cc +++ b/ngraph_bridge/ngraph_tracked_variable.cc @@ -60,7 +60,6 @@ class NGraphVar : public ResourceBase { private: mutex mu_; Tensor tensor_; - ~NGraphVar() override {} }; @@ -108,7 +107,7 @@ NGraphVariableOp::~NGraphVariableOp() { tracker_->Unref(); } void NGraphVariableOp::Compute(OpKernelContext* ctx) { mutex_lock l(init_mu_); std::ostringstream oss; - oss << "NGraphVariable: " << my_instance_id << ": " << name(); + oss << "NGVariable::Compute::" << name(); ngraph::Event event_compute(oss.str(), name(), ""); if (!initialized_) { @@ -182,6 +181,7 @@ void NGraphVariableOp::Compute(OpKernelContext* ctx) { ctx->record_persistent_memory_allocation(var->tensor()->AllocatedBytes()); } var->Unref(); + event_compute.Stop(); ngraph::Event::write_trace(event_compute); } diff --git a/ngraph_bridge/enable_variable_ops/ngraph_var.cc b/ngraph_bridge/ngraph_var.cc similarity index 98% rename from ngraph_bridge/enable_variable_ops/ngraph_var.cc rename to ngraph_bridge/ngraph_var.cc index efab9e7c0..1fa6001bf 100644 --- a/ngraph_bridge/enable_variable_ops/ngraph_var.cc +++ b/ngraph_bridge/ngraph_var.cc @@ -24,10 +24,10 @@ #include "ngraph/event_tracing.hpp" #include "ngraph/runtime/backend.hpp" -#include "ngraph_bridge/enable_variable_ops/ngraph_var.h" #include "ngraph_bridge/ngraph_backend_manager.h" #include "ngraph_bridge/ngraph_freshness_tracker.h" #include "ngraph_bridge/ngraph_utils.h" +#include "ngraph_bridge/ngraph_var.h" using namespace std; namespace ng = ngraph; diff --git a/ngraph_bridge/enable_variable_ops/ngraph_var.h b/ngraph_bridge/ngraph_var.h similarity index 100% rename from ngraph_bridge/enable_variable_ops/ngraph_var.h rename to ngraph_bridge/ngraph_var.h diff --git a/test/graph_rewrites/test_ng_var_update_ng_tensor.cc b/test/graph_rewrites/test_ng_var_update_ng_tensor.cc index 0af2c7a57..924c54266 100644 --- a/test/graph_rewrites/test_ng_var_update_ng_tensor.cc +++ b/test/graph_rewrites/test_ng_var_update_ng_tensor.cc @@ -23,10 +23,10 @@ #include "tensorflow/core/platform/test.h" #include "logging/tf_graph_writer.h" -#include "ngraph_bridge/enable_variable_ops/ngraph_var.h" #include "ngraph_bridge/enable_variable_ops/ngraph_variable_update_ng_tensor_op.h" #include "ngraph_bridge/ngraph_rewrite_for_tracking.h" #include "ngraph_bridge/ngraph_utils.h" +#include "ngraph_bridge/ngraph_var.h" #include "test/test_utilities.h" namespace tensorflow { diff --git a/test/python/test_flib.py b/test/python/test_flib.py index 079e34449..f0c9b5b59 100644 --- a/test/python/test_flib.py +++ b/test/python/test_flib.py @@ -46,6 +46,7 @@ def test_flib_1(self): res1 = self.with_ngraph(sess_fn) res2 = self.without_ngraph(sess_fn) + exp = [np.full((2, 3), 3.0), np.full((2, 3), 0.95257413)] # Note both run on Host (because NgraphEncapsulate can only run on host) assert np.isclose(res1, res2).all() diff --git a/test/test_ng_var_update_ng_tensor_kernel.cc b/test/test_ng_var_update_ng_tensor_kernel.cc index 51742fcc9..4612d156b 100644 --- a/test/test_ng_var_update_ng_tensor_kernel.cc +++ b/test/test_ng_var_update_ng_tensor_kernel.cc @@ -30,9 +30,9 @@ #include "tensorflow/core/lib/core/status_test_util.h" #include "tensorflow/core/platform/test.h" -#include "ngraph_bridge/enable_variable_ops/ngraph_var.h" #include "ngraph_bridge/enable_variable_ops/ngraph_variable_update_ng_tensor_op.h" #include "ngraph_bridge/ngraph_utils.h" +#include "ngraph_bridge/ngraph_var.h" #include "test/test_utilities.h" #include "test/tf_fake_input.h" diff --git a/tools/test_utils.py b/tools/test_utils.py index 12f2ead29..e8f5752d3 100755 --- a/tools/test_utils.py +++ b/tools/test_utils.py @@ -108,7 +108,7 @@ def run_ngtf_pytests(venv_dir, build_dir): build_dir = os.path.abspath(build_dir) venv_dir = os.path.abspath(venv_dir) mnist_dir = os.path.abspath(build_dir + '/examples/mnist/') - + axpy_dir = os.path.abspath(build_dir + '/examples/') test_dir = os.path.join(build_dir, "test") test_dir = os.path.join(test_dir, "python") @@ -130,7 +130,8 @@ def run_ngtf_pytests(venv_dir, build_dir): build_dir) + " --ignore=" + build_dir + "/test/python/bfloat16" env = os.environ.copy() new_paths = venv_dir + '/bin/python3:' + os.path.abspath( - build_dir) + ":" + os.path.abspath(mnist_dir) + build_dir) + ":" + os.path.abspath(axpy_dir) + ":" + os.path.abspath( + mnist_dir) if 'PYTHONPATH' in env: env["PYTHONPATH"] = new_paths + ":" + env["PYTHONPATH"] else: