### Многопоточность. Часть 3

Выдать первое домашнее задание

<br />

##### recursive_mutex

__Упражнение:__ Каким минимальным числом мьютексов и потоков можно добиться deadlock?


<details>
<summary>Ответ</summary>
<p>

```c++
int main()
{
    std::mutex m;
    m.lock();
    m.lock();
    return 0;
}
```

(почти правильный)

</p>
</details>

<details>
<summary>Правильный ответ</summary>
<p>

Дважды вызов `lock` у `std::mutex` на одном потоке формально - UB. Правильный ответ - классический - два потока и два мьютекса.

</p>
</details>

Зависание через дважды `lock()` на одном потоке - не самая приятная особенность.

Если, например, класс с многопоточным доступом простой, то все пути с блокировками `std::mutex` можно отследить глазами. Но есть случаи, когда:
* класс с многопоточным доступом спроектирован плохо, отследить все пути сложно
* класс в реализации методов управление отдаётся наружу, например:

```c++
class MTClass
{
private:
    std::mutex m;
    
public:
    void method()
    {
        std::lock_guard guard(m);
        
        global_function();  // <--- подозрительное место, не дойдёт ли в callstack
                            //      до вызова этого же метода снова? (нужно отслеживать)
    }
};
```

или так:

```c++
class TasksQueue
{
public:
    using Task = std::function<void(void)>;
    
    void push(Task task)
    {
        std::lock_guard guard(mtx);
        
        tasks_queue.emplace_back(std::move(task));
    }
    
    void pop_and_run()
    {
        std::lock_guard guard(mtx);
        
        if (!tasks_queue.empty())
        {        
            const auto task = std::move(tasks_queue.back());
            tasks_queue.pop_back();
            task();  // <--- запускается неизвестная задача, которая может захотеть
                     //      поставить другую задачу в очередь через |TasksQueue::push|
        }        
    }
    
private:
    std::mutex mtx;
    std::queue<Task> tasks_queue;
};
```

Чтобы справляться с этими ситуациями нужен `std::recursive_mutex`

https://en.cppreference.com/w/cpp/thread/recursive_mutex

* хранит внутри счётчик, сколько раз был сделан `lock`
* каждый `lock` увеличивает счётчик на 1
* каждый `unlock` уменьшает счётчик на 1
* кол-во `unlock`-ов должно совпадать с кол-вом `lock`-ов
* максимальное значение счётчика - unspecified, при его превышении - exception

```c++
// отработает корректно:
int main()
{
    std::recursive_mutex mtx;
    mtx.lock();
    mtx.lock(); // ok
    mtx.unlock();
    mtx.unlock();
    return 0;
}
```

пример с `TasksQueue`:

```c++
class TasksQueue
{
public:
    using Task = std::function<void(void)>;

    void push(Tash task)
    {
        std::lock_guard guard(mtx);

        tasks_queue.emplace_back(std::move(task));
    }

    void pop_and_run()
    {
        std::lock_guard guard(mtx);

        if (!tasks_queue.empty())
        {        
            const auto task = std::move(tasks_queue.back());
            tasks_queue.pop_back();
            task();  // <--- ok
        }        
    }

private:
    std::recursive_mutex mtx;
    std::queue<Task> tasks_queue;
};
```

__Замечание__: Другой способ решения проблемы (более правильный) - использовать обычный `std::mutex` и вызывать `task()` за пределами блокировки.

Обсудить, почему другой способ более правильный. Если есть время - нарисовать решение.

<br />

##### shared_mutex, unique_lock, shared_lock

https://en.cppreference.com/w/cpp/thread/shared_mutex

https://en.cppreference.com/w/cpp/thread/unique_lock

https://en.cppreference.com/w/cpp/thread/shared_lock

__Вопрос__: кто-нибудь работал с RWLock? Что это такое и зачем оно нужно?

`std::shared_mutex` - похожая концепция, где read - это shared, а write - это exclusive
* позволяет захватывать себя в shared и exclusive режимах (чтения и записи)
* exclusive (писатель) единомоментно может быть только один, при этом не может быть shared (читателей)
* shared (читатель) единомоментно может быть несколько

