Skip to content

Commit

Permalink
Allow merging compute-copy streams
Browse files Browse the repository at this point in the history
  • Loading branch information
buptzyb committed Aug 18, 2023
1 parent cc71ba6 commit 9e51f38
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 32 deletions.
79 changes: 49 additions & 30 deletions tensorflow/core/common_runtime/gpu/gpu_util.cc
Expand Up @@ -202,16 +202,21 @@ void GPUUtil::DeviceToDeviceCopy(
done(s);
return;
}
auto send_device_to_device_stream =
static_cast<const GPUDeviceContext*>(send_dev_context)
->device_to_device_stream(dev_to_dev_stream_index);
if (send_device_to_device_stream == nullptr) {
done(errors::Internal("No send gpu copy-out-stream is available."));
return;
se::Stream* send_device_to_device_stream = nullptr;
if (src->merge_DtoD_copy_stream()) {
send_device_to_device_stream = send_stream;
} else {
send_device_to_device_stream =
static_cast<const GPUDeviceContext*>(send_dev_context)
->device_to_device_stream(dev_to_dev_stream_index);
if (send_device_to_device_stream == nullptr) {
done(errors::Internal("No send gpu copy-out-stream is available."));
return;
}
// Wait for the main stream on the sender to make sure the result is
// available.
send_device_to_device_stream->ThenWaitFor(send_stream);
}
// Wait for the main stream on the sender to make sure the result is
// available.
send_device_to_device_stream->ThenWaitFor(send_stream);

const int64_t total_bytes = input->TotalBytes();
if (total_bytes > 0) {
Expand All @@ -230,7 +235,9 @@ void GPUUtil::DeviceToDeviceCopy(
// truly free.
// TODO(zhengxq): remove this dependency when we switch to a better way
// to make sure the memory is free.
send_device_to_device_stream->ThenWaitFor(recv_stream);
if (send_device_to_device_stream != recv_stream) {
send_device_to_device_stream->ThenWaitFor(recv_stream);
}

VLOG(2) << "src_ptr " << src_ptr << " dst_ptr " << dst_ptr;
send_device_to_device_stream->ThenMemcpy(&gpu_dst_ptr, gpu_src_ptr,
Expand Down Expand Up @@ -282,15 +289,20 @@ void GPUUtil::CopyGPUTensorToCPU(Device* gpu_device,
return;
}

auto send_device_to_host_stream =
static_cast<const GPUDeviceContext*>(device_context)
->device_to_host_stream();
if (send_device_to_host_stream == nullptr) {
done(errors::Internal("No send gpu copy-out-stream is available."));
return;
se::Stream* send_device_to_host_stream = nullptr;
if (gpu_device->merge_DtoH_copy_stream()) {
send_device_to_host_stream = send_stream;
} else {
send_device_to_host_stream =
static_cast<const GPUDeviceContext*>(device_context)
->device_to_host_stream();
if (send_device_to_host_stream == nullptr) {
done(errors::Internal("No send gpu copy-out-stream is available."));
return;
}
// Wait for the sender's main stream to make sure the data are available.
send_device_to_host_stream->ThenWaitFor(send_stream);
}
// Wait for the sender's main stream to make sure the data are available.
send_device_to_host_stream->ThenWaitFor(send_stream);

const int64_t total_bytes = gpu_tensor->TotalBytes();
if (total_bytes > 0) {
Expand Down Expand Up @@ -327,16 +339,21 @@ void GPUUtil::CopyCPUTensorToGPU(const Tensor* cpu_tensor,
return;
}

auto recv_host_to_device_stream =
static_cast<const GPUDeviceContext*>(device_context)
->host_to_device_stream();
if (recv_host_to_device_stream == nullptr) {
done(errors::Internal("No send gpu copy-out-stream is available."));
return;
}
// Wait for the recv-stream to make sure the buffer is truly available.
if (sync_dst_compute) {
recv_host_to_device_stream->ThenWaitFor(recv_stream);
se::Stream* recv_host_to_device_stream = nullptr;
if (gpu_device->merge_HtoD_copy_stream()) {
recv_host_to_device_stream = recv_stream;
} else {
recv_host_to_device_stream =
static_cast<const GPUDeviceContext*>(device_context)
->host_to_device_stream();
if (recv_host_to_device_stream == nullptr) {
done(errors::Internal("No send gpu copy-out-stream is available."));
return;
}
// Wait for the recv-stream to make sure the buffer is truly available.
if (sync_dst_compute) {
recv_host_to_device_stream->ThenWaitFor(recv_stream);
}
}

const int64_t total_bytes = cpu_tensor->TotalBytes();
Expand Down Expand Up @@ -378,10 +395,12 @@ void GPUUtil::CopyCPUTensorToGPU(const Tensor* cpu_tensor,
}
}

bool merge_HtoD = gpu_device->merge_HtoD_copy_stream();
if (merge_HtoD) done(OkStatus());
dev_info->event_mgr->ThenExecute(
recv_host_to_device_stream,
[recv_host_to_device_stream, done, input_ref, do_staging, staging_buffer,
host_memory_allocator]() {
host_memory_allocator, merge_HtoD]() {
if (do_staging) {
host_memory_allocator->DeallocateRaw(staging_buffer);
} else {
Expand All @@ -390,7 +409,7 @@ void GPUUtil::CopyCPUTensorToGPU(const Tensor* cpu_tensor,
if (!recv_host_to_device_stream->ok()) {
LOG(FATAL) << "CPU->GPU Memcpy failed";
}
done(OkStatus());
if (!merge_HtoD) done(OkStatus());
});
}

Expand Down
5 changes: 4 additions & 1 deletion tensorflow/core/common_runtime/local_device.cc
Expand Up @@ -110,7 +110,10 @@ struct LocalDevice::EigenThreadPoolInfo {

LocalDevice::LocalDevice(const SessionOptions& options,
const DeviceAttributes& attributes)
: Device(options.env, attributes), owned_tp_info_(nullptr) {
: Device(options.env, attributes),
owned_tp_info_(nullptr),
gpu_stream_merge_options_(
options.config.gpu_options().stream_merge_options()) {
// Log info messages if TensorFlow is not compiled with instructions that
// could speed up performance and are available on the current CPU.
port::InfoAboutUnusedCPUFeatures();
Expand Down
14 changes: 14 additions & 0 deletions tensorflow/core/common_runtime/local_device.h
Expand Up @@ -37,6 +37,18 @@ class LocalDevice : public Device {
const DeviceAttributes& attributes);
~LocalDevice() override;

bool merge_DtoD_copy_stream() const override {
return gpu_stream_merge_options_.merge_d_to_d_stream();
}

bool merge_DtoH_copy_stream() const override {
return gpu_stream_merge_options_.merge_d_to_h_stream();
}

bool merge_HtoD_copy_stream() const override {
return gpu_stream_merge_options_.merge_h_to_d_stream();
}

private:
static bool use_global_threadpool_;

Expand All @@ -47,6 +59,8 @@ class LocalDevice : public Device {
struct EigenThreadPoolInfo;
std::unique_ptr<EigenThreadPoolInfo> owned_tp_info_;

const GPUOptions::STREAM_MERGE_OPTIONS gpu_stream_merge_options_;

friend class test::Benchmark;

TF_DISALLOW_COPY_AND_ASSIGN(LocalDevice);
Expand Down
12 changes: 12 additions & 0 deletions tensorflow/core/framework/device.h
Expand Up @@ -193,6 +193,18 @@ class Device : public DeviceBase {
// Informs if this Device can be used as a caller in RemoteCall operation.
virtual bool IsRemoteCallAllowed() const;

// Whether to merge the DtoD copy streams with the compute stream. Only useful
// for GPU.
virtual bool merge_DtoD_copy_stream() const { return false; }

// Whether to merge the DtoH copy stream with the compute stream. Only useful
// for GPU.
virtual bool merge_DtoH_copy_stream() const { return false; }

// Whether to merge the HtoD copy stream with the compute stream. Only useful
// for GPU.
virtual bool merge_HtoD_copy_stream() const { return false; }

protected:
void DeleteResourceMgr() {
delete rmgr_;
Expand Down
18 changes: 17 additions & 1 deletion tensorflow/core/protobuf/config.proto
Expand Up @@ -99,6 +99,22 @@ message GPUOptions {
// the overall host system performance.
bool force_gpu_compatible = 8;

// Whether to merge streams in one stream group. Four types of streams will be
// created within a stream group by default: one compute stream, one HtoD copy
// stream, one DtoH copy stream, and several DtoD copy streams. Use the
// STREAM_MERGE_OPTIONS to merge the copy streams into the compute stream. For
// example, setting "merge_h_to_d_stream = true" to make the compute stream
// responsible for both computation and HtoD memory copy. Stream merging helps
// reduce the overhead caused by stream synchronization, especially when data
// transfers are frequent.
message STREAM_MERGE_OPTIONS {
bool merge_h_to_d_stream = 1;
bool merge_d_to_h_stream = 2;
bool merge_d_to_d_stream = 3;
}

STREAM_MERGE_OPTIONS stream_merge_options = 9;

message Experimental {
// Configuration for breaking down a visible GPU into multiple "virtual"
// devices.
Expand Down Expand Up @@ -257,7 +273,7 @@ message GPUOptions {
// Everything inside experimental is subject to change and is not subject
// to API stability guarantees in
// https://www.tensorflow.org/guide/version_compat.
Experimental experimental = 9;
Experimental experimental = 10;
}

// Options passed to the graph optimizer
Expand Down

0 comments on commit 9e51f38

Please sign in to comment.