# Kafka

## Основные понятия

**Topic** - все сообщения в Kafka приходят в определенный топик, по сути это простой способ организовать и сгруппировать поток сообытий. Каждый топик имеет уникальное имя. 

**Producers (производитель)**  - компонент системы, клиентское приложение, которое генерирует сообщения и посылает их в определенный топик. 

**Consumers (потребитель)** - компонент системы, клиентское приложение, которое получает новые сообщения из определенного топика или набора топиков. 

**Partition** - топик, как логическое понятие, неразделим. Но физически он может состоять из нескольких партиций, которые физически хранятся на нескольких узлах кластера. Когда сообщение приходит в топик, оно физически записывается в только одну партицию.  
<img src="https://kafka.apache.org/images/streams-and-tables-p1_p4.png" alt="kafka" style="width:500px;"/>

**Consumer group** - это набор потребителей, которые кооперируются для получения данных из определенного топика. Все партиции топика разделяются между членами группы. По мере входа новых членов группы и ухода старых, партиции перераспределяются так, чтобы каждый член получал пропорциональную долю партиций для чтения. Этот процесс называется ребалансировкой группы.

**At-least-once** - гарантируется, что сообщения никогда не теряются, но могут быть доставлены повторно. Если  приложение потоковой обработки падает, то некоторые сообщения могут быть посланы повторно и, следовательно, повторно обработаны. Семантика «хотя бы один раз» включена по умолчанию.

**Exactly-once** - сообщения обрабатываются строго один раз. Подобная семантика включается опционально. 

Запуск в `Docker`:

```bash
docker compose -f docker/docker-compose-kafka.yml up
```       

In [1]:
from confluent_kafka import Producer, Consumer, TopicPartition

In [2]:
TOPIC_NAME = "some_topic"

producer = Producer({
        "bootstrap.servers": "localhost:9092"
    })

for idx in range(0, 25):
    producer.produce(TOPIC_NAME, key=bytes(idx), value=b"Msg %d" % idx)
    producer.flush()

In [3]:
consumer = Consumer({
    "bootstrap.servers": "localhost:9092",
    "group.id": "group1",
    "auto.offset.reset": "earliest"
})

consumer.subscribe([TOPIC_NAME])

# tp = TopicPartition(topic=TOPIC_NAME, partition=0, offset=0)
# consumer.assign([tp])
# consumer.seek(tp)

for _ in range(25):
    msg = consumer.consume(num_messages=1, timeout=1.0)
    if len(msg) > 0:
        print(msg[0].value()) 

consumer.close()

b'Msg 0'
b'Msg 1'
b'Msg 2'
b'Msg 3'
b'Msg 4'
b'Msg 5'
b'Msg 6'
b'Msg 7'
b'Msg 8'
b'Msg 9'
b'Msg 10'
b'Msg 11'
b'Msg 12'
b'Msg 13'
b'Msg 14'
b'Msg 15'
b'Msg 16'
b'Msg 17'
b'Msg 18'
b'Msg 19'


%3|1726086499.511|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1726086500.515|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
%3|1726086530.698|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT, 30 identical error(s) suppressed)
%3|1726086560.754|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT, 30 identical error(s) suppressed)
%3|1726086590.793|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CON

### Kafka Streams

Надстройка (клиент), который позволяет разрабатывать приложения, которые удобно манипулируют данными в топиках `Kafka`. Есть `JVM API`  и специальный `SQL`-подобный язык, который работает с сервером `ksqlDB`.

Воспользуемся `ksqlDB` клиентом для `Python`. Он плохо поддерживается, поэтому несовместим без определенных изменений с последними версиями `CPython`.

In [4]:
# Необходимые патчи, чтобы вернуть работоспособность библиотеке 
import sys
import json

if sys.version_info.minor > 9:
    import collections
    collections.Iterable = collections.abc.Iterable
    collections.Mapping = collections.abc.Mapping
    collections.MutableSet = collections.abc.MutableSet
    collections.MutableMapping = collections.abc.MutableMapping

from ksql import KSQLAPI

# https://github.com/bryanyang0528/ksql-python/issues/80
class KsqlApiCustom(KSQLAPI):
    def __init__(self, url, max_retries=3, check_version=True, **kwargs):
        super().__init__(url, max_retries=max_retries,
                         check_version=check_version, **kwargs)
        self.sa._raise_for_status = self._raise_for_status

    @staticmethod
    def _raise_for_status(r, response):
        r_json = json.loads(response)
        if r.getcode() != 200:
            if r_json.get("@type") == "statement_error" or r_json.get("@type") == "generic_error":
                error_message = r_json["message"]
                error_code = r_json["error_code"]
                stackTrace = r_json["stack_trace"]
                raise KSQLError(error_message, error_code, stackTrace)
            else:
                raise KSQLError("Unknown Error: {}".format(r.content))
        else:
            # seems to be the old API behavior, so some errors have status 200, bug??
            if r_json and r_json[0]["@type"] == "currentStatus" and r_json[0]["commandStatus"]["status"] == "ERROR":
                error_message = r_json[0]["commandStatus"]["message"]
                error_code = None
                stackTrace = None
                raise KSQLError(error_message, error_code, stackTrace)
            return True


