Skip to content

Commit

Permalink
[XLA:CPU] Use thread pool to run host kernel functions in parallel.
Browse files Browse the repository at this point in the history
FUTURE_COPYBARA_INTEGRATE_REVIEW=#66023 from Intel-tensorflow:gaurides/add_back_bf16_sum_mean b9077f9
PiperOrigin-RevId: 632144996
  • Loading branch information
tvladyslav authored and tensorflower-gardener committed May 22, 2024
1 parent 9611b0c commit ca50055
Show file tree
Hide file tree
Showing 23 changed files with 293 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ class AutoMixedPrecisionListsMkl : public AutoMixedPrecisionLists {
"Log",
"Log1p",
"LogSoftmax",
"Mean",
"Prod",
"RealDiv",
"Reciprocal",
Expand All @@ -458,6 +459,7 @@ class AutoMixedPrecisionListsMkl : public AutoMixedPrecisionLists {
"Sqrt",
"Square",
"SquaredDifference",
"Sum",
"Tanh",
"TanhGrad"};
UpdateList("INFERLIST", &list);
Expand Down
1 change: 1 addition & 0 deletions tensorflow/core/kernels/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ package_group(
packages = [
"//tensorflow/...",
"//tensorflow_text/...",
"//waymo/ml/compiler/frontend/kernels/...",
"//waymo/onboard/ml/...",
],
)
Expand Down
3 changes: 2 additions & 1 deletion tensorflow/core/kernels/gather_nd_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ class GatherNdOp : public OpKernel {

Tensor out;
OP_REQUIRES_OK(
c, functor::DoGatherNd<Device, T, Index>(c, params, indices, &out));
c, functor::DoGatherNd<Device, T, Index, /*kDropBadIndices=*/false>(
c, params, indices, &out));
c->set_output(0, out);
}
};
Expand Down
7 changes: 6 additions & 1 deletion tensorflow/core/kernels/gather_nd_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ struct GatherNdSlice {
typename TTypes<T>::Matrix Tout);
};

