In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("ProductCategoryAnalysis").getOrCreate()

In [2]:
df_products = spark.read.option("multiline", True).json("data/products.json")
df_categories = spark.read.option("multiline", True).json("data/categories.json")
df_productCategories = spark.read.option("multiline", True).json("data/productCategories.json")

print("Продукты")
df_products.show()

print("Категории")
df_products.show()

print("Связка ключей продуктов и категорий")
df_products.show()

Продукты
+---+-----------------+
| id|             name|
+---+-----------------+
|  1|          Аспирин|
|  2|      Парацетамол|
|  3|Ноутбук Lenovo X1|
|  4| Smart TV Samsung|
|  5|           Яблоко|
|  6|        Картофель|
|  7|    Кофе зерновой|
|  8|    Шоколад Milka|
|  9|  Ручка шариковая|
| 10|       Принтер HP|
+---+-----------------+

Категории
+---+-----------------+
| id|             name|
+---+-----------------+
|  1|          Аспирин|
|  2|      Парацетамол|
|  3|Ноутбук Lenovo X1|
|  4| Smart TV Samsung|
|  5|           Яблоко|
|  6|        Картофель|
|  7|    Кофе зерновой|
|  8|    Шоколад Milka|
|  9|  Ручка шариковая|
| 10|       Принтер HP|
+---+-----------------+

Связка ключей продуктов и категорий
+---+-----------------+
| id|             name|
+---+-----------------+
|  1|          Аспирин|
|  2|      Парацетамол|
|  3|Ноутбук Lenovo X1|
|  4| Smart TV Samsung|
|  5|           Яблоко|
|  6|        Картофель|
|  7|    Кофе зерновой|
|  8|    Шоколад Milka|
|  9|  

In [3]:
def get_product_categories(df_products, df_categories, df_productCategories):
    result = (
                df_products.alias("p")
                    .join(df_productCategories.alias("pc"), col("p.id") == col("pc.product_id"), "left")
                    .join(df_categories.alias("c"), col("pc.category_id") == col("c.id"), "left")
                    .where(col("p.id").isNotNull())
                    .select(
                        col("p.name").alias("product_name"),
                        col("c.name").alias("category_name")
                    )
    )
        
    return result


In [4]:
result = get_product_categories(df_products, df_categories, df_productCategories)

print("Продукты и их категории")
result.show(truncate=False)

Продукты и их категории
+-----------------+----------------+
|product_name     |category_name   |
+-----------------+----------------+
|Аспирин          |Медикаменты     |
|Парацетамол      |Медикаменты     |
|Ноутбук Lenovo X1|Офисная техника |
|Ноутбук Lenovo X1|Электроника     |
|Smart TV Samsung |Бытовая техника |
|Smart TV Samsung |Электроника     |
|Яблоко           |NULL            |
|Картофель        |Овощи           |
|Картофель        |Продукты питания|
|Кофе зерновой    |Напитки         |
|Кофе зерновой    |Продукты питания|
|Шоколад Milka    |Бакалея         |
|Шоколад Milka    |Продукты питания|
|Ручка шариковая  |Канцелярия      |
|Принтер HP       |Офисная техника |
|Принтер HP       |Электроника     |
+-----------------+----------------+



In [5]:
spark.stop()