# ДОМАШНЕЕ ЗАДАНИЕ № 3. Обработка потоков данных посредством Spark Streaming
---
**Дисциплина:** Методы обработки больших данных  
**Студент:** Голдышев Д.М. (goldyshev02@mail.ru)  
**Группа:** ИУ6-31М  

## Задание 1. Подсчет количества сообщений в телеграм-канале
---
### Цель
Реализовать программу для подсчёта сообщений пользователей из Telegram-каналов с использованием **Spark Structured Streaming** и **Apache Kafka**.

### Задачи

1. Подключиться к указанным Telegram-каналам и читать новые сообщения
2. Публиковать сообщения в Kafka-топик
3. Читать поток из Kafka в Spark Structured Streaming
4. Подсчитывать количество сообщений каждого пользователя:
    - За 1 минуту (tumbling window)
    - За 10 минут с шагом 30 секунд (sliding window)
5. Выводить результат, отсортированный по убыванию количества сообщений
6. Формат вывода: `username, количество`

#### Telegram-каналы (id)
`1050820672, 1149896996, 1101170442, 1036362176, 1310155678, 1001872252, 1054549314, 1073571855`

## Шаг 1. Импорт зависимостей и конфигурация
---

In [1]:
import os
import sys
import json
import time
import shutil
import asyncio
import tempfile

from pathlib import Path
from datetime import datetime, timezone
from dataclasses import dataclass, asdict
from typing import Optional, List

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, LongType, TimestampType
)

# Kafka
KAFKA_BOOTSTRAP_SERVERS = os.environ.get("KAFKA_BOOTSTRAP_SERVERS", "kafka:9092")
KAFKA_TOPIC = "telegram_messages"

# Telegram-каналы для мониторинга - эти идентификаторы будут использоваться только для
# собственного генератора сообщений (т.к. сейчас по таким ID невозможно найти TG-каналы),
# но в Telegram Producer используются работающие идентификаторы TG-каналов
CHANNEL_IDS = [
    1050820672,
    1149896996,
    1101170442,
    1036362176,
    1310155678,
    1001872252,
    1054549314,
    1073571855,
]

# Директория для хранения временных файлов (чекпоинтов, Telegram-сессии)
TMP_DIR = "./tmp"
# Директория для хранения чекпоинтов
CHECKPOINT_DIR = f"{TMP_DIR}/checkpoints"

# Окно 1: tumbling window (неперекрывающееся)
WINDOW_1_DURATION = "1 minute"
# Окно 2: sliding window (скользящее)
WINDOW_2_DURATION = "10 minutes"
WINDOW_2_SLIDE = "30 seconds"

# Максимальное допустимое опоздание сообщений
WATERMARK_DELAY = "30 seconds"

# Путь к скрипту с Telegram Producer
PRODUCER_SCRIPT_PATH = "./telegram_producer.py"

### Проверка существования скрипта

In [2]:
def check_file(path: str):
    if not os.path.isfile(path):
        raise FileNotFoundError(f"{path}: файл не найден")
    try:
        with open(path, "r", encoding="utf-8") as f:
            f.read(1)  # пробное чтение
    except Exception as e:
        raise IOError(f"{path}: файл существует, но не читается: {e}")

try:
    check_file(PRODUCER_SCRIPT_PATH)
    print("Файлы, необходимые для работы, найдены и читаются.")
except Exception as e:
    print("Ошибка при проверке файлов:")
    print(e)
    raise SystemExit(1)

Файлы, необходимые для работы, найдены и читаются.


### Проверка возможности сохранения файлов

In [3]:
def ensure_output_path(path: str, clear_dir: bool) -> Path:
    p = Path(path).expanduser()
    if p.exists() and p.is_dir():
        dir_path = p
    else:
        if str(p).endswith(os.sep) or (not p.exists() and p.suffix == ""):
            dir_path = p
        else:
            dir_path = p.parent
    if not str(dir_path):
        dir_path = Path(".")
    try:
        # Очищаем директорию, если указано
        if clear_dir and os.path.exists(dir_path):
            shutil.rmtree(dir_path)
        dir_path.mkdir(parents=True, exist_ok=True)
    except PermissionError as e:
        raise PermissionError(f"{dir_path}: нет прав на создание директории: {e}")
    except OSError as e:
        raise OSError(f"{dir_path}: не удалось создать директорию: {e}")
    if not os.access(dir_path, os.W_OK):
        raise PermissionError(f"{dir_path}: нет прав на запись в директорию")
    try:
        with tempfile.NamedTemporaryFile(dir=dir_path, delete=True) as tmp:
            tmp.write(b"test")
            tmp.flush()
    except OSError as e:
        raise OSError(f"{dir_path}: невозможно записать временный файл: {e}")
    return dir_path

