Skip to content

Commit

Permalink
Rework stream inputs
Browse files Browse the repository at this point in the history
- allow for different stream as input
- default to stream from instance
- add helper function "get_input_stream" for selecting correct stream
  • Loading branch information
tcclevenger committed Jul 27, 2023
1 parent c4278e1 commit ba6b4d9
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 45 deletions.
22 changes: 12 additions & 10 deletions core/src/Cuda/Kokkos_CudaSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ void DeepCopyAsyncCuda(const Cuda &instance, void *dst, const void *src,
size_t n) {
KOKKOS_IMPL_CUDA_SAFE_CALL(
(instance.impl_internal_space_instance()->cuda_memcpy_async_wrapper(
dst, src, n, cudaMemcpyDefault, instance.cuda_stream())));
dst, src, n, cudaMemcpyDefault)));
}

void DeepCopyAsyncCuda(void *dst, const void *src, size_t n) {
Expand Down Expand Up @@ -174,21 +174,23 @@ void *impl_allocate_common(const Cuda &exec_space, const char *arg_label,
cudaError_t error_code;
if (arg_alloc_size >= memory_threshold_g) {
if (exec_space_provided) {
cudaStream_t stream = exec_space.cuda_stream();
error_code =
exec_space.impl_internal_space_instance()->cuda_malloc_async_wrapper(
&ptr, arg_alloc_size, stream);
&ptr, arg_alloc_size);
exec_space.fence("Kokkos::Cuda: backend fence after async malloc");
} else {
error_code =
exec_space.impl_internal_space_instance()->cuda_malloc_async_wrapper(
&ptr, arg_alloc_size, 0);
error_code = Impl::CudaInternal::singleton().cuda_malloc_async_wrapper(
&ptr, arg_alloc_size);
Impl::cuda_device_synchronize(
"Kokkos::Cuda: backend fence after async malloc");
}
} else {
error_code = exec_space.impl_internal_space_instance()->cuda_malloc_wrapper(
&ptr, arg_alloc_size);
error_code =
(exec_space_provided
? exec_space.impl_internal_space_instance()->cuda_malloc_wrapper(
&ptr, arg_alloc_size)
: Impl::CudaInternal::singleton().cuda_malloc_wrapper(
&ptr, arg_alloc_size));
}
#else
cudaError_t error_code;
Expand Down Expand Up @@ -350,7 +352,7 @@ void CudaSpace::impl_deallocate(
"Kokkos::Cuda: backend fence before async free");
KOKKOS_IMPL_CUDA_SAFE_CALL(
(Impl::CudaInternal::singleton().cuda_free_async_wrapper(
arg_alloc_ptr, 0)));
arg_alloc_ptr)));
Impl::cuda_device_synchronize(
"Kokkos::Cuda: backend fence after async free");
} else {
Expand Down Expand Up @@ -606,7 +608,7 @@ void cuda_prefetch_pointer(const Cuda &space, const void *ptr, size_t bytes,
space.cuda_device_prop().concurrentManagedAccess) {
KOKKOS_IMPL_CUDA_SAFE_CALL(
(space.impl_internal_space_instance()->cuda_mem_prefetch_async_wrapper(
ptr, bytes, space.cuda_device(), space.cuda_stream())));
ptr, bytes, space.cuda_device())));
}
}

Expand Down
3 changes: 1 addition & 2 deletions core/src/Cuda/Kokkos_Cuda_Graph_Impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,7 @@ struct GraphImpl<Kokkos::Cuda> {
}
KOKKOS_IMPL_CUDA_SAFE_CALL(
(m_execution_space.impl_internal_space_instance()
->cuda_graph_launch_wrapper(m_graph_exec,
m_execution_space.cuda_stream())));
->cuda_graph_launch_wrapper(m_graph_exec)));
}

