In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1 pyspark-shell'

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.dataframe import DataFrame

In [None]:
spark = (
    SparkSession
        .builder
        .appName("Spark Streaming")
        .master("local[4]")
        .getOrCreate()
)
sc = spark.sparkContext

## Structured Streaming

In [None]:
"""
docker compose exec -it kafka bash
kafka-topics.sh --bootstrap-server localhost:9092 --topic words-topic --create --partitions 1 --replication-factor 1
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic words-topic <<<$(yes 'Hello, World!' | head -n 20)
"""

input_table = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("startingOffsets", "earliest") \
    .option("subscribe", "words-topic") \
    .load()

In [None]:
result_table_lines = (
    input_table.select(col("value").cast("string"))
)

In [None]:
output_lines = (
    result_table_lines.writeStream
        .format("parquet")
        .option("path", "/tmp/wordcount")
        .option("checkpointLocation", "/tmp/wordcount_checkpoint")
    )

In [None]:
query_lines = output_lines.start()

In [None]:
! find /tmp/wordcount_checkpoint/

In [None]:
spark.read.parquet("/tmp/wordcount").distinct().show()

In [None]:
query_lines.awaitTermination(timeout=5)
query_lines.stop()

In [None]:
result_table_words_agg = (
    result_table_lines
        .select(F.split("value", " ").alias("words"))
        .select(F.explode("words").alias("word"))
        .groupby("word").count()
)

In [None]:
def foreach_batch_function(df: DataFrame, epoch_id):
    df.cache()
    df.write.mode("overwrite").parquet("/tmp/foreach_batch1")
    df.write.mode("overwrite").parquet("/tmp/foreach_batch2")
    df.unpersist()

In [None]:
output_words_count = (
    result_table_words_agg.writeStream
        .option("checkpointLocation", "/tmp/wordcount_checkpoint_1")
        .outputMode("complete")
        .foreachBatch(foreach_batch_function)
    )

In [None]:
query_wc = output_words_count.start()

In [None]:
spark.read.parquet("/tmp/foreach_batch1").show()

In [None]:
spark.read.parquet("/tmp/foreach_batch2").show()

In [None]:
query_wc.awaitTermination(timeout=5)
query_wc.stop()

### Создание таблицы на базе стрима

In [None]:
save_to_table_query = result_table_lines.writeStream \
    .option("checkpointLocation", "/tmp/save_to_table_checkpoint") \
    .toTable("input")

In [None]:
spark.table("input").show()

In [None]:
spark.sql("DROP TABLE input")

In [None]:
ave_to_table_query = (
    result_table_words_agg
        .writeStream
        .outputMode("update")
        .option("checkpointLocation", "/tmp/save_to_table_checkpoint_input_agg")
        .foreachBatch(foreach_batch_function)
        .start()
)
ave_to_table_query
spark.read.parquet("/tmp/foreach_batch1").show()
spark.read.parquet("/tmp/foreach_batch2").show()

In [None]:
ave_to_table_query.awaitTermination(timeout=5)
ave_to_table_query.stop()

## Watermarking

In [None]:
input_table = (
    input_table
    .withColumn(
        "value",
        col("value")
            .cast("string")
            .alias("value")
    )
    .drop("key")
)
input_table.printSchema()

In [None]:
result_table_watermark = (
    input_table.withWatermark("timestamp", "5 minutes")
    .withColumn("words", F.split("value", " "))
    .withColumn("word", F.explode("words"))
    .drop("words").drop("value")
)
result_table_watermark.printSchema()

In [None]:
output_agg = (
    result_table_watermark
    .writeStream.option("checkpointLocation", "/tmp/watermark_checkpoint")
    .toTable("watermark_table")
)

In [None]:
spark.table("watermark_table").show()

In [None]:
output_agg.awaitTermination(timeout=5)
output_agg.stop()

## Непрерывная обработка (Continuous Processing)

Continuous Processing является экспериментальной функцией, которая позволяет записывать данные в Kafka с гарантией доставки at-least-once

In [None]:
output = (
    input_table
    .select(col("value").cast("string").alias("value"))
    .select(F.expr("value || '_out'").alias('value'))
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("topic", "input-topic")
    .option("checkpointLocation", "/tmp/continuous")
    .trigger(continuous='1 second')
)

In [None]:
query = output.start()

In [None]:
query.awaitTermination(timeout=5)
query.stop()

# Почему важно работать с колонками

In [None]:
import pandas as pd

In [None]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)

In [None]:
@F.pandas_udf("long")
def timesTwo(value: pd.Series) -> pd.Series:
    return value * 2

In [None]:
table_df = spark.table("watermark_table")

In [None]:
df_from_udf = (
    table_df.withColumn("double_offset", timesTwo(col("offset")))
)
#e = df_from_udf.collect()

In [None]:
df_from_udf.explain()

In [None]:
df_no_udf = (
    table_df.withColumn("double_offset", col("offset") * 2)
)
# e = df_no_udf.collect()

In [None]:
df_no_udf.explain()