try:
    ensure_output_path(TMP_DIR, False)
    ensure_output_path(CHECKPOINT_DIR, False)
    print(f"Можно сохранять результаты в указанные директории.")
except Exception as e:
    print("Ошибка при проверке директорий для записи результатов:")
    print(e)
    raise SystemExit(1)

Можно сохранять результаты в указанные директории.


## Шаг 2. Схема сообщения и валидация
---
Используем `dataclass` для типизации и валидации структуры сообщений.
Это обеспечивает:
- Автодокументацию полей
- Проверку типов при создании объекта
- Удобную сериализацию в JSON через `asdict()`

In [4]:
@dataclass
class TelegramMessage:
    """
    Структура сообщения из Telegram-канала.
    Поля соответствуют данным, которые можно извлечь из события NewMessage в Telethon.
    """
    event_time: str              # ISO формат: "2025-12-06T21:30:45.123456"
    channel_id: int              # ID канала
    channel_name: Optional[str]  # Название канала
    message_id: int              # ID сообщения в канале
    user_id: Optional[int]       # ID отправителя (None для анонимных)
    username: Optional[str]      # @username отправителя
    first_name: Optional[str]    # Имя отправителя
    last_name: Optional[str]     # Фамилия отправителя
    text: str                    # Текст сообщения
    text_length: int             # Длина текста

    def to_json(self) -> str:
        """
        Сериализация в JSON-строку (UTF-8).
        Используется для отправки в Kafka.
        """
        return json.dumps(asdict(self), ensure_ascii=False)

    @classmethod
    def from_dict(cls, data: dict) -> "TelegramMessage":
        """
        Создание объекта из словаря.
        """
        return cls(**data)


def validate_message(msg: TelegramMessage) -> bool:
    """
    Проверка валидности сообщения перед отправкой в Kafka.
    Логика:
      - event_time должен быть непустой строкой
      - channel_id должен быть положительным числом
      - message_id должен быть положительным числом
    """
    if not msg.event_time or not isinstance(msg.event_time, str):
        return False
    if not isinstance(msg.channel_id, int) or msg.channel_id <= 0:
        return False
    if not isinstance(msg.message_id, int) or msg.message_id <= 0:
        return False
    return True

