# Spark Structured Streaming

## Запуск приложения

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import col
from pyspark.sql import functions as F

In [None]:
spark = (
    SparkSession
        .builder
        .appName("master")
        .master("local[4]")
        .config("spark.log.level", "WARN")
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0")
        .enableHiveSupport()
        .getOrCreate()
)
sc = spark.sparkContext

**Примечание**: Обратите внимание, что можно указать дополнительные зависимости через параметр `spark.jars.packages`

## Создание логера

In [None]:
def get_logger(name: str="PYSPARK"):
    return sc._jvm.org.slf4j.LoggerFactory.getLogger(name)

In [None]:
logger = get_logger()

## Подготовка данных данных

In [None]:
! rm -rf /tmp/checkpoint

### Утилитные методы

Функция `text_generator` генерирует случайную последовательность символов:

In [None]:
from random import randrange

def text_generator(*args, **kwargs) -> str:
    if args:
        max_length = args[0]
    elif "max_length" in kwargs:
        max_length = kwargs["max_length"]
    else:
        max_length=randrange(5, 20)

    return "".join([chr(ord('A') + randrange(26)) for _ in range(randrange(max_length))])

In [None]:
text_generator()

Функция `date_generator` генерирует случайную дату в прошлом:

In [None]:
from math import floor
from random import randrange
from datetime import datetime, timedelta
import time

def date_generator(*args, **kwargs) -> datetime:
    if args:
        seconds = args[0]
    elif "seconds" in kwargs:
        seconds = kwargs["seconds"]
    else:
        seconds=60 * 60 * 24 * 365 * randrange(20, 50)

    days = seconds / 60 / 60 / 24
    date = datetime.now() - timedelta(days=days)
    return date

In [None]:
print(date_generator())

в будущем:

In [None]:
print(f"Текущая дата: {datetime.now()}")
print(f"Будущая дата: {date_generator(seconds=-randrange(60))}")

Функция `number_generator` генерирует случайное число не больше заданого:

In [None]:
from random import randrange

def number_generator(limit: int=10**10) -> int:
    return randrange(limit)

In [None]:
number_generator()

Функция `generate_users_range_df` генерирует набор пользователей с указанным границами идентификаторов:

In [None]:
import json

from dataclasses import dataclass, asdict
from datetime import datetime
from random import randrange

@dataclass
class User:
    id: int
    first_name: str
    last_name: str
    dob: datetime
    gender: str

    def __init__(self, *args, **kwargs):
        if args:
            self.id = args[0]
        elif "id" in kwargs:
            self.id = kwargs["id"]
        else:
            self.id = randrange(1000)

        self.first_name = text_generator()
        self.last_name = text_generator()
        self.dob = date_generator()
        self.gender = "F" if randrange(100) > 50 else "M"


def generate_users_range_df(*args, **kwargs) -> DataFrame:
    if args:
        first_id = args[0]
    elif "first_id" in kwargs:
        first_id = kwargs["first_id"]
    else:
        first_id = 0

    if args[1:]:
        last_id = args[1]
    elif "last_id" in kwargs:
        last_id = kwargs["last_id"]
    else:
        last_id = 0

    assert first_id < last_id

    rows = [ asdict(User(id)) for id in range(first_id, last_id + 1) ]
    return spark.createDataFrame(rows)

users_schema = generate_users_range_df(0, 1).schema

In [None]:
generate_users_range_df(0, 1).show()

Функция `generate_heartrate_for_dates_df` генерирует показания датчика пульса за указанный интервал времени:

In [None]:
import json
import math

from dataclasses import dataclass, asdict
from datetime import datetime

