Skip to content

Commit

Permalink
This follows a similar approach to the earlier work by Derek Murray
Browse files Browse the repository at this point in the history
(change ID I9058c55898079cb3ab6011513b4468ddb293f59f) and allows using
atomic operations on the pending counts. It extends that work to also
apply to graphs that include control/merge nodes (eg the 'wide_deep'
model in the tf models repo)

The approach here is to note that even in a graph that has control/merge
nodes, it's likely that many nodes do not have any merge/control
outputs. There's already a fast path for this case with simpler logic,
and that simple logic has the advantage of only accessing the pending
count in one spot where it decrements the incoming pending values. Only
the last such decrementer propagates the output and activates the node.

Given this, we can use atomic operations for all such "fast path" nodes
and avoid holding an exclusive lock. We fall back to the exclusive lock
for any nodes that have slow-path outputs. As a conservative measure,
this patch acquires the shared lock for the fast path, though it may
not be necessary.

The other bit in this patch is the management of the 'outstanding_ops'
member. In order to allow concurrent completion of ops without the
exclusive lock, this also becomes atomic. Simply making it atomic is
sufficient but leaves a lot of performance on the table. This patch
batches the updates of this variable so that each op completion only
touches it at most once, and no-ops out in the case that the pending op
count doesn't need to be modified (eg when an op completion activates
exactly one downstream node).

I benchmarked this change using tensorflow-models/official/r1/wide_deep
and collecting the distribution of "steps/sec" reported using commands
like:

  # switch to new code
  $ python -u  census_main.py -ebe=20  -te 20 -mt wide 2>&1 | tee /tmp/before/wide.txt
  # switch to new code
  $ python -u  census_main.py -ebe=20  -te 20 -mt wide 2>&1 | tee /tmp/after/wide.txt