# Схема JSON-сообщения для Spark, соответствующая TelegramMessage
message_schema = StructType([
    StructField("event_time", StringType(), True),
    StructField("channel_id", LongType(), True),
    StructField("channel_name", StringType(), True),
    StructField("message_id", LongType(), True),
    StructField("user_id", LongType(), True),
    StructField("username", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("text", StringType(), True),
    StructField("text_length", IntegerType(), True),
])

## Шаг 3. Проверка доступности Kafka
---

In [5]:
from kafka import KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import KafkaError, NoBrokersAvailable
import time

def check_kafka_connection(
    bootstrap_servers: str,
    max_retries: int = 5,
    retry_delay: float = 2.0
) -> bool:
    """
    Проверка соединения с Kafka-брокером.
    Args:
        bootstrap_servers: Адрес брокера ("kafka:9092")
        max_retries: Максимальное количество попыток
        retry_delay: Задержка между попытками (секунды)
    Returns:
        True если подключение успешно, False в противном случае
    """
    for attempt in range(1, max_retries + 1):
        try:
            print(f"Попытка подключения к Kafka ({attempt}/{max_retries})...")
            # Создаём временный producer для проверки соединения
            producer = KafkaProducer(
                bootstrap_servers=bootstrap_servers,
                request_timeout_ms=5000,
                api_version_auto_timeout_ms=5000,
            )
            # Если дошли сюда, то соединение успешно
            producer.close()
            print(f"+ Kafka доступна: {bootstrap_servers}")
            return True
        except NoBrokersAvailable as e:
            print(f"  Брокер недоступен: {e}")
            if attempt < max_retries:
                print(f"  Повторная попытка через {retry_delay} сек...")
                time.sleep(retry_delay)
        except KafkaError as e:
            print(f"  Ошибка Kafka: {e}")
            if attempt < max_retries:
                time.sleep(retry_delay)
    print("- Не удалось подключиться к Kafka")
    return False


def ensure_topic_exists(
    bootstrap_servers: str,
    topic_name: str,
    num_partitions: int = 2,
) -> bool:
    """
    Создание топика, если он не существует.
    Args:
        bootstrap_servers: Адрес брокера
        topic_name: Имя топика
        num_partitions: Количество партиций
    Returns:
        True при успехе, False при ошибке
    """
    try:
        admin_client = KafkaAdminClient(
            bootstrap_servers=bootstrap_servers,
            request_timeout_ms=10000,
        )
        # Получаем список существующих топиков
        existing_topics = admin_client.list_topics()
        if topic_name in existing_topics:
            print(f"+ Топик '{topic_name}' уже существует")
            admin_client.close()
            return True
        # Создаём новый топик
        topic = NewTopic(
            name=topic_name,
            num_partitions=num_partitions,
            replication_factor=1, # Kafka запущена только в одном контейнере
        )
        admin_client.create_topics([topic])
        print(f"+ Топик '{topic_name}' создан (partitions={num_partitions})")
        admin_client.close()
        return True
    except KafkaError as e:
        print(f"- Ошибка при работе с топиком: {e}")
        return False

# Выполняем проверки
kafka_ok = check_kafka_connection(KAFKA_BOOTSTRAP_SERVERS)
if kafka_ok:
    ensure_topic_exists(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC)
else:
    print("Возникли проблемы при проверке Kafka!")
    raise SystemExit(1)

Попытка подключения к Kafka (1/5)...
+ Kafka доступна: kafka:9092
+ Топик 'telegram_messages' уже существует


## Шаг 4.1 Telegram Producer
---
Ниже представлен код Telegram Producer-а, который **слушает сообщения из Telegram-каналов и отправляет их в Kafka**.  
Этот скрипт должен работать параллельно этому ноутбуку, поэтому он **запускается отдельным процессом**, не связанным с Jupyter Notebook.  
Сам скрипт находится в файле `telegram_producer.py`, а здесь он представлен только для "ознакомления".

```python
#!/usr/bin/env python3
import os
import sys
import json
import asyncio
import signal
from datetime import datetime, timezone
from dataclasses import dataclass, asdict
from typing import Optional, List, Dict

# Telethon для Telegram API
from telethon import TelegramClient, events
from telethon.tl.functions.channels import JoinChannelRequest, LeaveChannelRequest
from telethon.tl.types import Channel, User
from telethon.errors import ChannelPrivateError, FloodWaitError

# Kafka
from kafka import KafkaProducer
from kafka.errors import KafkaError

TELEGRAM_API_ID = os.environ.get("TELEGRAM_API_ID")
TELEGRAM_API_HASH = os.environ.get("TELEGRAM_API_HASH")
SESSION_NAME = "/app/hw3/tmp/telegram_session"  # Путь к файлу сессии

# Kafka
KAFKA_BOOTSTRAP_SERVERS = os.environ.get("KAFKA_BOOTSTRAP_SERVERS", "kafka:9092")
KAFKA_TOPIC = "telegram_messages"

# Каналы для мониторинга
CHANNEL_IDS = [
    "@toporlive",
    "@ecotopor",
    "@cybers",
    "@mosnews",
    "@moscowach",
    "@techmedia",
    "@exploitex",
    "@kreator",
    "@novosti_efir",
    "https://t.me/+ULjQI9CfYMI0OGQy"
]
# ID каналов из задания, но они не работают:
# CHANNEL_IDS = [
#     1050820672,
#     1149896996,
#     1101170442,
#     1036362176,
#     1310155678,
#     1001872252,
#     1054549314,
#     1073571855,
# ]

# Схема сообщения
@dataclass
class TelegramMessage:
    """
    Структура сообщения из Telegram-канала.
    Поля соответствуют данным, которые можно извлечь из события NewMessage в Telethon.
    """
    event_time: str              # ISO формат: "2025-12-06T21:30:45.123456"
    channel_id: int              # ID канала
    channel_name: Optional[str]  # Название канала
    message_id: int              # ID сообщения в канале
    user_id: Optional[int]       # ID отправителя (None для анонимных)
    username: Optional[str]      # @username отправителя
    first_name: Optional[str]    # Имя отправителя
    last_name: Optional[str]     # Фамилия отправителя
    text: str                    # Текст сообщения
    text_length: int             # Длина текста
    
    def to_json(self) -> str:
        return json.dumps(asdict(self), ensure_ascii=False)


class TelegramKafkaProducer:
    """Класс для получения сообщений из Telegram и отправки в Kafka."""

    def __init__(self):
        # Проверка наличия нужных данных
        if not TELEGRAM_API_ID or not TELEGRAM_API_HASH:
            raise ValueError("Не заданы TELEGRAM_API_ID и/или TELEGRAM_API_HASH.")

        # Telegram клиент
        self.client = TelegramClient(
            SESSION_NAME,
            int(TELEGRAM_API_ID),
            TELEGRAM_API_HASH
        )

        # Kafka producer
        # acks='all' — ждём подтверждения от всех реплик
        # value_serializer — автоматическая сериализация в байты
        self.kafka_producer = KafkaProducer(
            bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
            acks='all',
            retries=3,
            value_serializer=lambda v: v.encode('utf-8'),
            key_serializer=lambda k: str(k).encode('utf-8') if k else None,
        )

        # Кэш названий каналов (id -> name)
        self.channel_names: Dict[int, str] = {}

        # Флаг для корректной остановки
        self.running = False

    async def get_channel_name(self, channel_id: int) -> str:
        """
        Получение названия канала по ID.
        (если канал недоступен — возвращает "Unknown")
        """
        if channel_id in self.channel_names:
            return self.channel_names[channel_id]
        try:
            entity = await self.client.get_entity(channel_id)
            name = getattr(entity, 'title', None) or getattr(entity, 'username', None) or "Unknown"
            self.channel_names[channel_id] = name
            return name
        except Exception as e:
            print(f"  Не удалось получить название канала {channel_id}: {e}")
            return "Unknown"

    async def join_channels(self) -> List[int]:
        """Подключение к заданным каналам."""
        joined = []
        for channel_id in CHANNEL_IDS:
            try:
                print(f"Подключение к каналу {channel_id}...")
                # Получаем entity канала
                channel = await self.client.get_entity(channel_id)
                # Присоединяемся (если ещё не участник)
                await self.client(JoinChannelRequest(channel))
                # Получаем и кэшируем название
                name = await self.get_channel_name(channel_id)
                print(f"  + Подключено: {name}")
                joined.append(channel_id)
            except ChannelPrivateError:
                print(f"  - Канал {channel_id} приватный")
            except FloodWaitError as e:
                print(f"  ? Rate limit, ожидание {e.seconds} сек...")
                await asyncio.sleep(e.seconds)
            except Exception as e:
                print(f"  - Ошибка подключения к {channel_id}: {e}")
        return joined

    async def handle_new_message(self, event):
        """Обработчик нового сообщения."""
        try:
            message = event.message
            chat = await event.get_chat()
            sender = await event.get_sender()

            # Извлекаем данные отправителя
            user_id = None
            username = None
            first_name = None
            last_name = None
            if sender:
                user_id = sender.id
                username = getattr(sender, 'username', None)
                first_name = getattr(sender, 'first_name', None)
                last_name = getattr(sender, 'last_name', None)
            # ID канала
            channel_id = chat.id
            if hasattr(chat, 'id'):
                # Telegram использует отрицательные ID для каналов в некоторых контекстах
                channel_id = abs(chat.id)
            # Название канала
            channel_name = await self.get_channel_name(channel_id)
            # Текст сообщения
            text = message.text or ""

            # Создаём объект сообщения
            msg = TelegramMessage(
                event_time=datetime.now(timezone.utc).isoformat(),
                channel_id=channel_id,
                channel_name=channel_name,
                message_id=message.id,
                user_id=user_id,
                username=username or f"user_{user_id}" if user_id else "anonymous",
                first_name=first_name,
                last_name=last_name,
                text=text,
                text_length=len(text),
            )

            # Отправляем в Kafka
            self.kafka_producer.send(
                KAFKA_TOPIC,
                key=str(user_id) if user_id else None, # для партиционирования по пользователю
                value=msg.to_json()
            )

            # Логируем
            display_name = username or first_name or f"user_{user_id}" or "anonymous"
            text_preview = text[:50] + "..." if len(text) > 50 else text
            print(f"[{channel_name}] {display_name}: {text_preview}")
        except Exception as e:
            print(f"Ошибка обработки сообщения: {e}")

    async def start(self):
        """Инициализация и запуск producer."""
        # Подключаемся к Telegram
        print("\nПодключение к Telegram API...")
        await self.client.start()
        me = await self.client.get_me()
        print(f"+ Авторизован как: {me.username or me.first_name}")

        # Подключаемся к каналам
        print("Подключение к каналам...")
        joined = await self.join_channels()
        print(f"+ Подключено каналов: {len(joined)}/{len(CHANNEL_IDS)}")

        if not joined:
            print("- Не удалось подключиться ни к одному каналу!")
            return False

        # Регистрируем обработчик новых сообщений
        @self.client.on(events.NewMessage(chats=joined))
        async def handler(event):
            await self.handle_new_message(event)

        print("=" * 60)
        print("Ожидание сообщений...")
        print("=" * 60)
        self.running = True
        return True

    async def run(self):
        """Запуск бесконечного цикла обработки сообщений."""
        if await self.start():
            await self.client.run_until_disconnected()

    def stop(self):
        """Корректное завершение работы."""
        print("Остановка producer...")
        self.running = False
        self.kafka_producer.flush()
        self.kafka_producer.close()
        print("+ Kafka producer закрыт")


def main():
    producer = TelegramKafkaProducer()
    # Обработка Ctrl+C
    def signal_handler(sig, frame):
        producer.stop()
        sys.exit(0)
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)
    
    # Запуск
    with producer.client:
        producer.client.loop.run_until_complete(producer.run())


if __name__ == "__main__":
    main()
```

#### ⚠️⚠️⚠️ ПРОБЛЕМЫ ⚠️⚠️⚠️
1) Версия `telethon`, указанная в задании (1.24.0), сильно устарела, т.к. при попытке авторизации возникает ошибка `UPDATE_APP_TO_LOGIN`. В итоге было решено использовать версию `telethon>=1.34.0`.
2) Основная проблема заключается в том, что представленные идентификаторы Telegram-каналов (`1050820672, 1149896996, ...`) бесполезны. Эти числа – это внутренние channel_id в MTProto. Поэтому было решено использовать другие Telegram-каналы.

