In [62]:
# look at tools/set_up_magics.ipynb
yandex_metrica_allowed = True ; get_ipython().run_cell('# one_liner_str\n\nget_ipython().run_cell_magic(\'javascript\', \'\', \n    \'// setup cpp code highlighting\\n\'\n    \'IPython.CodeCell.options_default.highlight_modes["text/x-c++src"] = {\\\'reg\\\':[/^%%cpp/]} ;\'\n    \'IPython.CodeCell.options_default.highlight_modes["text/x-cmake"] = {\\\'reg\\\':[/^%%cmake/]} ;\'\n    \'IPython.CodeCell.options_default.highlight_modes["text/x-sql"] = {\\\'reg\\\':[/^%%sql/]} ;\'\n)\n\n# creating magics\nfrom IPython.core.magic import register_cell_magic, register_line_magic\nfrom IPython.display import display, Markdown, HTML\nimport argparse\nfrom subprocess import Popen, PIPE, STDOUT, check_output\nimport html\nimport random\nimport sys\nimport os\nimport re\nimport signal\nimport shutil\nimport shlex\nimport glob\nimport time\n\n@register_cell_magic\ndef save_file(args_str, cell, line_comment_start="#"):\n    parser = argparse.ArgumentParser()\n    parser.add_argument("fname")\n    parser.add_argument("--ejudge-style", action="store_true")\n    parser.add_argument("--under-spoiler-threshold", type=int, default=None)\n    args = parser.parse_args(args_str.split())\n    \n    cell = cell if cell[-1] == \'\\n\' or args.no_eof_newline else cell + "\\n"\n    cmds = []\n    with open(args.fname, "w") as f:\n        f.write(line_comment_start + " %%cpp " + args_str + "\\n")\n        for line in cell.split("\\n"):\n            line_to_write = (line if not args.ejudge_style else line.rstrip()) + "\\n"\n            if not line.startswith("%"):\n                f.write(line_to_write)\n            else:\n                f.write(line_comment_start + " " + line_to_write)\n                run_prefix = "%run "\n                md_prefix = "%MD "\n                comment_prefix = "%" + line_comment_start\n                if line.startswith(run_prefix):\n                    cmds.append(line[len(run_prefix):].strip())\n                elif line.startswith(md_prefix):\n                    cmds.append(\'#<MD>\' + line[len(md_prefix):].strip())\n                elif line.startswith(comment_prefix):\n                    cmds.append(\'#\' + line[len(comment_prefix):].strip())\n                else:\n                    raise Exception("Unknown %%save_file subcommand: \'%s\'" % line)\n                \n        f.write("" if not args.ejudge_style else line_comment_start + r" line without \\n")\n    for cmd in cmds:\n        if cmd.startswith(\'#\'):\n            if cmd.startswith(\'#<MD>\'):\n                display(Markdown(cmd[5:]))\n            else:\n                display(Markdown("\\#\\#\\#\\# `%s`" % cmd[1:]))\n        else:\n            display(Markdown("Run: `%s`" % cmd))\n            if args.under_spoiler_threshold:\n                out = check_output(cmd, stderr=STDOUT, shell=True, universal_newlines=True)\n                out = out[:-1] if out.endswith(\'\\n\') else out\n                out = html.escape(out)\n                if len(out.split(\'\\n\')) > args.under_spoiler_threshold:\n                    out = "<details> <summary> output </summary> <pre><code>%s</code></pre></details>" % out\n                elif out:\n                    out = "<pre><code>%s</code></pre>" % out\n                if out:\n                    display(HTML(out))\n            else:\n                get_ipython().system(cmd)\n\n@register_cell_magic\ndef cpp(fname, cell):\n    save_file(fname, cell, "//")\n    \n@register_cell_magic\ndef cmake(fname, cell):\n    save_file(fname, cell, "#")\n\n@register_cell_magic\ndef asm(fname, cell):\n    save_file(fname, cell, "//")\n    \n@register_cell_magic\ndef makefile(fname, cell):\n    fname = fname or "makefile"\n    assert fname.endswith("makefile")\n    save_file(fname, cell.replace(" " * 4, "\\t"))\n        \n@register_line_magic\ndef p(line):\n    line = line.strip() \n    if line[0] == \'#\':\n        display(Markdown(line[1:].strip()))\n    else:\n        try:\n            expr, comment = line.split(" #")\n            display(Markdown("`{} = {}`  # {}".format(expr.strip(), eval(expr), comment.strip())))\n        except:\n            display(Markdown("{} = {}".format(line, eval(line))))\n    \n    \ndef show_log_file(file, return_html_string=False):\n    obj = file.replace(\'.\', \'_\').replace(\'/\', \'_\') + "_obj"\n    html_string = \'\'\'\n        <!--MD_BEGIN_FILTER-->\n        <script type=text/javascript>\n        var entrance___OBJ__ = 0;\n        var errors___OBJ__ = 0;\n        function halt__OBJ__(elem, color)\n        {\n            elem.setAttribute("style", "font-size: 14px; background: " + color + "; padding: 10px; border: 3px; border-radius: 5px; color: white; ");                    \n        }\n        function refresh__OBJ__()\n        {\n            entrance___OBJ__ -= 1;\n            if (entrance___OBJ__ < 0) {\n                entrance___OBJ__ = 0;\n            }\n            var elem = document.getElementById("__OBJ__");\n            if (elem) {\n                var xmlhttp=new XMLHttpRequest();\n                xmlhttp.onreadystatechange=function()\n                {\n                    var elem = document.getElementById("__OBJ__");\n                    console.log(!!elem, xmlhttp.readyState, xmlhttp.status, entrance___OBJ__);\n                    if (elem && xmlhttp.readyState==4) {\n                        if (xmlhttp.status==200)\n                        {\n                            errors___OBJ__ = 0;\n                            if (!entrance___OBJ__) {\n                                if (elem.innerHTML != xmlhttp.responseText) {\n                                    elem.innerHTML = xmlhttp.responseText;\n                                }\n                                if (elem.innerHTML.includes("Process finished.")) {\n                                    halt__OBJ__(elem, "#333333");\n                                } else {\n                                    entrance___OBJ__ += 1;\n                                    console.log("req");\n                                    window.setTimeout("refresh__OBJ__()", 300); \n                                }\n                            }\n                            return xmlhttp.responseText;\n                        } else {\n                            errors___OBJ__ += 1;\n                            if (!entrance___OBJ__) {\n                                if (errors___OBJ__ < 6) {\n                                    entrance___OBJ__ += 1;\n                                    console.log("req");\n                                    window.setTimeout("refresh__OBJ__()", 300); \n                                } else {\n                                    halt__OBJ__(elem, "#994444");\n                                }\n                            }\n                        }\n                    }\n                }\n                xmlhttp.open("GET", "__FILE__", true);\n                xmlhttp.setRequestHeader("Cache-Control", "no-cache");\n                xmlhttp.send();     \n            }\n        }\n        \n        if (!entrance___OBJ__) {\n            entrance___OBJ__ += 1;\n            refresh__OBJ__(); \n        }\n        </script>\n\n        <p id="__OBJ__" style="font-size: 14px; background: #000000; padding: 10px; border: 3px; border-radius: 5px; color: white; ">\n        </p>\n        \n        </font>\n        <!--MD_END_FILTER-->\n        <!--MD_FROM_FILE __FILE__.md -->\n        \'\'\'.replace("__OBJ__", obj).replace("__FILE__", file)\n    if return_html_string:\n        return html_string\n    display(HTML(html_string))\n\n    \nclass TInteractiveLauncher:\n    tmp_path = "./interactive_launcher_tmp"\n    def __init__(self, cmd):\n        try:\n            os.mkdir(TInteractiveLauncher.tmp_path)\n        except:\n            pass\n        name = str(random.randint(0, 1e18))\n        self.inq_path = os.path.join(TInteractiveLauncher.tmp_path, name + ".inq")\n        self.log_path = os.path.join(TInteractiveLauncher.tmp_path, name + ".log")\n        \n        os.mkfifo(self.inq_path)\n        open(self.log_path, \'w\').close()\n        open(self.log_path + ".md", \'w\').close()\n\n        self.pid = os.fork()\n        if self.pid == -1:\n            print("Error")\n        if self.pid == 0:\n            exe_cands = glob.glob("../tools/launcher.py") + glob.glob("../../tools/launcher.py")\n            assert(len(exe_cands) == 1)\n            assert(os.execvp("python3", ["python3", exe_cands[0], "-l", self.log_path, "-i", self.inq_path, "-c", cmd]) == 0)\n        self.inq_f = open(self.inq_path, "w")\n        interactive_launcher_opened_set.add(self.pid)\n        show_log_file(self.log_path)\n\n    def write(self, s):\n        s = s.encode()\n        assert len(s) == os.write(self.inq_f.fileno(), s)\n        \n    def get_pid(self):\n        n = 100\n        for i in range(n):\n            try:\n                return int(re.findall(r"PID = (\\d+)", open(self.log_path).readline())[0])\n            except:\n                if i + 1 == n:\n                    raise\n                time.sleep(0.1)\n        \n    def input_queue_path(self):\n        return self.inq_path\n        \n    def wait_stop(self, timeout):\n        for i in range(int(timeout * 10)):\n            wpid, status = os.waitpid(self.pid, os.WNOHANG)\n            if wpid != 0:\n                return True\n            time.sleep(0.1)\n        return False\n        \n    def close(self, timeout=3):\n        self.inq_f.close()\n        if not self.wait_stop(timeout):\n            os.kill(self.get_pid(), signal.SIGKILL)\n            os.waitpid(self.pid, 0)\n        os.remove(self.inq_path)\n        # os.remove(self.log_path)\n        self.inq_path = None\n        self.log_path = None \n        interactive_launcher_opened_set.remove(self.pid)\n        self.pid = None\n        \n    @staticmethod\n    def terminate_all():\n        if "interactive_launcher_opened_set" not in globals():\n            globals()["interactive_launcher_opened_set"] = set()\n        global interactive_launcher_opened_set\n        for pid in interactive_launcher_opened_set:\n            print("Terminate pid=" + str(pid), file=sys.stderr)\n            os.kill(pid, signal.SIGKILL)\n            os.waitpid(pid, 0)\n        interactive_launcher_opened_set = set()\n        if os.path.exists(TInteractiveLauncher.tmp_path):\n            shutil.rmtree(TInteractiveLauncher.tmp_path)\n    \nTInteractiveLauncher.terminate_all()\n   \nyandex_metrica_allowed = bool(globals().get("yandex_metrica_allowed", False))\nif yandex_metrica_allowed:\n    display(HTML(\'\'\'<!-- YANDEX_METRICA_BEGIN -->\n    <script type="text/javascript" >\n       (function(m,e,t,r,i,k,a){m[i]=m[i]||function(){(m[i].a=m[i].a||[]).push(arguments)};\n       m[i].l=1*new Date();k=e.createElement(t),a=e.getElementsByTagName(t)[0],k.async=1,k.src=r,a.parentNode.insertBefore(k,a)})\n       (window, document, "script", "https://mc.yandex.ru/metrika/tag.js", "ym");\n\n       ym(59260609, "init", {\n            clickmap:true,\n            trackLinks:true,\n            accurateTrackBounce:true\n       });\n    </script>\n    <noscript><div><img src="https://mc.yandex.ru/watch/59260609" style="position:absolute; left:-9999px;" alt="" /></div></noscript>\n    <!-- YANDEX_METRICA_END -->\'\'\'))\n\ndef make_oneliner():\n    html_text = \'("В этот ноутбук встроен код Яндекс Метрики для сбора статистики использований. Если вы не хотите, чтобы по вам собиралась статистика, исправьте: yandex_metrica_allowed = False" if yandex_metrica_allowed else "")\'\n    html_text += \' + "<""!-- MAGICS_SETUP_PRINTING_END -->"\'\n    return \'\'.join([\n        \'# look at tools/set_up_magics.ipynb\\n\',\n        \'yandex_metrica_allowed = True ; get_ipython().run_cell(%s);\' % repr(one_liner_str),\n        \'display(HTML(%s))\' % html_text,\n        \' #\'\'MAGICS_SETUP_END\'\n    ])\n       \n\n');display(HTML(("В этот ноутбук встроен код Яндекс Метрики для сбора статистики использований. Если вы не хотите, чтобы по вам собиралась статистика, исправьте: yandex_metrica_allowed = False" if yandex_metrica_allowed else "") + "<""!-- MAGICS_SETUP_PRINTING_END -->")) #MAGICS_SETUP_END

