# Параллельные и распределенные вычисления с Julia

[Marc Moreno Maza](https://docviewer.yandex.ru/view/326904132/?*=XkmsCLxss6FV3ksEDGu4JvxK2fN7InVybCI6Imh0dHBzOi8vd3d3LmNzZC51d28uY2EvQ291cnNlcy9DUzIxMDFhL1BhcmFsbGVsX2NvbXB1dGluZ193aXRoX0p1bGlhLnBkZiIsInRpdGxlIjoiUGFyYWxsZWxfY29tcHV0aW5nX3dpdGhfSnVsaWEucGRmIiwibm9pZnJhbWUiOnRydWUsInVpZCI6IjMyNjkwNDEzMiIsInRzIjoxNTY1NTg5NjQ4OTA2LCJ5dSI6IjI0ODIxNzM5MDE1NTIwNDg4ODciLCJzZXJwUGFyYW1zIjoibGFuZz1lbiZ0bT0xNTY1NTg5NjM4JnRsZD1ydSZuYW1lPVBhcmFsbGVsX2NvbXB1dGluZ193aXRoX0p1bGlhLnBkZiZ0ZXh0PVBhcmFsbGVsK2FuZCtEaXN0cmlidXRlZCtDb21wdXRpbmcrd2l0aCtKdWxpYSZ1cmw9aHR0cHMlM0EvL3d3dy5jc2QudXdvLmNhL0NvdXJzZXMvQ1MyMTAxYS9QYXJhbGxlbF9jb21wdXRpbmdfd2l0aF9KdWxpYS5wZGYmbHI9MzgmbWltZT1wZGYmbDEwbj1ydSZzaWduPTc0NTBkMDY2NzYwNzlmOThlZTVjMzc5NmYzMTNiMjkxJmtleW5vPTAifQ%3D%3D&lang=en)

### Задачи: одновременные вызовы функций

##### Tasks (aka Coroutines)
Задачи (сопрограммы)

*Задачи* - это функция управления потоком, которая позволяет гибко приостанавливать и возобновлять вычисления.
Эту функцию иногда называют другими именами, такими как симметричные сопрограммы, легкие потоки, совместная многозадачность или однократные продолжения.

Когда часть вычислительной работы (на практике выполняющая определенную функцию) обозначается как Задача, становится возможным прервать ее, переключившись на другую Задачу.

Исходное задание может быть позже возобновлено, и в этот момент оно будет продолжено именно там, где остановилось

**Схема производитель-потребитель**

Одна сложная процедура генерирует значения, а другая сложная процедура принимает их. Потребитель не может просто вызвать функцию производителя, чтобы получить значение, потому что у производителя может быть больше значений для генерации, и он может быть еще не готов к возврату. С задачами производитель и потребитель могут работать столько, сколько им нужно, передавая значения туда и обратно по мере необходимости.

*Пример:*

In [None]:
function producer(c::Channel)
    put!(c, "start")
    for n=1:2
        put!(c, 2n)
    end
    put!(c, "stop")
end;

Чтобы использовать значения, сначала производитель оборачивается в Задачу, а затем повторно вызывается для этого объекта:

In [None]:
p = Channel(producer)

In [None]:
take!(p)

In [None]:
take!(p)

In [None]:
take!(p)

In [None]:
take!(p)

In [None]:
take!(p)

Задача может использоваться как повторяемый объект в цикле for, и в этом случае переменная цикла принимает все полученные значения:

In [None]:
for x in Channel(producer)
    println(x)
end

**Передача сообщений**
Julia предоставляет многопроцессорную среду, основанную на передаче сообщений, позволяющую программам работать на нескольких процессорах в общей или распределенной памяти.
Реализация передачи сообщений Julia является односторонней:
* программист должен явно управлять только одним процессором в двухпроцессорном режиме
* эти операции обычно не похожи на отправку и получение сообщения, а скорее на операции более высокого уровня, такие как обращения к пользовательской функции.

Два ключевых понятия: *удаленные ссылки* и *удаленные вызовы*
Удаленная ссылка - это объект, который может использоваться любым процессором для ссылки на объект, хранящийся в конкретном процессоре.
Удаленный вызов - это запрос одного процессора на вызов определенной функции по определенным аргументам другого (возможно, того же) процессора. Удаленный вызов возвращает удаленную ссылку.

**Как обрабатываются удаленные вызовы в потоке программы**

Удаленные вызовы возвращаются немедленно: процессор, который сделал вызов, может затем перейти к следующей операции, в то время как удаленный вызов происходит где-то еще.
Вы можете дождаться завершения удаленного вызова, вызвав `wait` для его удаленной ссылки, и вы можете получить полное значение результата, используя `fetch`.

In [None]:
# Запустите диспетчер задач и посмотрите, как пыхтят ядра
using Distributed
addprocs(2)

In [None]:
r = remotecall(rand, 2, 2, 2)

In [None]:
s = @spawnat 2 1 .+ fetch(r)

In [None]:
fetch(s)

Вторым аргументом для удаленного вызова является индекс процессора, который будет выполнять эту работу.
В первой строке мы попросили процессор 2 построить случайную матрицу 2 на 2, и в третьей строке мы попросили добавить 1 к ней.
Макрос @spawnat оценивает выражение во втором аргументе процессора, указанного в первом аргументе.

In [None]:
remotecall_fetch(getindex, 2, r, 1, 1)

`remotecall_fetch`

Иногда вам может понадобиться удаленно вычисленное значение сразу. Для этого существует функция удаленного вызова.
Это эквивалентно fetch (remotecall (...)), но более эффективно. Обратите внимание, что `getindex(r, 1,1)` эквивалентен `r[1,1]`, поэтому этот вызов извлекает первый элемент удаленной ссылки `r`.

Макрос `@spawn`
Синтаксис удаленного вызова не особенно удобен. Макрос @spawn упрощает работу:
* Он работает с выражением, а не с функцией, и
* выбирает процессор, где выполнять операцию за вас

In [None]:
r = @spawn rand(2,2)

In [None]:
s = @spawn 1 .+ fetch(r)

In [None]:
fetch(s)

Обратите внимание, что мы использовали `1 .+ fetch(r)` вместо `1 + r`. Это потому, что мы не знаем, где будет выполняться код, поэтому в общем случае может потребоваться выборка для перемещения `r` к процессору, выполняющему сложение.
В этом случае `@spawn` достаточно умен, чтобы выполнять вычисления на процессоре, которому принадлежит `r`, поэтому выборка будет неактивной.

### Советы по перемещению кода и данных

##### Доступность функции для процессоров

Важным моментом является то, что ваш код должен быть доступен на любом процессоре, на котором он выполняется. Например, введите в командной строке julia следующее

In [None]:
function rand2(dims...)
    return 2*rand(dims...)
end

rand2(2,2)

In [None]:
fetch(@spawn rand2(2,2))

Процесс 1 знал о функции rand2, а процесс 2 — нет.Процесс 1 знал о функции rand2, а процесс 2 — нет. Сделаем нашу функцию видимой для всех процессов с помощью специального макроса:

In [None]:
@everywhere function rand2(dims...)
    return 2*rand(dims...)
end

rand2(2,2)

In [None]:
fetch(@spawn rand2(2,2))

In [None]:
@everywhere id = myid() # 
remotecall_fetch(()->id, 2)

In [None]:
?myid

In [None]:
workers()

Каждый процесс имеет свой идентификатор. Процесс, обеспечивающий интерактивное приглашение julia, всегда имеет идентификатор, равный 1, как и процесс julia, выполняющий скрипт драйвера в примере выше.
Процессы, используемые по умолчанию для параллельных операций, называются рабочими. Когда есть только один процесс, процесс 1 считается работником. В противном случае рабочими считаются все процессы, отличные от процесса 1.

Базовая установка Julia имеет встроенную поддержку для двух типов кластеров:
* Локальный кластер, вызываемый с параметром -p.
* Кластерный охват машин с использованием опции machinefile. При этом используется логин ssh без пароля для запуска рабочих процессов julia (по тому же пути, что и текущий хост) на указанных машинах.

Функции addprocs, rmprocs, worker и другие доступны в качестве программного средства добавления, удаления и запроса процессов в кластере.

Отправка сообщений и перемещение данных составляют большую часть накладных расходов в параллельной программе. Сокращение количества сообщений и объема отправляемых данных имеет решающее значение для достижения производительности и масштабируемости. Для этого важно понимать движение данных, выполняемое в Julia различными конструкциями параллельного программирования.

`fetch` можно рассматривать как явную операцию перемещения данных, поскольку она напрямую запрашивает перемещение объекта на локальную машину.

`@spawn` (и несколько связанных конструкций) также перемещает данные, но это не так очевидно, поэтому это можно назвать неявной операцией перемещения данных.

Рассмотрим эти два подхода к построению и возведению в квадрат случайной матрицы:

In [None]:
# method 1
A = rand(1000,1000)
@time Bref = @spawn A^2

fetch(Bref)

In [None]:
# method 2
@time Bref = @spawn rand(1000,1000)^2

fetch(Bref)

Разница кажется тривиальной, но на самом деле она довольно значительна из-за поведения `@spawn`.
В первом методе случайная матрица строится локально, а затем отправляется другому процессору, где она возводится в квадрат.
Во втором методе случайная матрица строится и возводится в квадрат на другом процессоре.
Поэтому второй метод отправляет намного меньше данных, чем первый.

В предыдущем примере эти два способа легко отличить, однако в реальной программе проектирование перемещения данных может потребовать больше обдумывания и, скорее всего, некоторых измерений.

Например, если первый процессор нуждается в матрице A, тогда первый метод может быть лучше. Или, если обработка A стоит дорого, но она есть только у текущего процессора, перенос ее на другой процессор может быть неизбежен. Или, если текущий процессор имеет очень мало общего между `@spawn` и `fetch (Bref)`, то может быть лучше вообще исключить параллелизм. Или представьте, что `rand(1000, 1000)` заменен более дорогой операцией. Тогда может иметь смысл добавить еще один оператор `@spawn` только для этого шага.

In [None]:
@everywhere function fib(n)
    if n < 2
        return n
    else 
        return fib(n-1) + fib(n-2)
    end
end

z = @spawn fib(10)

In [None]:
fetch(z)

In [None]:
@time [fib(i) for i=1:45];

In [None]:
@everywhere function fib_parallel(n)
    if n < 40
        return fib(n)
    else
        x = @spawn fib_parallel(n-1)
        y = fib_parallel(n-2)
        return fetch(x) + y
    end
end

@time [fib_parallel(i) for i=1:45];

Следующий простой пример демонстрирует мощный и часто используемый шаблон параллельного программирования: Reductuon.
Многие итерации выполняются независимо на нескольких процессорах, а затем их результаты объединяются с использованием некоторой функции

In [None]:
@everywhere function count_heads(n)
    c::Int = 0
    for i=1:n
        c += rand(Bool)
    end
    c
end

a = @spawn count_heads(100000000)
b = @spawn count_heads(100000000)
fetch(a)+fetch(b)

Разобрать пример с Монте-Карло [на Хабре](https://habr.com/ru/post/455846/) + углубленный разбор темы.

*ToDo:*

* Distributed arrays and parallel reduction
* Shared arrays
* Parallel blockwise matrix multiplication
* Finite difference schemes
* Homeworks