# Optimising compute with concurrent IO in HIP

With many iterative processes there is a need to get information **off** the device at regular intervals. Up to this point we have been transferring data off the compute device **after** kernel execution. Furthermore, the routines to read information from device buffers have thus far been used in a blocking manner, that is the program pauses while the read occurs. Most compute devices have the ability to transfer data **while** kernels are being executed. This means IO transfers can take place during compute and may in some instances **take place entirely** during kernel execution. For the cost of additional programming complexity, significant compute savings can be obtained, as the following diagram illustrates:

<figure style="margin-bottom 3em; margin-top: 2em; margin-left:auto; margin-right:auto; width:100%">
    <img style="vertical-align:middle" src="../images/optimising_io.svg"> <figcaption style= "text-align:lower; margin:1em; float:bottom; vertical-align:bottom;">Figure: The difference between sequential and concurrent IO.</figcaption>
</figure>

## Concurrent IO is enabled with multiple streams

A stream is a place where one can perform work, such as the execution of a kernel or an IO operation. Thus far we have been using just the null stream for compute and IO. GPU's have the ability to run streams that do IO **at the same time** as streams devoted to compute, and if a kernel does not occupy all hardware thread on the device, then streams allow for multiple kernels to be run at the same time. In either case **there is a performance avantage to be gained** in using multiple streams. 

Streams are initialised using the commands **hipStreamCreate** and **hipStreamCreateWithFlags**. The latter command allows one to create streams where the null stream can optionally wait for work in the stream to complete (implicitly synchronise) before running its own work. The command **h_create_streams** from <a href="../include/hip_helper.hpp">hip_helper.hpp</a> is used to create additional streams with the option of implicitly synchronising with the null stream. Below is the code for **h_create_streams**:

```C++
/// Create a number of streams
hipStream_t* h_create_streams(size_t nstreams, int synchronise) {
    // Blocking is a boolean, 0==no, 
    assert(nstreams>0);

    unsigned int flag = hipStreamDefault;

    // If blocking is 0 then set NonBlocking flag
    // meaning we don't synchronise with the null stream
    if (synchronise == 0) {
        flag = hipStreamNonBlocking;
    }

    // Make the streams
    hipStream_t* streams = (hipStream_t*)calloc(nstreams, sizeof(hipStream_t));

    for (int i=0; i<nstreams; i++) {
        H_ERRCHK(hipStreamCreateWithFlags(&streams[i], flag));
    }

    return streams;
}
```

If streams operate at the same time (concurrently), then there must be synchronisation controls. HIP events may be inserted into streams, and the event will register as complete after all previous work in the stream is done. Both event and stream synchronisation commands can help establish dependencies between streams. Non-blocking asynchronous IO calls are also vital for concurrency.

## Example with the 2D wave equation

