diff --git a/DirectProgramming/C++SYCL_FPGA/Tutorials/DesignPatterns/buffered_host_streaming/src/HostStreamer.hpp b/DirectProgramming/C++SYCL_FPGA/Tutorials/DesignPatterns/buffered_host_streaming/src/HostStreamer.hpp index 6885f7d4b1..c89aa10f9e 100755 --- a/DirectProgramming/C++SYCL_FPGA/Tutorials/DesignPatterns/buffered_host_streaming/src/HostStreamer.hpp +++ b/DirectProgramming/C++SYCL_FPGA/Tutorials/DesignPatterns/buffered_host_streaming/src/HostStreamer.hpp @@ -222,6 +222,9 @@ class HostStreamer { // queue (sycl_q_) to perform the request. It also performs the callbacks // to the user code when the requests have been completed. static void KernelLaunchAndWaitThread() { + size_t producer_count = 0; + size_t consumer_count = 0; + // Do this loop until told (by main thread) to stop via the // 'kill_kernel_thread_flag_' atomic shared variable. while (!kill_kernel_thread_flag_) { @@ -238,6 +241,9 @@ class HostStreamer { // pop from the Producer queue produce_q_.Pop(); + + // accumulate producer count + producer_count += count; } // If there is a Consume request to launch, do it @@ -247,12 +253,18 @@ class HostStreamer { size_t count; std::tie(buf_idx, count) = consume_q_.Front(); - // launch the kernel and push the request to the launch queue - auto e = LaunchConsumerKernel(consumer_buffer_[buf_idx], count); - launch_q_.Push(std::make_tuple(buf_idx, count, e, false)); + // Only launch consumer when there is enough producer count + if (producer_count >= consumer_count + count) { + // launch the kernel and push the request to the launch queue + auto e = LaunchConsumerKernel(consumer_buffer_[buf_idx], count); + launch_q_.Push(std::make_tuple(buf_idx, count, e, false)); - // pop from the Consumer queue - consume_q_.Pop(); + // pop from the Consumer queue + consume_q_.Pop(); + + // accumulate consumer count + consumer_count += count; + } } // Wait on the oldest event to finish given 2 conditions: @@ -261,7 +273,7 @@ class HostStreamer { // 2) the user has requested us to flush the launch queue and the // launch queue is not empty (i.e. flush_ && launch_q_.size() != 0) if ((launch_q_.Size() >= wait_threshold_) || - (flush_ && !LaunchQueueEmpty() && ProducerQueueEmpty() && ConsumerQueueEmpty())) { + (flush_ && !LaunchQueueEmpty())) { // grab the oldest request from the launch queue size_t buf_idx; size_t count; @@ -276,9 +288,11 @@ class HostStreamer { if (request_was_producer) { //std::cout << "Calling Producer Callback" << std::endl; producer_callback(count); + producer_count -= count; } else { //std::cout << "Calling Consumer Callback" << std::endl; consumer_callback(consumer_buffer_[buf_idx], count); + consumer_count -= count; } // Pop from the launch queue. This has to be done AFTER waiting on