execution_space const& get_execution_space() const noexcept {
Expand Down
55 changes: 32 additions & 23 deletions core/src/Cuda/Kokkos_Cuda_Instance.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,14 +212,28 @@ class CudaInternal {
KOKKOS_IMPL_CUDA_SAFE_CALL(cudaSetDevice(m_cudaDev));
}

// Return the class stream, optionally setting the device id.
template <bool setCudaDevice = true>
cudaStream_t get_stream() const {
if (setCudaDevice) set_cuda_device();
return m_stream;
}

// The following are wrappers for cudaAPI functions (C and C++ routines) which
// set the correct device id directly before the cudaAPI call (unless
// explicitly disabled by providing setCudaDevice=false template).
// setCudaDevice=true should be used for all calls which take a stream unless
// it is guarenteed to be from a cuda instance with the correct device set
// already (e.g., back-to-back cudaAPI calls in a single function). All
// cudaAPI calls should be wrapped in these interface functions to ensure
// safety when using threads.
// setCudaDevice=true should be used for all API calls which take a stream
// unless it is guarenteed to be from a cuda instance with the correct device
// set already (e.g., back-to-back cudaAPI calls in a single function). For
// cudaAPI functions that take a stream, an optional input stream is
// available. If no stream is given, the stream for the CudaInternal instance
// is used. All cudaAPI calls should be wrapped in these interface functions
// to ensure safety when using threads.

// Helper function for selecting the correct input stream
cudaStream_t get_input_stream(cudaStream_t s) const {
return s == nullptr ? get_stream<false>() : s;
}

// C API routines
template <bool setCudaDevice = true>
Expand Down Expand Up @@ -258,7 +272,7 @@ class CudaInternal {
cudaError_t cuda_event_record_wrapper(cudaEvent_t event,
cudaStream_t stream = nullptr) const {
if (setCudaDevice) set_cuda_device();
return cudaEventRecord(event, stream);
return cudaEventRecord(event, get_input_stream(stream));
}

template <bool setCudaDevice = true>
Expand Down Expand Up @@ -358,9 +372,9 @@ class CudaInternal {

template <bool setCudaDevice = true>
cudaError_t cuda_graph_launch_wrapper(cudaGraphExec_t graphExec,
cudaStream_t stream) const {
cudaStream_t stream = nullptr) const {
if (setCudaDevice) set_cuda_device();
return cudaGraphLaunch(graphExec, stream);
return cudaGraphLaunch(graphExec, get_input_stream(stream));
}

template <bool setCudaDevice = true>
Expand Down Expand Up @@ -403,7 +417,8 @@ class CudaInternal {
const void* devPtr, size_t count, int dstDevice,
cudaStream_t stream = nullptr) const {
if (setCudaDevice) set_cuda_device();
return cudaMemPrefetchAsync(devPtr, count, dstDevice, stream);
return cudaMemPrefetchAsync(devPtr, count, dstDevice,
get_input_stream(stream));
}

template <bool setCudaDevice = true>
Expand All @@ -418,15 +433,16 @@ class CudaInternal {
size_t count, cudaMemcpyKind kind,
cudaStream_t stream = nullptr) const {
if (setCudaDevice) set_cuda_device();
return cudaMemcpyAsync(dst, src, count, kind, stream);
return cudaMemcpyAsync(dst, src, count, kind, get_input_stream(stream));
}

template <bool setCudaDevice = true>
cudaError_t cuda_memcpy_to_symbol_async_wrapper(
const void* symbol, const void* src, size_t count, size_t offset,
cudaMemcpyKind kind, cudaStream_t stream = nullptr) const {
if (setCudaDevice) set_cuda_device();
return cudaMemcpyToSymbolAsync(symbol, src, count, offset, kind, stream);
return cudaMemcpyToSymbolAsync(symbol, src, count, offset, kind,
get_input_stream(stream));
}

template <bool setCudaDevice = true>
Expand All @@ -439,7 +455,7 @@ class CudaInternal {
cudaError_t cuda_memset_async_wrapper(void* devPtr, int value, size_t count,
cudaStream_t stream = nullptr) const {
if (setCudaDevice) set_cuda_device();
return cudaMemsetAsync(devPtr, value, count, stream);
return cudaMemsetAsync(devPtr, value, count, get_input_stream(stream));
}

template <bool setCudaDevice = true>
Expand Down Expand Up @@ -471,16 +487,16 @@ class CudaInternal {
#if (defined(KOKKOS_ENABLE_IMPL_CUDA_MALLOC_ASYNC) && CUDART_VERSION >= 11020)
template <bool setCudaDevice = true>
cudaError_t cuda_malloc_async_wrapper(void** devPtr, size_t size,
cudaStream_t hStream) const {
cudaStream_t hStream == nullptr) const {
if (setCudaDevice) set_cuda_device();
return cudaMallocAsync(devPtr, size, hStream);
return cudaMallocAsync(devPtr, size, get_input_stream(stream));
}

template <bool setCudaDevice = true>
cudaError_t cuda_free_async_wrapper(void* devPtr,
cudaStream_t hStream) const {
cudaStream_t hStream == nullptr) const {
if (setCudaDevice) set_cuda_device();
return cudaFreeAsync(devPtr, hStream);
return cudaFreeAsync(devPtr, get_input_stream(stream));
}
#endif

Expand Down Expand Up @@ -510,13 +526,6 @@ class CudaInternal {
bufferSize);
}

// Using the m_stream variable can also cause issues when device_id!=0.
template <bool setCudaDevice = true>
cudaStream_t get_stream() const {
if (setCudaDevice) set_cuda_device();
return m_stream;
}

// Resizing of reduction related scratch spaces
size_type* scratch_space(const std::size_t size) const;
size_type* scratch_flags(const std::size_t size) const;
Expand Down
12 changes: 4 additions & 8 deletions core/src/Cuda/Kokkos_Cuda_KernelLaunch.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -464,8 +464,7 @@ struct CudaParallelLaunchKernelInvoker<
cuda_instance->scratch_functor(sizeof(DriverType)));

KOKKOS_IMPL_CUDA_SAFE_CALL((cuda_instance->cuda_memcpy_async_wrapper(
driver_ptr, &driver, sizeof(DriverType), cudaMemcpyDefault,
cuda_instance->get_stream())));
driver_ptr, &driver, sizeof(DriverType), cudaMemcpyDefault)));
(base_t::get_kernel_func())<<<grid, block, shmem,
cuda_instance->get_stream()>>>(driver_ptr);
}
Expand Down Expand Up @@ -500,8 +499,7 @@ struct CudaParallelLaunchKernelInvoker<
// destroyed, where there should be a fence ensuring that the allocation
// associated with this kernel on the device side isn't deleted.
KOKKOS_IMPL_CUDA_SAFE_CALL((cuda_instance->cuda_memcpy_async_wrapper(
driver_ptr, &driver, sizeof(DriverType), cudaMemcpyDefault,
cuda_instance->get_stream())));
driver_ptr, &driver, sizeof(DriverType), cudaMemcpyDefault)));