<IPython.core.display.Javascript object>

# Мультиплексирование

<br>
<div style="text-align: right"> Спасибо <a href="https://github.com/SyrnikRebirth">Сове Глебу</a> и <a href="https://github.com/Disadvantaged">Голяр Димитрису</a> за участие в написании текста </div>
<br>


<p><a href="skjdhfskjfh" target="_blank">
    <h3>Видеозапись семинара</h3> 
</a></p>


[Ридинг Яковлева](https://github.com/victor-yacovlev/mipt-diht-caos/tree/master/practice/epoll)

Мультиплексирование - о чем это? Это об одновременной работе с несколькими соединениями. О том, чтобы эффективно решать задачу: вот из этих файловых дескрипторов, нужно прочитать, сразу как только станет доступно, а вот в эти записать, опять же, сразу когда будет возможность.

[Хорошая статья на хабре: select / poll / epoll: практическая разница](https://habr.com/ru/company/infopulse/blog/415259/)
В этой же статье есть плюсы и минусы `select`/`poll`/`epoll`.

[Довольно детальная статья на хабре про epoll](https://habr.com/ru/post/416669/)

Способы мультиплексирования:
* <a href="#pipelike" style="color:#856024">Для работы с пайпами и сокетами</a>
* <a href="#aio" style="color:#856024">Linux AIO</a> - одновременная запись/чтение из нескольких файлов. (К сожалению, это только с файлами работает)

<a href="#hw" style="color:#856024">Комментарии к ДЗ</a>

#  <a name="pipelike"></a> Варианты мультиплексирования ввода из пайпов и сокетов. От наивных и не работающих до epoll c edge-triggering   

С выводом то же самое. Чтобы говорить меньше букв, поговорим только в вводе.

* <a href="#just_read" style="color:#856024">Просто обычный read</a> - то, что можно написать, когда на события не нужно реагировать максимально быстро и можно позволить себе обработать первое пришедшее событие после последнего пришедшего.
* <a href="#read_nonblock" style="color:#856024"> read из файловых дескрипторов с опцией O_NONBLOCK </a> - с неблокирующим чтением легко позволить себе постоянно пытаться читать из интересных файловых дескрипторов. Но это кушает много процессорного времени - поэтому это далеко не лучший способ.
* <a href="#select" style="color:#856024">select</a> - старая штука, но стандартизированная (POSIX), и поддерживется практически везде, где есть интернет.
  Минусы: смотри статью на хабре. Но если кратко: в многопоточных программах крайне неудобен + <a href="#select_fail" style="color:#856024">не поддерживает больше 1024 файловых дескрипторов</a> (или просто файловые дескрипторы с номерами >= 1024, тут не уверен).
* <a href="#select" style="color:#856024">poll</a> - менее старая штука, стандартизированная (POSIX.1-2001 and POSIX.1-2008).
* <a href="#epoll" style="color:#856024">epoll</a> - linux
* kqueue - FreeBSD и MacOS. Аналог epoll. Вообще для того, чтобы писать тут кроссплатформенный код, написали библиотеку [libevent](http://libevent.org/)

Посмотрим на разные способы мультиплексирования на одной задаче: прочитать все что будет записано от N подпроцессов через пайпы. Читать хочется сразу, как только произошла запись.

Разные способы мультиплексирования - разные реализации функции read_all

In [52]:
%%cpp multiplexing_reader_common.h

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <sys/resource.h>
#include <sys/types.h>
#include <errno.h>
#include <time.h>
#include <stdbool.h>

// log_printf - макрос для отладочного вывода, добавляющий время с первого использования, имя функции и номер строки
const char* log_prefix(const char* func, int line);
#define log_printf_impl(fmt, ...) { time_t t = time(0); dprintf(2, "%s: " fmt "%s", log_prefix(__FUNCTION__, __LINE__), __VA_ARGS__); }
// Format: <time_since_start> <func_name>:<line> : <custom_message>
#define log_printf(...) log_printf_impl(__VA_ARGS__, "")


#define conditional_handle_error(stmt, msg) \
    do { if (stmt) { perror(msg " (" #stmt ")"); exit(EXIT_FAILURE); } } while (0)

// Функция, реализации которой мы будем тестировать
void read_all(int* fds, int count);

In [53]:
%%cpp multiplexing_reader_common.c

#include "multiplexing_reader_common.h"

#include <stdatomic.h>
#include <sys/syscall.h>

// log_printf - макрос для отладочного вывода, добавляющий время с первого использования, имя функции и номер строки
inline const char* log_prefix(const char* func, int line) {
    struct timespec spec; clock_gettime(CLOCK_REALTIME, &spec); long long current_msec = spec.tv_sec * 1000L + spec.tv_nsec / 1000000;
    static _Atomic long long start_msec_storage = -1; long long start_msec = -1; if (atomic_compare_exchange_strong(&start_msec_storage, &start_msec, current_msec)) start_msec = current_msec;
    long long delta_msec = current_msec - start_msec; const int max_func_len = 12;
    static __thread char prefix[100]; sprintf(prefix, "%lld.%03lld %*s():%d    ", delta_msec / 1000, delta_msec % 1000, max_func_len, func, line); sprintf(prefix + max_func_len + 13, "[tid=%ld]", syscall(__NR_gettid));
    return prefix;
}

main тестирующей программы

In [54]:
%%cpp multiplexing_reader_test.c

#include "multiplexing_reader_common.h"

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <sys/resource.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <errno.h>
#include <time.h>

const int INPUTS_COUNT = 5;

int main() {
    log_printf("Start multiplexing test\n");
    pid_t pids[INPUTS_COUNT];
    int input_fds[INPUTS_COUNT];
    // create INPUTS_COUNT subprocesses that will write to pipes with different delays
    for (int i = 0; i < INPUTS_COUNT; ++i) {
        int fds[2];
        pipe(fds);
        input_fds[i] = fds[0];
        if ((pids[i] = fork()) == 0) {
            int sleep_ms = 100 * (INPUTS_COUNT - 1 - i); 
            struct timespec t = {.tv_sec = sleep_ms / 1000, .tv_nsec = (sleep_ms % 1000) * 1000000};
            nanosleep(&t, &t);  
                
            log_printf("Send hello from %d subprocess\n", i);
            dprintf(fds[1], "Hello from %d subprocess\n", i);
            // try with EPOLL realisation
            // sleep(10);
            // dprintf(fds[1], "Hello 2 from %d subprocess\n", i);
            exit(0);
        }
        close(fds[1]);
    }
    
    // run multiplexing reading
    read_all(input_fds, INPUTS_COUNT);
     
    int status;
    for (int i = 0; i < INPUTS_COUNT; ++i) {
        close(input_fds[i]);
        assert(waitpid(pids[i], &status, 0) != -1);
    }
    log_printf("Finish multiplexing test\n");
    return 0;
}

## <a name="just_read"></a> Наивный read

In [72]:
%%cpp multiplexing_reader_trivial.c
%run gcc multiplexing_reader_test.c multiplexing_reader_common.c multiplexing_reader_trivial.c -o trivial.exe
%MD ### Результаты чтения с помощью наивного read
%MD Видно, что чтение происходит совсем не сразу после записи, так как мультиплексирования по сути нет
%run time -p ./trivial.exe

#include "multiplexing_reader_common.h"

#include <sys/epoll.h>

void read_all(int* input_fds, int count) {
    // Работает неэффективно, так как при попытке считать из пайпа мы можем на этом надолго заблокироваться 
    // А в другом пайпе данные могут появиться, но мы их не сможем обработать сразу (заблокированы, пытаясь читать другой пайп)
    log_printf("Trivial realisation start\n");
    // Проходимся по всем файловым дескрипторам (специально выбрал плохой порядок)
    for (int i = 0; i < count; ++i) {
        char buf[100];
        int read_bytes = 0;
        while ((read_bytes = read(input_fds[i], buf, sizeof(buf))) > 0) { // Читаем файл пока он не закроется.
            buf[read_bytes] = '\0';
            log_printf("Read from %d subprocess: %s", i, buf);
        }
        conditional_handle_error(read_bytes < 0, "read error");
    }
    log_printf("Trivial realisation finish\n");
}


Run: `gcc multiplexing_reader_test.c multiplexing_reader_common.c multiplexing_reader_trivial.c -o trivial.exe`

### Результаты чтения с помощью наивного read

Видно, что чтение происходит совсем не сразу после записи, так как мультиплексирования по сути нет

Run: `time -p ./trivial.exe`

0.000         main():18  [tid=51639]: Start multiplexing test
0.001     read_all():14  [tid=51639]: Trivial realisation start
0.001         main():31  [tid=51644]: Send hello from 4 subprocess
0.149         main():31  [tid=51643]: Send hello from 3 subprocess
0.248         main():31  [tid=51642]: Send hello from 2 subprocess
0.349         main():31  [tid=51641]: Send hello from 1 subprocess
0.401         main():31  [tid=51640]: Send hello from 0 subprocess
0.401     read_all():21  [tid=51639]: Read from 0 subprocess: Hello from 0 subprocess
0.401     read_all():21  [tid=51639]: Read from 1 subprocess: Hello from 1 subprocess
0.401     read_all():21  [tid=51639]: Read from 2 subprocess: Hello from 2 subprocess
0.401     read_all():21  [tid=51639]: Read from 3 subprocess: Hello from 3 subprocess
0.401     read_all():21  [tid=51639]: Read from 4 subprocess: Hello from 4 subprocess
0.401     read_all():25  [tid=51639]: Trivial realisation finish
0.402         main():49  [tid=51639]: Finish

## <a name="read_nonblock"></a> Наивный read + NONBLOCK

In [71]:
%%cpp multiplexing_reader_nonblock.c
%run gcc multiplexing_reader_test.c multiplexing_reader_common.c multiplexing_reader_nonblock.c -o nonblock.exe
%MD ### Результаты чтения с помощью наивного неблокирующего read
%MD Видно, что чтение происходит сразу после записи, **но user и system time близки к затраченному астрономическому времени**, то есть сожжено дикое количество процессорного времени
%run time -p ./nonblock.exe

#include "multiplexing_reader_common.h"

#include <fcntl.h>
#include <sys/epoll.h>

void read_all(int* input_fds, int count) {
    // Работает быстро, так как читает все что есть в "файле" на данный момент вне зависимости от того пишет ли туда кто-нибудь или нет
    // У этого метода есть большая проблема: внутри вечного цикла постоянно вызывается системное прерывание.
    // Процессорное время тратится впустую.
    log_printf("Nonblock realisation start\n");
    for (int i = 0; i < count; ++i) {
        fcntl(input_fds[i], F_SETFL, fcntl(input_fds[i], F_GETFL) | O_NONBLOCK); // Пометили дескрипторы как неблокирующие
    }
    bool all_closed = false;
    while (!all_closed) {
        all_closed = true;
        for (int i = 0; i < count; ++i) { // Проходимся по всем файловым дескрипторам
            if (input_fds[i] == -1) {
                continue;
            }
            all_closed = false;
            char buf[100];
            int read_bytes = 0;
            // Пытаемся читать пока либо не кончится файл, либо не поймаем ошибку
            while ((read_bytes = read(input_fds[i], buf, sizeof(buf))) > 0) {
                buf[read_bytes] = '\0';
                log_printf("Read from %d subprocess: %s", i, buf);
            }
            if (read_bytes == 0) { // Либо прочитали весь файл
                close(input_fds[i]);
                input_fds[i] = -1;
            } else {
                conditional_handle_error(errno != EAGAIN, "strange error"); // Либо поймали ошибку (+ проверяем, что ошибка ожидаемая)
            }
        }
    }
    log_printf("Nonblock realisation finish\n");
}


Run: `gcc multiplexing_reader_test.c multiplexing_reader_common.c multiplexing_reader_nonblock.c -o nonblock.exe`

### Результаты чтения с помощью наивного неблокирующего read

Видно, что чтение происходит сразу после записи, **но user и system time близки к затраченному астрономическому времени**, то есть сожжено дикое количество процессорного времени

Run: `time -p ./nonblock.exe`

0.000         main():18  [tid=51621]: Start multiplexing test
0.001     read_all():16  [tid=51621]: Nonblock realisation start
0.001         main():31  [tid=51626]: Send hello from 4 subprocess
0.001     read_all():33  [tid=51621]: Read from 4 subprocess: Hello from 4 subprocess
0.137         main():31  [tid=51625]: Send hello from 3 subprocess
0.137     read_all():33  [tid=51621]: Read from 3 subprocess: Hello from 3 subprocess
0.201         main():31  [tid=51624]: Send hello from 2 subprocess
0.201     read_all():33  [tid=51621]: Read from 2 subprocess: Hello from 2 subprocess
0.301         main():31  [tid=51623]: Send hello from 1 subprocess
0.301     read_all():33  [tid=51621]: Read from 1 subprocess: Hello from 1 subprocess
0.433         main():31  [tid=51622]: Send hello from 0 subprocess
0.433     read_all():33  [tid=51621]: Read from 0 subprocess: Hello from 0 subprocess
0.434     read_all():43  [tid=51621]: Nonblock realisation finish
0.434         main():49  [tid=51621]: Fini

## <a name="epoll"></a> epoll c level-triggering

In [73]:
%%cpp multiplexing_reader_epoll.c
%run gcc multiplexing_reader_test.c multiplexing_reader_common.c multiplexing_reader_epoll.c -o epoll.exe
%MD ### Результаты чтения с помощью epoll c level-triggering
%MD Видно, что чтение происходит сразу после записи, а user и system time близки к 0
%run time -p ./epoll.exe

#include "multiplexing_reader_common.h"

#include <fcntl.h>
#include <sys/epoll.h>

void read_all(int* input_fds, int count) {
    // Круче предыдущего, потому что этот вариант программы не ест процессорное время ни на что
    // (в данном случае на проверку условия того, что в файле ничего нет)
    log_printf("Epoll realisation start\n");
    // Создаем epoll-объект. В случае Level Triggering события объект скорее представляет собой множество файловых дескрипторов по которым есть события. 
    // И мы можем читать это множество, вызывая epoll_wait
    // epoll_create has one legacy parameter, so I prefer to use newer function. 
    int epoll_fd = epoll_create1(0);
    // Тут мы подписываемся на события, которые будет учитывать epoll-объект, т.е. указываем события за которыми мы следим
    for (int i = 0; i < count; ++i) {
        struct epoll_event event = {
            .events = EPOLLIN | EPOLLERR | EPOLLHUP, 
            .data = {.u32 = i} // user data
        };
        epoll_ctl(epoll_fd, EPOLL_CTL_ADD, input_fds[i], &event);
    }
    int not_closed = count;
    while (not_closed > 0) {
        struct epoll_event event;
        int epoll_ret = epoll_wait(epoll_fd, &event, 1, 1000); // Читаем события из epoll-объект (то есть из множества файловых дескриптотров, по которым есть события)
        if (epoll_ret <= 0) {
            continue;
        }
        int i = event.data.u32; // Получаем обратно заданную user data
        
        char buf[100];
        int read_bytes = 0;
        // Что-то прочитали из файла.
        // Так как read вызывается один раз, то если мы все не считаем, то нам придется делать это еще раз на следующей итерации большого цикла. 
        // (иначе можем надолго заблокироваться)
        // Решение: комбинируем со реализацией через O_NONBLOCK и в этом месте читаем все что доступно до самого конца
        if ((read_bytes = read(input_fds[i], buf, sizeof(buf))) > 0) {
            buf[read_bytes] = '\0';
            log_printf("Read from %d subprocess: %s", i, buf);
        } else if (read_bytes == 0) { // Файл закрылся, поэтому выкидываем его файловый дескриптор
            // Это системный вызов. Он довольно дорогой. Такая вот плата за epoll (в сравнении с poll, select)
            epoll_ctl(epoll_fd, EPOLL_CTL_DEL, input_fds[i], NULL);
            close(input_fds[i]);
            input_fds[i] = -1;
            not_closed -= 1;
        } else {
            conditional_handle_error(1, "strange error");
        }
    }
    close(epoll_fd);
    log_printf("Epoll realisation finish\n");
}


Run: `gcc multiplexing_reader_test.c multiplexing_reader_common.c multiplexing_reader_epoll.c -o epoll.exe`

### Результаты чтения с помощью epoll c level-triggering

Видно, что чтение происходит сразу после записи, а user и system time близки к 0

Run: `time -p ./epoll.exe`

0.000         main():18  [tid=51676]: Start multiplexing test
0.002     read_all():15  [tid=51676]: Epoll realisation start
0.002         main():31  [tid=51681]: Send hello from 4 subprocess
0.003     read_all():45  [tid=51676]: Read from 4 subprocess: Hello from 4 subprocess
0.154         main():31  [tid=51680]: Send hello from 3 subprocess
0.155     read_all():45  [tid=51676]: Read from 3 subprocess: Hello from 3 subprocess
0.202         main():31  [tid=51679]: Send hello from 2 subprocess
0.202     read_all():45  [tid=51676]: Read from 2 subprocess: Hello from 2 subprocess
0.346         main():31  [tid=51678]: Send hello from 1 subprocess
0.347     read_all():45  [tid=51676]: Read from 1 subprocess: Hello from 1 subprocess
0.402         main():31  [tid=51677]: Send hello from 0 subprocess
0.402     read_all():45  [tid=51676]: Read from 0 subprocess: Hello from 0 subprocess
0.402     read_all():57  [tid=51676]: Epoll realisation finish
0.403         main():49  [tid=51676]: Finish mul

## epoll c edge-triggering

In [74]:
%%cpp multiplexing_reader_epoll_edge.c
%run gcc multiplexing_reader_test.c multiplexing_reader_common.c multiplexing_reader_epoll_edge.c -o epoll_edge.exe
%MD ### Результаты чтения с помощью epoll c edge-triggering (EPOLLET опция)
%MD Видно, что чтение происходит сразу после записи, а user и system time близки к 0
%run time -p ./epoll_edge.exe

#include "multiplexing_reader_common.h"

#include <fcntl.h>
#include <sys/epoll.h>

void read_all(int* input_fds, int count) {
    // epoll + edge triggering
    // В этом случае объект epoll уже является очередью. 
    // Ядро в него нам пишет событие каждый раз, когда случается событие, на которое мы подписались
    // А мы в дальнейшем извлекаем эти события (и в очереди их больше не будет).
    log_printf("Epoll edge-triggered realisation start\n");
    
    // sleep(1); // так можно проверить, не потеряем ли мы информацию о записанном в файловые дескрипторы, если сделаем EPOLL_CTL_ADD после записи
    int epoll_fd = epoll_create1(0);
    for (int i = 0; i < count; ++i) {
        fcntl(input_fds[i], F_SETFL, fcntl(input_fds[i], F_GETFL) | O_NONBLOCK);
        // Обратите внимание на EPOLLET
        struct epoll_event event = {
            .events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLET, 
            .data = {.u32 = i}
        };
        epoll_ctl(epoll_fd, EPOLL_CTL_ADD, input_fds[i], &event);
    }
    int not_closed = count;
    while (not_closed > 0) {
        // У меня тут возник вопрос: а получим ли мы уведомления о файловых дескрипторах, 
        // из которых на момент EPOLL_CTL_ADD УЖЕ есть что читать?
        // Не нашел в документации, но многочисленные примеры говорят, что можно считать, что получим.
        struct epoll_event event;
        int epoll_ret = epoll_wait(epoll_fd, &event, 1, 1000);
        if (epoll_ret <= 0) {
            continue;
        }
        int i = event.data.u32;
    
        char buf[100];
        int read_bytes = 0;
        while ((read_bytes = read(input_fds[i], buf, sizeof(buf))) > 0) {
            buf[read_bytes] = '\0';
            log_printf("Read from %d subprocess: %s", i, buf);
        } 
        if (read_bytes == 0) {
            epoll_ctl(epoll_fd, EPOLL_CTL_DEL, input_fds[i], NULL);
            close(input_fds[i]);
            input_fds[i] = -1;
            not_closed -= 1;
        } else {
            conditional_handle_error(errno != EAGAIN, "strange error");
        }
    }
    close(epoll_fd);
    log_printf("Epoll edge-triggered realisation finish\n");
}


Run: `gcc multiplexing_reader_test.c multiplexing_reader_common.c multiplexing_reader_epoll_edge.c -o epoll_edge.exe`

### Результаты чтения с помощью epoll c edge-triggering (EPOLLET опция)

Видно, что чтение происходит сразу после записи, а user и system time близки к 0

Run: `time -p ./epoll_edge.exe`

0.000         main():18  [tid=51694]: Start multiplexing test
0.001     read_all():17  [tid=51694]: Epoll edge-triggered realisation start
0.001         main():31  [tid=51699]: Send hello from 4 subprocess
0.001     read_all():46  [tid=51694]: Read from 4 subprocess: Hello from 4 subprocess
0.155         main():31  [tid=51698]: Send hello from 3 subprocess
0.155     read_all():46  [tid=51694]: Read from 3 subprocess: Hello from 3 subprocess
0.254         main():31  [tid=51697]: Send hello from 2 subprocess
0.255     read_all():46  [tid=51694]: Read from 2 subprocess: Hello from 2 subprocess
0.355         main():31  [tid=51696]: Send hello from 1 subprocess
0.355     read_all():46  [tid=51694]: Read from 1 subprocess: Hello from 1 subprocess
0.453         main():31  [tid=51695]: Send hello from 0 subprocess
0.453     read_all():46  [tid=51694]: Read from 0 subprocess: Hello from 0 subprocess
0.454     read_all():58  [tid=51694]: Epoll edge-triggered realisation finish
0.454         main

## <a name="select"></a> select

In [67]:
%%cpp multiplexing_reader_select.c
%run gcc multiplexing_reader_test.c multiplexing_reader_common.c multiplexing_reader_select.c -o select.exe
%MD ### Результаты чтения с помощью select
%MD Видно, что чтение происходит сразу после записи, а user и system time близки к 0
%run time -p ./select.exe

#include "multiplexing_reader_common.h"

#include <fcntl.h>
#include <sys/epoll.h>

void read_all(int* input_fds, int count) {
    log_printf("Select realisation start\n");

    struct timeval tv = {.tv_sec = 1, .tv_usec = 0};
    int not_closed = count;
    while (not_closed > 0) {
        int max_fd = 0;
        // Так как структура fd_set используется и на вход (какие дескрипторы обрабатывать) и на выход (из каких пришёл вывод), её надо повторно инициализировать.
        fd_set rfds;
        FD_ZERO(&rfds);
        for (int i = 0; i < count; ++i) {
            if (input_fds[i] != -1) {
                FD_SET(input_fds[i], &rfds);
                max_fd = (input_fds[i] < max_fd) ? max_fd : input_fds[i];
            }
        }
        // аргументы: макс количество файловых дескрипторов, доступное количество на чтение, запись, ошибки, время ожидания.
        int select_ret = select(max_fd + 1, &rfds, NULL, NULL, &tv);
        conditional_handle_error(select_ret == -1, "select error");
        if (select_ret > 0) {
            for (int i = 0; i < count; ++i) {
                // Проверяем, какой дескриптор послал данные.
                if (input_fds[i] != -1 && FD_ISSET(input_fds[i], &rfds)) {
                    char buf[100];
                    int read_bytes = 0;
                    if ((read_bytes = read(input_fds[i], buf, sizeof(buf))) > 0) {
                        buf[read_bytes] = '\0';
                        log_printf("Read from %d subprocess: %s", i, buf);
                    } else if (read_bytes == 0) {
                        close(input_fds[i]);
                        input_fds[i] = -1;
                        not_closed -= 1;
                    } else {
                        conditional_handle_error(1, "strange error");
                    }
                }
            }
        }
    }
    log_printf("Select realisation finish\n");
}


Run: `gcc multiplexing_reader_test.c multiplexing_reader_common.c multiplexing_reader_select.c -o select.exe`

### Результаты чтения с помощью select

Видно, что чтение происходит сразу после записи, а user и system time близки к 0

Run: `time -p ./select.exe`

0.000         main():18  [tid=51543]: Start multiplexing test
0.002     read_all():13  [tid=51543]: Select realisation start
0.002         main():31  [tid=51548]: Send hello from 4 subprocess
0.002     read_all():39  [tid=51543]: Read from 4 subprocess: Hello from 4 subprocess
0.102         main():31  [tid=51547]: Send hello from 3 subprocess
0.102     read_all():39  [tid=51543]: Read from 3 subprocess: Hello from 3 subprocess
0.201         main():31  [tid=51546]: Send hello from 2 subprocess
0.202     read_all():39  [tid=51543]: Read from 2 subprocess: Hello from 2 subprocess
0.346         main():31  [tid=51545]: Send hello from 1 subprocess
0.346     read_all():39  [tid=51543]: Read from 1 subprocess: Hello from 1 subprocess
0.401         main():31  [tid=51544]: Send hello from 0 subprocess
0.401     read_all():39  [tid=51543]: Read from 0 subprocess: Hello from 0 subprocess
0.402     read_all():51  [tid=51543]: Select realisation finish
0.402         main():49  [tid=51543]: Finish m

# <a name="select_fail"></a> Select fail

Как-то в монорепозитории Яндекса обновили openssl...

(Суть в том, что select не поддерживает файловые дескрипторы с номерами больше 1024. Это пример на такую ошибку)

In [80]:
%%cpp select_fail.c
%run gcc select_fail.c -o select_fail.exe
%run ulimit -n 1200 && ./select_fail.exe
%run gcc -DBIG_FD select_fail.c -o select_fail.exe
%run ulimit -n 1200 && ./select_fail.exe

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <fcntl.h>
#include <sys/resource.h>
#include <sys/syscall.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <errno.h>
#include <time.h>
#include <sys/epoll.h>
#include <string.h>
#include <stdatomic.h>

// log_printf - макрос для отладочного вывода, добавляющий время с первого использования, имя функции и номер строки
const char* log_prefix(const char* func, int line) {
    struct timespec spec; clock_gettime(CLOCK_REALTIME, &spec); long long current_msec = spec.tv_sec * 1000L + spec.tv_nsec / 1000000;
    static _Atomic long long start_msec_storage = -1; long long start_msec = -1; if (atomic_compare_exchange_strong(&start_msec_storage, &start_msec, current_msec)) start_msec = current_msec;
    long long delta_msec = current_msec - start_msec; const int max_func_len = 13;
    static __thread char prefix[100]; sprintf(prefix, "%lld.%03lld %*s():%d    ", delta_msec / 1000, delta_msec % 1000, max_func_len, func, line); sprintf(prefix + max_func_len + 13, "[tid=%ld]", syscall(__NR_gettid));
    return prefix;
}
#define log_printf_impl(fmt, ...) { time_t t = time(0); dprintf(2, "%s: " fmt "%s", log_prefix(__FUNCTION__, __LINE__), __VA_ARGS__); }
// Format: <time_since_start> <func_name>:<line> : <custom_message>
#define log_printf(...) log_printf_impl(__VA_ARGS__, "")

#define conditional_handle_error(stmt, msg) \
    do { if (stmt) { perror(msg " (" #stmt ")"); exit(EXIT_FAILURE); } } while (0)

#ifdef BIG_FD
const int EXTRA_FD_COUNT = 1030;
#else
const int EXTRA_FD_COUNT = 1010;
#endif

int main() {
    pid_t child_pid;
    int input_fd;
   
    {
        int fds[2];
        pipe(fds);
        input_fd = fds[0];
        if ((child_pid = fork()) == 0) {
            sleep(1);
            dprintf(fds[1], "Hello from exactly one subprocess\n");
            exit(0);
        }
        assert(child_pid > 0);
        close(fds[1]);
    }
    
    for (int i = 0; i < EXTRA_FD_COUNT; ++i) {
        input_fd = dup(input_fd); // yes, we don't bother closing file descriptors in this example
    }
    
    log_printf("Select start input_fd=%d\n", input_fd);
    
    struct timeval tv = {.tv_sec = 10, .tv_usec = 0};
    while (input_fd != -1) {
        fd_set rfds;
        FD_ZERO(&rfds);
        FD_SET(input_fd, &rfds);
        char secret[] = "abcdefghijklmnop";
        int select_ret = select(input_fd + 1, &rfds, NULL, NULL, &tv);
        log_printf("Secret is %s\n", secret);
        if (strcmp(secret, "abcdefghijklmnop") != 0) {
            log_printf("Hey! select is broken!\n");
        }
        conditional_handle_error(select_ret == -1, "select error");
        if (select_ret > 0) {
            assert(FD_ISSET(input_fd, &rfds));
            
            char buf[100];
            int read_bytes = 0;
            if ((read_bytes = read(input_fd, buf, sizeof(buf) - 1)) > 0) {
                buf[read_bytes] = '\0';
                log_printf("Read from child subprocess: %s", buf);
            } else if (read_bytes == 0) {
                input_fd = -1;
            } else {
                conditional_handle_error(errno != EAGAIN, "strange error");
            }
        }
    }
    
    int status;    
    assert(waitpid(child_pid, &status, 0) != -1);
    return 0;
}

Run: `gcc select_fail.c -o select_fail.exe`

Run: `ulimit -n 1200 && ./select_fail.exe`

0.000          main():64  [tid=52276]: Select start input_fd=1013
1.049          main():73  [tid=52276]: Secret is abcdefghijklmnop
1.049          main():85  [tid=52276]: Read from child subprocess: Hello from exactly one subprocess
1.049          main():73  [tid=52276]: Secret is abcdefghijklmnop


Run: `gcc -DBIG_FD select_fail.c -o select_fail.exe`

Run: `ulimit -n 1200 && ./select_fail.exe`

0.000          main():64  [tid=52285]: Select start input_fd=1033
1.045          main():73  [tid=52285]: Secret is a
1.045          main():75  [tid=52285]: Hey! select is broken!
1.045          main():85  [tid=52285]: Read from child subprocess: Hello from exactly one subprocess
1.046          main():73  [tid=52285]: Secret is a
1.046          main():75  [tid=52285]: Hey! select is broken!


# <a name="aio"></a> Linux AIO

Медленными бывают так же диски. И у них есть особенность: они не завершаются с ошибкой EAGAIN если нет данных. А просто долго висят в операциях read, write.

Как жить? Можно делать несколько операций одновременно. И чтобы не плодить потоки (блочить каждый поток на записи/чтении) можно юзать Linux AIO

Предустановка

```bash
sudo apt-get install libaio1
sudo apt-get install libaio-dev
```

Статейки

https://github.com/littledan/linux-aio

https://oxnz.github.io/2016/10/13/linux-aio/#install

In [82]:
%%cpp aio.c
%run gcc aio.c -o aio.exe -laio # обратите внимание
%run ./aio.exe
%run cat ./output_0.txt
%run cat ./output_1.txt

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <fcntl.h>
#include <sys/resource.h>
#include <sys/syscall.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <errno.h>
#include <time.h>
#include <sys/epoll.h>
#include <string.h>
#include <libaio.h>  // подключаем
#include <err.h>
#include <stdatomic.h>

// log_printf - макрос для отладочного вывода, добавляющий время с первого использования, имя функции и номер строки
const char* log_prefix(const char* func, int line) {
    struct timespec spec; clock_gettime(CLOCK_REALTIME, &spec); long long current_msec = spec.tv_sec * 1000L + spec.tv_nsec / 1000000;
    static _Atomic long long start_msec_storage = -1; long long start_msec = -1; if (atomic_compare_exchange_strong(&start_msec_storage, &start_msec, current_msec)) start_msec = current_msec;
    long long delta_msec = current_msec - start_msec; const int max_func_len = 13;
    static __thread char prefix[100]; sprintf(prefix, "%lld.%03lld %*s():%d    ", delta_msec / 1000, delta_msec % 1000, max_func_len, func, line); sprintf(prefix + max_func_len + 13, "[tid=%ld]", syscall(__NR_gettid));
    return prefix;
}
#define log_printf_impl(fmt, ...) { time_t t = time(0); dprintf(2, "%s: " fmt "%s", log_prefix(__FUNCTION__, __LINE__), __VA_ARGS__); }
// Format: <time_since_start> <func_name>:<line> : <custom_message>
#define log_printf(...) log_printf_impl(__VA_ARGS__, "")

#define conditional_handle_error(stmt, msg) \
    do { if (stmt) { perror(msg " (" #stmt ")"); exit(EXIT_FAILURE); } } while (0)

const int N_FILES = 2;

int main() {
    // подготовим контекст для асинхронных действий
    io_context_t ctx = {0};
    int io_setup_ret = io_setup(N_FILES + 10, &ctx);
    errno = -io_setup_ret;
    conditional_handle_error(io_setup_ret < 0, "Can't io_setup");
      
    // создаем какой-то список открытых файлов
    int fds[N_FILES];
    for (int i = 0; i < N_FILES; ++i) {
        char file[100];
        sprintf(file, "./output_%d.txt", i);
        fds[i] = open(file, O_WRONLY | O_CREAT, 0664);
        conditional_handle_error(fds[i] < 0, "Can't open");
        log_printf("Opened file '%s' fd %d\n", file, fds[i]);
    }
    
    // готовим батчевую запись во много файлов
    struct iocb iocb[N_FILES];
    struct iocb* iocbs[N_FILES];
    char msgs[N_FILES][100];
    for (int i = 0; i < N_FILES; ++i) {
        sprintf(msgs[i], "hello to file %d\n", i);
        // Создаём структуру для удобной записи (включает сразу дескриптор, сообщение и его длину)
        io_prep_pwrite(&iocb[i], fds[i], (void*)msgs[i], strlen(msgs[i]), 0); // Формируем запросы на запись
        // data -- для передачи дополнительной информации (в epoll такая же штуковина)
        // Конкретно здесь передаётся информация о том, в какой файл записываем
        iocb[i].data = (char*)0 + i;
        iocbs[i] = &iocb[i];
    }

    // Отправляем батч запросов на выполнение
    // Возвращает количество успешно добавленных запросов.
    int io_submit_ret = io_submit(ctx, N_FILES, iocbs);
    if (io_submit_ret != N_FILES) {
        errno = -io_submit_ret;
        log_printf("Error: %s\n", strerror(-io_submit_ret));
        warn("io_submit");
        io_destroy(ctx);
    }

    int in_fly_writings = N_FILES;
    while (in_fly_writings > 0) {
        struct io_event event;
        struct timespec timeout = {.tv_sec = 0, .tv_nsec = 500000000};
        // В этом примере получаем максимум реакцию на один запрос. Эффективнее, конечно, сразу на несколько.
        if (io_getevents(ctx, 0, 1, &event, &timeout) == 1) { // Здесь в цикле получаем реакцию на запросы
            conditional_handle_error(event.res < 0, "Can't do operation");
            int i = (char*)event.data - (char*)0;
            log_printf("%d written ok\n", i);
            close(fds[i]);
            --in_fly_writings;
            continue;
        }
        log_printf("not done yet\n");
    }
    io_destroy(ctx);

    return 0;
}

Run: `gcc aio.c -o aio.exe -laio # обратите внимание`

Run: `./aio.exe`

0.000          main():55  [tid=52304]: Opened file './output_0.txt' fd 3
0.001          main():55  [tid=52304]: Opened file './output_1.txt' fd 4
0.001          main():90  [tid=52304]: 0 written ok
0.001          main():90  [tid=52304]: 1 written ok


Run: `cat ./output_0.txt`

hello to file 0


Run: `cat ./output_1.txt`

hello to file 1


# <a name="hw"></a> Комментарии к ДЗ

*  highload/epoll-read-fds-vector: Тупая реализация не зайдёт
<br>Контрпример: мы поочерёдно начинаем читать файлы, стартуя с 0-го. Пусть 2 файл -- это пайп, через который проверяющая система начинает посылать 100кб данных. Так как пайп не обработан сразу, то по достижении 65kb, ввод заблокируется. Чекер зависнет, не закроет нам 0-ой файл (который скорее всего пайп). И будет таймаут.
  <br>В общем задача на epoll. linux aio тут не зайдет, вопрос на подумать - почему?

* highload/epoll-read-write-socket: Возможно вам помогут факты: 
  * в epoll можно добавить файл дважды: один раз на чтение, другой раз на запись (с разной user data). 
  * вы можете переключать режим, на предмет каких событий вы слушаете файловый дескриптор