## Шаг 4.2 Генератор тестовых сообщений
---
**Для отладки** Spark-части, а также **для решения проблемы с невалидными идентификаторами каналов** (см. шаг 4.1) используем генератор фейковых сообщений.  
Он **создаёт случайные сообщения** и **отправляет их в Kafka** с заданным интервалом.

In [6]:
import random
import time
import threading

# Тестовые данные
FAKE_USERNAMES = [
    "alice", "bob", "charlie", "diana", "eve",
    "frank", "grace", "henry", "iris", "jack"
]
FAKE_CHANNEL_NAMES = [
    "Tech News", "Python Community", "Data Science Hub",
    "ML Updates", "Spark Users", "Big Data Talk"
]
FAKE_MESSAGES = [
    "Привет всем!",
    "Интересная статья",
    "Кто-нибудь пробовал?",
    "Отличный пример кода",
    "Спасибо за информацию",
    "А как насчёт производительности?",
    "Подскажите, пожалуйста",
    "Согласен с предыдущим оратором",
    "Это работает!",
    "Нужна помощь с настройкой",
]


def generate_fake_message() -> TelegramMessage:
    """Генерация случайного тестового сообщения."""
    username = random.choice(FAKE_USERNAMES)
    user_id = hash(username) % 1000000 + 100000  # Псевдослучайный ID
    channel_id = random.choice(CHANNEL_IDS)
    channel_name = random.choice(FAKE_CHANNEL_NAMES)
    text = random.choice(FAKE_MESSAGES)
    return TelegramMessage(
        event_time=datetime.now(timezone.utc).isoformat(),
        channel_id=channel_id,
        channel_name=channel_name,
        message_id=random.randint(1, 100000),
        user_id=user_id,
        username=username,
        first_name=username.capitalize(),
        last_name=None,
        text=text,
        text_length=len(text),
    )