Создаем клиента и соединяемся с сервером `ksqlDB`

In [17]:
client = KsqlApiCustom('http://127.0.0.1:8788')

Создадим `Stream` - непрерывный, неограниченный поток данных. Каждый элемент потока - пара ключ/значение. Представляет собой обертку над топиками kafka.

In [11]:
client.ksql(
    """
    CREATE STREAM rider_locations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)
    WITH (kafka_topic='locations', value_format='json', partitions=1)
    """
)

[{'@type': 'currentStatus',
  'statementText': "CREATE STREAM RIDER_LOCATIONS (PROFILEID STRING, LATITUDE DOUBLE, LONGITUDE DOUBLE) WITH (KAFKA_TOPIC='locations', KEY_FORMAT='KAFKA', PARTITIONS=1, VALUE_FORMAT='JSON');",
  'commandId': 'stream/`RIDER_LOCATIONS`/create',
  'commandStatus': {'status': 'SUCCESS',
   'message': 'Stream created',
   'queryId': None},
  'commandSequenceNumber': 0,

Создадим две "таблицы", которые представляет из себя слепок состояния `Stream`'а, определяемым по некоторым правилам

In [19]:
client.ksql(
    """
    CREATE TABLE current_location AS
        SELECT profileId,
                LATEST_BY_OFFSET(latitude) AS la,
                LATEST_BY_OFFSET(longitude) AS lo
        FROM rider_locations
        GROUP BY profileId
    """
)

client.ksql(
    """
    CREATE TABLE riders_near_mountain_view AS
        SELECT ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1) AS distanceInMiles,
                COLLECT_LIST(profileId) AS riders,
                COUNT(*) AS count
        FROM current_location
        GROUP BY ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1)
    """
)      

[{'@type': 'currentStatus',
  'statementText': "CREATE TABLE RIDERS_NEAR_MOUNTAIN_VIEW WITH (KAFKA_TOPIC='RIDERS_NEAR_MOUNTAIN_VIEW', PARTITIONS=1, REPLICAS=1) AS SELECT\n  ROUND(GEO_DISTANCE(CURRENT_LOCATION.LA, CURRENT_LOCATION.LO, 37.4133, -122.1162), -1) DISTANCEINMILES,\n  COLLECT_LIST(CURRENT_LOCATION.PROFILEID) RIDERS,\n  COUNT(*) COUNT\nFROM CURRENT_LOCATION CURRENT_LOCATION\nGROUP BY ROUND(GEO_DISTANCE(CURRENT_LOCATION.LA, CURRENT_LOCATION.LO, 37.4133, -122.1162), -1)\nEMIT CHANGES;",
  'commandId': 'table/`RIDERS_NEAR_MOUNTAIN_VIEW`/create',
  'commandStatus': {'status': 'SUCCESS',
   'message': 'Created query with ID CTAS_RIDERS_NEAR_MOUNTAIN_VIEW_3',
   'queryId': 'CTAS_RIDERS_NEAR_MOUNTAIN_VIEW_3'},
  'commandSequenceNumber': 4,

Запишем в `Stream` данные с помощью функции `INSERT` (физически данные добавятся в топик `Kafka`)

In [21]:
import threading
import time
    

def insert_in_thread():
    time.sleep(10)
    client.ksql(
        """
        INSERT INTO rider_locations (profileId, latitude, longitude) VALUES ('c2309eec', 37.7877, -122.4205);
        INSERT INTO rider_locations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643);
        INSERT INTO rider_locations (profileId, latitude, longitude) VALUES ('4ab5cbad', 37.3952, -122.0813);
        INSERT INTO rider_locations (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813);
        INSERT INTO rider_locations (profileId, latitude, longitude) VALUES ('4a7c7b41', 37.4049, -122.0822);
        INSERT INTO rider_locations (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011)
        """
    )

thread = threading.Thread(target=insert_in_thread)    
thread.start()

Запрос типа `PUSH` (c `EMIT CHANGES`) работает бесконечно, генерируя обновления таблицы

In [14]:
res = client.query(
    """
    SELECT * FROM rider_locations
        WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 
        EMIT CHANGES
    """, return_objects=True
)

print(next(res))
print(next(res))
print(next(res))

{'PROFILEID': '4ab5cbad', 'LATITUDE': 37.3952, 'LONGITUDE': -122.0813}
{'PROFILEID': '8b6eae59', 'LATITUDE': 37.3944, 'LONGITUDE': -122.0813}
{'PROFILEID': '4a7c7b41', 'LATITUDE': 37.4049, 'LONGITUDE': -122.0822}


Запрос вида `PULL` возвращает текущее состояние таблицы. 

In [22]:
res = client.query(
    """
    SELECT * from riders_near_mountain_view WHERE distanceInMiles <= 10
    """, return_objects=True, use_http2=True
)

next(res)
for x in res:
    print(x)

[0.0,["4ab5cbad","8b6eae59","4a7c7b41"],3]

[10.0,["18f4ea86"],1]


