In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

spark = SparkSession.builder \
    .appName("PetStoreETL") \
    .config("spark.driver.extraClassPath", 
            "/home/jovyan/work/postgresql-42.7.1.jar:/home/jovyan/work/clickhouse-jdbc-0.4.6-all.jar") \
    .config("spark.executor.extraClassPath", 
            "/home/jovyan/work/postgresql-42.7.1.jar:/home/jovyan/work/clickhouse-jdbc-0.4.6-all.jar") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

postgres_properties = {
    "url": "jdbc:postgresql://postgres_pet_store:5432/pet_store_lab_2",
    "driver": "org.postgresql.Driver",
    "user": "postgres",
    "password": "password"
}

df = spark.read.jdbc(
    url=postgres_properties["url"],
    table="mock_data",
    properties=postgres_properties
)
print("Успешно подключились к PostgreSQL")
print(f"Количество строк в исходной таблице: {df.count()}")

Успешно подключились к PostgreSQL
Количество строк в исходной таблице: 10000


In [2]:
try:
    tables = ["fact_sales", "dim_customer", "dim_seller", "dim_product", 
              "dim_store", "dim_supplier", "dim_date"]
    
    for table in tables:
        # Используем подзапрос, который вернет результат после выполнения DELETE
        spark.read.jdbc(
            url=postgres_properties["url"],
            table=f"(SELECT 1 as col1 FROM {table} WHERE 1=0) cleanup",
            properties=postgres_properties
        )
        
        # Теперь выполняем DELETE через write.jdbc
        empty_df = spark.createDataFrame([], "col1 INT")
        empty_df.write \
            .mode("overwrite") \
            .jdbc(url=postgres_properties["url"],
                 table=table,
                 properties=postgres_properties)
    
    print("Все таблицы успешно очищены")
except Exception as e:
    print(f"Ошибка при очистке таблиц: {str(e)}")

Все таблицы успешно очищены


In [3]:
# 1. Измерение Customer
print("\nОбработка измерения Customer...")
dim_customer_df = df.select(
    'sale_customer_id',
    'customer_first_name',
    'customer_last_name',
    'customer_age',
    'customer_email',
    'customer_country',
    'customer_postal_code',
    'customer_pet_type',
    'customer_pet_name',
    'customer_pet_breed'
).dropDuplicates(['sale_customer_id']) \
 .withColumnRenamed('sale_customer_id', 'customer_id')

print(f"Количество уникальных покупателей: {dim_customer_df.count()}")
dim_customer_df.write \
    .jdbc(url=postgres_properties["url"],
          table="dim_customer",
          mode="overwrite",
          properties=postgres_properties)
print("Измерение Customer успешно загружено")

# 2. Измерение Seller
print("\nОбработка измерения Seller...")
dim_seller_df = df.select(
    'sale_seller_id',
    'seller_first_name',
    'seller_last_name',
    'seller_email',
    'seller_country',
    'seller_postal_code'
).dropDuplicates(['sale_seller_id']) \
 .withColumnRenamed('sale_seller_id', 'seller_id')

print(f"Количество уникальных продавцов: {dim_seller_df.count()}")
dim_seller_df.write \
    .jdbc(url=postgres_properties["url"],
          table="dim_seller",
          mode="overwrite",
          properties=postgres_properties)
print("Измерение Seller успешно загружено")

# 3. Измерение Product
print("\nОбработка измерения Product...")
dim_product_df = df.select(
    'sale_product_id',
    'product_name',
    'product_category',
    'pet_category',
    'product_weight',
    'product_color',
    'product_size',
    'product_brand',
    'product_material',
    'product_description',
    'product_rating',
    'product_reviews',
    'product_release_date',
    'product_expiry_date'
).dropDuplicates(['sale_product_id']) \
 .withColumnRenamed('sale_product_id', 'product_id')

print(f"Количество уникальных продуктов: {dim_product_df.count()}")
dim_product_df.write \
    .jdbc(url=postgres_properties["url"],
          table="dim_product",
          mode="overwrite",
          properties=postgres_properties)
print("Измерение Product успешно загружено")