class FakeMessageGenerator:
    """
    Генератор тестовых сообщений для Kafka.
    Работает в отдельном потоке, генерирует сообщения с заданным интервалом.
    Использование:
        generator = FakeMessageGenerator(interval=1.0)
        generator.start()
        # ... тестируем Spark ...
        generator.stop()
    """

    def __init__(
        self,
        bootstrap_servers: str = KAFKA_BOOTSTRAP_SERVERS,
        topic: str = KAFKA_TOPIC,
        interval: float = 1.0,  # Интервал между сообщениями (секунды)
        batch_size: int = 1,    # Сообщений за один интервал
    ):
        self.bootstrap_servers = bootstrap_servers
        self.topic = topic
        self.interval = interval
        self.batch_size = batch_size
        self.running = False
        self.thread = None
        self.message_count = 0
        self.producer = None

    def _producer_loop(self):
        """Внутренний цикл генерации сообщений."""
        self.producer = KafkaProducer(
            bootstrap_servers=self.bootstrap_servers,
            value_serializer=lambda v: v.encode('utf-8'),
            key_serializer=lambda k: str(k).encode('utf-8') if k else None,
        )
        print(f"Генератор запущен (интервал: {self.interval} сек, batch: {self.batch_size})")
        while self.running:
            for _ in range(self.batch_size):
                msg = generate_fake_message()
                self.producer.send(
                    self.topic,
                    key=str(msg.user_id),
                    value=msg.to_json(),
                )
                self.message_count += 1
                # # Логируем каждое 10-е сообщение
                # if self.message_count % 10 == 0:
                #     print(f"  Отправлено сообщений: {self.message_count}")
            self.producer.flush()
            time.sleep(self.interval)
        self.producer.close()

    def start(self):
        """Запуск генератора в отдельном потоке."""
        if self.running:
            print("Генератор уже запущен")
            return

        self.running = True
        self.thread = threading.Thread(target=self._producer_loop, daemon=True)
        self.thread.start()
        print(f"Генератор тестовых сообщений запущен")
        print(f"  Topic: {self.topic}")
        print(f"  Интервал: {self.interval} сек")

    def stop(self):
        """Остановка генератора."""
        if not self.running:
            print("Генератор не запущен")
            return
        self.running = False
        if self.thread:
            self.thread.join(timeout=2.0)
        print(f"Генератор остановлен. Отправлено сообщений: {self.message_count}")


