# Numba

Numba-это компилятор для массивов Python и числовых функций, который дает вам возможность ускорить ваши приложения с помощью высокопроизводительных функций, написанных непосредственно на Python.

Numba генерирует оптимизированный машинный код из чистого кода Python с использованием инфраструктуры компилятора LLVM. С помощью numba тяжелый код Python может быть оптимизирован до производительности, аналогичной C, C++ и Fortran, без необходимости переключения языков или интерпретаторов Python.

Оптимизация происходит при помощи использования различных декораторов.

### @jit
Самый базовый декоратор, который использует Numba JIT компилятор для оптимизации. Numba сама анализирует код и решает, какие его части можно оптимизировать. Внутри генерируется код, обрабатывающий все значения как объекты Python и использующий Python C API для выполнения всех операций с этими объектами. Чаще всего он будет не быстрее, чем оригинальный код на Python за исключением оптимизации циклов в nopython режиме.

### @njit

Режим компиляции Numba, который генерирует код, не имеющий доступа к Python C API. Этот режим компиляции создает код с наивысшей производительностью, но может взаимодействовать только с определенными типами объектов.

http://numba.pydata.org/numba-doc/latest/reference/pysupported.html#pysupported-builtin-types

Параметры:

nogil - отключает GIL для nopython mode и позволяет использовать многопоточность, но в таком режиме не стоит забывать про подводные камни многопоточности: согласованность, синхронизация, условия конкурренции.

parallel - позволяет автоматически распараллеливать код там, где это возможно (https://numba.readthedocs.io/en/stable/user/parallel.html#numba-parallel)

cache - кеширует результат работы функции

### @vectorize

Данный декоратор позволяет создавать универсальный функции ufuncs оперируя синтаксисом numpy, достигая производительности, как на ufuncs написанных на языке C. Функция работает с данными не как с массивами, а отдельно с каждым скаляром, создавая оптимальные циклы для итерирования. Данный вариант не всегда будет быстрее, чем njit, но он позволяет работать с GPU.
ПО умолчанию параметр target='cpu', но можно выставить 'parallel' и 'cuda'.

### @guvectorize

В то время как vectorize() позволяет писать ufuncs, которые работают с одним элементом за один раз, декоратор guvectorize() делает еще один шаг вперед и позволяет писать ufuncs, которые будут работать с произвольным числом элементов входных массивов, а также принимать и возвращать массивы различных размеров. Например, он полезен при расчете скользящего среднего.
http://numba.pydata.org/numba-doc/latest/cuda/cudapysupported.html

In [None]:
from numba import jit, njit, cuda, vectorize, float64, float32, cuda, guvectorize, int32
import numpy as np
from numpy import testing
import math

Операции с 1 числом

In [None]:
def compute_logit(y):
    logit = 1 / (1 + math.exp(-y))
    return logit

@jit
def compute_logit_numba(y):
    logit = 1 / (1 + math.exp(-y))
    return logit

@njit
def compute_logit_numba_2(y):
    logit = 1 / (1 + math.exp(-y))
    return logit

Давайте посмотрим, есть ли какая-то оптимизация, когда у на соперация с одним числом и нет циклов

In [None]:
%timeit compute_logit(3.5)

In [None]:
%timeit compute_logit_numba(3.5)

In [None]:
%timeit compute_logit_numba_2(3.5)

Вектора

Создадим вектор размерности 10 000 000 чисел float64

In [None]:
x = np.random.random_sample(10000000)

In [None]:
def logistic(y):
    result = [1 / (1 + math.exp(-val)) for val in y]
    return result

@njit
def logistic_simple_numba(y):
    result = [1 / (1 + math.exp(-val)) for val in y]
    return result

@njit(nogil=True)
def logistic_simple_numba_ng(y):
    result = [1 / (1 + math.exp(-val)) for val in y]
    return result

def logistic_numpy(y):
    return 1 / (1 + np.exp(-y))

In [None]:
%timeit logistic(x)

In [None]:
%timeit logistic_simple_numba(x)

In [None]:
%timeit logistic_simple_numba_ng(x)

In [None]:
%timeit logistic_numpy(x)

А что если numpy код обернуть декоратором?

In [None]:
@njit
def logistic_numba(y):
    return 1 / (1 + np.exp(-y))

@njit(parallel=True)
def logistic_numba_parallel(y):
    return 1 / (1 + np.exp(-y))

In [None]:
%timeit logistic_numba(x)

In [None]:
%timeit logistic_numba_parallel(x)

Есть декоратор для работы с веткорами, давайте попробуем и его!

In [None]:
@vectorize([float64(float64)], nopython=True)
def logistic_numba_v(y):
    return 1 / (1 + np.exp(-y))

@vectorize([float64(float64)], target='cuda')
def logistic_numba_v_cuda(y):
    return 1 / (1 + math.exp(-y))

In [None]:
%timeit logistic_numba_v(x)

In [None]:
%timeit logistic_numba_v_cuda(x)

Что-то GPU не впечатлило...а что можно сделать? Первое - это перейти на float32, сильно ничего не потеряем, но взамен преобретем скорость=)

