# Базовые операции с Kafka Consumer

## Подготовка

### Создать топик

При создании топика необходимо настроить два параметра:
1. `partitions` - количество партиций в топике. Apache Kafka гарантирует порядок только в рамках одной партиций. Чем больше партиций, тем больше консьюмеров можно держать в одной консьюмер группе, а значит тем выше параллелизм и пропускная способность (throughput);
1. `replication factor (RF)` - число копий одной партиции для хранения в кластере Apache Kafka.

Базовые рекомендации:

- `RF == 3`, т.е. 3 копии (реплики) одних и тех же данных будет хранится в кластере. Значение `3` выведено экспериментальным путем и позволяет обеспечеить надежность даже на очень больших кластерах
- `partitions = K * N`, где `N` - ожидаемое количество консьюмеров в консьюмер группе, а `K (>= 1)` - целочисленный коэффициент. Так количество партиций будет кратным количеству консьюмеров, а значит консьюмеры будут равномерно нагружены.

Топик `movies` будет состоять из двух партиций, по три копии каждая:

In [None]:
kafka-topics --bootstrap-server "$KAFKA_HOST":"$KAFKA_PORT" \
  --topic movies \
  --create \
  --partitions 2 \
  --replication-factor 3

### Получить список топиков

In [None]:
kafka-topics --bootstrap-server "$KAFKA_HOST":"$KAFKA_PORT" \
    --list

In [None]:
log_file=$(HOST=$KAFKA_HOST execute ls /tmp/kraft-combined-logs/__cluster_metadata-0/ | grep '\.log$')

kafka-metadata-shell \
    --snapshot /tmp/kraft-combined-logs/__cluster_metadata-0/$log_file \
    ls /topics \
    2> /dev/null

In [None]:
kafka-topics --bootstrap-server "$KAFKA_HOST":"$KAFKA_PORT" \
    --describe \
    --topic movies

### Записать данные в топик

Утилита `kafka-console-producer` позволяет записат в топик данные со стандартного потока ввода. При этом все, что попадает в стандартный поток ввода (`stdin`) определяется как значение, и это зачение целиком без ключа отправляется в Apache Kafka кластер. Если существует ключ, то необходимо указать дополнительные опции при запуске `kafka-console-producer`:

In [None]:
kafka-console-producer \
    --bootstrap-server "$KAFKA_HOST":"$KAFKA_PORT" \
    --topic movies \
    --property "parse.key=true" \
    --property "key.separator=:" \
<<EOF
0:The Shawshank Redemption, 1994
0:God Father, 1972
0:The Dark Knight, 2008
0:The Godfather Part II, 1974
0:12 Angry Men, 1957
1:Schindler's List, 1993
1:The Lord of the Rings: The Return of the King, 2003
1:Pulp Fiction, 1994
1:The Lord of the Rings: The Fellowship of the Ring, 2001
1:The Good, the Bad and the Ugly, 1966
EOF

### Прочитать данные из топика

Утилита `kafka-console-consumer` позволяет прочитать данные из топика, и отправить результаты на стандартный поток вывода (stdout). При этом по умолчанию печаются только значения, которые находились в топике. Если необходимо напечатать также ключ и партицию, в которой находилась запись, то необходимо указать дополнительные опции при старте `kafka-console-consumer`:

In [None]:
kafka-console-consumer \
    --bootstrap-server "$KAFKA_HOST":"$KAFKA_PORT" \
    --topic movies \
    --from-beginning \
    --timeout-ms 10000 \
    --property print.key=true \
    --property print.partition=true

Обратите внимание, что сначала были отображены все записи одной партиции, а потом все записи второй партиции

## Конфигурация консьюмеров

