In [None]:
import os
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, to_json, col, lit, struct, unix_timestamp, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, LongType

# Шаг 6 и 7 — Отправить результат в Postgres и Kafka в функции foreachBatch

def foreach_batch_function(df, epoch_id):
    
    df.persist() # Персистентность датафрейма (шаг 8) - потом освободим память
    
    df_with_feedback = df.withColumn("feedback", lit(None).cast(StringType())) # записываем df в PostgreSQL с полем feedback (пустое значение)
    
    df_with_feedback.write \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://localhost:5432/de") \
        .option("driver", "org.postgresql.Driver") \
        .option("dbtable", "subscribers_feedback") \
        .option("user", "student") \
        .option("password", "de-student") \
        .mode("append") \
        .save()
    
    # создаём df для отправки в Kafka (без поля feedback)
    
    kafka_df = df.select(
        "restaurant_id", "adv_campaign_id", "adv_campaign_content",
        "adv_campaign_owner", "adv_campaign_owner_contact",
        "adv_campaign_datetime_start", "adv_campaign_datetime_end",
        "datetime_created", "client_id", "trigger_datetime_created"
    )
    
    # json для отправки в Kafka
    kafka_output_df = kafka_df.select(
        to_json(struct("*")).alias("value")
    )
    
    # отправляем сообщения в результирующий топик Kafka
    kafka_output_df.write \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "rc1b-2erh7b35n4j4v869.mdb.yandexcloud.net:9091") \
        .option("kafka.security.protocol", "SASL_SSL") \
        .option("kafka.sasl.jaas.config", 'org.apache.kafka.common.security.scram.ScramLoginModule required username="login" password="password";') \
        .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
        .option("topic", "your_username_out") \
        .save()
    
    df.unpersist() # очищаем память от df

# необходимые библиотеки для интеграции Spark с Kafka и PostgreSQL
spark_jars_packages = ",".join([
    "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0",
    "org.postgresql:postgresql:42.4.0",
])

# создаём spark сессию
spark = SparkSession.builder \
    .appName("RestaurantSubscribeStreamingService") \
    .config("spark.sql.session.timeZone", "UTC") \
    .config("spark.jars.packages", spark_jars_packages) \
    .getOrCreate()

# схема входного сообщения
incomming_message_schema = StructType([
    StructField("restaurant_id", StringType(), True),
    StructField("adv_campaign_id", StringType(), True),
    StructField("adv_campaign_content", StringType(), True),
    StructField("adv_campaign_owner", StringType(), True),
    StructField("adv_campaign_owner_contact", StringType(), True),
    StructField("adv_campaign_datetime_start", LongType(), True),
    StructField("adv_campaign_datetime_end", LongType(), True),
    StructField("datetime_created", LongType(), True)
])


# Шаг — Прочитать данные об акциях из Kafka 

restaurant_read_stream_df = spark.readStream \
    .format('kafka') \
    .option('kafka.bootstrap.servers', 'rc1b-2erh7b35n4j4v869.mdb.yandexcloud.net:9091') \
    .option('kafka.security.protocol', 'SASL_SSL') \
    .option('kafka.sasl.jaas.config', 'org.apache.kafka.common.security.scram.ScramLoginModule required username="login" password="password";') \
    .option('kafka.sasl.mechanism', 'SCRAM-SHA-512') \
    .option('subscribe', 'olaann') \
    .load()

# определяем текущее время в UTC в секундах
current_timestamp_utc = int(round(unix_timestamp(current_timestamp())))

# Шаг — Преобразование JSON в DataFrame и фильтрация сообщений по времени действия акции

json_df = restaurant_read_stream_df \
    .select(from_json(col("value").cast("string"), incomming_message_schema).alias("data")) \
    .select("data.*")

filtered_read_stream_df = json_df.filter(
    (col("adv_campaign_datetime_start") <= current_timestamp_utc) & 
    (col("adv_campaign_datetime_end") >= current_timestamp_utc)
)

# Шаг — Прочитать данные о подписчиках из Postgres

subscribers_restaurant_df = spark.read \
    .format('jdbc') \
    .option('url', 'jdbc:postgresql://localhost:5432/de') \
    .option('driver', 'org.postgresql.Driver') \
    .option('dbtable', 'subscribers_restaurants') \
    .option('user', 'student') \
    .option('password', 'de-student') \
    .load()

# Шаг — Джойнить данные из Kafka и Postgres по restaurant_id, добавлять столбец времени создания триггера

result_df = filtered_read_stream_df \
    .join(subscribers_restaurant_df, "restaurant_id") \
    .withColumn("trigger_datetime_created", lit(current_timestamp_utc))

# запускаем стриминг
query = result_df.writeStream \
    .foreachBatch(foreach_batch_function) \
    .outputMode("update") \
    .start()

query.awaitTermination()


In [None]:
# Отправка тестового сообщения
echo 'key:{"restaurant_id": "123e4567-e89b-12d3-a456-426614174000","adv_campaign_id": "123e4567-e89b-12d3-a456-426614174003","adv_campaign_content": "first campaign","adv_campaign_owner": "Ivanov Ivan Ivanovich","adv_campaign_owner_contact": "iiivanov@restaurant.ru","adv_campaign_datetime_start": 1659203516,"adv_campaign_datetime_end": 2659207116,"datetime_created": 1659131516}' | \
kafkacat -b rc1b-2erh7b35n4j4v869.mdb.yandexcloud.net:9091 \
-X security.protocol=SASL_SSL \
-X sasl.mechanisms=SCRAM-SHA-512 \
-X sasl.username="student" \
-X sasl.password="de-student" \
-X ssl.ca.location="/ssh_private_key.pem" \
-t your_username_in \
-K: \
-P