In [None]:
spark.stop()

In [None]:
# # Отладка Kafka Streaming в Jupyter
# Этот notebook позволяет поэкспериментировать с чтением из Kafka и обогащением данных

In [None]:
# # 2. Подготовка данных для event_list
# event_data = [
#     (1, "Покупка крипты"),
#     (2, "Вложение в акции"),
#     (3, "Оплата товара"),
#     (4, "Донатная помойка"),
#     (5, "Пожертвование сектам")
# ]
# event_schema = StructType([
#     StructField("event_id", IntegerType(), True),
#     StructField("event_name", StringType(), True)
# ])
# df_events = spark.createDataFrame(event_data, schema=event_schema)

# # 3. Подготовка данных для group_list
# group_data = [
#     (1, "Новые"),
#     (2, "Иностранные"),
#     (3, "Рептилойды"),
#     (4, "Gold"),
#     (5, "Platinum")
# ]
# group_schema = StructType([
#     StructField("group_id", IntegerType(), True),
#     StructField("group_name", StringType(), True)
# ])
# df_groups = spark.createDataFrame(group_data, schema=group_schema)

# # 4. Запись данных в MinIO (Bucket: library)
# # Примечание: Spark пишет в директорию. Расширение .parquet в пути создаст папку с таким именем.
# # Режим 'overwrite' перезапишет данные, если они уже существуют.

# bucket_path = "s3a://library/kafka-enrich"

# print("Запись таблицы event_list...")
# df_events.write.mode("overwrite").parquet(f"{bucket_path}/event_list.parquet")

# print("Запись таблицы group_list...")
# df_groups.write.mode("overwrite").parquet(f"{bucket_path}/group_list.parquet")

# print("Готово! Таблицы созданы в MinIO.")


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, expr
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType, DateType
import pandas as pd

drivers = [
    "/opt/spark/external-jars/hadoop-aws-3.3.4.jar",
    "/opt/spark/external-jars/aws-java-sdk-bundle-1.12.262.jar",
    "/opt/spark/external-jars/wildfly-openssl-1.0.7.Final.jar",
    "/opt/spark/external-jars/postgresql-42.6.0.jar",
    "/opt/spark/external-jars/spark-sql-kafka-0-10_2.12-3.5.0.jar",
    "/opt/spark/external-jars/kafka-clients-3.2.0.jar",
    "/opt/spark/external-jars/spark-token-provider-kafka-0-10_2.12-3.5.0.jar",
    "/opt/spark/external-jars/commons-pool2-2.11.1.jar"
]



spark = (SparkSession.builder
         .appName("KafkaDebug")
         .master("spark://spark-master:7077")
         .config("spark.sql.repl.eagerEval.enabled", True)  # Красивый вывод в notebook
         .config("spark.sql.shuffle.partitions", 5)         
         .config("spark.jars", ",".join(drivers))
         .getOrCreate()
        )

# 1. ПРОВЕРКА: Чтение справочников из MinIO
print("Читаем справочник событий:")
events_df = (spark.read
                  .format("parquet")
                  .load("s3a://library/kafka-enrich/event_list.parquet"))
events_df.show()

print("\nЧитаем справочник групп:")
groups_df = (spark.read
                  .format("parquet")
                  .load("s3a://library/kafka-enrich/group_list.parquet"))
groups_df.show()

# 2. ПРОВЕРКА: Чтение из Kafka (батч-режим для отладки)
# Читаем последние 100 сообщений из Kafka
kafka_df = (spark.read
            .format("kafka")
            .option("kafka.bootstrap.servers", "kafka:9092")
            .option("subscribe", "user-events")
            .option("startingOffsets", "earliest")  # С начала
            .option("endingOffsets", "latest")      # До конца
            .load()
            .limit(100))  # Ограничим для отладки

print(f"Получено сообщений из Kafka: {kafka_df.count()}")

spark.stop()

# Посмотрим схему полученных данных
print("Схема Kafka DataFrame:")
kafka_df.printSchema()

# Посмотрим сами сообщения
print("\nПримеры сообщений (первые 10):")
kafka_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "partition", "offset") \
        .show(10, truncate=False)

# 3. ПРОВЕРКА: Парсинг JSON
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("date", StringType(), True),
    StructField("event_date", StringType(), True),
    StructField("event_id", IntegerType(), True),
    StructField("username", StringType(), True),
    StructField("group_id", IntegerType(), True),
    StructField("value", DoubleType(), True)
])

# Парсим и конвертируем типы
parsed_df = (kafka_df
             .select(from_json(col("value").cast("string"), schema).alias("data"))
             .select("data.*")
             .withColumn("date", col("date").cast(DateType()))
             .withColumn("event_date", col("event_date").cast(TimestampType())))

print("Распарсенные данные:")
parsed_df.show(10, truncate=False)

# 4. ПРОВЕРКА: Обогащение данных
events_for_join = events_df.withColumnRenamed("id", "event_id").withColumnRenamed("event", "event_name")
groups_for_join = groups_df.withColumnRenamed("id", "group_id").withColumnRenamed("group", "group_name")

# Делаем JOIN
enriched_df = (parsed_df
               .join(events_df, on="event_id", how="left")
               .join(groups_df, on="group_id", how="left")
              )

print("Обогащенные данные:")
enriched_df.show(10, truncate=False)

# 5. ПРОВЕРКА: Подготовка к записи в Postgres
final_df = enriched_df.select(
    "id", "date", "event_date", "event_id", "event_name", "username", "group_id", "group_name", "value"
)

print("Финальный датасет для записи:")
final_df.show(10, truncate=False)

# 6. ЗАПИСЬ В POSTGRES 
from pyspark.sql.utils import AnalysisException

try:
    final_df.write \
        .mode("append") \
        .jdbc(url="jdbc:postgresql://postgres-db:5432/learn_base",
              table="kafka_farm.user_events",
              properties={
                  "user": "airflow",
                  "password": "airflow",
                  "driver": "org.postgresql.Driver"
              })
    print("Данные успешно записаны в PostgreSQL!")
except AnalysisException as e:
    print(f"Ошибка записи: {e}")
    print("Возможно, таблица еще не создана или нет данных")

In [None]:
# Закрываем сессию
spark.stop()