# Сложение двух векторов на ГПУ с помощью RAY.
### Размер, кол-во гпу и тп задается в коде.

In [None]:
# Library and frameworks import

import numpy as np
import cupy as cp
import time
import ray
import os
from scipy.stats import trim_mean
os.environ["RAY_DEDUP_LOGS"] = "0"
print("Import done, Hello")

2023-12-13 17:28:38,113	INFO util.py:159 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.


Import done, Hello


In [None]:
!nvidia-smi

Wed Dec 13 17:28:39 2023       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 510.108.03   Driver Version: 510.108.03   CUDA Version: 11.6     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  NVIDIA A100-SXM...  Off  | 00000000:51:00.0 Off |                    0 |
| N/A   30C    P0    69W / 400W |   3499MiB / 81920MiB |      0%      Default |
|                               |                      |             Disabled |
+-------------------------------+----------------------+----------------------+
|   1  NVIDIA A100-SXM...  Off  | 00000000:57:00.0 Off |                    0 |
| N/A   32C    P0    59W / 400W |      2MiB / 81920MiB |      0%      Default |
|       

In [None]:
"""
Задача. Есть 2 случайных вектора размера N. Мы их разбиваем на подвектора 'subvectors'

Пример:
num_subvectors = 2
v1 = [0,1,2,3], v2 = [5, 6, 7, 8] -->
subv1 = [[0,1], [2, 3]], subv2 = [[5, 6], [7, 8]]

Функция RAY принимает на вход 2 вектора и складывает их.

Вызываем функцию RAY для сложения одновременно всех подвекторов.
"""
num_subvectors = 4 # nubmer of subvectors
num_gpus = 1 # number of gpus in system physically
num_cpus = 36 # number of cpus in system physically
num_gpus_per_worker = num_gpus/num_subvectors # How much resources will one worker use from GPU in fraction
                                              # without blocking the other part

N = 1 * 10**8 #size of arrays
large_vector1 = np.random.rand(N)
large_vector2 = np.random.rand(N)

sub_vectors = np.array_split(large_vector1, num_subvectors) # This is list of lists sizez of
sub_vectors2 = np.array_split(large_vector2, num_subvectors) # (num_subvectors, N/subvectors)

repeat = 100 # Code repeats 100 times to gain statistics
cut_off = 0.1 # To calculate the mean value of time estimation 10% of array will be cut from both sides (trim_mean)

@ray.remote(num_gpus=num_gpus_per_worker) # Here we say that one worker will use only 'num_gpus_per_worker' resours
def add_vectors_on_gpu(vector_a, vector_b):
    """
    This func get 2 vectors and add them with time estimation by repeating "repeat" times
    Return the summation vector result, averages of timing
    """
    # array to contain time estimation to gain statistics
    copy_time = [] # time to copy from Host to Device (H2D)
    calc_time = [] # H2D + add 2 vecs calc time (CALC)
    back_time = [] # H2D + CALC + time to copy back from Device to Host (D2H)
    #cpu_time = [] # time measure with time.time() CPU method - only for comparison

    for i in range(repeat):

        # Setting start time points

        start_gpu_time = time.time()
        start_event = cp.cuda.Event()
        end_event_1 = cp.cuda.Event()
        end_event_2 = cp.cuda.Event()
        end_event_3 = cp.cuda.Event()

        # H2D procedure and timing

        start_event.record()
        gpu_vector_a = cp.asarray(vector_a)
        gpu_vector_b = cp.asarray(vector_b)
        end_event_1.record()
        end_event_1.synchronize()

        # Calculation procedure and H2D + CALC timing

        result = cp.add(gpu_vector_a, gpu_vector_b)
        end_event_2.record()
        end_event_2.synchronize()

        # D2H procedure and H2D + CALC + D2H timing
        res = cp.asnumpy(result)
        end_event_3.record()
        end_event_3.synchronize()


        # Time data collection
        copy_time.append(cp.cuda.get_elapsed_time(start_event, end_event_1))
        calc_time.append(cp.cuda.get_elapsed_time(start_event, end_event_2))
        back_time.append(cp.cuda.get_elapsed_time(start_event, end_event_3))# Время в миллисекундах

        end_gpu_time = time.time()

    # After all repeats, calculate trim mean of each timing
    append_time = end_gpu_time - start_gpu_time
    avg_copy = trim_mean(copy_time, cut_off)
    avg_calc = trim_mean(calc_time, cut_off)
    avg_back = trim_mean(back_time, cut_off)

    return res, avg_copy, avg_calc, avg_back, append_time



In [None]:
# Starting Ray instance
ray.init(num_cpus=num_cpus, num_gpus=num_gpus, include_dashboard=False, ignore_reinit_error=True)
print("Initiation done")

start = time.time() # CPU time estimation

# This thing makes the same like :
# results = []
# for i in range(num_subvectors):
#     buf = add_vectors_on_gpu.remote(sub_vectors[i][:], sub_vetors2[i][:])
#     results.append(buf)

results = [add_vectors_on_gpu.remote(sub_v1, sub_v2) for sub_v1, sub_v2 in zip(sub_vectors, sub_vectors2)]
final_results = ray.get(results)


end = time.time()
ray.shutdown()

print(f"total time = {(end - start)*1000} ms")
print(f"time elapsed from CPU for 1 loop: {(end - start)*1000/repeat} ms")

2023-12-13 17:35:10,585	INFO worker.py:1621 -- Started a local Ray instance.


Initiation done
total time = 18415.777921676636 ms
time elapsed from CPU for 1 loop: 184.15777921676636 ms


In [None]:
#post production
final_result = np.concatenate([result[0] for result in final_results])
copy_time = np.array([res[1] for res in final_results])
calc_time = np.array([res[2] for res in final_results])
back_time = np.array([res[3] for res in final_results])
append_time = np.array([res[4] for res in final_results])
# this thing make right time in rigth place
back_time = back_time - calc_time
calc_time = calc_time - copy_time

In [None]:
label = f"""AVG TIMES are given per one device! If you want for each device remove np.mean()
Copy straight avg time = {np.mean(copy_time)} ms
Calc avg time = {np.mean(calc_time) } ms
Copy avg back time = {np.mean(back_time)} ms
Total avg time = {np.mean(copy_time + calc_time + back_time)} ms
append time = {np.mean(append_time*1000)} ms
"""
print(label)

AVG TIMES are given per one device! If you want for each device remove np.mean()
Copy straight avg time = 76.94807415008545 ms
Calc avg time = 0.5758175849914551 ms
Copy avg back time = 54.50511145591736 ms
Total avg time = 132.02900319099427 ms
append time = 128.00651788711548 ms