void const* args[] = {&driver_ptr};

Expand Down Expand Up @@ -591,17 +589,15 @@ struct CudaParallelLaunchKernelInvoker<
KOKKOS_IMPL_CUDA_SAFE_CALL(
(cuda_instance->cuda_memcpy_to_symbol_async_wrapper(
kokkos_impl_cuda_constant_memory_buffer, staging,
sizeof(DriverType), 0, cudaMemcpyHostToDevice,
cudaStream_t(cuda_instance->get_stream()))));
sizeof(DriverType), 0, cudaMemcpyHostToDevice)));

// Invoke the driver function on the device
(base_t::get_kernel_func())<<<grid, block, shmem,
cuda_instance->get_stream()>>>();

// Record an event that says when the constant buffer can be reused
KOKKOS_IMPL_CUDA_SAFE_CALL((cuda_instance->cuda_event_record_wrapper(
CudaInternal::constantMemReusable,
cudaStream_t(cuda_instance->get_stream()))));
CudaInternal::constantMemReusable)));
}

inline static void create_parallel_launch_graph_node(
Expand Down
3 changes: 1 addition & 2 deletions core/src/Cuda/Kokkos_Cuda_ZeroMemset.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ struct ZeroMemset<Kokkos::Cuda, View<T, P...>> {
(exec_space_instance.impl_internal_space_instance()
->cuda_memset_async_wrapper(
dst.data(), 0,
dst.size() * sizeof(typename View<T, P...>::value_type),
exec_space_instance.cuda_stream())));
dst.size() * sizeof(typename View<T, P...>::value_type))));
}

ZeroMemset(const View<T, P...>& dst,
Expand Down

0 comments on commit ba6b4d9

Please sign in to comment.