template <typename Device, typename T, typename Index>
template <typename Device, typename T, typename Index,
bool kDropBadIndices = false>
Status DoGatherNd(OpKernelContext* c, const Tensor& params,
const Tensor& indices, Tensor* out) {
if (!TensorShapeUtils::IsVectorOrHigher(params.shape())) {
Expand Down Expand Up @@ -151,6 +152,10 @@ Status DoGatherNd(OpKernelContext* c, const Tensor& params,
indices_nd);
}

if constexpr (kDropBadIndices) {
return absl::OkStatus();
}

// bad_i will only return >= 0 on CPUs right now.
if (bad_i >= 0) {
auto shape = indices.shape();
Expand Down
63 changes: 42 additions & 21 deletions tensorflow/core/kernels/scatter_nd_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,7 @@ class IndexFlattener {
namespace {

template <typename Device, typename T, typename Index,
scatter_nd_op::UpdateOp Op>
scatter_nd_op::UpdateOp Op, bool kDropBadIndices>
Status DoScatterNdImpl(OpKernelContext* c, const Tensor& indices,
const Tensor& updates, const TensorShape& shape,
Tensor* out, bool allocate) {
Expand Down Expand Up @@ -925,7 +925,11 @@ Status DoScatterNdImpl(OpKernelContext* c, const Tensor& indices,
for (int i = 0; i < IXDIM; ++i) { \
output_shape_prefix[i] = shape.dim_size(i); \
} \
functor::ScatterNdFunctor<Device, T, Index, Op, IXDIM> functor; \
constexpr bool kShallDropBadIndices = \
kDropBadIndices || std::is_same<Device, GPUDevice>::value; \
functor::ScatterNdFunctor<Device, T, Index, Op, IXDIM, \
kShallDropBadIndices> \
functor; \
bad_i = \
functor(c->eigen_device<Device>(), slice_size, output_shape_prefix, \
output_matrix, indices_flat, updates_flat, output_matrix); \
Expand All @@ -947,6 +951,9 @@ Status DoScatterNdImpl(OpKernelContext* c, const Tensor& indices,
slice_dim);
}
}
if constexpr (kDropBadIndices) {
return absl::OkStatus();
}
if (bad_i >= 0) {
auto slice_shape = indices.shape();
slice_shape.RemoveLastDims(1);
Expand All @@ -970,7 +977,8 @@ Status DoScatterNdOnCpu(OpKernelContext* c, const Tensor& indices,
// back to GPU. This is useful because the CPU implementation is deterministic
// and the GPU implementation is not. Tensor inputs to this function must be on
// the GPU.
template <typename T, typename Index, scatter_nd_op::UpdateOp Op>
template <typename T, typename Index, scatter_nd_op::UpdateOp Op,
bool kDropBadIndices>
Status DoScatterNdOnCpu(OpKernelContext* c, const Tensor& indices,
const Tensor& updates, const TensorShape& shape,
Tensor* out, bool allocate) {
Expand Down Expand Up @@ -1015,7 +1023,7 @@ Status DoScatterNdOnCpu(OpKernelContext* c, const Tensor& indices,
}

TF_RETURN_IF_ERROR(stream->BlockHostUntilDone());
TF_RETURN_IF_ERROR(DoScatterNd<CPUDevice, T, Index, Op>(
TF_RETURN_IF_ERROR(DoScatterNd<CPUDevice, T, Index, Op, kDropBadIndices>(
c, host_indices, host_updates, shape, &host_out, /*allocate=*/false));

// Copy 'host_out' to device.
Expand All @@ -1033,44 +1041,57 @@ Status DoScatterNdOnCpu(OpKernelContext* c, const Tensor& indices,
} // namespace

template <typename Device, typename T, typename Index,
scatter_nd_op::UpdateOp Op>
scatter_nd_op::UpdateOp Op, bool kDropBadIndices>
Status DoScatterNd(OpKernelContext* c, const Tensor& indices,
const Tensor& updates, const TensorShape& shape, Tensor* out,
bool allocate) {
#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM
if (std::is_same<Device, GPUDevice>::value &&
tensorflow::OpDeterminismRequired() && !DisableScatterOpDeterminism()) {
return DoScatterNdOnCpu<T, Index, Op>(c, indices, updates, shape, out,
allocate);
return DoScatterNdOnCpu<T, Index, Op, kDropBadIndices>(
c, indices, updates, shape, out, allocate);
}
#endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM

// Run on the CPU for integer types, since the GPU implementation uses
// atomics, which are not supported for all integer types.
if constexpr (std::is_same<Device, GPUDevice>::value &&
std::is_integral<T>::value) {
return DoScatterNdOnCpu<T, Index, Op>(c, indices, updates, shape, out,
allocate);
return DoScatterNdOnCpu<T, Index, Op, kDropBadIndices>(
c, indices, updates, shape, out, allocate);
} else {
return DoScatterNdImpl<Device, T, Index, Op>(c, indices, updates, shape,
out, allocate);
return DoScatterNdImpl<Device, T, Index, Op, kDropBadIndices>(
c, indices, updates, shape, out, allocate);
}
}
} // namespace functor

#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM
// Forward declarations of the functor specializations for GPU.
namespace functor {
#define DECLARE_GPU_SPECS_INDEX_OP_IXDIM(T, Index, op, IXDIM) \
template <> \
Index ScatterNdFunctor<GPUDevice, T, Index, op, IXDIM>::operator()( \
const GPUDevice& d, const Index slice_size, \
const Eigen::array<Eigen::DenseIndex, IXDIM> output_shape_prefix, \
typename TTypes<T, 2>::Tensor Tparams, \
typename TTypes<Index, 2>::ConstTensor Tindices, \
typename TTypes<T, 2>::ConstTensor Tupdates, \
typename TTypes<T, 2>::Tensor Toutput); \
extern template struct ScatterNdFunctor<GPUDevice, T, Index, op, IXDIM>;
#define DECLARE_GPU_SPECS_INDEX_OP_IXDIM(T, Index, op, IXDIM) \
template <> \
Index \
ScatterNdFunctor<GPUDevice, T, Index, op, IXDIM, /*kDropBadIndices=*/true>:: \
operator()(const GPUDevice& d, const Index slice_size, \
const Eigen::array<Eigen::DenseIndex, IXDIM> output_shape_prefix, \
typename TTypes<T, 2>::Tensor Tparams, \
typename TTypes<Index, 2>::ConstTensor Tindices, \
typename TTypes<T, 2>::ConstTensor Tupdates, \
typename TTypes<T, 2>::Tensor Toutput); \
extern template struct ScatterNdFunctor<GPUDevice, T, Index, op, IXDIM, \
/*kDropBadIndices=*/true>; \
template <> \
Index ScatterNdFunctor<GPUDevice, T, Index, op, IXDIM, \
/*kDropBadIndices=*/false>:: \
operator()(const GPUDevice& d, const Index slice_size, \
const Eigen::array<Eigen::DenseIndex, IXDIM> output_shape_prefix, \
typename TTypes<T, 2>::Tensor Tparams, \
typename TTypes<Index, 2>::ConstTensor Tindices, \
typename TTypes<T, 2>::ConstTensor Tupdates, \
typename TTypes<T, 2>::Tensor Toutput); \
extern template struct ScatterNdFunctor<GPUDevice, T, Index, op, IXDIM, \
/*kDropBadIndices=*/false>;

#define DECLARE_GPU_SPECS_INDEX_OP(T, Index, op) \
DECLARE_GPU_SPECS_INDEX_OP_IXDIM(T, Index, op, 1); \
Expand Down
4 changes: 2 additions & 2 deletions tensorflow/core/kernels/scatter_nd_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ namespace functor {

// Functor used by ScatterOp to do the computations.
template <typename Device, typename T, typename Index,
scatter_nd_op::UpdateOp op, int IXDIM>
scatter_nd_op::UpdateOp op, int IXDIM, bool kDropBadIndices>
struct ScatterNdFunctor {
// Returns -1 on success or a nonnegative i s.t. indices[i] is a bad index.
Index operator()(
Expand All @@ -63,7 +63,7 @@ struct ScatterNdFunctor {
// right type (T) and shape. This tensor will not be zeroed out
// before the scatter is executed.
template <typename Device, typename T, typename Index,
scatter_nd_op::UpdateOp Op>
scatter_nd_op::UpdateOp Op, bool kDropBadIndices = false>
Status DoScatterNd(OpKernelContext* c, const Tensor& indices,
const Tensor& updates, const TensorShape& shape, Tensor* out,
bool allocate);
Expand Down
52 changes: 32 additions & 20 deletions tensorflow/core/kernels/scatter_nd_op_cpu_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ class UpdateExecutor<T, Input, Update, Output, scatter_nd_op::UpdateOp::MAX> {
namespace functor {

// Implementation of update functor for CPU.
template <typename T, typename Index, scatter_nd_op::UpdateOp OP, int IXDIM>
struct ScatterNdFunctor<CPUDevice, T, Index, OP, IXDIM> {
template <typename T, typename Index, scatter_nd_op::UpdateOp OP, int IXDIM,
bool kDropBadIndices>
struct ScatterNdFunctor<CPUDevice, T, Index, OP, IXDIM, kDropBadIndices> {
Index operator()(
const CPUDevice& d, const Index slice_size,
const Eigen::array<Eigen::DenseIndex, IXDIM> output_shape_prefix,
Expand Down Expand Up @@ -136,33 +137,44 @@ struct ScatterNdFunctor<CPUDevice, T, Index, OP, IXDIM> {
i += ix_d * batch_strides[dim];
}
if (TF_PREDICT_FALSE(out_of_bounds)) {
if constexpr (kDropBadIndices) {
continue;
}
error_loc = loc;
break;
} else {
auto input_chip = Toutput.template chip<0>(i);
auto output_chip = input_chip;
auto update_chip = Tupdates.template chip<0>(loc);
update_executor::UpdateExecutor<
CPUDevice, decltype(input_chip), decltype(update_chip),
decltype(output_chip), OP>::Execute(d, input_chip, update_chip,
output_chip);
}
auto input_chip = Toutput.template chip<0>(i);
auto output_chip = input_chip;
auto update_chip = Tupdates.template chip<0>(loc);
update_executor::UpdateExecutor<
CPUDevice, decltype(input_chip), decltype(update_chip),
decltype(output_chip), OP>::Execute(d, input_chip, update_chip,
output_chip);
}

return error_loc;
}
};

#define REGISTER_SCATTER_ND_FULL(T, Index, op) \
template Index \
ScatterNdFunctor<CPUDevice, T, Index, op, CPU_PROVIDED_IXDIM>::operator()( \
const CPUDevice& d, const Index slice_size, \
const Eigen::array<Eigen::DenseIndex, CPU_PROVIDED_IXDIM> \
output_shape_prefix, \
typename TTypes<T, 2>::Tensor Tparams, \
typename TTypes<Index, 2>::ConstTensor Tindices, \
typename TTypes<T, 2>::ConstTensor Tupdates, \
typename TTypes<T, 2>::Tensor Toutput)
#define REGISTER_SCATTER_ND_FULL(T, Index, op) \
template Index ScatterNdFunctor<CPUDevice, T, Index, op, CPU_PROVIDED_IXDIM, \
/*kDropBadIndices=*/false>:: \
operator()(const CPUDevice& d, const Index slice_size, \
const Eigen::array<Eigen::DenseIndex, CPU_PROVIDED_IXDIM> \
output_shape_prefix, \
typename TTypes<T, 2>::Tensor Tparams, \
typename TTypes<Index, 2>::ConstTensor Tindices, \
typename TTypes<T, 2>::ConstTensor Tupdates, \
typename TTypes<T, 2>::Tensor Toutput); \
template Index ScatterNdFunctor<CPUDevice, T, Index, op, CPU_PROVIDED_IXDIM, \
/*kDropBadIndices=*/true>:: \
operator()(const CPUDevice& d, const Index slice_size, \
const Eigen::array<Eigen::DenseIndex, CPU_PROVIDED_IXDIM> \
output_shape_prefix, \
typename TTypes<T, 2>::Tensor Tparams, \
typename TTypes<Index, 2>::ConstTensor Tindices, \
typename TTypes<T, 2>::ConstTensor Tupdates, \
typename TTypes<T, 2>::Tensor Toutput)

#define REGISTER_SCATTER_ND_INDEX(type, op) \
REGISTER_SCATTER_ND_FULL(type, int32, op); \
Expand Down
10 changes: 6 additions & 4 deletions tensorflow/core/kernels/scatter_nd_op_gpu.cu.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,9 @@ __global__ void ScatterNdOpKernel(
namespace functor {

// Functor used by ScatterOp to do the computations.
template <typename T, typename Index, scatter_nd_op::UpdateOp op, int IXDIM>
struct ScatterNdFunctor<GPUDevice, T, Index, op, IXDIM> {
template <typename T, typename Index, scatter_nd_op::UpdateOp op, int IXDIM,
bool kDropBadIndices>
struct ScatterNdFunctor<GPUDevice, T, Index, op, IXDIM, kDropBadIndices> {
Index operator()(
const GPUDevice& d, const Index slice_size,
const Eigen::array<Eigen::DenseIndex, IXDIM> output_shape_prefix,
Expand Down Expand Up @@ -164,8 +165,9 @@ struct ScatterNdFunctor<GPUDevice, T, Index, op, IXDIM> {

} // namespace functor

#define DECLARE_GPU_SPECS_INDEX_OP_IXDIM(T, Index, op, IXDIM) \
template struct functor::ScatterNdFunctor<GPUDevice, T, Index, op, IXDIM>;
#define DECLARE_GPU_SPECS_INDEX_OP_IXDIM(T, Index, op, IXDIM) \
template struct functor::ScatterNdFunctor<GPUDevice, T, Index, op, IXDIM, \
/*kDropBadIndices=*/true>;

#define DECLARE_GPU_SPECS_INDEX_OP(T, Index, op) \
DECLARE_GPU_SPECS_INDEX_OP_IXDIM(T, Index, op, 1); \
Expand Down
1 change: 0 additions & 1 deletion tensorflow/lite/schema/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ filegroup(
name = "tflite_internal_cc_3p_api_deps_src",
srcs = [
":schema_fbs_srcs",
":schema_utils.cc",
":schema_utils.h",
],
visibility = [
Expand Down
12 changes: 0 additions & 12 deletions third_party/xla/xla/stream_executor/cuda/cuda_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -772,18 +772,6 @@ absl::Status GpuExecutor::WaitForEvent(Stream* stream, Event* event) {
}
}

absl::Status GpuExecutor::WaitForEventOnExternalStream(std::intptr_t stream,
Event* event) {
if (GpuDriver::WaitStreamOnEvent(context_,
absl::bit_cast<GpuStreamHandle>(stream),
AsGpuEvent(event)->gpu_event())) {
return absl::OkStatus();
} else {
return absl::InternalError(
"error waiting for CUDA event on external stream");
}
}

Event::Status GpuExecutor::PollForEventStatus(Event* event) {
return AsGpuEvent(event)->PollForStatus();
}
Expand Down
4 changes: 0 additions & 4 deletions third_party/xla/xla/stream_executor/event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,4 @@ Event::Status Event::PollForStatus() {
return stream_exec_->PollForEventStatus(this);
}

absl::Status Event::WaitForEventOnExternalStream(std::intptr_t stream) {
return stream_exec_->WaitForEventOnExternalStream(stream, this);
}

} // namespace stream_executor
6 changes: 4 additions & 2 deletions third_party/xla/xla/stream_executor/event.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class Event {
kComplete,
};

Event(StreamExecutorInterface* stream_exec);
explicit Event(StreamExecutorInterface* stream_exec);

// Releases any resources held by the Event object.
virtual ~Event() = default;
Expand All @@ -51,7 +51,9 @@ class Event {

// Blocks `stream` on this event. `stream` is a raw platform-specific
// stream (e.g. GpuStreamHandle).
absl::Status WaitForEventOnExternalStream(std::intptr_t stream);
virtual absl::Status WaitForEventOnExternalStream(std::intptr_t stream) {
return absl::UnimplementedError("Not supported for this Event.");
}

Event(Event&&) = default;
Event& operator=(Event&&) = default;
Expand Down
1 change: 1 addition & 0 deletions third_party/xla/xla/stream_executor/gpu/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ gpu_only_cc_library(
":gpu_stream",
":gpu_types_header",
"//xla/stream_executor:stream_executor_headers",
"@com_google_absl//absl/base",
"@com_google_absl//absl/status",
],
)
Expand Down
14 changes: 14 additions & 0 deletions third_party/xla/xla/stream_executor/gpu/gpu_event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ limitations under the License.

#include "xla/stream_executor/gpu/gpu_event.h"

#include <cstdint>

#include "absl/base/casts.h"
#include "absl/status/status.h"
#include "xla/stream_executor/event.h"
#include "xla/stream_executor/gpu/gpu_driver.h"
#include "xla/stream_executor/gpu/gpu_executor.h"
#include "xla/stream_executor/gpu/gpu_stream.h"
Expand Down Expand Up @@ -45,5 +49,15 @@ absl::Status GpuEvent::Record(GpuStream* stream) {

GpuEventHandle GpuEvent::gpu_event() { return gpu_event_; }

absl::Status GpuEvent::WaitForEventOnExternalStream(std::intptr_t stream) {
if (GpuDriver::WaitStreamOnEvent(parent_->gpu_context(),
absl::bit_cast<GpuStreamHandle>(stream),
gpu_event_)) {
return absl::OkStatus();
} else {
return absl::InternalError("Error waiting for event on external stream");
}
}

} // namespace gpu
} // namespace stream_executor
Loading

0 comments on commit ca50055

Please sign in to comment.