@dataclass
class HeartRate:
    user_id: int
    timestamp: datetime
    value: int

    def __init__(self, user_id, **kwargs):
        self.user_id = user_id

        now = datetime.now()
        if "start" in kwargs:
            start = kwargs["start"]
        else:
            start = now - timedelta(days=5)
        if "end" in kwargs:
            end = kwargs["end"]
        else:
            end = datetime.now()

        assert start < end

        end_start_seconds_diff = math.floor((end - start).total_seconds())
        now_end_diff = math.floor((now - end).total_seconds())
        total_seconds_to_past = now_end_diff + randrange(end_start_seconds_diff)

        seconds = timedelta(seconds=total_seconds_to_past).total_seconds()
        self.timestamp = date_generator(seconds=seconds)
        self.value = 30 + number_generator(180)


def generate_heartrate_for_dates_df(
    total_users: int,
    start: datetime,
    end: datetime,
    *args, **kwargs
) -> DataFrame:
    if args:
        size = args[0]
    elif "size" in kwargs:
        size = kwargs["size"]
    else:
        size = randrange(1000)

    rows = [ asdict(HeartRate(randrange(total_users), start=start, end=end)) for _ in range(size) ]

    return spark.createDataFrame(rows)

start = datetime.now() - timedelta(days=1)
end = datetime.now()
heartrate_schema = generate_heartrate_for_dates_df(1, start, end, 5).schema

In [None]:
start = datetime.now() - timedelta(days=5)
end = datetime.now() - timedelta(days=3)
df = generate_heartrate_for_dates_df(1, start, end, 5)

In [None]:
df.show(truncate=False)

Вне указанных временных границ данных не существует:

In [None]:
df.where((col("timestamp") > F.lit(end)) | (col("timestamp") < F.lit(start))).count()

### Генерация данных

Сгенерировать 100 пользователей:

In [None]:
users_df = generate_users_range_df(first_id=0, last_id=100)
users_df.show(5, truncate=False)

In [None]:
(
    users_df
        .write
        .mode("overwrite")
        .parquet("/tmp/users")
)

Сгенерировать 100 тысяч показаний датчика пульса для 100 пользователей:

In [None]:
start = datetime.now() - timedelta(days=7)
end = datetime.now() - timedelta(days=3)
heartrate_df = generate_heartrate_for_dates_df(total_users=100, start=start, end=end, size=100_000)

In [None]:
(
    heartrate_df
        .repartition(10, "user_id")
        .write
        .mode("overwrite")
        .parquet("/tmp/heartrate")
)

In [None]:
! ls -l /tmp/heartrate

## Файловый стриминг

В дальнешем необходимо будет эффективно фильтровать логи, поэтому необхдодимо запомнить во сколько начался стриминг:

In [None]:
! date '+%Y-%m-%dT%H:%M:%SZ' > /tmp/my_pyspark_timestamp

Зафиксируем в логе сообщение о подготовке к стримингу:

In [None]:
logger.warn("Before Reader")

Из файлов необходимо создать входную таблицу (Input Data Table):

In [None]:
users_reader_stream = (
    spark.readStream
        .schema(users_schema)
        .load("/tmp/users")
)

В результате появился объект `DataFrame`, т.е. можно применить к нему методы `DataFrame API`:

In [None]:
type(users_reader_stream)

Можно проверить, что датафрейм является стримом:

In [None]:
users_reader_stream.isStreaming

Для обычных датафреймов значение `isStreaming` будет ложным:

In [None]:
spark.range(4).isStreaming

Датафреймы являются ленивыми, поэтому никакой стрим пока не запущен. Для запуска стрима необходимо настроить приёмник:

In [None]:
logger.warn("Before Writer")

In [None]:
users_stream = (
    users_reader_stream
        .writeStream
        .outputMode("append")
        .format("console")
        .trigger(availableNow=True)
        .start()
)
users_stream.awaitTermination()

In [None]:
logger.warn("After Writer")

Проанализируем логи:

In [None]:
! source ~/.bash_aliases && \
docker compose logs --since $(cat /tmp/my_pyspark_timestamp) pyspark | \
grep 'Before Reader' -A 50 | hl 'Before\|After'