... for the 'wide', 'wide_deep', and 'deep' models. I then exported the
'steps/second' metrics to TSV with:

  $ grep 'tensorflow:global_step/sec' \
    /tmp/{before,after}/*.txt | perl -p  -e \
    's,/tmp/(.+)/(.+).txt:.*?([\d\.]+)$,$1 $2 $3,g' > /tmp/data.tsv

Mean steps/sec are as follows on my AMD Ryzen 9 3900X desktop (12
physical cores, 'performance' CPU governor enabled):

Before:
    model   steps/sec
----------------------
      deep  876
      wide  776
 wide_deep  625

After:
    model   steps/sec   improvement
-----------------------------------
      deep  897        (+2.5%)
      wide  928        (+19.5%)
 wide_deep  760        (+21.6%)

A few notes worth considering:

Using atomic operations has some fixed overhead compared to non-atomics,
and particularly when the cache line being operated on is in M
"modified" state in another core. This increased latency of operating on
a cache-line in remote M state (aka "HitM" access) is also true with
non-atomic operations, but non-atomics can potentially allow for
instruction level parallelism and pipeline multiple such accesses
together, whereas atomics do not on current x86 architectures[1]. So,
there is possibly a cross-over point on some graph shapes where the
mutex-based synchronization with non-atomic operations actually beats
the atomic-based implementation here.

That said, the current code path for the non-atomic (mutex-protected)
case is complex/branchy enough that it's not clear that any significant
pipelining of reads can actually occur without some more explicit
unrolling, etc.

It's also worth noting that, in uncontended cases, the atomics will be
slower than the non-atomics. However, this is unlikely an issue in real
workloads, considering that these code paths are only uncontended when
the actual work of op execution dominates the runtime. In other words,
this is only a perf bottleneck when it's under contention, and a small
regression for uncontended cases is likely in the noise.

[1] https://spcl.inf.ethz.ch/Publications/.pdf/atomic-bench.pdf

PiperOrigin-RevId: 331889409
Change-Id: I8763d09f060f5d550e596d63f037f95b5ff48e64
  • Loading branch information
Todd Lipcon authored and tensorflower-gardener committed Sep 16, 2020
1 parent a873a3e commit a97b0c8
Show file tree
Hide file tree
Showing 4 changed files with 365 additions and 114 deletions.
191 changes: 128 additions & 63 deletions tensorflow/core/common_runtime/pending_counts.h
Expand Up @@ -16,6 +16,8 @@ See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/

#include <atomic>

#include "tensorflow/core/lib/gtl/flatmap.h"
#include "tensorflow/core/lib/hash/hash.h"
#include "tensorflow/core/platform/logging.h"
Expand Down Expand Up @@ -93,63 +95,75 @@ class PendingCounts {

void set_initial_count(Handle h, size_t pending_count) {
if (h.is_large_) {
LargeCounts* c = Large(h);
c->pending = pending_count;
c->dead_count = 0;
c->has_started = 0;
std::atomic<LargeCounts>* c_ptr = Large(h);
auto c = c_ptr->load(std::memory_order_relaxed);
c.pending = pending_count;
c.dead_count = 0;
c.has_started = 0;
c_ptr->store(c, std::memory_order_relaxed);
} else {
PackedCounts* c = Packed(h);
DCHECK_LE(pending_count, kMaxCountForPackedCounts);
c->pending = pending_count;
c->dead_count = 0;
c->has_started = 0;
std::atomic<PackedCounts>* c_ptr = Packed(h);
auto c = c_ptr->load(std::memory_order_relaxed);
c.pending = pending_count;
c.dead_count = 0;
c.has_started = 0;
c_ptr->store(c, std::memory_order_relaxed);
}
}

NodeState node_state(Handle h) {
if (h.is_large_) {
return NodeStateForStruct(Large(h));
return NodeStateForStruct(Large(h)->load(std::memory_order_relaxed));
} else {
return NodeStateForStruct(Packed(h));
return NodeStateForStruct(Packed(h)->load(std::memory_order_relaxed));
}
}
void mark_started(Handle h) {
DCHECK_EQ(pending(h), 0);
if (h.is_large_) {
LargeCounts* c = Large(h);
DCHECK_EQ(c->has_started, 0);
c->has_started = 1;
std::atomic<LargeCounts>* c_ptr = Large(h);
auto c = c_ptr->load(std::memory_order_relaxed);
DCHECK_EQ(c.has_started, 0);
c.has_started = 1;
c_ptr->store(c, std::memory_order_relaxed);
} else {
PackedCounts* c = Packed(h);
DCHECK_EQ(c->has_started, 0);
c->has_started = 1;
std::atomic<PackedCounts>* c_ptr = Packed(h);
auto c = c_ptr->load(std::memory_order_relaxed);
DCHECK_EQ(c.has_started, 0);
c.has_started = 1;
c_ptr->store(c, std::memory_order_relaxed);
}
}
void mark_completed(Handle h) {
if (h.is_large_) {
LargeCounts* c = Large(h);
DCHECK_EQ(c->has_started, 1);
c->pending = 1;
std::atomic<LargeCounts>* c_ptr = Large(h);
auto c = c_ptr->load(std::memory_order_relaxed);
DCHECK_EQ(c.has_started, 1);
c.pending = 1;
c_ptr->store(c, std::memory_order_relaxed);
} else {
PackedCounts* c = Packed(h);
DCHECK_EQ(c->has_started, 1);
c->pending = 1;
std::atomic<PackedCounts>* c_ptr = Packed(h);
auto c = c_ptr->load(std::memory_order_relaxed);
DCHECK_EQ(c.has_started, 1);
c.pending = 1;
c_ptr->store(c, std::memory_order_relaxed);
}
}
int pending(Handle h) {
if (h.is_large_) {
LargeCounts* c = Large(h);
LargeCounts c = Large(h)->load(std::memory_order_relaxed);
if (PENDING_NOTREADY == NodeStateForStruct(c)) {
return c->pending;
return c.pending;
} else {
// The pending count encodes the state once the node has
// started, so just return 0.
return 0;
}
} else {
PackedCounts* c = Packed(h);
PackedCounts c = Packed(h)->load(std::memory_order_relaxed);
if (PENDING_NOTREADY == NodeStateForStruct(c)) {
return c->pending;
return c.pending;
} else {
// The pending count encodes the state once the node has
// started, so just return 0.
Expand All @@ -160,50 +174,63 @@ class PendingCounts {
int decrement_pending(Handle h, int v) {
DCHECK_GE(pending(h), v);
if (h.is_large_) {
LargeCounts* c = Large(h);
c->pending -= v;
return c->pending;
std::atomic<LargeCounts>* c_ptr = Large(h);
auto c = c_ptr->load(std::memory_order_relaxed);
c.pending -= v;
c_ptr->store(c, std::memory_order_relaxed);
return c.pending;
} else {
PackedCounts* c = Packed(h);
c->pending -= v;
return c->pending;
std::atomic<PackedCounts>* c_ptr = Packed(h);
auto c = c_ptr->load(std::memory_order_relaxed);
c.pending -= v;
c_ptr->store(c, std::memory_order_relaxed);
return c.pending;
}
}
// Mark a merge node as live
// REQUIRES: Node corresponding to "h" is a merge node
void mark_live(Handle h) {
if (h.is_large_) {
LargeCounts* c = Large(h);
std::atomic<LargeCounts>* c_ptr = Large(h);
auto c = c_ptr->load(std::memory_order_relaxed);
// Only do anything if the node hasn't already started executing.
if (PENDING_NOTREADY == NodeStateForStruct(c)) {
c->pending &= ~static_cast<int>(0x1);
c.pending &= ~static_cast<int>(0x1);
c_ptr->store(c, std::memory_order_relaxed);
}
} else {
PackedCounts* c = Packed(h);
std::atomic<PackedCounts>* c_ptr = Packed(h);
auto c = c_ptr->load(std::memory_order_relaxed);
// Only do anything if the node hasn't already started executing.
if (PENDING_NOTREADY == NodeStateForStruct(c)) {
static_assert(7 == kMaxCountForPackedCounts,
"Live flag incorrect for max packed count");
c->pending &= 0x6;
c.pending &= 0x6;
c_ptr->store(c, std::memory_order_relaxed);
}
}
}

int dead_count(Handle h) {
int r = h.is_large_ ? Large(h)->dead_count : Packed(h)->dead_count;
int r = h.is_large_ ? Large(h)->load(std::memory_order_relaxed).dead_count
: Packed(h)->load(std::memory_order_relaxed).dead_count;
return r;
}
void increment_dead_count(Handle h) {
if (h.is_large_) {
LargeCounts* c = Large(h);
std::atomic<LargeCounts>* c_ptr = Large(h);
auto c = c_ptr->load(std::memory_order_relaxed);
if (PENDING_NOTREADY == NodeStateForStruct(c)) {
c->dead_count++;
c.dead_count++;
c_ptr->store(c, std::memory_order_relaxed);
}
} else {
PackedCounts* c = Packed(h);
std::atomic<PackedCounts>* c_ptr = Packed(h);
auto c = c_ptr->load(std::memory_order_relaxed);
if (PENDING_NOTREADY == NodeStateForStruct(c)) {
DCHECK_LT(c->dead_count, kMaxCountForPackedCounts);
c->dead_count++;
DCHECK_LT(c.dead_count, kMaxCountForPackedCounts);
c.dead_count++;
c_ptr->store(c, std::memory_order_relaxed);
}
}
}
Expand All @@ -230,6 +257,17 @@ class PendingCounts {
}
}

// The same as the above, but performs the operation atomically. This
// is thread-safe to run concurrently with other threads.
AdjustResult adjust_for_activation_atomic(Handle h, bool increment_dead) {
DCHECK_GE(pending(h), 1);
if (h.is_large_) {
return adjust_for_activation_shared_atomic(Large(h), increment_dead);
} else {
return adjust_for_activation_shared_atomic(Packed(h), increment_dead);
}
}

class Handle {
public:
Handle() : byte_offset_(0), is_large_(0) {}
Expand All @@ -242,12 +280,31 @@ class PendingCounts {

private:
template <typename T>
inline AdjustResult adjust_for_activation_shared(T* c, bool increment_dead) {
if (increment_dead && PENDING_NOTREADY == NodeStateForStruct(c)) {
c->dead_count++;
inline AdjustResult adjust_for_activation_shared(std::atomic<T>* c,
bool increment_dead) {
T val = c->load(std::memory_order_relaxed);
if (increment_dead && PENDING_NOTREADY == NodeStateForStruct(val)) {
val.dead_count++;
}
val.pending--;
c->store(val, std::memory_order_relaxed);
return AdjustResult(val.dead_count, val.pending);
}

template <typename T>
inline AdjustResult adjust_for_activation_shared_atomic(std::atomic<T>* c,
bool increment_dead) {
T old_val = c->load(std::memory_order_relaxed);
while (true) {
T new_val = old_val;
if (increment_dead && PENDING_NOTREADY == NodeStateForStruct(new_val)) {
new_val.dead_count++;
}
new_val.pending--;
AdjustResult ret(new_val.dead_count, new_val.pending);
if (TF_PREDICT_TRUE(c->compare_exchange_weak(old_val, new_val)))
return ret;
}
c->pending -= 1;
return AdjustResult(c->dead_count, c->pending);
}

// We keep track of the pending count and dead input count for each
Expand All @@ -272,30 +329,35 @@ class PendingCounts {
uint8 has_started : 1;
};

struct LargeCounts {
// NOTE: alignas(8) is critical to implement efficient atomic<LargeCounts>
// on MSVC.
struct alignas(8) LargeCounts {
uint32 pending;
uint32 dead_count : 31;
uint8 has_started : 1;
// NOTE(tlipcon): MSVC won't pack this struct into 8 bytes unless
// all of the member types are uint32.
uint32 has_started : 1;
};

template <typename T>
NodeState NodeStateForStruct(T* c) const {
if (c->has_started) {
return (c->pending == 0) ? STARTED : COMPLETED;
NodeState NodeStateForStruct(const T& c) const {
if (c.has_started) {
return (c.pending == 0) ? STARTED : COMPLETED;
} else {
return (c->pending == 0) ? PENDING_READY : PENDING_NOTREADY;
return (c.pending == 0) ? PENDING_READY : PENDING_NOTREADY;
}
}
inline LargeCounts* Large(Handle h) {
inline std::atomic<LargeCounts>* Large(Handle h) {
DCHECK(h.is_large_);
DCHECK_LE(h.byte_offset_ + sizeof(LargeCounts), num_bytes_);
DCHECK_EQ(h.byte_offset_ % alignof(LargeCounts), 0);
return reinterpret_cast<LargeCounts*>(bytes_ + h.byte_offset_);
DCHECK_LE(h.byte_offset_ + sizeof(std::atomic<LargeCounts>), num_bytes_);
DCHECK_EQ(h.byte_offset_ % alignof(std::atomic<LargeCounts>), 0);
return reinterpret_cast<std::atomic<LargeCounts>*>(bytes_ + h.byte_offset_);
}
inline PackedCounts* Packed(Handle h) {
inline std::atomic<PackedCounts>* Packed(Handle h) {
DCHECK(!h.is_large_);
DCHECK_LE(h.byte_offset_ + sizeof(PackedCounts), num_bytes_);
return reinterpret_cast<PackedCounts*>(bytes_ + h.byte_offset_);
return reinterpret_cast<std::atomic<PackedCounts>*>(bytes_ +
h.byte_offset_);
}

const int num_bytes_; // Just for bounds checking in debug mode
Expand All @@ -309,18 +371,21 @@ inline PendingCounts::Handle PendingCounts::Layout::CreateHandle(
Handle result;
if ((max_pending_count > kMaxCountForPackedCounts) ||
(max_dead_count > kMaxCountForPackedCounts)) {
int B = sizeof(LargeCounts);
constexpr int B = sizeof(std::atomic<LargeCounts>);
// Round byte offset to proper alignment
DCHECK_GE(sizeof(LargeCounts), alignof(LargeCounts));
static_assert(
sizeof(std::atomic<LargeCounts>) >= alignof(std::atomic<LargeCounts>),
"std::atomic<LargeCounts> must be packed");
int64 offset = ((static_cast<int64>(next_offset_) + B - 1) / B) * B;
result.byte_offset_ = offset;
result.is_large_ = true;
next_offset_ = result.byte_offset_ + B;
} else {
result.byte_offset_ = next_offset_;
result.is_large_ = false;
DCHECK_EQ(sizeof(PackedCounts), 1);
next_offset_ += sizeof(PackedCounts);
static_assert(sizeof(std::atomic<PackedCounts>) == 1,
"std::atomic<PackedCounts> should be a single byte");
next_offset_ += sizeof(std::atomic<PackedCounts>);
}
return result;
}
Expand Down
39 changes: 38 additions & 1 deletion tensorflow/core/common_runtime/pending_counts_test.cc
Expand Up @@ -13,12 +13,17 @@ See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/

#include "tensorflow/core/common_runtime/pending_counts.h"

#include <memory>
#include <unordered_map>
#include <vector>

#include "tensorflow/core/common_runtime/pending_counts.h"
#include "tensorflow/core/platform/env.h"
#include "tensorflow/core/platform/test.h"

using std::unique_ptr;

namespace tensorflow {

TEST(PendingCounts, Simple) {
Expand Down Expand Up @@ -165,4 +170,36 @@ TEST(PendingCounts, AdjustForActivation) {
}
}

TEST(PendingCounts, AdjustForActivationAtomic) {
PendingCounts::Layout layout;
PendingCounts::Handle handles[2];
const int kInitialCounts[2] = {6, 16};
handles[0] = layout.CreateHandle(kInitialCounts[0], 0);
handles[1] = layout.CreateHandle(kInitialCounts[1], 0);
PendingCounts c(layout);
c.set_initial_count(handles[0], kInitialCounts[0]);
c.set_initial_count(handles[1], kInitialCounts[1]);

Env* env = Env::Default();
std::atomic<bool> start{false};
std::vector<unique_ptr<Thread>> threads;
for (int t = 0; t < 2; t++) {
threads.emplace_back(env->StartThread({}, "tester", [&]() {
while (!start) {
}
for (int i = 0; i < kInitialCounts[0] / 2; i++) {
c.adjust_for_activation_atomic(handles[0], false);
}
for (int i = 0; i < kInitialCounts[1] / 2; i++) {
c.adjust_for_activation_atomic(handles[1], false);
}
}));
}
start = true;
threads.clear(); // Joins the threads.

EXPECT_EQ(c.pending(handles[0]), 0);
EXPECT_EQ(c.pending(handles[1]), 0);
}

} // namespace tensorflow

0 comments on commit a97b0c8

Please sign in to comment.