# Создаём экземпляр генератора
fake_generator = FakeMessageGenerator(
    interval=0.5,   # Каждые 0.5 секунды
    batch_size=3,   # По 3 сообщения за раз
)

## Шаг 5. Инициализация SparkSession

In [7]:
spark = (
    SparkSession.builder
        .appName("HW3_Spark_Streaming")
        .master("local[*]")
        # Корректное завершение
        .config("spark.streaming.stopGracefullyOnShutdown", "true")
        # Используем явную схему
        .config("spark.sql.streaming.schemaInference", "false")
        .getOrCreate()
    # Количество партиций при shuffle и настройки памяти указаны на уровне Docker-контейнера
)
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/10 21:09:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/12/10 21:09:44 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/12/10 21:09:44 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


## Шаг 6. Чтение потока из Kafka
---
Читаем сообщения из Kafka-топика как DataFrame.  
Kafka возвращает следующие колонки (которые нужно распарсить):
- `key`: бинарный ключ сообщения;
- `value`: бинарное значение (наш JSON);
- `topic`: имя топика;
- `partition`: номер партиции;
- `offset`: смещение в партиции;
- `timestamp`: время записи в Kafka;
- `timestampType`: тип временной метки.

In [8]:
# Читаем сырой поток из Kafka
raw_stream = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)
        .option("subscribe", KAFKA_TOPIC)
        .option("startingOffsets", "latest")  # Только новые сообщения
        .option("failOnDataLoss", "false")    # Не падаем при потере
        .option("maxOffsetsPerTrigger", 1000) # Макс. сообщений за батч
        .load()
)

# Преобразуем данные:
parsed_stream = (
    raw_stream
        # Декодируем значение из байтов в строку
        .selectExpr("CAST(value AS STRING) as json_value")
        # Парсим JSON согласно схеме
        .select(F.from_json(F.col("json_value"), message_schema).alias("data"))
        # Разворачиваем struct в отдельные колонки
        .select("data.*")
        # Преобразуем event_time в timestamp
        .withColumn(
            "event_time",
            F.to_timestamp(F.col("event_time"))
        )
        # Фильтруем записи с некорректным event_time
        .filter(F.col("event_time").isNotNull())
)
# Добавляем watermark:
# - определяет максимальное допустимое опоздание сообщений;
# - сообщения, опоздавшие более чем на WATERMARK_DELAY, будут отброшены;
# - необходимо для корректной работы оконных агрегаций.
kafka_stream = parsed_stream.withWatermark("event_time", WATERMARK_DELAY)
kafka_stream.printSchema()

root
 |-- event_time: timestamp (nullable = true)
 |-- channel_id: long (nullable = true)
 |-- channel_name: string (nullable = true)
 |-- message_id: long (nullable = true)
 |-- user_id: long (nullable = true)
 |-- username: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- text: string (nullable = true)
 |-- text_length: integer (nullable = true)



## Шаг 7. Оконные агрегации
---
Вычисляем количество сообщений для каждого пользователя в двух окнах:

1. **Tumbling Window (1 минута)**
   - Неперекрывающиеся окна;
   - Каждое сообщение попадает ровно в одно окно;
   - Результат: количество сообщений за каждую полную минуту.

2. **Sliding Window (10 минут / 30 секунд)**
   - Перекрывающиеся окна;
   - Каждое сообщение может попасть в несколько окон;
   - Результат: скользящее среднее за последние 10 минут, обновляется каждые 30 сек.

#### Сортировка в streaming
В Structured Streaming нельзя напрямую использовать `orderBy()` на всём потоке, потому что данные бесконечны.  
Мы используем `foreachBatch` для сортировки внутри каждого батча.

In [9]:
def create_window_aggregation(
    stream: "DataFrame",
    window_duration: str,
    slide_duration: str = None,
    window_name: str = "window"
) -> "DataFrame":
    """
    Создание оконной агрегации для подсчёта сообщений по пользователям.
    Args:
        stream: Входной streaming DataFrame
        window_duration: Длительность окна ("1 minute", "10 minutes")
        slide_duration: Шаг скольжения
        window_name: Метка для идентификации окна в выводе
    Returns:
        Streaming DataFrame с колонками: window, username, count, window_type
    """
    # Создаём окно
    if slide_duration:
        # Sliding window (скользящее)
        window_col = F.window(F.col("event_time"), window_duration, slide_duration)
    else:
        # Tumbling window (неперекрывающееся)
        window_col = F.window(F.col("event_time"), window_duration)
    # Агрегация
    aggregated = (
        stream
            # Группируем по окну и username
            .groupBy(window_col, F.col("username"))
            # Считаем количество сообщений
            .agg(F.count("*").alias("message_count"))
            # Добавляем метку типа окна
            .withColumn("window_type", F.lit(window_name))
            # Форматируем окно для читаемости
            .withColumn("window_start", F.col("window.start"))
            .withColumn("window_end", F.col("window.end"))
    )
    return aggregated