Стрим был запущен с тригером `awailableNow`, что означает, что необходимо обработать все данные и по завершению остановить стрим, т.е. сейчас стриминг прекратил свою работу:

In [None]:
users_stream.isActive

Датафрейм `users_reader_stream` по прежнему доступен, а значит можно запустить этот же самый стрим ещё раз:

In [None]:
! date '+%Y-%m-%dT%H:%M:%SZ' > /tmp/my_pyspark_timestamp

In [None]:
logger.warn("Second Attempt")

In [None]:
users_stream = (
    users_reader_stream
        .writeStream
        .outputMode("append")
        .format("console")
        .trigger(availableNow=True)
        .start()
)
users_stream.awaitTermination()

In [None]:
! source ~/.bash_aliases && \
docker compose logs --since $(cat /tmp/my_pyspark_timestamp) pyspark | \
grep 'Second Attempt' -A 50 | hl 'Second Attempt'

У нас получилось обработать одни и те же данные два раза. Можно запустить этот стрим еще раз, и все данные будут обработаны снова. На практике не всегда это подходит, зачастую необходимо обрабатывать данные один раз.

## Чекпоинты - Checkpoint

### Описание

Если обратить внимание на логи снова, то можно увидеть строчку `Temporary checkpoint location...`:

In [None]:
! source ~/.bash_aliases && \
docker compose logs --since $(cat /tmp/my_pyspark_timestamp) pyspark | \
grep 'Temporary checkpoint location' -B 10 -A 10 | hl 'Temporary checkpoint location.*:'

По умолчанию Apache Spark защищается от повторной обработки данных только в рамках одного запуска стрима. Так, если воркер упадет, то его работу продолжит другой воркер с того самого места, где остановился упавший воркер.

Таким образом, Spark создает временную директорию, где сохраняет прогресс обработки входных данных в стриме. После успешного завершения директория с прогрессом удаляется.

Директория с прогрессом обработки данных в стриме называется **чекпоинтом** (**checkpoint**)

### Конфигурация чекпоинтов

Как было отмечено выше, чекпоинты играют важную роль в достижении нажедности стримовой обработки данных, поэтому очень важно нажедно их сохранять.

Наилучшим решением будет использование внешнего хранилища HDFS или S3 для целей хранения чекпоинтов. Так любой воркер может продолжить работу любого другого воркера при необходимости, т.к директория с чекпоинтами надежно защищена.

Сделаем временную отметку перед стартом стрима:

In [None]:
! date '+%Y-%m-%dT%H:%M:%SZ' > /tmp/my_pyspark_timestamp

Залогируем сообщение перед запуском стрима:

In [None]:
logger.warn("Stream with Checkpoint")

Конфигурация директории для чекпоинтов осуществляется при помощи опции `checkpointLocation`:

In [None]:
users_stream = (
    users_reader_stream
        .writeStream
        .outputMode("append")
        .format("console")
        .option("checkpointLocation", "/tmp/checkpoint/users_file_stream")
        .trigger(availableNow=True)
        .start()
)
users_stream.awaitTermination()

In [None]:
! source ~/.bash_aliases && \
docker compose logs --since $(cat /tmp/my_pyspark_timestamp) pyspark | \
grep 'Stream with Checkpoint' -A 50 | hl 'Stream with Checkpoint'

Ожидаемо данные появились в консоли.

Последующие запуски не будут иметь эффекта:

In [None]:
! date '+%Y-%m-%dT%H:%M:%SZ' > /tmp/my_pyspark_timestamp

Залогируем сообщение перед запуском стрима:

In [None]:
logger.warn("Stream with Checkpoint After Successful Processing")

In [None]:
users_stream = (
    users_reader_stream
        .writeStream
        .outputMode("append")
        .format("console")
        .option("checkpointLocation", "/tmp/checkpoint/users_file_stream")
        .trigger(availableNow=True)
        .start()
)
users_stream.awaitTermination()