In [None]:
x_32 = np.float32(np.random.random_sample(10000000))


@vectorize([float32(float32)], target='cuda')
def logistic_numba_v_cuda(y):
    return 1 / (1 + math.exp(y))

In [None]:
%timeit logistic_numba_v_cuda(x_32)

Так, все равно что-то не так...Давайте вспомним о том, что есть расходы на передачу данных от оперативной памяти на GPU и попробуем сделать это заранее. А также заранее заразервируем на GPU место для результата

In [None]:
x_device = cuda.to_device(x_32)
out_device = cuda.device_array(shape=(len(x_32),), dtype=np.float32) 

In [None]:
%timeit logistic_numba_v_cuda(x_device, out=out_device)

А теперь давайте напишем функцию через cuda.jit

In [None]:
@cuda.jit
def logistic_cuda_jit(x, y):
    pos = cuda.grid(1)
    if pos < x.size:
        y[pos] = 1 / (1 + 1 + math.exp(-x[pos]))

In [None]:
threadsperblock = 64
blockspergrid = int(math.ceil(x.shape[0] / threadsperblock))

d_x = cuda.to_device(x_32)
y = np.zeros(x.shape[0])
d_y = cuda.to_device(y)

In [None]:
for threadsperblock in [16, 32, 64, 128, 256, 512]:
    blockspergrid = int(math.ceil(x.shape[0] / threadsperblock))
    res = %timeit -o logistic_cuda_jit[blockspergrid, threadsperblock](d_x, d_y)
    print(f'Number of threads - {threadsperblock}, time - {res.average}')

Самим писать бывает полезно, можно найти вариант, когда реализация быстрее готовых решений, но нужно подбирать параметры

guvectorize

In [None]:
x = np.random.random_sample(100000)

In [None]:
def moving_average(vector, window):
    avg = [vector[i:i+window].mean() for i in range(len(vector))]
    return np.array(avg)

@guvectorize(['void(float64[:], int32, float64[:])'],
             '(n),()->(n)', nopython=True)
def moving_average_g(vector, window, out):
    for i in range(len(vector)):
        acc = 0.
        values = vector[i:i+window]
        for val in values:
            acc += val
        out[i] = acc / len(values)
        
@guvectorize(['void(float64[:], int64, float64[:])'],
             '(n),()->(n)', target='cuda')
def moving_average_cuda(vector, window, out):
    for i in range(len(vector)):
        acc = 0.
        values = vector[i:i+window]
        for val in values:
            acc += val
        out[i] = acc / len(values)

In [None]:
%timeit moving_average(x, 10)

In [None]:
%timeit moving_average_g(x, 10)

In [None]:
x_device = cuda.to_device(x)
window = cuda.to_device(10)

In [None]:
%time moving_average_cuda(x_device, window)

Матрицы

In [None]:
x = np.random.random_sample(90000).reshape(300, 300)
y = np.random.random_sample(90000).reshape(300, 300)

In [None]:
def mm(x, y):
    x_rows, x_cols = x.shape
    y_rows, y_cols = y.shape
    z = np.zeros(x_rows * y_cols).reshape(x_rows, y_cols)
    for i in range(x_rows):
        for j in range(x_cols):
            for k in range(y_cols):
                z[i][k] += x[i][j] * y[j][k]
    return z

def mm_numpy(x, y):
    return np.dot(x, y)

@njit
def nn_numba(x, y):
    x_rows, x_cols = x.shape
    y_rows, y_cols = y.shape
    z = np.zeros(x_rows * y_cols).reshape(x_rows, y_cols)
    for i in range(x_rows):
        for j in range(x_cols):
            for k in range(y_cols):
                z[i][k] += x[i][j] * y[j][k]
    return z

@njit(parallel=True)
def mm_numpy_numba(x, y):
    return np.dot(x, y)


@cuda.jit
def mm_cuda(x, y, z):
    i, j = cuda.grid(2)
    tmp = 0.
    for k in range(x.shape[1]):
        tmp += x[i, k] * y[k, j]
    z[i, j] = tmp

In [None]:
testing.assert_almost_equal(mm(x, y), mm_numpy(x, y), decimal=2)

In [None]:
%time mm(x, y)

In [None]:
%timeit mm_numpy(x, y)

In [None]:
%timeit nn_numba(x, y)

In [None]:
%timeit mm_numpy_numba(x, y)

In [None]:
threadsperblock = (16, 16)
blockspergrid_x = int(math.ceil(x.shape[0] / threadsperblock[0]))
blockspergrid_y = int(math.ceil(y.shape[1] / threadsperblock[1]))
blockspergrid = (blockspergrid_x, blockspergrid_y)