# Создаём агрегацию для окна 1 минута (tumbling)
window_1min = create_window_aggregation(
    kafka_stream,
    window_duration=WINDOW_1_DURATION,
    slide_duration=None,
    window_name="1_min_tumbling"
)
# Создаём агрегацию для окна 10 минут с шагом 30 сек (sliding)
window_10min = create_window_aggregation(
    kafka_stream,
    window_duration=WINDOW_2_DURATION,
    slide_duration=WINDOW_2_SLIDE,
    window_name="10_min_sliding"
)

## Шаг 8. Вывод результатов
---

In [10]:
def print_separator():
    print("=" * 64)

def create_batch_processor(window_name: str):
    """
    Фабрика для создания обработчика микробатчей.
    Args:
        window_name: Название окна для вывода в лог
    Returns:
        Функция-обработчик для foreachBatch
    """
    def process_batch(batch_df, epoch_id):
        """
        Обработка одного микробатча.
        Args:
            batch_df: DataFrame с данными батча
            epoch_id: Номер микробатча
        """
        # Проверяем, есть ли данные
        if batch_df.isEmpty():
            return
        # Сортируем по убыванию количества сообщений
        sorted_df = (
            batch_df
            .select(
                F.col("window_start"),
                F.col("window_end"),
                F.col("username"),
                F.col("message_count")
            )
            .orderBy(F.col("message_count").desc())
        )
        # Выводим заголовок
        print_separator()
        print(f"[Epoch {epoch_id}] Window: {window_name}")
        print(f"Время: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print_separator()
        # Выводим таблицу
        sorted_df.show(truncate=False)
    return process_batch


def start_streaming_query(
    aggregated_stream: "DataFrame",
    window_name: str,
    checkpoint_suffix: str,
    trigger_interval: str = "30 seconds",
    output_mode: str = "update",
) -> "StreamingQuery":
    """
    Запуск обработки потока с сортировкой и выводом в консоль.
    Args:
        aggregated_stream: Streaming DataFrame с агрегациями
        window_name: Название для логов
        checkpoint_suffix: Суффикс для папки чекпоинтов
    Returns:
        StreamingQuery объект для управления потоком
    """
    # Путь для хранения состояния
    checkpoint_path = f"{CHECKPOINT_DIR}/{checkpoint_suffix}"
    query = (
        aggregated_stream
        .writeStream
        .outputMode(output_mode)
        .foreachBatch(create_batch_processor(window_name)) # Обрабатываем каждый микробатч отдельно
        .trigger(processingTime=trigger_interval)
        .option("checkpointLocation", checkpoint_path)
        .queryName(f"query_{checkpoint_suffix}")
        .start()
    )
    return query

## Шаг 9. Запуск Streaming
---

In [11]:
WAIT_1MIN_WINDOW = 3 * 60 # 3 минуты (с учетом watermark + trigger_interval)
WAIT_10MIN_WINDOW = 12 * 60 # 12 минут (с учетом watermark + trigger_interval)

# Отключаем предупреждения (чтобы не засорять вывод), оставляем только ошибки
spark.sparkContext.setLogLevel("ERROR")
    
def print_timestamp():
    print(f"[{datetime.now().strftime('%H:%M:%S')}] " + "=" * 53)    

def run_streaming():
    print_timestamp()
    print(f"Минутное окно (tumbling) – ожидание {WAIT_1MIN_WINDOW} секунд...")
    print_separator()
    # Запускаем query для окна 1 минута
    query_1min = start_streaming_query(
        window_1min,
        window_name="1_min_tumbling",
        checkpoint_suffix="window_1min",
        trigger_interval="30 seconds",
        # Выводим финальные результаты после закрытия окна
        output_mode="append"
    )
    time.sleep(WAIT_1MIN_WINDOW)
    query_1min.stop()
    print_separator()
    print("Минутное окно остановлено")
    print_separator()

    print("\n\n\n")
    print_timestamp()
    print(f"10-минутное окно (sliding) – ожидание {WAIT_10MIN_WINDOW} секунд...")
    print_separator()
    # Запускаем query для окна 10 минут
    query_10min = start_streaming_query(
        window_10min,
        window_name="10_min_sliding_30sec",
        checkpoint_suffix="window_10min",
        trigger_interval="30 seconds",
        output_mode="update"
    )
    time.sleep(WAIT_10MIN_WINDOW)
    query_10min.stop()
    print_separator()
    print("10-минутное окно остановлено")
    print_separator()

# Предполагаем, что Telegram Producer (telegram_producer.py) запущен
print("***** ЗАПУСК STREAMING ДЛЯ TELEGRAM PRODUCER-А *****")
run_streaming()

***** ЗАПУСК STREAMING ДЛЯ TELEGRAM PRODUCER-А *****
Минутное окно (tumbling) – ожидание 180 секунд...


                                                                                

[Epoch 4] Window: 1_min_tumbling
Время: 2025-12-10 21:12:30
+-------------------+-------------------+---------------+-------------+
|window_start       |window_end         |username       |message_count|
+-------------------+-------------------+---------------+-------------+
|2025-12-10 21:10:00|2025-12-10 21:11:00|user_2457046691|3            |
+-------------------+-------------------+---------------+-------------+

Минутное окно остановлено




10-минутное окно (sliding) – ожидание 720 секунд...


                                                                                

[Epoch 1] Window: 10_min_sliding_30sec
Время: 2025-12-10 21:13:00
+-------------------+-------------------+---------------+-------------+
|window_start       |window_end         |username       |message_count|
+-------------------+-------------------+---------------+-------------+
|2025-12-10 21:11:30|2025-12-10 21:21:30|user_2457046691|1            |
|2025-12-10 21:10:00|2025-12-10 21:20:00|user_2457046691|1            |
|2025-12-10 21:07:30|2025-12-10 21:17:30|user_2457046691|1            |
|2025-12-10 21:06:00|2025-12-10 21:16:00|user_2457046691|1            |
|2025-12-10 21:03:00|2025-12-10 21:13:00|user_2457046691|1            |
|2025-12-10 21:04:30|2025-12-10 21:14:30|user_2457046691|1            |
|2025-12-10 21:03:30|2025-12-10 21:13:30|user_2457046691|1            |
|2025-12-10 21:11:00|2025-12-10 21:21:00|user_2457046691|1            |
|2025-12-10 21:06:30|2025-12-10 21:16:30|user_2457046691|1            |
|2025-12-10 21:10:30|2025-12-10 21:20:30|user_2457046691|1            

### Читаем из генератора сообщений
Используем собственный генератор сообщений, чтобы отобразить результаты даже если:
- `Telegram Producer` выключен;
- или нет сообщений с указанных Telegram-каналов.

In [12]:
fake_generator.start()
time.sleep(3)  # Даём время накопить первые сообщения

print("***** ЗАПУСК STREAMING ДЛЯ ГЕНЕРАТОРА СООБЩЕНИЙ *****")
run_streaming()

fake_generator.stop()

Генератор тестовых сообщений запущен
  Topic: telegram_messages
  Интервал: 0.5 сек
Генератор запущен (интервал: 0.5 сек, batch: 3)
***** ЗАПУСК STREAMING ДЛЯ ГЕНЕРАТОРА СООБЩЕНИЙ *****
Минутное окно (tumbling) – ожидание 180 секунд...
[Epoch 6] Window: 1_min_tumbling
Время: 2025-12-10 21:25:00
+-------------------+-------------------+---------------+-------------+
|window_start       |window_end         |username       |message_count|
+-------------------+-------------------+---------------+-------------+
|2025-12-10 21:16:00|2025-12-10 21:17:00|user_2457046691|4            |
|2025-12-10 21:21:00|2025-12-10 21:22:00|user_2457046691|3            |
|2025-12-10 21:23:00|2025-12-10 21:24:00|user_2457046691|2            |
|2025-12-10 21:19:00|2025-12-10 21:20:00|user_2457046691|2            |
|2025-12-10 21:20:00|2025-12-10 21:21:00|user_2457046691|2            |
|2025-12-10 21:11:00|2025-12-10 21:12:00|user_2457046691|2            |
|2025-12-10 21:12:00|2025-12-10 21:13:00|user_2457046691

## Шаг 10. Остановка Streaming и Spark

In [13]:
# Остановка всех активных streaming queries
active = spark.streams.active
if not active:
    print("Нет активных streaming queries")
else:
    print(f"Остановка {len(active)} streaming queries...")
    for query in active:
        print(f"  Останавливаем: {query.name}")
        query.stop()
    print("Все streaming queries остановлены")

# Остановка Spark
spark.stop()

Нет активных streaming queries


## Выводы
---
В рамках задания был реализован **потоковый подсчёт сообщений пользователей** из Telegram-каналов с использованием **Spark Structured Streaming** и **Apache Kafka**. Сообщения из каналов поступают в Kafka, после чего обрабатываются в реальном времени и агрегируются в Spark.

Для каждого пользователя вычисляется количество сообщений в двух временных разрезах:
- **в минутном tumbling-окне** — для оценки активности за каждую отдельную минуту;
- **в 10-минутном sliding-окне с шагом 30 секунд** — для более плавного мониторинга динамики и всплесков активности.

Результаты выводятся в требуемом формате username, количество, отсортированы по убыванию числа сообщений, что позволяет быстро видеть самых активных пользователей за выбранный интервал.