In [None]:
! source ~/.bash_aliases && \
docker compose logs --since $(cat /tmp/my_pyspark_timestamp) pyspark | \
grep 'Stream with Checkpoint After Successful Processing' -A 50 | hl 'Stream with Checkpoint After Successful Processing'

Оба запуска `writeStream` использовали одну и ту же опцию `.option("checkpointLocation", "/tmp/checkpoint/users_file_stream")`, поэтому Spark обработал данные только один раз.

Содержимое чекпоинт директории:

In [None]:
! find /tmp/checkpoint/users_file_stream -type f

In [None]:
! cat /tmp/checkpoint/users_file_stream/sources/0/0

In [None]:
! cat /tmp/checkpoint/users_file_stream/offsets/0 | head -n 2 | tail -n 1 | json_pp

In [None]:
! cat /tmp/checkpoint/users_file_stream/metadata

Каждый стрим должен иметь свою чекпоинт директорию:

> **Нельзя использовать одну чекпоинт директорию для всех стримов.**

При этом один и тот же стрим можно запускать сколько угодно раз, чекпоинт директория позволит достичь Exactly Once семантики при обработке данных.

## Преобразование входных данных

Входные данные образуют входную таблицу (Input Table), над которой можно выполнять запросы как при помощи DataFrame API так и при помощи Spark SQL.

In [None]:
heartrate_input_table = (
    spark
        .readStream
        .schema(heartrate_schema)
        .option("maxFilesPerTrigger", 1)
        .parquet("/tmp/heartrate")
)

Входные данные могут поступать бесконечно, а значит таблица `heartrate_input_table` является неограниченной, поэтому ее обработка имеет свои особенности.

Преобразование входной таблицы (Input Table) создает результирующую таблицу (Result Table). Например, можно вести подсчет числа метрик, полученных каждым пользователем:

In [None]:
heartrate_result_table_df = (
    heartrate_input_table
        .groupBy("user_id")
        .count()
)

До текущего момента ни одна операция не была запущена, т.к. все операции выполняются над датафреймом, который по своей природе является ленивым.

Если для запуска вычислений на обычных датафреймах необходимо выполнить действие (action), то в стримовых датафреймах необходимо указать куда данные будут отправляться, т.е. указать приемик данных.

Приемником данных могут выступать:

- консоль - датафрейм выводится на консоль (не больше 20 строк за раз),
- файл - весь датафрейм сохраняется в файле указанного формата,
- Kafka - датафрейм отправляется в топик Apache Kafka,
- таблица - управляемая таблица в Hive Meta Store,
- foreachBatch - пользовательская процедура на python.

Для примера выполним запись через `foreachBatch`:

In [None]:
spark.sql("drop table if exists heartrates_for_each1")
spark.sql("drop table if exists heartrates_for_each2")

Подготовим таблицы для хранения результатов:

In [None]:
spark.sql("""
create table heartrates_for_each1(
    user_id int,
    batch_id int,
    total int,
    ts timestamp
)""")
spark.sql("""
create table heartrates_for_each2
as
select * from heartrates_for_each1
""")

In [None]:
from pyspark import StorageLevel

def my_handler(batch_df: DataFrame, batch_id: int):
    batch_df = (
        batch_df
            .withColumn("batch_id", F.lit(batch_id))
            .withColumn("ts", F.current_timestamp())
            .persist(StorageLevel.MEMORY_ONLY)
    )
    try:
        batch_df.write.insertInto("heartrates_for_each1")
        batch_df.write.insertInto("heartrates_for_each2")
    finally:
        batch_df.unpersist()

In [None]:
query = (
    heartrate_result_table_df
        .writeStream
        .outputMode("complete")
        .foreachBatch(my_handler)
        .start()
)

In [None]:
one = spark.table("heartrates_for_each1")

In [None]:
two = spark.table("heartrates_for_each2")