Консьюмер имеет много [параметров конфигурации](https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html), но на практике наиболее часто используют следующие:
|<div style="width:250px">Имя</div>|Описание|<div style="width:200px">Значение по умолчанию</div>|Допустимые значения|
|---|--------|---------------------|---------|
|[`group.id`](https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#group-id)|Идентификатор группы, в которую будет входить консьюмер. Консьюмер группа создается "на лету" как только consumer coordinator получает новое значение `group.id` | `null` | цифро-буквенная последовательность |
|[`auto.offset.reset`](https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#auto.offset.reset)|Откуда начинать читать данные из топика, когда новый консьюмер подключается: `earliest` - с самого начала; `latest` - только новые (игноририровать имеющиеся записи)|`latest`|`latest`\|`earliest`|
|[`enable-auto-commit`](https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#enable-auto-commit)|Если значение `true`, то консьюмер будет сам автоматически отправлять брокеру информацию об обработанных событиях. |`true`|`true`\|`false`|
|[`max-poll-records`](https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#max-poll-records)|Максимальное число записей, которое вернет брокер за одно обращение. |`500`|`[1,..]`|
|[`max-poll-interval-ms`](https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#max-poll-interval-ms)|Максимальное время на обработку батча. Если консьюмер не вызовет `poll()` до истечения этого срока, то консьюмер признается мертвым, consumer group coordinator запустит перебалансировку партиций среди оставщихся живых консьюмеров в консьюмер группе |`300_000`(5 минут)|`[1,..]`|

Функция `run_consumer_with_config` позволяет запустить консольный консьюмер с указанной конфигурацией:

In [None]:
function run_consumer_with_config() {
    local consumer_config=${1:-/tmp/consumer.properties}
    local HOST=$KAFKA_HOST

    execute "[ -f ${consumer_config} ]" || {
        execute touch ${consumer_config}
    }

    kafka-console-consumer \
        --bootstrap-server "$KAFKA_HOST":"$KAFKA_PORT" \
        --topic movies \
        --from-beginning \
        --timeout-ms 10000 \
        --property print.key=true \
        --property print.partition=true \
        --consumer.config ${consumer_config}
}

Консьюмер группа позволяет запоминать какие записи уже были обработаны. Таким образом, только необработанные записи будут прочитаны при перезапуске консьюмер группы:

In [None]:
HOST=$KAFKA_HOST \
new_file /tmp/consumer-my-explicit-group-id.properties <<EOF
group.id=my-explicit-group-id
max-poll-records=100
max-poll-interval-ms=1000
EOF

HOST=$KAFKA_HOST execute \
cat /tmp/consumer-my-explicit-group-id.properties

In [None]:
run_consumer_with_config /tmp/consumer-my-explicit-group-id.properties

К сожалению, консольный консьюмер не позволяет полностью задать любую конфигурацию, поэтому из трех конфигов, был применен только один: `group.id=my-explicit-group-id`.

Проверить наличие консьюмер группы можно при помощи консольной утилиты:

In [None]:
kafka-consumer-groups --bootstrap-server "${KAFKA_HOST}":"${KAFKA_PORT}" \
    --group my-explicit-group-id \
    --describe \
    --members

Если запустить консьюмер с той же конфигурацией, то ни одной записи не будет получено:

In [None]:
run_consumer_with_config /tmp/consumer-my-explicit-group-id.properties

Если отправить новые сообщения в топик `movies` и запустить снова консьюмер с той же конфигурацией, то будут получены только новые записи:

In [None]:
kafka-console-producer \
  --bootstrap-server "$KAFKA_HOST":"$KAFKA_PORT" \
  --topic movies \
  --property "parse.key=true" \
  --property "key.separator=:" \
<<EOF
0:Lock, Stock and Two Smoking Barrels, 1998
1:Forrest Gump, 1994
EOF

In [None]:
run_consumer_with_config /tmp/consumer-my-explicit-group-id.properties

### Задание

1. Создать новую консьюмер группу для топика `movies`, названием которой будет ваше имя;
1. Добавить два консьюмера в новую консьюмер группу (можно использовать [`kafka-consumer-1.ipynb`](kafka-consumer-1.ipynb) и [`kafka-consumer-2.ipynb`](kafka-consumer-2.ipynb) для параллельного запуска консьюмеров);
1. Убедиться, что консьюмер группа состоит из двух консьюмеров;
1. Убедиться, что после перезапуска консьюмеров, старые записи не не обрабатываются заново.

## Консьюмер группы

Каждый топик имеет одну и более партиций, при этом каждую партицию можно читать параллельно, увеличивая таким образом пропускную способность приложения.

При параллельном чтении данных из топика, невозможно гарантировать `FIFO` порядок в рамках всего топика, Apache Kafka гарантирует порядок записей только в рамках одной партиции, поэтому необходимо ответственно подходить к разбиению топика на партиции. На практике редко бывает необходимо поддерживать FIFO порядок в пределах всего топика, но если такая ситуация возникает, то можно создать топик, в котором будет всего лишь одна партиция. При этом сама партция может иметь `RF` (`Replication Factor`) больше единицы для повышения оказоустойчивости.

Для параллельного чтения данных из топика необходимо запустить несколько инстанцев консьюмера, объединив их всех в одну консьюмер группу (consumer group). Для этих целей используется опция `group.id`: у всех консьюмеров в одной консьюмер группе должно быть одно и тоже же значение `group.id`

### Объединение нескольких консьюмеров в одну консьюмер группу

Откройте [kafka-consumer-1.ipynb](./kafka-consumer-1.ipynb) и запустите консьюмера. Он автоматически завершится через 60 секунд

Откройте [kafka-consumer-2.ipynb](./kafka-consumer-2.ipynb) и запустите консьюмера. Он автоматически завершится через 60 секунд

Откройте [kafka-consumer-3.ipynb](./kafka-consumer-3.ipynb) и запустите консьюмера. Он автоматически завершится через 60 секунд

**ВНИМАНИЕ** убедитесь, что все три консьюмера из [kafka-consumer-1.ipynb](./kafka-consumer-1.ipynb), [kafka-consumer-2.ipynb](./kafka-consumer-2.ipynb) и [kafka-consumer-3.ipynb](./kafka-consumer-3.ipynb) запущены

In [None]:
kafka-consumer-groups --bootstrap-server kafka1:9092,kafka2:9092 --list | grep -q consumer-group-1 &&
kafka-consumer-groups --bootstrap-server kafka1:9092,kafka2:9092 --group consumer-group-1 --describe --members |
  grep '^consumer-group-1' | \
  awk '{print $2}' | \
  sort -u | wc -l | \
  grep -q '^3' || {
    echo "Пожалуйста, запустите консьюмеры" &&
    false
  }

In [None]:
kafka-consumer-groups --bootstrap-server kafka1:9092,kafka2:9092 \
    --group consumer-group-1 \
    --describe \
    --members

In [None]:
kafka-console-producer \
  --bootstrap-server kafka1:9092 \
  --topic movies \
  --property "parse.key=true" \
  --property "key.separator=:" \
<<EOF
0:Операция "Ы" и другие приключения Шурика, 1965
0:Место встречи изменить нельзя, 1979
0:Семнадцать мнгновений весны, 1973
0:Собачье сердце, 1988
0:Джельтенмены удачи, 1971
1:Тот самый Мюнхаузен, 1979
1:Служебный роман, 1977
1:Бриллиантовая рука, 1969
1:Кавказская пленница, 1967
1:12 стульев,1971
EOF

Обратите внимание, что

- топик `movies` имеет всего две партиции,
- при загрузке данных в Kafka, у всех записей было лишь два уникальных ключа: `0` и `1`,
- консьюмер группа `consumer-group-1` состоит из трёх консьюмеров,
- только два консьюмера получили данные и вывели их на экран.

### Выводы

1. Один консьюмер в консьюмер группе _может_ обрабатывать одну или больше партиций;
1. Количество консьюмеров в консьюмер группе должно быть делителем количества партиций в топике, иначе будет неравномерная нагрузка на консьюмеры;
1. Если количество консьюмеров в консьюмер группе больше числа партиций, то консьюмеры будут простаивать.

### Задание

1. Создать новый топик `music`;
1. При помощи сайта [birthdayjams.com](https://www.birthdayjams.com/) найти песни, которые были популярны в день вашего рождения в периоде с 0 до 15 лет;
1. Отправить названия песен в топик `music`, при этом ключом записи будет год песни, например: *1999:Christina Aguilera, Genie In A Bottle*;
1. Убедиться, что песни уходят в разные партиции топика `music`;
1. **(*) ДОПОЛНИТЕЛЬНО**: запустить 2 или 3 консьюмера в одной консьюмер группе, чтобы получать данные в разных консьюмерах;
1. **(*) ДОПОЛНИТЕЛЬНО**: повысить количество партиций топика `music` до 5 (удалить топик и создать заново) и посмотреть, в какие консьюмеры какие записи будут направлены.

## Координатор консьюмер групп

Консьюмер группа объединяет несколько консьюмеров, которые могут читать данные из топика параллельно. Каждый консьюмер в консьюмер группе получает одну или более партиций для чтения, но два консьюмера не могут читать одну партицию одновременно. При этом:

- если число партиций больше, чем число консьюмеров, то один консьюмер может читать данные из нескольких партиций;
- если число консьюмеров больше, чем число партиций, то консьюмеры, которые не получили партиции на обработку, будут находится в ждущем (`IDLE`) режиме.

Если один из консьюмеров прекращает свою работу, то его партиции перераспределяются между другими консьюмерами. Процесс распределения партиций между консьюмерами назвается _балансировка_ (rebalabcing) и занимается им один из брокеров Kafka кластера. Этот брокер называется **Координатором Консьюмер Группы (Consumer Group Coordinator)**.

Для каждой консьюмер группы consumer group coordinator может быть различным, чтобы его узнать, можно выполнить следующую команду:

In [None]:
kafka-consumer-groups --bootstrap-server "${KAFKA_HOST}":"${KAFKA_PORT}" \
    --group my-explicit-group-id \
    --describe \
    --state

Колонка **`COORDINATOR`** показывает, какой консьюмер является consumer group coordinator для указанной консьюмер группы.

### Перебалансировка

Чтобы запустить консьюмер группу нет никакой необходимости синхронизировать запуск каждого консьюмера, чтобы он произошел в одно время со всеми остальными консьюмерами в консьюмер группе.

Когда появляется первый консьюмер в консьюмер группе, Kafka выбирает одного из брокеров в качестве consumer group coordinator и он назначает чтение всех партиций топика первому консьюмеру.

По мере запуска новых консьюмеров в консьюмер группе (по `group.id`) consumer group coordinator будет перераспределять партиции между всеми участниками консьюмер группы равномерно.

Если какой-либо коньсьюмер в консьюмер группе завершает свою работу, то его партиции перераспределяются между остальными участниками консьюмер группы.

Процесс перераспределения партиций между участниками консьюмер группы называется **перебалансировка** (**rebalabcing**).

Для продолжения запустите [kafka-consumer-group-coordinator-1.ipynb](kafka-consumer-group-coordinator-1.ipynb)

Дополнительная проверка запуска консьюмер группы:

In [None]:
kafka-consumer-groups --bootstrap-server "${KAFKA_HOST}":"${KAFKA_PORT}" \
    --group my-consumer-group-coodinator-demo-group-id \
    --describe | awk '/console-consumer/{print $7}' | sort -u | wc -l | grep -q '^1$' || {
        echo "Пожалуйста, запустите kafka-consumer-group-coordinator-1.ipynb" >&2
        false
    }

Координатором для `my-consumer-group-coodinator-demo-group-id` выступает:

In [None]:
kafka-consumer-groups --bootstrap-server "${KAFKA_HOST}":"${KAFKA_PORT}" \
    --group my-consumer-group-coodinator-demo-group-id \
    --describe \
    --state

Для продолжения запустите [kafka-consumer-group-coordinator-2.ipynb](kafka-consumer-group-coordinator-2.ipynb), [kafka-consumer-group-coordinator-3.ipynb](kafka-consumer-group-coordinator-3.ipynb)

Необходимо запустить команду ниже несколько раз до тех пор, пока не появится сообщение:

> Consumer group 'my-consumer-group-coodinator-demo-group-id' is rebalancing

In [None]:
kafka-consumer-groups --bootstrap-server "${KAFKA_HOST}":"${KAFKA_PORT}" \
    --group my-consumer-group-coodinator-demo-group-id \
    --describe \
    --members

Сообщение:

> Consumer group 'my-consumer-group-coodinator-demo-group-id' is rebalancing

говорит о том, координатор консьюмер группы `my-consumer-group-coodinator-demo-group-id` обнаружил, что какой-то из консьюмеров перестал выходить на связь, поэтому координатор запустил процесс перебалансировки, чтобы перераспределелить партиции топика `movies` между живыми участниками группы `my-consumer-group-coodinator-demo-group-id`

## Работа консьюмеров при сбоях кластера

Текущая топология кластера Kafka:
- KRaft,
- три брокера,
- один контроллер.

Сбои кластера можно эмулировать при помощи `docker compose pause`. Пожалуйста, остановите `kafka3`:

In [None]:
docker compose pause kafka3
docker compose ps kafka3

Функция `run_consumer` запускает нового консьюмера:

In [None]:
function run_consumer() {
    kafka-console-consumer \
        --bootstrap-server "$KAFKA_HOST":"$KAFKA_PORT" \
        --topic movies \
        --from-beginning \
        --timeout-ms 20000 \
        --property print.key=true \
        --property print.partition=true
}

Функция `check_if_broker_stoppped` проверяет остановлен ли брокер:

In [None]:
function check_if_broker_stoppped() {
    local broker_no=${1}

    [ -z "${broker_no}" ] && {
        echo "Пожалуйста, укажите номер брокера для проверки" >&2
        return 1
    }

    HOST=$KAFKA_HOST
    local log_file=$(execute ls /tmp/kraft-combined-logs/__cluster_metadata-0 | grep '\.log$')

    kafka-metadata-shell --snapshot /tmp/kraft-combined-logs/__cluster_metadata-0/$log_file cat /brokers/${broker_no}/isFenced 2> /dev/null |
        grep -q "true" || {
            echo "Брокер 'kafka$((broker_no - 200))' все еще активен, пожалуйста, остановите 'kafka$((broker_no - 200))' брокер: 'docker compose pause kafka$((broker_no - 200))'" >&2
            false
        }
}

In [None]:
check_if_broker_stoppped 203

In [None]:
run_consumer

Сбой одного узла в кластере не влияет на процесс чтения: консьюмер читает данные из лидер реплик, которыми сейчас являются:

In [None]:
kafka-topics --bootstrap-server ${KAFKA_HOST}:${KAFKA_PORT} \
    --describe \
    --topic movies

Что будет, если `kafka2` брокер упадет (`docker compose pause kafka2`)?

In [None]:
docker compose pause kafka2
docker compose ps kafka2

In [None]:
check_if_broker_stoppped 202

In [None]:
run_consumer

Сбой даже двух узлов в кластере не влияет на процесс чтения: консьюмер читает данные из лидер реплик, которыми сейчас являются:

In [None]:
kafka-topics --bootstrap-server ${KAFKA_HOST}:${KAFKA_PORT} \
    --describe \
    --topic movies

Можно заметить:

1. `Replication Factor` топика `movies` равен 3;
1. изначально каждая реплика находилась на одном брокере эксклюзивно. В случае, если бы две реплики находились на одном брокере, то выход из строя одного брокера, сократил бы количество живых реплик на 2;
1. сейчас обе лидер реплики находятся на брокере `kafka1`, т.е кластер Apache Kafka автоматически выбрал лидером реплику, которая находится на брокере `kafka1`.

Что произойдет, если брокер `kafka1` так же выйдет из строя? Пожалуйста, остановите `kafka1`:

In [None]:
docker compose pause kafka1
docker compose ps kafka1

In [None]:
ssh-keyscan kafka1 > /dev/null 2>/dev/null && {
    echo 'Брокер `kafka1` все еще активен, пожалуйста остановите его: `docker compose pause kafka1`' >&2
    false
} || true

In [None]:
run_consumer || true

Ошибка сигнализирует, что брокер `kafka1` недоступен, а значит невозможно не только получить данные, но и даже войти в него, чтобы запустить консьюмер.

Восстановите брокеры:

In [None]:
docker compose unpause kafka1 kafka2 kafka3
docker compose ps kafka1 kafka2 kafka3

Необходимо дождаться, когда брокеры восстановятся:

In [None]:
log_file=$(HOST=$KAFKA_HOST execute ls /tmp/kraft-combined-logs/__cluster_metadata-0 | grep '\.log$')

kafka-metadata-shell --snapshot /tmp/kraft-combined-logs/__cluster_metadata-0/$log_file cat /brokers/{201,202,203}/isFenced 2> /dev/null |
    sort -u |
    grep -q -v "true" && {
        echo 'Все брокеры восстановили свою работу. Можно продолжать'
    } || {
        echo 'Не все брокеры восстановили свою работу. Пожалуйста, запустите `docker compose unpause kafka1 kafka2 kafka3`' >&2
        false
    }

Лидер реплики сейчас находятся на:

In [None]:
kafka-topics --bootstrap-server ${KAFKA_HOST}:${KAFKA_PORT} \
    --describe \
    --topic movies

### Изменение брокера для лидер партиции

Если лидер реплики обеих партиций находятся на одном брокере (например, `kafka1`), то брокер будет испытвать избыточную нагрузку при операциях чтения и записи. Для равноменого распределения нагрузки, можно перенести лидера одной из партиций на другой брокер:

In [None]:
kafka-leader-election --bootstrap-server ${KAFKA_HOST}:${KAFKA_PORT} \
    --election-type PREFERRED \
    --topic movies \
    --partition 1

In [None]:
kafka-topics --bootstrap-server ${KAFKA_HOST}:${KAFKA_PORT} \
    --describe \
    --topic movies

### Выводы

- консьюмер группе для работы достаточно всего одной живой реплики;
- кластер Apache Kafka автоматически выбирает новую лидер реплику, если лидер реплика находилась на брокере, который вышел из строя.

### Задание

1. Увеличить количество брокеров до пяти (нужно отредактировать `docker-compose.yml`);
1. Проверить, что кластер состоит из пяти брокеров;
1. Найти брокер, на котором находится лидер реплика партиции 1 топика `movies`;
1. Остановить брокер на котором находится лидер реплика партиции 1 топика `movies`;
1. Найти брокер, на котором находится лидер реплика партиции 1 топика `movies` в данный момент;
1. Восстановить работу остановленного брокера;
1. Остановить все три брокера, на котором находятся реплики партиции `0` топика `movies`;
1. Запустить консольный консьюмер (`run_consumer`), доступны ли данные? Почему?