# 4. Измерение Store
print("\nОбработка измерения Store...")
# Создаем временное представление
df.createOrReplaceTempView("source_data")

store_dedup_sql = """
WITH store_locations AS (
    SELECT DISTINCT
        store_name,
        store_city,
        store_country,
        store_location,
        store_state,
        store_phone,
        store_email,
        ROW_NUMBER() OVER (
            PARTITION BY store_name
            ORDER BY store_email
        ) as rn
    FROM source_data
)
SELECT 
    store_name,
    store_location,
    store_city,
    store_state,
    store_country,
    store_phone,
    store_email
FROM store_locations
WHERE rn = 1
"""

dim_store_df = spark.sql(store_dedup_sql) \
    .withColumn('store_id', monotonically_increasing_id())

print(f"Количество уникальных магазинов: {dim_store_df.count()}")
dim_store_df.write \
    .jdbc(url=postgres_properties["url"],
          table="dim_store",
          mode="overwrite",
          properties=postgres_properties)
print("Измерение Store успешно загружено")

# 5. Измерение Supplier
print("\nОбработка измерения Supplier...")
supplier_dedup_sql = """
WITH supplier_locations AS (
    SELECT DISTINCT
        supplier_name,
        supplier_city,
        supplier_country,
        supplier_contact,
        supplier_email,
        supplier_phone,
        supplier_address,
        ROW_NUMBER() OVER (
            PARTITION BY supplier_name
            ORDER BY supplier_email
        ) as rn
    FROM source_data
)
SELECT 
    supplier_name,
    supplier_contact,
    supplier_email,
    supplier_phone,
    supplier_address,
    supplier_city,
    supplier_country
FROM supplier_locations
WHERE rn = 1
"""

dim_supplier_df = spark.sql(supplier_dedup_sql) \
    .withColumn('supplier_id', monotonically_increasing_id())

print(f"Количество уникальных поставщиков: {dim_supplier_df.count()}")
dim_supplier_df.write \
    .jdbc(url=postgres_properties["url"],
          table="dim_supplier",
          mode="overwrite",
          properties=postgres_properties)
print("Измерение Supplier успешно загружено")

# 6. Измерение Date
print("\nОбработка измерения Date...")
dim_date_df = df.select('sale_date').distinct() \
    .withColumn('date_id', col('sale_date')) \
    .withColumn('year', year('sale_date')) \
    .withColumn('month', month('sale_date')) \
    .withColumn('day', dayofmonth('sale_date')) \
    .withColumn('quarter', quarter('sale_date')) \
    .withColumn('is_weekend', dayofweek('sale_date').isin([1, 7]))

print(f"Количество уникальных дат: {dim_date_df.count()}")
dim_date_df.write \
    .jdbc(url=postgres_properties["url"],
          table="dim_date",
          mode="overwrite",
          properties=postgres_properties)
print("Измерение Date успешно загружено")

print("\nПроверка результатов дедупликации:")

print("\nАнализ измерения Store:")
print("Количество записей:", dim_store_df.count())
print("\nРаспределение по странам:")
dim_store_df.groupBy('store_country') \
    .count() \
    .orderBy(desc('count')) \
    .show(5)

print("\nАнализ измерения Supplier:")
print("Количество записей:", dim_supplier_df.count())
print("\nРаспределение по странам:")
dim_supplier_df.groupBy('supplier_country') \
    .count() \
    .orderBy(desc('count')) \
    .show(5)


Обработка измерения Customer...
Количество уникальных покупателей: 1000
Измерение Customer успешно загружено

Обработка измерения Seller...
Количество уникальных продавцов: 1000
Измерение Seller успешно загружено

Обработка измерения Product...
Количество уникальных продуктов: 1000
Измерение Product успешно загружено

Обработка измерения Store...
Количество уникальных магазинов: 383
Измерение Store успешно загружено

Обработка измерения Supplier...
Количество уникальных поставщиков: 383
Измерение Supplier успешно загружено

Обработка измерения Date...
Количество уникальных дат: 364
Измерение Date успешно загружено

Проверка результатов дедупликации:

Анализ измерения Store:
Количество записей: 383