In [None]:
one.count()

In [None]:
two.count()

In [None]:
query.awaitTermination(timeout=5)
query.stop()

### О режимах работы приемника

Стриминг выше был запущен в режиме `outputMode("complete")`, т.е. все данные отправлялись приёмнику, но есть и другие режимы:

- `complete` - вся результирующая таблица целиком отправляется в приёмник. Приёмник сам должен решить, что делать со старыми записями
- `append` - только новые строки отправляются в приёмник. Этот режим можно использовать только, если в результирующей таблице нет агрегатов,
- `update` - только обновленные строки отправляются в приёмник

В предыдущем примере приёмник работал в режиме `complete`, поэтому в итоговых таблицах есть дублирующиеся строки:

In [None]:
(
    spark.table("heartrates_for_each2")
        .groupBy("user_id").count()
        .orderBy(col("count").desc())
        .show(5)
)

In [None]:
(
    spark.table("heartrates_for_each2")
        .select("user_id", "total")
        .where("user_id = 1")
        .show()
)

В результирующей таблице постоянно обновляется число полученных показаний датчика для каждого пользователя, поэтому в режиме `complete` все данные отправляются в приёмник.

Если выбрать режим `update`, то ситуация поменяется:

In [None]:
spark.sql("truncate table heartrates_for_each1")
spark.sql("truncate table heartrates_for_each2")

In [None]:
query = (
    heartrate_result_table_df
        .writeStream
        .outputMode("update")
        .foreachBatch(my_handler)
        .start()
)

In [None]:
(
    spark.table("heartrates_for_each2")
        .groupBy("user_id").count()
        .orderBy(col("count").desc())
        .show(5)
)

In [None]:
(
    spark.table("heartrates_for_each2")
        .select("user_id", "total")
        .where("user_id = 1")
        .show()
)

In [None]:
query.awaitTermination(timeout=5)
query.stop()

## Оконные операции

Примеры выше вынуждали Spark поддерживать результирующую таблицу в памяти, что может быть очень затратно на больших датасетах.

Для стримовых запросов применяют агрегации на базе окна:

- открывается окно,
- инициализируется результирующая таблица,
- результирующая таблица вычисляет информацию на базе событий, которые появились в рамках активного окна,
- окно закрывается,
- результаты сбрасываются в приёмник.

### Статистика по данными Heart Rate

Прежде чем приступить к применению окна, необходимо понять разумные границы окна, для этого необходимо собрать статистику

In [None]:
from pyspark.sql import Window

In [None]:
heartrate_df = spark.read.parquet("/tmp/heartrate")

In [None]:
windowByUser = Window.partitionBy("user_id").orderBy("timestamp")

In [None]:
heartrate_diffs_df = (
    heartrate_df
        .withColumn(
            "diff",
             F.lead("timestamp").over(windowByUser) - F.lag("timestamp").over(windowByUser)
        )
        .withColumn(
            "diff_seconds",
             F.lead(col("timestamp").cast("long")).over(windowByUser) - F.lag(col("timestamp").cast("long")).over(windowByUser)
        )
)

heartrate_intervals_df = (
    heartrate_diffs_df
        .select(
            F.max("diff").alias("max_diff"),
            F.min("diff").alias("min_diff"),
            F.mean("diff").alias("mean_diff"),
            F.current_timestamp(),
            F.current_timestamp() + F.max("diff"),
            F.current_timestamp() + F.min("diff")
        )
    
)

In [None]:
heartrate_diffs_df.summary().show()

In [None]:
[[max_diff, min_diff, mean_diff]] = heartrate_intervals_df.select("max_diff", "min_diff", "mean_diff").collect()

In [None]:
print(f"Максимальная разница между показаниями датчика: {max_diff}")
print(f"Минимальная разница между показаниями датчика: {min_diff}")
print(f"Средняя разница между показаниями датчика: {mean_diff}")

In [None]:
from math import floor, ceil

