In [74]:
import json
import uuid
from typing import Final

from kafka.admin import KafkaAdminClient, NewTopic
from pydantic import ConfigDict
from pydantic_settings import BaseSettings
from typing import Dict
from faker import Faker
from kafka import KafkaProducer
from kafka import KafkaConsumer

Admin

In [18]:
class TopicConfig(BaseSettings):
    topic_name: str
    num_partitions: int
    replication_factor: int
    topic_configs: Dict[str, str]
    model_config = ConfigDict(strict=True)

In [19]:
def get_topic_json_config(path_to_json: str) -> dict:
    with open(path_to_json, "r") as fp:
        index_mapping = json.load(fp)
    return index_mapping

In [20]:
TOPIC_LIST: Final[list] = ["click_events", "player_settings_events", "player_progress"]


class KafkaInit:
    def __init__(self):
        self.topics_list = []
        self.admin_client = KafkaAdminClient(
            bootstrap_servers="localhost:9094",
            client_id='test'
        )

    def append_topic(self, topic_config: TopicConfig):
        if topic_config.topic_name not in self.admin_client.list_topics():
            self.topics_list.append(
                NewTopic(
                    name=topic_config.topic_name,
                    num_partitions=topic_config.num_partitions,
                    replication_factor=topic_config.replication_factor,
                    topic_configs=topic_config.topic_configs,
                )
            )

    def create_topics(self):
        for this_topic in TOPIC_LIST:
            try:
                json_config = get_topic_json_config(f"topics_config/{this_topic}.json")
                topic_config = TopicConfig(**json_config)
                self.append_topic(topic_config=topic_config)
            except Exception:
                ...
        try:
            self.admin_client.create_topics(new_topics=self.topics_list, validate_only=False)
            print("Done!")
        except Exception:
            ...


def get_kafka_init() -> KafkaInit:
    return KafkaInit()

In [21]:
KafkaInit().create_topics()

Done!


Kafka Schemas

In [75]:
from enum import Enum
from pydantic import Field, field_validator, model_validator
import datetime
from uuid import UUID

class EventsNames(Enum):
    change_resolution_to_480 = "change_resolution_to_480"
    change_resolution_to_720 = "change_resolution_to_720"
    change_resolution_to_1080 = "change_resolution_to_1080"
    change_resolution_to_1440 = "change_resolution_to_1440"
    change_resolution_to_2160 = "change_resolution_to_2160"

class KafkaModelConfig(BaseSettings):
    model_config = ConfigDict(populate_by_name=True)

class PlayerProgress(KafkaModelConfig):
    user_id: UUID = Field(description="UUID пользователя")
    movie_id: UUID = Field(description="UUID произведения")
    event_dt: datetime.datetime = Field(default_factory=datetime.datetime.utcnow, description="Время события")
    view_progress: int = Field(description="Прогресс просмотра произведения в секундах")
    movie_duration: int = Field(description="Продолжительность произведения в секундах")
    
    @model_validator(mode='after')
    def compare_duration_and_view(self):
        if self.view_progress > self.movie_duration:
            raise ValueError("view_progress is larger than movie_duration")
        return self
    
    
class PlayerSettingEvents(KafkaModelConfig):
    user_id: UUID = Field(description="UUID пользователя")
    movie_id: UUID = Field(description="UUID произведения")
    event_dt: datetime.datetime = Field(default_factory=datetime.datetime.utcnow, description="Время события")
    event_type: int = Field(description="Тип события")
    
class ClickEvent(KafkaModelConfig):
    user_id: UUID = Field(description="UUID пользователя")
    movie_id: UUID = Field(description="UUID произведения")
    current_url: str = Field(default_factory=datetime.datetime.utcnow, description="Время события")
    destination_url: str = Field(description="Тип события")


Producer

In [76]:
class Producer:
    def __init__(self):
        self.producer = KafkaProducer(bootstrap_servers=['localhost:9094'], acks="all")
        
    def produce_message(self,  topic_name: str, message: KafkaModelConfig):
        # TODO: Посмотреть нужно ли записывать во все партиции
        self.producer.send(
            topic=topic_name,
            value=message.model_dump_json().encode("utf-8"),
        )
        self.producer.flush()
        
def get_producer() -> Producer:
    return Producer()

In [77]:
fake = Faker()
producer = get_producer()

for i in range(10_000_000):
    duration = fake.pyint(min_value=1000, max_value=3*1000)
    view_progress = fake.pyint(min_value=1, max_value=duration)
    this_message = PlayerProgress(
        user_id=fake.uuid4(),
        movie_id=fake.uuid4(),
        event_dt=fake.date_time(),
        view_progress=view_progress,
        movie_duration=duration,
    )
    producer.produce_message(
        topic_name='player_progress',
        message=this_message,
    )

KeyboardInterrupt: 

In [78]:
for i in range(10_000_000):
    duration = fake.pyint(min_value=1000, max_value=3*1000)
    view_progress = fake.pyint(min_value=1, max_value=duration)
    this_message = ClickEvent(
        user_id=fake.uuid4(),
        movie_id=fake.uuid4(),
        current_url=fake.url(),
        destination_url=fake.url(),
    )
    producer.produce_message(
        topic_name='click_events',
        message=this_message,
    )

KeyboardInterrupt: 

In [79]:
for i in range(10_000_000):
    duration = fake.pyint(min_value=1000, max_value=3*1000)
    view_progress = fake.pyint(min_value=1, max_value=duration)
    this_message = PlayerSettingEvents(
        user_id=fake.uuid4(),
        movie_id=fake.uuid4(),
        event_dt=fake.date_time(),
        event_type=fake.enum(EventsNames),
    )
    producer.produce_message(
        topic_name='player_settings_events',
        message=this_message,
    )

KeyboardInterrupt: 

 Consumer

In [25]:
consumer = KafkaConsumer(
    'player_progress',
    bootstrap_servers=['localhost:9094'],
    auto_offset_reset='earliest',
    group_id='echo-messages-to-stdout',
)

for message in consumer:
    print(message.value)

NameError: name 'KafkaConsumer' is not defined

In [71]:
fake.enum(EventsNames).value

'change_resolution_to_720'