In [3]:
import os
import logging

from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, IntegerType, DateType
from confluent_kafka.serialization import StringDeserializer
from confluent_kafka.schema_registry import SchemaRegistryClient as con
from confluent_kafka.serialization import StringSerializer
from schema_registry.client import SchemaRegistryClient, schema
from schema_registry.serializers import JsonMessageSerializer

In [4]:
# Настройка логирования
logging.getLogger().setLevel(logging.INFO)
logger = logging.getLogger("main")

In [5]:
KAFKA_BOOTSTRAP_SERVERS = "kafka111-mirror:9192" # kafka111-mirror:9192 localhost:9095
SR_URL = os.getenv("SCHEMA_REGISTRY_URL", "http://localhost:8081/") # localhost:8081 schema-registry:8081
HDFS_URI = "http://hadoop-namenode:9870" # "http://hadoop-namenode:9870"
KAFKA_ORDERS_TOPIC = os.getenv("KAFKA_ORDERS_TOPIC", "orders")

In [6]:
SR_client = SchemaRegistryClient(url=SR_URL) #сторонний клиент
#schema_registry_client = SchemaRegistryClient({'url': SR_URL})
json_deserializer = JsonMessageSerializer(SR_client)
string_deserializer = StringDeserializer('utf_8')

In [7]:
subject = f'{KAFKA_ORDERS_TOPIC}-value'
confluent_SR_client = con({"url": SR_URL})
latest = confluent_SR_client.get_latest_version(subject)
schema_file = latest.schema.schema_str
json_value_schema_str = schema.JsonSchema(schema_file)

In [8]:
# Спарк сессия
spark = SparkSession.builder \
        .appName("SparkApp") \
        .config("spark.executor.memory", "2g") \
        .config("spark.executor.cores", "1") \
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1") \
        .config("spark.hadoop.fs.defaultFS", "hdfs://hadoop-namenode:9000") \
        .config("spark.hadoop.dfs.namenode.http-address", "hadoop-namenode:9870") \
        .config("spark.hadoop.dfs.namenode.kerberos.principal", "dummy") \
        .config("spark.hadoop.mapreduce.job.user.name", "root") \
        .config("spark.hadoop.user.name", "root") \
        .config("spark.driver.extraJavaOptions", "-Divy.home=/tmp/ivy") \
        .config("spark.sql.catalogImplementation", "in-memory") \
        .config("spark.hadoop.fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem") \
        .config("spark.hadoop.fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem") \
        .getOrCreate()
logger.info("Spark session started.")

In [9]:
orders_path = "hdfs://hadoop-namenode:9000/data/orders"

In [10]:
# Вычитываем заказы
df = spark.read.option("multiline", "false").json(orders_path)