Основные методы:

для exclusive (write) режима
* `lock`
* `try_lock`
* `unlock`

для shared (read) режима
* `lock_shared`
* `try_lock_shared`
* `unlock_shared`

RAII-захват:
* `std::unique_lock` захватывает в exclusive (write) режиме (можно `std::lock_guard` тоже)
* `std::shared_lock` захватывает в shared (read) режиме

```c++
class MTJiuceBottle
{
private:
    float cur_volume_;
    float max_volume_;
    mutable std::shared_mutex mtx_;

public:
    float get_cur_volume() const {
        std::shared_lock lock(mtx_, std::adopt_lock);
        return cur_volume_;
    }

    void set_cur_volume(const float value) {
        std::lock_guard guard(mtx_);
        cur_volume_ = value;
    }

    float get_max_volume() const {
        std::shared_lock lock(mtx_, std::adopt_lock);
        return max_volume_;
    }

    void set_max_volume(const float value) {
        std::lock_guard guard(mtx_);
        max_volume_ = value;
    }
};
```

<br />

##### condition_variable

https://en.cppreference.com/w/cpp/thread/condition_variable

https://en.cppreference.com/w/cpp/thread/condition_variable_any

https://en.wikipedia.org/wiki/Spurious_wakeup

`std::condition_variable` - примитив синхронизации, позволяющий одному потоку нотифицировать другой/другие, что какое-то событие произошло.

`std::condition_variable` работает только с `std::unique_lock<std::mutex>` (для эффективности). Если нужно что-то другое - можно использовать `std::condition_variable_any`

Принцип работы:
    
* есть условие `condition`, о смене которого нужно сообщить между потоками
* есть `std::mutex` для синхронизации, `condition` и его внутренности вычисляются только если мьютекс захвачен
* есть `std::condition_variable` для отсылки нотификации

методы:

* `notify_one` - отослать нотификацию одному потоку, что событие произошло / условие поменялось
* `notify_all` - отослать нотификацию всем потокам, что событие произошло / условие поменялось
* `wait(std::unique_lock<std::mutex>& lock)` - подождать, пока либо пока кто-нибудь нас не нотифицирует о событии либо ОКОНЧИТЬ ОЖИДАНИЕ ПРОСТО ТАК ПОТОМУ ЧТО ЗАХОТЕЛОСЬ, ДАЖЕ ЕСЛИ НИКТО НЕ ОТСЫЛАЛ НОТИФИКАЦИИ.
    * проблема в сложностях реализации честного `wait`.
* `wait( std::unique_lock<std::mutex>& lock, Predicate pred )` - эквивалент:

```c++
    while (!pred)
        wait(lock);
```

__Пример__:

```c++
std::mutex m;
std::condition_variable cv_ready;
std::condition_variable cv_compl;
std::string data;
bool is_inp_ready = false;
bool is_completed = false;
 
void worker_function()
{
    std::unique_lock lk(m);  // mutex locked
    cv_ready.wait(lk, []{ return is_inp_ready; });  // mutex unlocked, locked inside lambda
    // mutex locked

    process(data);
    is_completed = true;
    
    cv_compl.notify_one();
}
 
int main()
{
    std::thread thr(worker_function);
    
    data = prepare_input();
 
    {
        std::lock_guard guard(m);
        is_inp_ready = true;
    }
    cv_ready.notify_one();
    
    {
        std::unique_lock lk(m);
        cv_compl.wait(lk, []{ return is_completed; });
    }
    
    std::cout << data;

    thr.join();
}
```

Код работает, но в нём проблема, в чём она?

Более правильный вариант:
    