d_x = cuda.to_device(x)
d_y = cuda.to_device(y)
z = np.zeros(x.shape[0] * y.shape[1]).reshape(x.shape[0], y.shape[1])
d_z = cuda.to_device(z)

In [None]:
%timeit mm_cuda[blockspergrid, threadsperblock](d_x, d_y, d_z)

In [None]:
a = %timeit -n 100 -r 10 -o mm_cuda[blockspergrid, threadsperblock](d_x, d_y, d_z)

# Немного про распараллеливание кода

In [None]:
from joblib import Parallel, delayed

### backend: str, ParallelBackendBase instance or None, default: ‘loky’
Specify the parallelization backend implementation. Supported backends are:

“loky” used by default, can induce some communication and memory overhead when exchanging input and output data with the worker Python processes. On some rare systems (such as Pyiodide), the loky backend may not be available.

“multiprocessing” previous process-based backend based on multiprocessing.Pool. Less robust than loky.

“threading” is a very low-overhead backend but it suffers from the Python Global Interpreter Lock if the called function relies a lot on Python objects. “threading” is mostly useful when the execution bottleneck is a compiled extension that explicitly releases the GIL (for instance a Cython loop wrapped in a “with nogil” block or an expensive call to a library such as NumPy).

finally, you can register backends by calling register_parallel_backend(). This will allow you to implement a backend of your liking.

### batch_size: int or ‘auto’, default: ‘auto’

The number of atomic tasks to dispatch at once to each worker. When individual evaluations are very fast, dispatching calls to workers can be slower than sequential computation because of the overhead. Batching fast computations together can mitigate this. The 'auto' strategy keeps track of the time it takes for a batch to complete, and dynamically adjusts the batch size to keep the time on the order of half a second, using a heuristic. The initial batch size is 1. batch_size="auto" with backend="threading" will dispatch batches of a single task at a time as the threading backend has very little overhead and using larger batch size has not proved to bring any gain in that case.

Рассмотрим простой пример, но он позволит понять основной принцип

In [None]:
x = np.random.random_sample(20000000)

In [None]:
vector = np.random.random_sample(100)

def preprocess_one(val):
    val = np.sum(np.multiply(vector, val))
    return val ** 2

def preprocess_array(values):
    return [preprocess_one(val) for val in values]

In [None]:
%time preprocess_array(x)

In [None]:
with Parallel(n_jobs=2, verbose=10) as parallel:
     results = parallel(delayed(preprocess_one)(val) for val in x)

Что-то это было слишком долго...Почему так? Причин несколько: у нас есть затраты на распраллеливание, слишком быстро проходит этап расчета одного воркера и еще и на лету делаются батчи

In [None]:
batch_size = 30000
batches = [x[i:i+batch_size] for i in range(0, len(x), batch_size)]

In [None]:
with Parallel(n_jobs=2, verbose=10) as parallel:
     results = parallel(delayed(preprocess_array)(val) for val in batches)

Уже лучше, видим, что есть затраты на распараллеливание

In [None]:
with Parallel(n_jobs=8, verbose=10) as parallel:
     results = parallel(delayed(preprocess_array)(val) for val in batches)

Попробуем убрать лог

In [None]:
%%time

with Parallel(n_jobs=8) as parallel:
     results = parallel(delayed(preprocess_array)(val) for val in batches)

Давайте оценим все сразу, вместе с подготовкой батчей

In [None]:
%%time

batch_size = 30000
batches = [x[i:i+batch_size] for i in range(0, len(x), batch_size)]

res = []

with Parallel(n_jobs=8) as parallel:
     results = parallel(delayed(preprocess_array)(val) for val in batches)
        
for part in results:
    res.extend(part)

Плохой пример

In [None]:
def preprocess_one_bad(val):
    return val ** 2

def preprocess_array_bad(values):
    return [preprocess_one_bad(val) for val in values]

In [None]:
%timeit preprocess_array_bad(x)

In [None]:
%%time

batch_size = 30000
batches = [x[i:i+batch_size] for i in range(0, len(x), batch_size)]

res = []

with Parallel(n_jobs=8) as parallel:
     results = parallel(delayed(preprocess_array_bad)(val) for val in batches)
        
for part in results:
    res.extend(part)

Также можно использовать более низкоуровневые варианты через библиотеки multiprocessing, concurrent

# Домашнее задание

### Первая часть

Задание будет простым: нужно реализовать функции для поэлементного сложения двух матриц:

    1) На чистом python
    2) python + @njit
    3) numpy
    4) numpy + @njit
    5) @cuda.jit
    
Также необходимо замерить время выполнения

### Вторая часть

Попробуйте для параллельного расчета preprocess_array разные комбинации количетва ядер и размера батча. Постройте график зависмости времени расчета (в простом варианте можно несколько графиков при фиксации одного из параметров).