In [11]:
df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- email: string (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- brand: string (nullable = true)
 |    |    |-- category: string (nullable = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- index: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- product_id: string (nullable = true)
 |    |    |-- sku: string (nullable = true)
 |    |    |-- store_id: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- state: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [12]:
df.show(5)

+-----------+--------------------+--------------------+-------------------+--------+---------+--------------------+
|customer_id|               email|               items|         order_date|order_id|    state|           timestamp|
+-----------+--------------------+--------------------+-------------------+--------+---------+--------------------+
|      Alice| debra56@example.com|[{JKL, Аудио, Бес...|2025-08-01 16:33:58|  271372|Completed|2025-08-13T13:44:...|
|      Alice|markknight@exampl...|[{MNO, Электроник...|2025-05-27 00:11:02|  271344|Completed|2025-08-13T13:45:...|
|      Alice|   lking@example.com|[{YZA, Компьютеры...|2024-12-30 03:09:20|  271686|  Pending|2025-08-13T13:46:...|
|      Alice|esheppard@example...|[{PQR, Аудио, Пор...|2025-05-18 22:29:11|  268690|  Pending|2025-08-13T13:46:...|
|      Alice|vincentcharles@ex...|[{JKL, Аудио, Бес...|2024-12-26 11:39:19|  270715|Cancelled|2025-08-13T13:46:...|
+-----------+--------------------+--------------------+-----------------

In [13]:
# Разворачиваем items массив
df2 = df.withColumn("item", F.explode(F.col("items")))
df2.show(10) #, truncate=False

+-----------+--------------------+--------------------+-------------------+--------+---------+--------------------+--------------------+
|customer_id|               email|               items|         order_date|order_id|    state|           timestamp|                item|
+-----------+--------------------+--------------------+-------------------+--------+---------+--------------------+--------------------+
|      Alice| debra56@example.com|[{JKL, Аудио, Бес...|2025-08-01 16:33:58|  271372|Completed|2025-08-13T13:44:...|{JKL, Аудио, Бесп...|
|      Alice|markknight@exampl...|[{MNO, Электроник...|2025-05-27 00:11:02|  271344|Completed|2025-08-13T13:45:...|{MNO, Электроника...|
|      Alice|markknight@exampl...|[{MNO, Электроник...|2025-05-27 00:11:02|  271344|Completed|2025-08-13T13:45:...|{BCD, Хранение да...|
|      Alice|markknight@exampl...|[{MNO, Электроник...|2025-05-27 00:11:02|  271344|Completed|2025-08-13T13:45:...|{EFG, Мобильные т...|
|      Alice|   lking@example.com|[{YZA, 

In [14]:
# Более лучше разворачиваем
exploded_df2 = df2.withColumn("item", F.explode(F.col("items"))).select("*", "item.*").drop("item", "items")
exploded_df2.show(10)

+-----------+--------------------+-------------------+--------+---------+--------------------+-----+------------------+--------------------+--------+--------------------+----------+---------+---------+
|customer_id|               email|         order_date|order_id|    state|           timestamp|brand|          category|         description|   index|                name|product_id|      sku| store_id|
+-----------+--------------------+-------------------+--------+---------+--------------------+-----+------------------+--------------------+--------+--------------------+----------+---------+---------+
|      Alice| debra56@example.com|2025-08-01 16:33:58|  271372|Completed|2025-08-13T13:44:...|  JKL|             Аудио|Беспроводные науш...|products|Беспроводные науш...|     12349|JKL-12349|store_002|
|      Alice|markknight@exampl...|2025-05-27 00:11:02|  271344|Completed|2025-08-13T13:45:...|  MNO|       Электроника|Фитнес-трекер с м...|products|   Фитнес-трекер MNO|     12350|MNO-12350|s

In [15]:
# временные ренеймы
pre_analytics_df = exploded_df2.withColumnRenamed("timestamp", "timestamp_str").withColumnRenamed("product_id", "product_id_str")
pre_analytics_df.show(10)

+-----------+--------------------+-------------------+--------+---------+--------------------+-----+------------------+--------------------+--------+--------------------+--------------+---------+---------+
|customer_id|               email|         order_date|order_id|    state|       timestamp_str|brand|          category|         description|   index|                name|product_id_str|      sku| store_id|
+-----------+--------------------+-------------------+--------+---------+--------------------+-----+------------------+--------------------+--------+--------------------+--------------+---------+---------+
|      Alice| debra56@example.com|2025-08-01 16:33:58|  271372|Completed|2025-08-13T13:44:...|  JKL|             Аудио|Беспроводные науш...|products|Беспроводные науш...|         12349|JKL-12349|store_002|
|      Alice|markknight@exampl...|2025-05-27 00:11:02|  271344|Completed|2025-08-13T13:45:...|  MNO|       Электроника|Фитнес-трекер с м...|products|   Фитнес-трекер MNO|      

In [16]:
pre_analytics_df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- email: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- state: string (nullable = true)
 |-- timestamp_str: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- category: string (nullable = true)
 |-- description: string (nullable = true)
 |-- index: string (nullable = true)
 |-- name: string (nullable = true)
 |-- product_id_str: string (nullable = true)
 |-- sku: string (nullable = true)
 |-- store_id: string (nullable = true)



In [17]:
# Подготовка к аналитике
pre_analytics_df2 = pre_analytics_df.withColumn("timestamp", F.to_timestamp(F.col("timestamp_str"))) \
                                    .withColumn("product_id", F.col("product_id_str").cast(IntegerType())) \
                                    .drop("timestamp_str") \
                                    .drop("product_id_str")
pre_analytics_df2.show(10)

+-----------+--------------------+-------------------+--------+---------+-----+------------------+--------------------+--------+--------------------+---------+---------+--------------------+----------+
|customer_id|               email|         order_date|order_id|    state|brand|          category|         description|   index|                name|      sku| store_id|           timestamp|product_id|
+-----------+--------------------+-------------------+--------+---------+-----+------------------+--------------------+--------+--------------------+---------+---------+--------------------+----------+
|      Alice| debra56@example.com|2025-08-01 16:33:58|  271372|Completed|  JKL|             Аудио|Беспроводные науш...|products|Беспроводные науш...|JKL-12349|store_002|2025-08-13 13:44:...|     12349|
|      Alice|markknight@exampl...|2025-05-27 00:11:02|  271344|Completed|  MNO|       Электроника|Фитнес-трекер с м...|products|   Фитнес-трекер MNO|MNO-12350|store_003|2025-08-13 13:45:...|  

In [18]:
pre_analytics_df2.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- email: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- state: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- category: string (nullable = true)
 |-- description: string (nullable = true)
 |-- index: string (nullable = true)
 |-- name: string (nullable = true)
 |-- sku: string (nullable = true)
 |-- store_id: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- product_id: integer (nullable = true)



In [19]:
# Топ 5 покупателей по личеству купленных товаров
analytics_df = pre_analytics_df.groupBy("customer_id") \
        .agg(F.count("*").alias("items_purchased")) \
        .orderBy(F.col("items_purchased").desc())
analytics_df.show(5)

+-----------+---------------+
|customer_id|items_purchased|
+-----------+---------------+
|      Alice|          13222|
|      David|          12989|
|        Eva|          12811|
|    Charlie|          12466|
|        Bob|          12449|
+-----------+---------------+



In [20]:
analytics_schema = StructType([
    StructField("customer_id", StringType(), nullable=True),
    StructField("items_purchased", IntegerType(), nullable=True)])

In [21]:
empty_analytics_df = spark.createDataFrame([], schema=analytics_schema)
union_df=empty_analytics_df.union(analytics_df)
union_df2 = union_df.withColumn("items_purchased", F.col("items_purchased").cast(IntegerType()))
union_df2.show(5)
union_df2.printSchema()

+-----------+---------------+
|customer_id|items_purchased|
+-----------+---------------+
|      Alice|          13222|
|      David|          13005|
|        Eva|          12811|
|    Charlie|          12482|
|        Bob|          12453|
+-----------+---------------+

root
 |-- customer_id: string (nullable = true)
 |-- items_purchased: integer (nullable = true)



In [22]:
## schema-registry:8081
subject_a = 'analytic-topic-value'
confluent_SR_client = con({"url": SR_URL})
latest_a = confluent_SR_client.get_latest_version(subject_a)
schema_file_a = latest_a.schema.schema_str
json_value_schema_a_str = schema.JsonSchema(schema_file_a)
print(json_value_schema_a_str)

{'$schema': 'http://json-schema.org/draft-07/schema#', 'title': 'AnalisMessageSchema', 'type': 'object', 'properties': {'customer_id': {'type': 'string'}, 'items_purchased': {'type': 'integer'}}, 'required': ['customer_id', 'items_purchased']}


In [23]:
# Записываем аналитику в Kafka
analytics_df.selectExpr("to_json(struct(*)) AS value") \
    .write \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka111-mirror:9192") \
    .option("topic", "analytic-topic") \
    .option("value.serializer", "io.confluent.connect.json.JsonSchemaConverter") \
    .option("schema.registry.url", os.getenv("KAFKA_SCHEMA_REGISTRY", "localhost:8081")) \
    .save()

In [24]:
r_df = pre_analytics_df2.withColumnRenamed("customer_id", "user_id").withColumnRenamed("product_id", "product_id_tmp")
r_df.show(10)
r_df.printSchema()

+-------+--------------------+-------------------+--------+---------+-----+------------------+--------------------+--------+--------------------+---------+---------+--------------------+--------------+
|user_id|               email|         order_date|order_id|    state|brand|          category|         description|   index|                name|      sku| store_id|           timestamp|product_id_tmp|
+-------+--------------------+-------------------+--------+---------+-----+------------------+--------------------+--------+--------------------+---------+---------+--------------------+--------------+
|  Alice| debra56@example.com|2025-08-01 16:33:58|  271372|Completed|  JKL|             Аудио|Беспроводные науш...|products|Беспроводные науш...|JKL-12349|store_002|2025-08-13 13:44:...|         12349|
|  Alice|markknight@exampl...|2025-05-27 00:11:02|  271344|Completed|  MNO|       Электроника|Фитнес-трекер с м...|products|   Фитнес-трекер MNO|MNO-12350|store_003|2025-08-13 13:45:...|      

In [25]:
r_df2 = r_df.select("user_id", "product_id_tmp")
r_df2.show(5, truncate=False)
r_df2.printSchema()

+-------+--------------+
|user_id|product_id_tmp|
+-------+--------------+
|Alice  |12349         |
|Alice  |12350         |
|Alice  |12355         |
|Alice  |12356         |
|Alice  |12350         |
+-------+--------------+
only showing top 5 rows

root
 |-- user_id: string (nullable = true)
 |-- product_id_tmp: integer (nullable = true)



In [26]:
r_df3= r_df2.groupBy("user_id", "product_id_tmp") \
                .agg(
                    F.count("*").alias("product_cnt"))
r_df3.show(5,truncate=False)

+-------+--------------+-----------+
|user_id|product_id_tmp|product_cnt|
+-------+--------------+-----------+
|Alice  |12349         |1400       |
|Alice  |12356         |1489       |
|Alice  |12354         |1459       |
|Alice  |12353         |1401       |
|Alice  |12355         |1475       |
+-------+--------------+-----------+
only showing top 5 rows



In [27]:
w = Window.partitionBy("user_id").orderBy(F.desc("product_cnt"), F.asc("product_id_tmp"))

In [28]:
top3_per_user = r_df3.withColumn("rn", F.row_number().over(w)) \
                      .filter(F.col("rn") <= 3) \
                      .drop("rn") \
                      .orderBy("user_id", F.desc("product_cnt"))
top3_per_user.show()

+-------+--------------+-----------+
|user_id|product_id_tmp|product_cnt|
+-------+--------------+-----------+
|  Alice|         12350|       1531|
|  Alice|         12357|       1510|
|  Alice|         12351|       1494|
|    Bob|         12352|       1576|
|    Bob|         12353|       1426|
|    Bob|         12357|       1407|
|Charlie|         12354|       1549|
|Charlie|         12352|       1529|
|Charlie|         12353|       1440|
|  David|         12355|       1511|
|  David|         12357|       1489|
|  David|         12350|       1470|
|    Eva|         12353|       1527|
|    Eva|         12349|       1515|
|    Eva|         12350|       1459|
+-------+--------------+-----------+



In [29]:
rec_schema = StructType([
    StructField("user_id", StringType(), nullable=True),
    StructField("product_id", IntegerType(), nullable=True)])

In [30]:
out_df = top3_per_user.withColumnRenamed("product_id_tmp", "product_id").select("user_id", "product_id")
out_df.show(truncate=False)
out_df.printSchema()

+-------+----------+
|user_id|product_id|
+-------+----------+
|Alice  |12350     |
|Alice  |12357     |
|Alice  |12351     |
|Bob    |12352     |
|Bob    |12353     |
|Bob    |12357     |
|Charlie|12354     |
|Charlie|12352     |
|Charlie|12353     |
|David  |12355     |
|David  |12357     |
|David  |12350     |
|Eva    |12353     |
|Eva    |12349     |
|Eva    |12350     |
+-------+----------+

root
 |-- user_id: string (nullable = true)
 |-- product_id: integer (nullable = true)



In [31]:
jaas = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="your-password";'
# Записываем рекомендации в Kafka
out_df.selectExpr("to_json(struct(*)) AS value") \
    .write \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka111:9093") \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.sasl.jaas.config", jaas) \
    .option("kafka.ssl.truststore.location", "/etc/kafka/secrets/kafka111.truststore.jks") \
    .option("kafka.ssl.truststore.password", "your-password") \
    .option("kafka.ssl.truststore.type", "JKS") \
    .option("topic", "recommendations") \
    .save()