```c++
std::mutex m;
std::condition_variable cv_ready;
std::condition_variable cv_compl;
std::string data;
bool is_inp_ready = false;
bool is_completed = false;

void worker_function()
{
    std::unique_lock lk(m);  // mutex locked
    cv_ready.wait(lk, []{ return is_inp_ready; });  // mutex unlocked, locked inside lambda
    // mutex locked

    process(data);
    is_completed = true;
    
    lk.unlock();  // !avoid redundant switches!
    cv_compl.notify_one();
}

int main()
{
    std::thread thr(worker_function);

    data = prepare_input();

    {
        std::lock_guard guard(m);
        is_inp_ready = true;
    }
    cv_ready.notify_one();

    {
        std::unique_lock lk(m);
        cv_compl.wait(lk, []{ return is_completed; });
    }

    thr.join();
}
```

<br />

##### thread_local

`thread_local` - модификатор перед переменной / константой, делающий её глобальной в рамках одного потока:

```c++
int func()
{
    thread_local unsigned i = 0;
    ++i;
    // в каждом потоке будет своя личная глобальная переменная i,
    // которая будет равна числу вызовов функции |func| на этом потоке
    
    ...;
}
```

Плюсы:
* возможность делать потокобезопасные глобальные кеши

Минусы:
* создание потока дороже, т.к. для каждого нового потока нужно вызывать конструкторы `thread_local` данных
    * хуже того, создание потока дороже неявно. Если вы слинковались с левой библиотекой, использующей `thread_local`-оптимизации для себя лично, то ВСЕ ваши потоки стали создаваться медленнее.
* старые операционки не поддерживают `thread_local` (не компиляторы, а операционки!). Например, Windows XP (поддерживает, начиная с Windows XP SP 3, емнип). В них `thread_local` молча становится `static` без синхронизаций, и программы неожиданно начинают ловить race condition.

<br />

**Пример +- разумного использования thread_local**:

Есть функция - конвертилка форматов. Она работает "почти" быстро, если бы ей не нужно было временное хранилище.

```c++
std::string person_to_string(const Person &p) {
  // Implementation requires an additional allocation,
  // but other its parts are fast enough.
  std::vector<char> buffer{4096};
  ...; // convert using buffer.
}
```

Возможным решением мог бы быть вынос `buffer` в глобальные переменные:

```c++
static std::vector<char> g_buffer{4096};

std::string person_to_string(const Person &p) {
  // For now, implementation is fast and works sometimes.
  // But is not correct.
  g_buffer.clear();
  ...; // convert using buffer.
}
```

Решение не работает в многопоточной среде с возможным одновременным вызовом `person_to_string` из нескольких потоков. Рабочий вариант - иметь свой личный глобальный объект `buffer` для каждого потока.

```c++
static thread_local std::vector<char> g_buffer{4096};

std::string person_to_string(const Person &p) {
  // Implementation is fast and seems correct.
  // But the new thread creation become slower :(
  g_buffer.clear();
  ...; // convert using buffer.
}
```

<br />

**Упражнение:**

* Реализовать однопоточное рекурсивное вычисление чисел Фибоначчи:

  ```
  f(n) = f(n-1) + f(n-2)
  ```

  Запустить вычисление на разных потоках. С помощью `recursive_mutex` вывести, сколько раз за всё выполнение программмы была вызвана функция `f`. Распечатать в конце работы программы. Можно решить задачу и через обычный `mutex`.

* С помошью `thread_local` посчитать, сколько раз вызывалась функция `f` для каждого из потоков. Распечатать в конце работы каждого потока.

* Напишите сортировку подсчётом для чисел от 0 до 1024. Оптимизируйте аллокацию массива счётчиков. Функция должна работать корректно и сравнительно быстро в многопоточной среде.

* Напишите многопоточную очередь с синхронизацией через мьютекс. Запустите её в режиме один писатель - два читателя. Нотификация о новом объекте в очереди читателю должна приходить через `condition_variable`.

<br />

##### Резюме

* `std::recursive_mutex` поможет, если нужно `std::mutex` захватить больше одного раза в одном потоке
* `std::shared_mutex / std::unique_lock / std::shared_lock` - для организации читателей и писателей
* `std::conditional_variable` - примитив синхронизации лоя нотификации других потоков о событиях (помните о spurios wakeups)
* `thread_local` - глобальные данные, видимые для одного потока (у каждого потока свои)