Skip to content

Commit

Permalink
Add a function for stream to wait for a tag (#746)
Browse files Browse the repository at this point in the history
  • Loading branch information
deukhyun-cha committed Apr 17, 2024
1 parent 39908af commit 4ea065d
Show file tree
Hide file tree
Showing 34 changed files with 278 additions and 2 deletions.
3 changes: 3 additions & 0 deletions examples/cpp/19_stream_tags/.gitignore
@@ -0,0 +1,3 @@
main
main.o
main_c
4 changes: 4 additions & 0 deletions examples/cpp/19_stream_tags/CMakeLists.txt
@@ -0,0 +1,4 @@
compile_cpp_example_with_modes(stream_tags main.cpp)

add_custom_target(cpp_example_stream_tags_okl ALL COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/powerOfPi2.okl powerOfPi2.okl)
add_dependencies(examples_cpp_stream_tags cpp_example_stream_tags_okl)
27 changes: 27 additions & 0 deletions examples/cpp/19_stream_tags/Makefile
@@ -0,0 +1,27 @@

PROJ_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST))))

ifndef OCCA_DIR
include $(PROJ_DIR)/../../../scripts/build/Makefile
else
include ${OCCA_DIR}/scripts/build/Makefile
endif

#---[ COMPILATION ]-------------------------------
headers = $(wildcard $(incPath)/*.hpp) $(wildcard $(incPath)/*.tpp)
sources = $(wildcard $(srcPath)/*.cpp)

objects = $(subst $(srcPath)/,$(objPath)/,$(sources:.cpp=.o))

executables: ${PROJ_DIR}/main

${PROJ_DIR}/main: $(objects) $(headers) ${PROJ_DIR}/main.cpp
$(compiler) $(compilerFlags) -o ${PROJ_DIR}/main $(flags) $(objects) ${PROJ_DIR}/main.cpp $(paths) $(linkerFlags)

$(objPath)/%.o:$(srcPath)/%.cpp $(wildcard $(subst $(srcPath)/,$(incPath)/,$(<:.cpp=.hpp))) $(wildcard $(subst $(srcPath)/,$(incPath)/,$(<:.cpp=.tpp)))
$(compiler) $(compilerFlags) -o $@ $(flags) -c $(paths) $<

clean:
rm -f $(objPath)/*;
rm -f ${PROJ_DIR}/main;
#=================================================
28 changes: 28 additions & 0 deletions examples/cpp/19_stream_tags/README.md
@@ -0,0 +1,28 @@
# Example: Events

GPU devices introduce `streams`, which potentially allow parallel queueing of instructions

`Stream tags` are used to query and manage (synchronize) those streams

This example shows how to setup `occa::streamTag` to manage jobs in different streams

# Compiling the Example

```bash
make
```

## Usage

```
> ./main --help
Usage: ./main [OPTIONS]
Example showing the use of multiple non-blocking streams in a device
Options:
-d, --device Device properties (default: "{mode: 'CUDA', device_id: 0}")
-h, --help Print usage
-v, --verbose Compile kernels in verbose mode
```
92 changes: 92 additions & 0 deletions examples/cpp/19_stream_tags/main.cpp
@@ -0,0 +1,92 @@
#include <iostream>

#include <occa.hpp>

//---[ Internal Tools ]-----------------
// Note: These headers are not officially supported
// Please don't rely on it outside of the occa examples
#include <occa/internal/utils/cli.hpp>
//======================================


occa::json parseArgs(int argc, const char **argv);

int main(int argc, const char **argv) {
occa::json args = parseArgs(argc, argv);

occa::setDevice(occa::json::parse(args["options/device"]));

int entries = 1<<20;
int block = 64;
int group = 1;

float *a = new float[entries];
for (int i = 0; i < entries; i++)
a[i] = 0.f;

occa::memory o_a = occa::malloc<float>(entries);
o_a.copyFrom(a);

occa::json kernelProps({
{"defines/block", block},
{"defines/group", group},
{"serial/include_std", true},
});
occa::kernel powerOfPi2 = occa::buildKernel("powerOfPi2.okl",
"powerOfPi2",
kernelProps);
occa::json streamProps({
{"nonblocking", true},
});
occa::stream stream_a = occa::createStream(streamProps);
occa::stream stream_b = occa::createStream(streamProps);

occa::setStream(stream_a);
powerOfPi2(o_a, entries);
occa::streamTag tag_a = occa::tagStream();

// set stream_b to wait for the job(s) to be finished in stream_a
occa::streamWait(stream_b, tag_a);

occa::setStream(stream_b);
powerOfPi2(o_a, entries);
occa::streamTag tag_b = occa::tagStream();

// set the device to wait for stream_b to finish
occa::waitFor(tag_b);

o_a.copyTo(a);

const float tol = 1e-3;
for (auto i = 0; i < entries; i++) {
if (fabs(a[i] - 3.14159) > tol) {
std::cerr << "Invalid output value: " << a[i] << " in " << i << std::endl;
return -1;
}
}
return 0;
}


occa::json parseArgs(int argc, const char **argv) {
occa::cli::parser parser;
parser
.withDescription(
"Example showing the use of multiple device streams"
)
.addOption(
occa::cli::option('d', "device",
"Device properties (default: \"{mode: 'CUDA', device_id: 0}\")")
.withArg()
.withDefaultValue("{mode: 'CUDA', device_id: 0}")
)
.addOption(
occa::cli::option('v', "verbose",
"Compile kernels in verbose mode")
);

occa::json args = parser.parseArgs(argc, argv);
occa::settings()["kernel/verbose"] = args["options/verbose"];

return args;
}
10 changes: 10 additions & 0 deletions examples/cpp/19_stream_tags/powerOfPi2.okl
@@ -0,0 +1,10 @@
@kernel void powerOfPi2(float* x,
int entries) {
for (int g = 0; g < group; g++; @outer) {
for (int i = 0; i < block; ++i; @inner) {
for (int j=i+g*block; j < entries; j+=block*group) {
x[j] = pow(3.14159f,x[j]);
}
}
}
}
1 change: 1 addition & 0 deletions examples/cpp/CMakeLists.txt
Expand Up @@ -14,6 +14,7 @@ add_subdirectory(14_cuda_interop)
add_subdirectory(17_memory_pool)

add_subdirectory(18_nonblocking_streams)
add_subdirectory(19_stream_tags)
add_subdirectory(20_native_dpcpp_kernel)
add_subdirectory(30_device_function)

Expand Down
2 changes: 2 additions & 0 deletions include/occa/c/base.h
Expand Up @@ -35,6 +35,8 @@ occaStreamTag occaTagStream();

void occaWaitForTag(occaStreamTag tag);

void occaStreamWaitForTag(occaStream stream, occaStreamTag tag);

double occaTimeBetweenTags(occaStreamTag startTag,
occaStreamTag endTag);

Expand Down
1 change: 1 addition & 0 deletions include/occa/core/base.hpp
Expand Up @@ -42,6 +42,7 @@ namespace occa {
stream createStream(const occa::json &props = occa::json());
stream getStream();
void setStream(stream s);
void streamWait(stream s, streamTag tag);

streamTag tagStream();

Expand Down
12 changes: 12 additions & 0 deletions include/occa/core/stream.hpp
Expand Up @@ -12,6 +12,7 @@
namespace occa {
class modeStream_t; class stream;
class modeDevice_t; class device;
class streamTag;

/**
* @startDoc{stream}
Expand Down Expand Up @@ -148,6 +149,17 @@ namespace occa {
*/
void finish();

/**
* @startDoc{waitFor}
*
* Description:
* Waits for any operations submitted before a given streamg tag recording
* to complete where the stream tag may be created on a different stream.
*
* @endDoc
*/
void waitFor(occa::streamTag tag);

/**
* @startDoc{unwrap}
*
Expand Down
4 changes: 4 additions & 0 deletions src/c/base.cpp
Expand Up @@ -75,6 +75,10 @@ void occaWaitForTag(occaStreamTag tag) {
occa::waitFor(occa::c::streamTag(tag));
}

void occaStreamWaitForTag(occaStream stream, occaStreamTag tag) {
occa::streamWait(occa::c::stream(stream), occa::c::streamTag(tag));
}

double occaTimeBetweenTags(occaStreamTag startTag,
occaStreamTag endTag) {
return occa::timeBetween(occa::c::streamTag(startTag),
Expand Down
4 changes: 4 additions & 0 deletions src/core/base.cpp
Expand Up @@ -94,6 +94,10 @@ namespace occa {
return getDevice().tagStream();
}

void streamWait(stream s, streamTag tag) {
s.waitFor(tag);
}

memoryPool createMemoryPool(const occa::json &props) {
return getDevice().createMemoryPool(props);
}
Expand Down
5 changes: 5 additions & 0 deletions src/core/stream.cpp
Expand Up @@ -2,6 +2,7 @@
#include <occa/core/device.hpp>
#include <occa/internal/core/device.hpp>
#include <occa/internal/core/stream.hpp>
#include <occa/internal/core/streamTag.hpp>

namespace occa {
stream::stream() :
Expand Down Expand Up @@ -102,6 +103,10 @@ namespace occa {
if(modeStream) modeStream->finish();
}

void stream::waitFor(occa::streamTag tag) {
if(modeStream) modeStream->waitFor(tag);
}

void* stream::unwrap() {
OCCA_ERROR(
"stream::unwrap: stream is uninitialized or has been free'd",
Expand Down
1 change: 1 addition & 0 deletions src/occa/internal/api/metal/commandQueue.hpp
Expand Up @@ -34,6 +34,7 @@ namespace occa {
void freeLastCommandBuffer();

event_t createEvent() const;
void waitForEvent(const event_t &event);

void clearCommandBuffer(void *commandBufferObj);
void setLastCommandBuffer(void *commandBufferObj);
Expand Down
10 changes: 10 additions & 0 deletions src/occa/internal/api/metal/commandQueue.mm
Expand Up @@ -77,6 +77,16 @@
lastCommandBufferObj);
}

void commandQueue_t::waitForEvent(const event_t &event) {
if (lastCommandBufferObj) {
id<MTLEvent> metalEvent = (__bridge id<MTLEvent>) event.eventObj;
id<MTLCommandBuffer> metalCommandBuffer = (
(__bridge id<MTLCommandBuffer>) lastCommandBufferObj
);
[metalCommandBuffer encodeWaitForEvent:metalEvent value:event.signalValue];
}
}

void commandQueue_t::clearCommandBuffer(void *commandBufferObj) {
if (commandBufferObj == lastCommandBufferObj) {
freeLastCommandBuffer();
Expand Down
1 change: 1 addition & 0 deletions src/occa/internal/api/metal/event.hpp
Expand Up @@ -13,6 +13,7 @@ namespace occa {
int eventId;
void *commandBufferObj;
double eventTime;
int signalValue;

event_t();

Expand Down
13 changes: 11 additions & 2 deletions src/occa/internal/api/metal/event.mm
Expand Up @@ -25,10 +25,17 @@
eventObj(eventObj_),
eventId(eventId_),
commandBufferObj(commandBufferObj_),
eventTime(0) {
eventTime(0),
signalValue(1) {
// If there are no active command buffers, use the current time
if (!commandBufferObj) {
eventTime = occa::sys::currentTime();
} else {
id<MTLEvent> metalEvent = (__bridge id<MTLEvent>) eventObj;
id<MTLCommandBuffer> metalCommandBuffer = (
(__bridge id<MTLCommandBuffer>) commandBufferObj
);
[metalCommandBuffer encodeSignalEvent:metalEvent value:signalValue];
}
}

Expand All @@ -37,14 +44,16 @@
eventId(other.eventId),
eventObj(other.eventObj),
commandBufferObj(other.commandBufferObj),
eventTime(other.eventTime) {}
eventTime(other.eventTime),
signalValue(other.signalValue) {}

event_t& event_t::operator = (const event_t &other) {
commandQueue = other.commandQueue;
eventId = other.eventId;
eventObj = other.eventObj;
commandBufferObj = other.commandBufferObj;
eventTime = other.eventTime;
signalValue = other.signalValue;
return *this;
}

Expand Down
1 change: 1 addition & 0 deletions src/occa/internal/api/metal/polyfill.cpp
Expand Up @@ -73,6 +73,7 @@ namespace occa {
event_t commandQueue_t::createEvent() const {
return event_t();
}
void commandQueue_t::waitForEvent(const event_t &event) {}

void commandQueue_t::clearCommandBuffer(void *commandBufferObj) {}
void commandQueue_t::setLastCommandBuffer(void *commandBufferObj) {}
Expand Down
1 change: 1 addition & 0 deletions src/occa/internal/core/stream.hpp
Expand Up @@ -25,6 +25,7 @@ namespace occa {

//---[ Virtual Methods ]------------
virtual void finish() = 0;
virtual void waitFor(streamTag tag) = 0;

virtual void* unwrap() = 0;
//==================================
Expand Down
4 changes: 4 additions & 0 deletions src/occa/internal/modes/cuda/polyfill.hpp
Expand Up @@ -275,6 +275,10 @@ namespace occa {
inline CUresult cuStreamSynchronize(CUstream hStream) {
return OCCA_CUDA_IS_NOT_ENABLED;
}

inline CUresult cuStreamWaitEvent(CUstream hStream, CUevent hEvent, unsigned int Flags) {
return OCCA_CUDA_IS_NOT_ENABLED;
}
}

#endif
Expand Down
9 changes: 9 additions & 0 deletions src/occa/internal/modes/cuda/stream.cpp
@@ -1,4 +1,5 @@
#include <occa/internal/modes/cuda/stream.hpp>
#include <occa/internal/modes/cuda/streamTag.hpp>
#include <occa/internal/modes/cuda/utils.hpp>

namespace occa {
Expand All @@ -24,6 +25,14 @@ namespace occa {
cuStreamSynchronize(cuStream));
}

void stream::waitFor(occa::streamTag tag) {
occa::cuda::streamTag *cuTag = (
dynamic_cast<occa::cuda::streamTag*>(tag.getModeStreamTag())
);
OCCA_CUDA_ERROR("Stream: waitFor",
cuStreamWaitEvent(cuStream, cuTag->cuEvent, 0));
}

void* stream::unwrap() {
return static_cast<void*>(&cuStream);
}
Expand Down
1 change: 1 addition & 0 deletions src/occa/internal/modes/cuda/stream.hpp
Expand Up @@ -21,6 +21,7 @@ namespace occa {

virtual ~stream();
void finish() override;
void waitFor(occa::streamTag tag) override;

void* unwrap() override;
};
Expand Down
1 change: 1 addition & 0 deletions src/occa/internal/modes/dpcpp/polyfill.hpp
Expand Up @@ -295,6 +295,7 @@ class queue {
}

sycl::event ext_oneapi_submit_barrier() { return sycl::event();}
sycl::event ext_oneapi_submit_barrier( const std::vector<sycl::event> &waitList ) { return sycl::event(); }
};

inline void* malloc_device(size_t num_bytes,
Expand Down

0 comments on commit 4ea065d

Please sign in to comment.