Распределение по странам:
+-------------+-----+
|store_country|count|
+-------------+-----+
|        China|   73|
|    Indonesia|   39|
|       Russia|   24|
|  Philippines|   15|
|     Portugal|   15|
+-------------+-----+
only showing top 5 rows


Анализ измерения Supplier:
Количество зап

In [4]:
print("Создание таблицы фактов продаж...")

dim_store_df.createOrReplaceTempView("dim_store")
dim_supplier_df.createOrReplaceTempView("dim_supplier")
dim_date_df.createOrReplaceTempView("dim_date")

fact_sales_sql = """
WITH store_mapping AS (
    SELECT 
        s.store_name,
        s.store_city,
        s.store_country,
        s.store_id
    FROM dim_store s
),
supplier_mapping AS (
    SELECT 
        s.supplier_name,
        s.supplier_city,
        s.supplier_country,
        s.supplier_id
    FROM dim_supplier s
)
SELECT 
    f.sale_customer_id as customer_id,
    f.sale_seller_id as seller_id,
    f.sale_product_id as product_id,
    sm.store_id,
    sup.supplier_id,
    d.date_id,
    f.sale_quantity,
    f.sale_total_price,
    f.product_price as unit_price,
    f.product_rating,
    f.product_reviews
FROM source_data f
LEFT JOIN store_mapping sm ON 
    LOWER(TRIM(f.store_name)) = LOWER(TRIM(sm.store_name))
LEFT JOIN supplier_mapping sup ON 
    LOWER(TRIM(f.supplier_name)) = LOWER(TRIM(sup.supplier_name))
LEFT JOIN dim_date d ON 
    f.sale_date = d.date_id
"""

fact_sales_df = spark.sql(fact_sales_sql)

print("\nПроверка качества данных в таблице фактов:")
print(f"Общее количество строк: {fact_sales_df.count()}")

null_check = fact_sales_df.select([
    sum(when(col('customer_id').isNull(), 1).otherwise(0)).alias('null_customer_ids'),
    sum(when(col('seller_id').isNull(), 1).otherwise(0)).alias('null_seller_ids'),
    sum(when(col('product_id').isNull(), 1).otherwise(0)).alias('null_product_ids'),
    sum(when(col('store_id').isNull(), 1).otherwise(0)).alias('null_store_ids'),
    sum(when(col('supplier_id').isNull(), 1).otherwise(0)).alias('null_supplier_ids'),
    sum(when(col('date_id').isNull(), 1).otherwise(0)).alias('null_date_ids')
])
null_check.show()

fact_sales_df.write \
    .jdbc(url=postgres_properties["url"],
          table="fact_sales",
          mode="overwrite",
          properties=postgres_properties)
print("Таблица фактов успешно создана и сохранена")

print("\nПример данных из таблицы фактов:")
fact_sales_df.select(
    'customer_id',
    'seller_id',
    'product_id',
    'store_id',
    'supplier_id',
    'date_id',
    'sale_quantity',
    'sale_total_price',
    'unit_price'
).show(5)

print("\nСтатистика по продажам:")
fact_sales_df.select(
    count('*').alias('total_sales'),
    sum('sale_total_price').alias('total_revenue'),
    avg('sale_quantity').alias('avg_quantity_per_sale'),
    avg('unit_price').alias('avg_unit_price')
).show()

Создание таблицы фактов продаж...

Проверка качества данных в таблице фактов:
Общее количество строк: 10000
+-----------------+---------------+----------------+--------------+-----------------+-------------+
|null_customer_ids|null_seller_ids|null_product_ids|null_store_ids|null_supplier_ids|null_date_ids|
+-----------------+---------------+----------------+--------------+-----------------+-------------+
|                0|              0|               0|             0|                0|            0|
+-----------------+---------------+----------------+--------------+-----------------+-------------+

Таблица фактов успешно создана и сохранена

Пример данных из таблицы фактов:
+-----------+---------+----------+--------+-----------+----------+-------------+----------------+----------+
|customer_id|seller_id|product_id|store_id|supplier_id|   date_id|sale_quantity|sale_total_price|unit_price|
+-----------+---------+----------+--------+-----------+----------+-------------+----------------