mean_seconds_between_readings = ceil(mean_diff.total_seconds())
max_seconds_between_readings = ceil(max_diff.total_seconds())

In [None]:
heartrate_input_table = (
    spark
        .readStream
        .schema(heartrate_schema)
        .option("maxFilesPerTrigger", 1)
        .parquet("/tmp/heartrate")
)

Входные данные могут поступать бесконечно, а значит таблица `heartrate_input_table` является неограниченной, поэтому ее обработка имеет свои особенности.

Преобразование входной таблицы (Input Table) создает результирующую таблицу (Result Table). Например, можно вести подсчет числа метрик, полученных каждым пользователем:

In [None]:
heartrate_input_table.printSchema()

In [None]:
windowed_heartrate_result_table_df = (
    heartrate_input_table
        .withWatermark("timestamp", f"{max_seconds_between_readings} minutes")
        .groupBy(
            F.window(
                "timestamp",
                f"{mean_seconds_between_readings * 10} seconds",
                f"{mean_seconds_between_readings} seconds"
            ),
            "user_id"
        )
        .agg(F.mean("value"), F.count("value"))
        .select(
            "window.start",
            "window.end",
            "user_id",
            col("avg(value)").alias("avg_value"),
            col("count(value)").alias("count")
        )
)

In [None]:
windowed_heartrate_result_table_df.printSchema()

In [None]:
! rm -rf /tmp/checkpoint/windowed_heartrate_result_table_df

In [None]:
spark.sql("drop table if exists heartrate_avg")

In [None]:
query = (
    windowed_heartrate_result_table_df
        .writeStream
        .outputMode("append")
        .option("checkpointLocation", "/tmp/checkpoint/windowed_heartrate_result_table_df")
        .toTable("heartrate_avg")
)

In [None]:
spark.table("heartrate_avg").printSchema()

In [None]:
(
    spark.table("heartrate_avg")
        .orderBy("user_id", "start")
        .show(truncate=False)
)

In [None]:
query.awaitTermination(timeout=5)
query.stop()

## Соединения стримов - Joins

### Соединение стрима и датафрейма

In [None]:
users_df = spark.read.parquet("/tmp/users")

In [None]:
users_df.printSchema()

In [None]:
heartrate_stream = spark.readStream.schema(heartrate_schema).parquet("/tmp/heartrate")

In [None]:
heartrate_stream.printSchema()

In [None]:
heartrate_stream_enriched = (
    heartrate_stream.alias("h")
        .join(
            users_df.alias("u"),
            col("h.user_id") == col("u.id")
        )
        .where("gender = 'F'")
)

In [None]:
spark.sql("drop table if exists heartrate_stream_enriched_table")

In [None]:
query = (
    heartrate_stream_enriched
        .writeStream
        .outputMode("append")
        .option("checkpointLocation", "/tmp/checkpoint/heartrate_stream_enriched_table")
        .toTable("heartrate_stream_enriched_table")
)

In [None]:
spark.sql("select * from heartrate_stream_enriched_table").show(truncate=False)

In [None]:
query.awaitTermination(timeout=5)
query.stop()

### Соединение стрима со стримом

In [None]:
heartrate_stream = (
    spark.readStream
        .schema(heartrate_schema)
        .option("maxFilesPerTrigger", 1)
        .parquet("/tmp/heartrate")
)

In [None]:
heartrate_stream.printSchema()

In [None]:
heartrate_stream_watermark = heartrate_stream.withWatermark(
    "timestamp",
    f"{max_seconds_between_readings} minutes"
)

heartrate_stream_to_stream_enriched = (
    heartrate_stream_watermark.alias("h1")
        .join(
            heartrate_stream_watermark.alias("h2"),
            F.expr("""
                h1.user_id = h2.user_id
            AND h1.timestamp BETWEEN h2.timestamp AND h2.timestamp + INTERVAL 1 HOUR
            """)
        )
        .select("h1.*")
)