The [scalar wave equation](https://en.wikipedia.org/wiki/Wave_equation) adequately describes the propagation of waves. If **U** is a 2D grid storing the amplitude of the wave at every location (the wavefield), **V** is a 2D grid storing velocity, and **t** is time, then 2D waves propagate according to the formula,

$$\frac{\partial^2 \textbf{U}}{{\partial t}^2}=\textbf{V}^2 \left (\frac{\partial^2 \textbf{U}}{{\partial x_{0}}^2}+\frac{\partial^2 \textbf{U}}{{\partial x_{1}}^2} \right)+f(t)$$

where $x_0$ and $x_1$ are spatial directions and $f(t)$ is a forcing term. If $\Delta t$ is the time step a second-order finite-difference approximation to the time derivative is given in terms of the amplitude at timesteps $\textbf{U}_{0}, \textbf{U}_{1}$ and $\textbf{U}_{2}.$ 

$$\frac{\partial^2 \textbf{U}}{{\partial t}^2} \approx \frac{1}{\Delta t^2} \left ( \textbf{U}_{0} -2 \textbf{U}_{1}+\textbf{U}_{2} \right ) $$

Replace $\frac{\partial^2 \textbf{U}}{{\partial t}^2}$ with $\frac{1}{\Delta t^2} \left( \textbf{U}_{0} -2 \textbf{U}_{1}+\textbf{U}_{2} \right )$ and solve for $\textbf{U}_{2}$.

$$\textbf{U}_{2} \approx 2 \textbf{U}_{1} - \textbf{U}_{0} + \Delta t^2\textbf{V}^2 \left (\frac{\partial^2 \textbf{U}_{1}}{{\partial x_{0}}^2}+\frac{\partial^2 \textbf{U}_{1}}{{\partial x_{1}}^2} \right)+f_{1}$$

This is an iterative formula to generate the amplitude at the next timestep $\textbf{U}_2$ if we know the present ampltiude $\textbf{U}_{1}$ and past amplitude $\textbf{U}_{0}.$ We also use finite difference approximations for the spatial derivatives, and express the spatial derivatives as a matrix multiplied by $\textbf{U}_{1}$, but this complexity is unnecessary to show here. All we need to know is that the next timestep is a function ${\textbf{F}}$ of the present and past timesteps, the velocity, and the forcing term.

$$\textbf{U}_{2}=\textbf{F}(\textbf{U}_0, \textbf{U}_1, \textbf{V}, f_{1})$$

> In geophysics we usually use a [Ricker Wavelet](https://wiki.seg.org/wiki/Dictionary:Ricker_wavelet) for the forcing term $f$, and usually inject that wavelet into one cell within the grid as time progresses.

### Kernel implementation

In both [wave2d_sync.cpp](wave2d_sync.cpp) and [wave2d_async.cpp](wave2d_async.cpp) is a kernel called **wave2d_4o** that implements the function **F**. HIP device allocations store $\textbf{U}_{0}, \textbf{U}_{1}, \textbf{U}_{2}$, and $\textbf{V}$ on the compute device.

```C++
// Kernel to solve the wave equation with fourth-order accuracy in space
__global__ void wave2d_4o (
        // Arguments
        float_type* U0,
        float_type* U1,
        float_type* U2,
        float_type* V,
        size_t N0,
        size_t N1,
        float dt2,
        float inv_dx02,
        float inv_dx12,
        // Position, frequency, and time for the
        // wavelet injection
        size_t P0,
        size_t P1,
        float pi2fm2t2) {    

    // U2, U1, U0, V is of size (N0, N1)
    size_t i0 = blockIdx.y * blockDim.y + threadIdx.y;
    size_t i1 = blockIdx.x * blockDim.x + threadIdx.x;
    
    // Required padding and coefficients for spatial finite difference
    const int pad_l=2, pad_r=2, ncoeffs=5;
    float coeffs[ncoeffs] = {-0.083333336f, 1.3333334f, -2.5f, 1.3333334f, -0.083333336f};
    
    // Limit i0 and i1 to the region of U2 within the padding
    i0=min(i0, (size_t)(N0-1-pad_r));
    i1=min(i1, (size_t)(N1-1-pad_r));
    i0=max((size_t)pad_l, i0);
    i1=max((size_t)pad_l, i1);
    
    // Position within the grid as a 1D offset
    long offset=i0*N1+i1;
    
    // Temporary storage
    float temp0=0.0f, temp1=0.0f;
    float tempV=V[offset];
    
    // Calculate the Laplacian
    for (long n=0; n<ncoeffs; n++) {
        // Stride in dim0 is N1        
        temp0+=coeffs[n]*U1[offset+(n*(long)N1)-(pad_l*(long)N1)];
        // Stride in dim1 is 1
        temp1+=coeffs[n]*U1[offset+n-pad_l];
    }
    
    // Calculate the wavefield U2 at the next timestep
    U2[offset]=(2.0f*U1[offset])-U0[offset]+((dt2*tempV*tempV)*(temp0*inv_dx02+temp1*inv_dx12));
    
    // Inject the forcing term at coordinates (P0, P1)
    if ((i0==P0) && (i1==P1)) {
        U2[offset]+=(1.0f-2.0f*pi2fm2t2)*exp(-pi2fm2t2);
    }
    
}
```

### Problem setup

For this problem we create the 2D grid as a square box of size $(N0,N1)=(256,256)$. The velocity is uniform at 343m/s. This is approximately the speed of sound in air. Then we use a Ricker wavelet as a forcing term to 'let off a firework' in the middle of the box and run a number of timesteps to see how a sound wave propagates in the box. 

<figure style="margin-bottom 3em; margin-top: 2em; margin-left:auto; margin-right:auto; width:80%">
    <img style="vertical-align:middle" src="../images/wave2d_problem.svg"> <figcaption style= "text-align:lower; margin:1em; float:bottom; vertical-align:bottom;">Figure: Problem setup for the 2D wave equation.</figcaption>
</figure>

The programs [wave2d_sync.cpp](wave2d_sync.cpp) and [wave2d_async.cpp](wave2d_async.cpp) setup a velocity and wavefield of size (N0, N1). At each timestep the kernel **wave2d_4o** is used to update the solution and a Ricker wavelet is injected into the middle of the box. Enough timesteps are alloted so that the wave propagates through the medium and reflects off the walls. Wavefield arrays that are no longer needed are recycled for efficiency.

The Python code below readies the framework for plotting the answer

In [14]:
%matplotlib widget

import os
import sys
import numpy as np
import subprocess
from ipywidgets import widgets
from matplotlib import pyplot as plt
from matplotlib import animation, rc
from IPython.display import HTML

sys.path.insert(0, os.path.abspath("../include"))

import py_helper

float_type = np.float32

defines=py_helper.load_defines("mat_size.hpp")

### Sequential (synchronous) IO solution

In [wave2d_sync.cpp](wave2d_sync.cpp) we use an array of three HIP device allocations to represent the wavefield at timesteps (0,1,2). The null stream (stream 0) is used for both kernel execution and IO.

<figure style="margin-bottom 3em; margin-top: 2em; margin-left:auto; margin-right:auto; width:100%">
    <img style="vertical-align:middle" src="../images/sequential_io.svg"> <figcaption style= "text-align:lower; margin:1em; float:bottom; vertical-align:bottom;">Figure: Sequential IO solution.</figcaption>
</figure>

#### Make and run the application.

In [15]:
!make

make: Nothing to be done for 'all'.


In [16]:
subprocess.run([os.path.join(os.getcwd(),"wave2d_sync.exe")])

Device id: 0
	name:                                    
	global memory size:                      536 MB
	available registers per block:           65536 
	maximum shared memory size per block:    65 KB
	maximum pitch size for memory copies:    402 MB
	max block size:                          (1024,1024,1024)
	max threads in a block:                  1024
	max Grid size:                           (2147483647,2147483647,2147483647)
dt=0.001166, Vmax=343.000000
dt=0.00116618, fm=34.3, Vmax=343, dt2=1.35998e-06
The synchronous calculation took 48.430000 milliseconds.


CompletedProcess(args=['/home/toby/Pelagos/Projects/HIP_Course/course_material/L8_IO_Optimisation/wave2d_sync.exe'], returncode=0)

#### Plot the output wavefield

At the end of iteration a binary file containing the wavefield at every timestep is written to the file **array_out.dat**. We can read in this wavefield and plot it below. 

In [17]:
# Read the output file back in for display
output_sync=np.fromfile("array_out.dat", dtype=float_type)
nimages_sync=int(output_sync.size//(defines["N0_U"]*defines["N1_U"]))
images_sync=output_sync.reshape(nimages_sync, defines["N0_U"], defines["N1_U"])

py_helper.plot_slices(images_sync)

interactive(children=(IntSlider(value=0, description='n', max=639), Output()), _dom_classes=('widget-interact'…

#### Application trace

The script **make_traces.sh** produces traces in the **tau** folder.

In [18]:
!./make_traces.sh

RPL: on '230726_151432' from '/opt/rocm-5.6.0' in '/home/toby/Pelagos/Projects/HIP_Course/course_material/L8_IO_Optimisation'
RPL: profiling '"./wave2d_sync.exe"'
RPL: input file ''
RPL: output dir '/tmp/rpl_data_230726_151432_34580'
RPL: result dir '/tmp/rpl_data_230726_151432_34580/input_results_230726_151432'
ROCProfiler: input from "/tmp/rpl_data_230726_151432_34580/input.xml"
  0 metrics
ROCtracer (34603):
    HSA-trace(*)
    HSA-activity-trace()
    HIP-trace(*)
Device id: 0
	name:                                    
	global memory size:                      536 MB
	available registers per block:           65536 
	maximum shared memory size per block:    65 KB
	maximum pitch size for memory copies:    402 MB
	max block size:                          (1024,1024,1024)
	max threads in a block:                  1024
	max Grid size:                           (2147483647,2147483647,2147483647)
dt=0.001166, Vmax=343.000000
dt=0.00116618, fm=34.3, Vmax=343, dt2=1.35998e-06
The synchrono

Then you can go to the address [https://ui.perfetto.dev](https://ui.perfetto.dev) to open the tracing utility. If you load the file **rocprof_trace/trace_sync.json** you should see something like this. The IO occurs after each kernel execution using the same command queue.

<figure style="margin-bottom 3em; margin-top: 2em; margin-left:auto; margin-right:auto; width:100%">
    <img style="vertical-align:middle" src="../images/synchronous_io.png"> <figcaption style= "text-align:lower; margin:1em; float:bottom; vertical-align:bottom;">Figure: Sequential IO solution.</figcaption>
</figure>

### Concurrent (asynchronous) IO solutions

In [wave2d_async_streams.cpp](wave2d_async_streams.cpp) and [wave2d_async_events.cpp](wave2d_async_events.cpp) are solutions for concurrent IO. The goal is to use **multiple streams** so that while one stream is executing a kernel the others are working on IO.

<figure style="margin-bottom 3em; margin-top: 2em; margin-left:auto; margin-right:auto; width:100%">
    <img style="vertical-align:middle" src="../images/concurrent_io.svg"> <figcaption style= "text-align:lower; margin:1em; float:bottom; vertical-align:bottom;">Figure: Concurrent IO solution.</figcaption>
</figure>

Both solutions use streams, however the difference between the two solutions is that [wave2d_async_streams.cpp](wave2d_async_streams.cpp) uses explicit synchronisation on streams to establish the necessary dependencies between IO and compute, while [wave2d_async_events.cpp](wave2d_async_events.cpp) is focused on using HIP events to achieve the same synchronisation. Both accomplish the same task of moving data while compute is taking place.

#### Concurrent access to buffers from the host and device

It is a **race condition** to read from a device allocation (from another stream) at the same time as a kernel is writing to the allocation. Furthermore, while it is technically possible to asynchronously read from an array that is also being read by a kernel, it can lead to undefined behaviour as memory can be moved around. Therefore, it is **recommended** to perform IO only on buffers that we **know for sure** are not being used by a kernel. Our kernel needs access to wavefields at timesteps $\textbf{U}_{0}, \textbf{U}_{1}, \textbf{U}_{2}$, therefore they are **not** safe to copy from, but wavefields at earlier timesteps e.g $\textbf{U}_{-2}, \textbf{U}_{-1}$ **are** safe to copy.

<figure style="margin-bottom 3em; margin-top: 2em; margin-left:auto; margin-right:auto; width:30%">
    <img style="vertical-align:middle" src="../images/wavefields.svg"> <figcaption style= "text-align:lower; margin:1em; float:bottom; vertical-align:bottom;">Figure: Wavefields that are ok to copy.</figcaption>
</figure>

In this instance, a solution to enable concurrent IO is to have an array of at least four memory allocations on the device to represent the wavefield. We choose an array of **nscratch=5** allocations to allow extra leeway for copies to finish. Associated with the buffers is an array of five streams for IO and one stream for compute. We also create an array of five events to demonstrate the use of events in workflow dependencies.

#### Stream-based synchronisation

The command **hipStreamSynchronize** 

# Got to here


#### Event-based synchronisation

 During each iteration **n** of the time loop then:

1. We use **hipStreamWaitEvent** to make the compute stream wait for past events arising from IO streams at **(n+2)** and **(n+1)**. 
1. For teaching purposes use **hipEventSynchronize** to make the host wait on the IO event at **n**.
1. Then use **hipStreamSynchronize** on the compute stream to make sure there is no backlog of work.
1. Submit the kernel to compute stream to solve for U[(n+2)%nscratch], then record an event (at index **n**) into the compute stream.
1. The wavefield at **(n-1)** (which we call the *copy_index*) is safe to copy once the compute stream is done with it. Use **hipStreamWaitEvent** to make the IO stream at **(n-1)** wait on event at **(n-1)**. Then use the IO stream and **hipMemcpy3DAsync** to asynchronously copy the wavefield at **(n-1)** to the stack of output images. 
1. Use **hipStreamWaitEvent** so that the stream used for the copy waits for the kernel to finish. Record an event to the IO stream after the copy. These events will be then awaited on in the next iteration (step 1).

The following diagram shows how the dependencies play out with events and streams during a single iteration.

<figure style="margin-bottom 3em; margin-top: 2em; margin-left:auto; margin-right:auto; width:100%">
    <img style="vertical-align:middle" src="../images/wavefields_concurrent.svg"> <figcaption style= "text-align:lower; margin:1em; float:bottom; vertical-align:bottom;">Figure: Compute and IO queues during an iteration.</figcaption>
</figure>

The code for the iterations in [wave2d_async.cpp](wave2d_async.cpp) is produced here.

```C++
    for (int n=0; n<NT; n++) {
        
        // Wait for the event associated with a stream
        
        // Can make a stream wait on an event
        // Make the compute stream wait on previous copies
        H_ERRCHK(hipStreamWaitEvent(compute_stream, events[(n+2)%nscratch], 0));
        H_ERRCHK(hipStreamWaitEvent(compute_stream, events[(n+1)%nscratch], 0));
        
        // Or wait on an event directly
        H_ERRCHK(hipEventSynchronize(events[n%nscratch]));
        
        // Can wait on a stream directly also
        H_ERRCHK(hipStreamSynchronize(compute_stream));
        
        // Get the wavefields
        U0_d = U_ds[n%nscratch];
        U1_d = U_ds[(n+1)%nscratch];
        U2_d = U_ds[(n+2)%nscratch];
        
        // Shifted time
        t = n*dt-2.0*td;
        pi2fm2t2 = pi*pi*fm*fm*t*t;
        
        // Launch the kernel using hipLaunchKernelGGL method
        // Use 0 when choosing the default (null) stream
        hipLaunchKernelGGL(wave2d_4o, 
            grid_nblocks, block_size, sharedMemBytes, compute_stream,
            U0_d, U1_d, U2_d, V_d,
            N0, N1, dt2,
            inv_dx02, inv_dx12,
            P0, P1, pi2fm2t2
        );
                           
        // Check the status of the kernel launch
        H_ERRCHK(hipGetLastError());
          
        // Insert an event into stream at n%nscratch
        // It will complete afer the kernel does
        H_ERRCHK(hipEventRecord(events[n%nscratch], compute_stream));   
        
        // Read memory from the buffer to the host in an asynchronous manner
        if (n>2) {
            size_t copy_index=n-1;
            
            // Insert a wait for the copy stream on the compute event
            H_ERRCHK(
                hipStreamWaitEvent(
                    streams[copy_index%nscratch], 
                    events[copy_index%nscratch],
                    0
                )
            );
            
            // Then asynchronously copy a wavefield back
            // Using the copy stream
            
            // Only change what is necessary in copy_parms
            copy_parms.srcPtr.ptr = U_ds[copy_index%nscratch];
            
            // Z positions of 1 don't seem to work on AMD platforms?!?!
            copy_parms.dstPos.z = copy_index;
            
            // Copy memory asynchronously
            H_ERRCHK(
                hipMemcpy3DAsync(
                    &copy_parms,
                    streams[copy_index%nscratch]
                )
            );
            // Record the event to a stream
            H_ERRCHK(
                hipEventRecord(
                    events[copy_index%nscratch],
                    streams[copy_index%nscratch]
                )
            );
            
            // Copy memory synchronously
            //H_ERRCHK(
            //    hipMemcpy3D(
            //        &copy_parms
            //        //streams[copy_index%nscratch]
            //    )
            //); 
        }
    }
```

> With this example I used the function **hipMemcpy3DAsync** to copy wavefields as planes back to the host memory allocation. For some bizarre reason, during construction of this example I found that on AMD platforms a value of **copy_parms.dstPos.z=1** resulted in an error for calls to either **hipMemcpy3D** or **hipMemcpy3DAsync**. This strange behaviour was not present with the NVIDIA backend. I have worked around this issue by only copying planes when **n>2**.

#### Make and run the example

In [19]:
!make

hipcc -g -fopenmp -O2 -I../include wave2d_async.cpp -o wave2d_async.exe 


In [29]:
# Run the application
subprocess.run([os.path.join(os.getcwd(),"wave2d_async_streams.exe")])

Device id: 0
	name:                                    
	global memory size:                      536 MB
	available registers per block:           65536 
	maximum shared memory size per block:    65 KB
	maximum pitch size for memory copies:    402 MB
	max block size:                          (1024,1024,1024)
	max threads in a block:                  1024
	max Grid size:                           (2147483647,2147483647,2147483647)
dt=0.001166, Vmax=343.000000
dt=0.00116618, fm=34.3, Vmax=343, dt2=1.35998e-06
The asynchronous calculation took 23.229000 milliseconds.


CompletedProcess(args=['/home/toby/Pelagos/Projects/HIP_Course/course_material/L8_IO_Optimisation/wave2d_async_streams.exe'], returncode=0)

If we check the time elapsed we find that the concurrent IO solution took less time than the sequential IO solution. A trace of the HIP activity shows that IO is taking place during compute.

<figure style="margin-bottom 3em; margin-top: 2em; margin-left:auto; margin-right:auto; width:100%">
    <img style="vertical-align:middle" src="../images/asynchronous_io.png"> <figcaption style= "text-align:lower; margin:1em; float:bottom; vertical-align:bottom;">Figure: Concurrent IO solution.</figcaption>
</figure>


### Plot the wavefield and explore results

In [30]:
# Read the output file back in for display
output_async=np.fromfile("array_out.dat", dtype=float_type)
nimages_async=int(output_async.size//(defines["N0_U"]*defines["N1_U"]))
images_async=output_async.reshape(nimages_async, defines["N0_U"], defines["N1_U"])

py_helper.plot_slices(images_async)

print(f"Maximum residual between results is {np.max(images_async-images_sync)}")

interactive(children=(IntSlider(value=0, description='n', max=639), Output()), _dom_classes=('widget-interact'…

Maximum residual between results is 0.0


## Summary of learnings

In this module we explored how IO can take place at the same time as a kernel using multiple streams. The concurrent IO solution was faster than the sequential IO solution. With concurrent IO it is a safety measure to avoid accessing memory allocations that are being used by a kernel, unless the allocated is managed. Both stream and event-based synchronisation can establish and enforce dependencies between activity that occurs across multiple streams.

<address>
Written by Dr. Toby Potter of <a href="https://www.pelagos-consulting.com">Pelagos Consulting and Education</a> for the Pawsey Supercomputing Centre
</address>