In [None]:
from pyspark.sql import SparkSession

# Initialize Spark Session
spark = (
    SparkSession.builder
    .appName("kafka_to_clickhouse") 
    .master("spark://spark-master:7077")
    .config("spark.jars.packages", 
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,"
            "org.apache.kafka:kafka-clients:3.5.1,"
            "org.apache.commons:commons-pool2:2.11.1,"
            "com.clickhouse:clickhouse-jdbc:0.9.1:shaded-all"
           )
    .config("spark.sql.streaming.checkpointLocation", "/tmp/checkpoint")
    # Чтобы сессия не занимала все воркеры
    .config("spark.cores.max", "3")
    .getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")

## Подготовка таблицы в ClickHouse

Выполним DDL-запрос по адресу http://localhost:8123/play
```sql
CREATE TABLE IF NOT EXISTS default.streaming_test_table
(
    id String,
    name String,
    timestamp String,
    kafka_timestamp DateTime64(3),
    processed_at DateTime64(3)
)
ENGINE = MergeTree()
ORDER BY (processed_at, id)
PARTITION BY toYYYYMM(processed_at);
```


## Начнем запись в Kafka

Выполним ```python3 main.py``` в терминале

## Запускаем стриминг (микробатчинг)

In [None]:
from pyspark.sql.functions import from_json, col, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([
    StructField("id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("timestamp", StringType(), True)
])

kafka_df = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "test_topic")
    .option("startingOffsets", "earliest")
    .load()
)

parsed_df = kafka_df.select(
    from_json(col("value").cast("string"), schema).alias("data"),
    col("timestamp").alias("kafka_timestamp")
).select("data.*", "kafka_timestamp")

processed_df = parsed_df.withColumn("processed_at", current_timestamp())

In [None]:
# Function to write batch to ClickHouse
def write_to_clickhouse(batch_df, batch_id):
    """
    Write each micro-batch to ClickHouse
    """
    if not batch_df.isEmpty():
        print(f"\nProcessing batch {batch_id} with {batch_df.count()} records")
        (
            batch_df.write
            .format("jdbc")
            .option("driver", "com.clickhouse.jdbc.ClickHouseDriver")
            .option("url", "jdbc:clickhouse://clickhouse-server:8123/default")
            .option("dbtable", "default.streaming_test_table")
            .option("user", "default")
            .option("password", "1234qwe")
            .option("batchsize", "10000")
            .option("socket_timeout", "300000")
            .option("numPartitions", "4")append
            .option("rewriteBatchedStatements", "true")
            .mode("append")
            .save()
        )
        print(f"Batch {batch_id} written successfully to ClickHouse")
    else:
        print(f"Batch {batch_id} is empty, skipping")

# Write stream to ClickHouse using foreachBatch
query = processed_df.writeStream \
    .foreachBatch(write_to_clickhouse) \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/checkpoint/kafka_to_clickhouse") \
    .start()

query.awaitTermination()