In [None]:
! rm -rf "/tmp/checkpoint/heartrate_stream_to_stream_enriched"

In [None]:
spark.sql("drop table if exists heartrate_stream_to_stream_enriched_table")

In [None]:
query = (
    heartrate_stream_to_stream_enriched
        .writeStream
        .outputMode("append")
        .option("checkpointLocation", "/tmp/checkpoint/heartrate_stream_to_stream_enriched")
        .toTable("heartrate_stream_to_stream_enriched_table")
)

In [None]:
spark.sql("select * from heartrate_stream_to_stream_enriched_table").show(truncate=False)

In [None]:
query.awaitTermination(timeout=5)
query.stop()

## Хранение состояния

Во время работы стрима Spark может выполнять операции, которые обновляют состояние (`count`, `sum`, и т.д).

Состояние хранится сначала в памяти, а потом периодически сбрасывается на диск. Операции с состоянием должны выполняться транзакционно, а также состояние должно версионироваться на случай внезапного сбоя.

В Apache Spark 3.2 появилась реализация хранения состояния в [RocksDB](https://rocksdb.org/) - встраиваемой базе данных.

Перенос состояния в RocksDB позволяет сборщику мусора перестать обрабатывать структуры состояния, что снижает время на сборку мусора.

Для активации RocksDB необходимо установить опцию `spark.sql.streaming.stateStore.providerClass`:

In [None]:
spark.conf.get("spark.sql.streaming.stateStore.providerClass")

In [None]:
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

In [None]:
spark.conf.get("spark.sql.streaming.stateStore.providerClass")

In [None]:
users_stream = (
    spark.readStream
        .schema(users_schema)
        .option("maxFilesPerTrigger", 1)
        .parquet("/tmp/users")
)

In [None]:
! rm -rf /tmp/checkpoint/users_stream_rocksdb

In [None]:
spark.sql("drop table if exists users_stream_rocksdb")

In [None]:
query = (
    users_stream.writeStream
        .option("checkpointLocation", "/tmp/checkpoint/users_stream_rocksdb")
        .trigger(availableNow=True)
        .toTable("users_stream_rocksdb")
)

In [None]:
spark.table("users_stream_rocksdb").show(5, False)

In [None]:
! find /tmp/checkpoint/users_stream_rocksdb -type f

In [None]:
! cat /tmp/checkpoint/users_stream_rocksdb/offsets/0 | sed -n '2p' | json_pp | hl '.*RocksDB.*'

In [None]:
query.awaitTermination(timeout=5)
query.stop()

## Триггеры

Вся работа в Structured Streaming выполняется при помощи микро-батчей:

1. из источника забирается часть данных,
1. полученные данные обрабатываются,
1. новые данные забираются из источника,
1. и т.д.

Триггер определяет как, когда и при каких условиях микро-батчи будут запускаться. Apache Spark позволяет подобрать нужный триггер под задачу:

- *(default)* - микро-батчи идут один за другим без перерыва,
- фиксированный интервал времени,
- только доступные данные - обработать все имеющиеся данные и остановиться,
- непрерывный, но с фиксированным интервалом для чекпоинтов.

Необходимый триггер настраивается у объекта [`DataStreamWriter#trigger`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.trigger.html):

```python
spark.readStream \
    .parquet("file.parquet") \
    .writeStream \
    .trigger(НУЖНЫЙ ТРИГГЕР ЗДЕСЬ) \
    .start()
```

## Apache Kafka

In [None]:
! source ~/.bash_aliases && \
docker compose ps kafka redpanda | hl 'healthy'

Создать топик `my-users-topic`:

In [None]:
! source ~/.bash_aliases && HOST=kafka execute \
kafka-topics \
    --bootstrap-server kafka:9092 \
    --topic my-users-topic \
    --create \
    --partitions 1 \
    --replication-factor 1

Проверить, что топик `my-users-topic` появился:

In [None]:
! source ~/.bash_aliases && HOST=kafka execute \
kafka-topics \
    --bootstrap-server kafka:9092 \
    --list

Чекпоинт является основным механизмом, благодаря которому Apache Spark обеспечивает `Exactly-Once` семантику обработки запросов. Но `Exactly-Once` часто может быть недопустимо медленным, поэтому на практике чаще прибегают к `At-Least-Once`.

Идея в том, что фиксация чекпоинтов выполняется асинхронно, поэтому одни и те же записи могут и будут быть обработаны несколько раз, поэтому нужно быть готовым к дубликатам строк. Жертва ненажедным чекпоинтом в обмен на повышение скорости обработки часто имеет большой смысл на практике.

Семантика At-Least-Once поддерживается только, если Apache Spark взаимодействует с Apache Kafka, для других приёмников At-Least-Once недоступно.

Apache Spark позволяет перейти к At-Least-Once семантике. Для этого пользователю необходимо активировать асинхронную фиксацию чекпоинтов при помощи опции `asyncProgressTrackingEnabled`:

In [None]:
(
    spark.read
        .parquet("/tmp/users")
        .select(F.struct("*").alias("value"))
        .select(F.to_json("value"))
        .show(truncate=False)
)

In [None]:
users_stream = (
    spark.readStream
        .schema(users_schema)
        .parquet("/tmp/users")
        .select(
            col("id").cast("string").alias("key"),
            F.struct("*").alias("value")
        )
        .select(
            "key",
            F.to_json("value").alias("value")
        )
)

In [None]:
users_stream.printSchema()

In [None]:
! rm -rf /tmp/checkpoint/kafka-async-checkpoint

In [None]:
query = (
    users_stream.writeStream
        .option("checkpointLocation", "/tmp/checkpoint/kafka-async-checkpoint")
        .option("asyncProgressTrackingEnabled", True)
        .option("asyncProgressTrackingCheckpointIntervalMs", 5000)
        .format("kafka")
        .option("kafka.bootstrap.servers", "kafka:9092")
        .option("topic", "my-users-topic")
        .start()
)

Проверить, что данные дошли до Kafka брокера:

In [None]:
! source ~/.bash_aliases && HOST=kafka execute \
kafka-console-consumer \
    --bootstrap-server kafka:9092 \
    --topic my-users-topic \
    --from-beginning \
    --timeout-ms 10000

In [None]:
query.awaitTermination(timeout=5)
query.stop()

### Kafka Consumer

In [None]:
! rm -rf /tmp/checkpoint/users_from_kafka-checkpoint

In [None]:
spark.sql("DROP TABLE IF EXISTS users_from_kafka")

In [None]:
kafka_read_stream = (
    spark.readStream.format("kafka")
        .format("kafka")
        .option("asyncProgressTrackingEnabled", True)
        .option("asyncProgressTrackingCheckpointIntervalMs", 5000)
        .option("kafka.bootstrap.servers", "kafka:9092")
        .option("startingOffsets", "earliest")
        .option("subscribe", "my-users-topic")
        .load()
)

kafka_read_stream_clean = (
    kafka_read_stream.select(
        col("key").cast("string"),
        col("value").cast("string"),
    ).select(
        col("key").cast("INT").alias("id"),
        F.from_json("value", users_schema).alias("value")
    ).select(
        "id",
        "value.first_name",
        "value.last_name",
        "value.gender",
        "value.dob"
    )
)

In [None]:
query = (
    kafka_read_stream_clean
        .writeStream
        .option("checkpointLocation", "/tmp/checkpoint/users_from_kafka-checkpoint")
        .toTable("users_from_kafka")
)

In [None]:
spark.sql("select * from users_from_kafka").printSchema()

In [None]:
spark.sql("select * from users_from_kafka").show(truncate=False)

In [None]:
query.awaitTermination(timeout=5)
query.stop()