Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 39 additions & 5 deletions ngraph_bridge/ngraph_encapsulate_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,6 @@ Status NGraphEncapsulateImpl::AllocateNGOutputTensors(
current_dst_ptr, last_dst_ptr, last_ng_tensor, true, ng_exec,
op_backend, ng_element_type, ng_shape,
m_executable_can_create_tensor ? out_group_from_pipeline[i] : nullptr);

current_ng_tensor->set_stale(true);
output_caches[i] = std::make_pair(current_dst_ptr, current_ng_tensor);
ng_outputs.push_back(current_ng_tensor);
Expand All @@ -416,18 +415,21 @@ std::shared_ptr<ng::runtime::Tensor> NGraphEncapsulateImpl::GetCurrentNgTensor(
// NOTE: we assume that TF's pointers WILL change if it actually changes
// values. ie, it will not reuse the same space if its rewritten it
bool tf_tensor_has_changed = current_tf_ptr != last_tf_ptr;
NGRAPH_VLOG(5) << "tf_tensor_has_changed: " << tf_tensor_has_changed;
bool no_ng_tensor_found = last_ng_tensor == nullptr;
bool is_cpu = m_op_backend_name == "CPU";
// m_op_backend_name might be BE:0, check if it starts with BE
bool is_cpu_or_nnpi = (m_op_backend_name.find("CPU") == 0) ||
(m_op_backend_name.find("NNPI") == 0);

// We need to check last_ng_tensor != nullptr, since there are cases where
// at the first call to the ng_exec, both current_dst_ptr (when the
// output is a 0-sized tensor) and last_dst_ptr (uninitialized at the
// first call) are nullptr
// A new tensor needs to be created for sure if no_ng_tensor_found
// Additionally for CPU, it needs to be created if tf_tensor_has_changed,
// Additionally for CPU/NNPI, it needs to be created if tf_tensor_has_changed,
// for others, we do not create
bool need_new_tensor_creation;
if (is_cpu) {
if (is_cpu_or_nnpi) {
need_new_tensor_creation = no_ng_tensor_found || tf_tensor_has_changed;
} else {
need_new_tensor_creation = no_ng_tensor_found;
Expand All @@ -449,7 +451,9 @@ std::shared_ptr<ng::runtime::Tensor> NGraphEncapsulateImpl::GetCurrentNgTensor(
current_ng_tensor = tensor_from_pipeline;
} else {
if (need_new_tensor_creation) {
if (is_cpu) {
if (is_cpu_or_nnpi) {
NGRAPH_VLOG(5) << "Backend creating tensor with pointer: "
<< current_tf_ptr;
current_ng_tensor = op_backend->create_tensor(ng_element_type, ng_shape,
current_tf_ptr);
} else {
Expand Down Expand Up @@ -576,6 +580,36 @@ void NGraphEncapsulateImpl::DumpNgFunction(
StringToFile(file_name, m_serialized_ng_function_map[ng_exec]);
}

Status NGraphEncapsulateImpl::GetPersistentTFOutputTensor(
std::shared_ptr<ngraph::runtime::Executable> exec,
std::vector<tensorflow::PersistentTensor>& tf_output_tensors) {
auto itr = m_out_persistents.find(exec);
if (itr == m_out_persistents.end()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could use PersistentOutputsExist

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then I'd be doing 2 searches. One in PersistentOutputsExist and one in again to actually return the item. Right now I do only one search to find the appropriate iterator

return errors::Internal(
"Expected persistent tensor to be present in cache");
} else {
tf_output_tensors = itr->second;
}
return Status::OK();
}

bool NGraphEncapsulateImpl::PersistentOutputsExist(
std::shared_ptr<ngraph::runtime::Executable> exec) {
return m_out_persistents.find(exec) != m_out_persistents.end();
}

Status NGraphEncapsulateImpl::RegisterPersistentOutputTensors(
std::shared_ptr<ngraph::runtime::Executable> exec,
std::vector<tensorflow::PersistentTensor> persistent_tensors) {
auto itr = m_out_persistents.find(exec);
if (itr != m_out_persistents.end()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could use PersistentOutputsExist

return errors::Internal(
"Found an entry already exists in the cache for persistent tensors");
}
m_out_persistents.emplace(exec, persistent_tensors);
return Status::OK();
}

} // namespace ngraph_bridge

} // namespace tensorflow
19 changes: 19 additions & 0 deletions ngraph_bridge/ngraph_encapsulate_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,18 @@ class NGraphEncapsulateImpl {
m_serialized_ng_function_map.clear();
}

void ClearNgExecPersistentOutputCache() { m_out_persistents.clear(); }

Status GetPersistentTFOutputTensor(
std::shared_ptr<ngraph::runtime::Executable>,
std::vector<tensorflow::PersistentTensor>&);

bool PersistentOutputsExist(std::shared_ptr<ngraph::runtime::Executable>);

Status RegisterPersistentOutputTensors(
std::shared_ptr<ngraph::runtime::Executable>,
std::vector<tensorflow::PersistentTensor>);

NGraphFreshnessTracker* GetNgraphFreshnessTracker() {
return m_freshness_tracker;
}
Expand Down Expand Up @@ -249,6 +261,13 @@ class NGraphEncapsulateImpl {
m_executable_pipelined_tensors_map;

int m_depth{2}; // TODO make this settable

// each executable (which comes from a new shape) corresponds to a vector of
// output tensors
// TODO: Should the vector store PersistentTensor or PersistentTensor* ?
std::unordered_map<std::shared_ptr<ngraph::runtime::Executable>,
std::vector<tensorflow::PersistentTensor>>
m_out_persistents;
};

} // namespace ngraph_bridge
Expand Down
60 changes: 55 additions & 5 deletions ngraph_bridge/ngraph_encapsulate_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,22 @@ NGraphEncapsulateOp::NGraphEncapsulateOp(OpKernelConstruction* ctx)
BackendManager::SetConfig(ng_encap_impl.GetOpBackend(),
additional_attribute_map);

ng_encap_impl.SetExecCanCreateTensor(
// For NNPI (even though executable can create tensor) use backend to create
// tensor
// Keep the executable_can_create_tensors check before the
// backend_name!="NNPI"
bool executable_create_tensor =
BackendManager::GetBackend(ng_encap_impl.GetOpBackend())
->executable_can_create_tensors());
->executable_can_create_tensors() &&
(backend_name != "NNPI");
ng_encap_impl.SetExecCanCreateTensor(executable_create_tensor);
NGRAPH_VLOG(5) << "Executable can "
<< (ng_encap_impl.GetExecCanCreateTensor() ? "" : "not")
<< " create tensors";

const char* not_persistent_flag = std::getenv("NGRAPH_TF_DISABLE_PERSISTENT");
m_use_persistent = (not_persistent_flag == nullptr);

event.Stop();
ngraph::Event::write_trace(event);
}
Expand Down Expand Up @@ -262,6 +271,7 @@ NGraphEncapsulateOp::~NGraphEncapsulateOp() {
ng_encap_impl.ClearNgExecMap();
ng_encap_impl.ClearNgExecPipelinedTensorMap();
ng_encap_impl.ClearNgExecSerializedFunctionCache();
ng_encap_impl.ClearNgExecPersistentOutputCache();

// Release the backend
NGRAPH_VLOG(2) << "~NGraphEncapsulateOp():: ReleaseBackend";
Expand Down Expand Up @@ -345,9 +355,20 @@ void NGraphEncapsulateOp::Compute(OpKernelContext* ctx) {
// Allocate tensors for the output results.
vector<shared_ptr<ng::runtime::Tensor>> ng_outputs;
std::vector<Tensor*> tf_output_tensors;
std::vector<tensorflow::PersistentTensor> cached_persistent_output_tensors(
ng_exec->get_results().size());
bool present_in_cache = false;

{
NG_TRACE("NGTF_Output_Alloc", "");
if (m_use_persistent) {
present_in_cache = ng_encap_impl.PersistentOutputsExist(ng_exec);
if (present_in_cache) {
OP_REQUIRES_OK(ctx, ng_encap_impl.GetPersistentTFOutputTensor(
ng_exec, cached_persistent_output_tensors));
}
}

for (auto i = 0; i < ng_exec->get_results().size(); i++) {
auto ng_element = ng_exec->get_results()[i];
auto ng_shape = ng_element->get_shape();
Expand All @@ -360,21 +381,40 @@ void NGraphEncapsulateOp::Compute(OpKernelContext* ctx) {
}
TensorShape tf_shape(dims);
Tensor* output_tensor = nullptr;
OP_REQUIRES_OK(ctx, ctx->allocate_output(i, tf_shape, &output_tensor));
tf_output_tensors.push_back(output_tensor);

// Make sure the nGraph-inferred element type agrees with what TensorFlow
// expected.
ng::element::Type expected_elem_type;
// TODO, we only need to do these checks once when the exec was
// created/compiled, not again and again

OP_REQUIRES_OK(
ctx, TFDataTypeToNGraphElementType(ctx->expected_output_dtype(i),
&expected_elem_type));
OP_REQUIRES(
ctx, ng_element_type == expected_elem_type,
errors::Internal("Element type inferred by nGraph does not match "
"the element type expected by TensorFlow"));
}

if (m_use_persistent) {
if (present_in_cache) {
output_tensor = cached_persistent_output_tensors[i].AccessTensor(ctx);
} else {
// create a persistent tensor
OP_REQUIRES_OK(
ctx, ctx->allocate_persistent(
ctx->expected_output_dtype(i), tf_shape,
&cached_persistent_output_tensors[i], &output_tensor));
}
} else {
OP_REQUIRES_OK(ctx, ctx->allocate_output(i, tf_shape, &output_tensor));
}
tf_output_tensors.push_back(output_tensor);
}
if (m_use_persistent && !present_in_cache) {
OP_REQUIRES_OK(ctx, ng_encap_impl.RegisterPersistentOutputTensors(
ng_exec, cached_persistent_output_tensors));
}
OP_REQUIRES_OK(ctx, ng_encap_impl.AllocateNGOutputTensors(
tf_output_tensors, ng_exec, out_group_from_pipeline,
op_backend, ng_outputs));
Expand Down Expand Up @@ -611,6 +651,16 @@ void NGraphEncapsulateOp::Compute(OpKernelContext* ctx) {
exp.what(), "\n"));
}
}

if (m_use_persistent) {
for (int out_idx = 0; out_idx < ng_exec->get_results().size(); out_idx++) {
OP_REQUIRES_OK(ctx, ng_encap_impl.GetPersistentTFOutputTensor(
ng_exec, cached_persistent_output_tensors));
auto out_tensor =
cached_persistent_output_tensors[out_idx].AccessTensor(ctx);
ctx->set_output(out_idx, *out_tensor);
}
}
} // end compute

int NGraphEncapsulateImpl::s_instance_count = 0;
Expand Down
1 change: 1 addition & 0 deletions ngraph_bridge/ngraph_encapsulate_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class NGraphEncapsulateOp : public OpKernel {
private:
NGraphEncapsulateImpl ng_encap_impl;
std::mutex m_compute_lock;
bool m_use_persistent;
};

} // namespace ngraph_bridge
Expand Down
10 changes: 10 additions & 0 deletions test/tf_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,16 @@ Status CreateSession(const string& graph_filename, const string& backend_name,
return load_graph_status;
}

// This test might fail when running with persistent output tensors
// Maybe because once we have computed outputs out to persistent tensors,
// the next thread comes in before Compare runs, and changes the values?
// For example, if we add a 1sec sleep right after entering Compute(), this test
// would pass (since we now allow enough time for compare to run before the next
// thread comes in and modifies the persistent tensor values)
// TODO: see how persistenttensors might fit in with this kind of multithreading
// (symmetric parallel)
TEST(tf_exec, SingleGraphOn2Threads) {
SetEnvVariable("NGRAPH_TF_DISABLE_PERSISTENT", "1");
string graph_name = "test_axpy.pbtxt";
vector<string> backends{"CPU", "INTERPRETER"};
for (auto be : backends) {
Expand Down Expand Up @@ -136,6 +145,7 @@ TEST(tf_exec, SingleGraphOn2Threads) {
thread0.join();
thread1.join();
}
UnsetEnvVariable("NGRAPH_TF_DISABLE_PERSISTENT");
}

TEST(tf_exec, hello_world) {
Expand Down