#### Шаг 1: Генерация данных (E-commerce logs)

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

spark = SparkSession.builder.appName("Transformations_Lab").getOrCreate()

# Генерируем данные о покупках
data = [
    ("2023-01-01", "iPhone", "US", 1200.0, 1),
    ("2023-01-01", "Samsung", "KR", 900.0, 2),
    ("2023-01-01", "Xiaomi", "CN", 400.0, 5),
    ("2023-01-02", "iPhone", "US", 1200.0, 1),
    ("2023-01-02", "Samsung", "US", 900.0, 1), # Samsung продан в US
    ("2023-01-03", "Nokia", "FI", 50.0, 10),
    ("2023-01-03", "Xiaomi", "CN", 400.0, 0)  # Возврат (qty=0) или ошибка
]

schema = StructType([
    StructField("date", StringType(), True),
    StructField("product", StringType(), True),
    StructField("country", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("qty", IntegerType(), True)
])

df = spark.createDataFrame(data, schema=schema)
df.show()

+----------+-------+-------+------+---+
|      date|product|country| price|qty|
+----------+-------+-------+------+---+
|2023-01-01| iPhone|     US|1200.0|  1|
|2023-01-01|Samsung|     KR| 900.0|  2|
|2023-01-01| Xiaomi|     CN| 400.0|  5|
|2023-01-02| iPhone|     US|1200.0|  1|
|2023-01-02|Samsung|     US| 900.0|  1|
|2023-01-03|  Nokia|     FI|  50.0| 10|
|2023-01-03| Xiaomi|     CN| 400.0|  0|
+----------+-------+-------+------+---+



#### Шаг 2: Узкие трансформации (Очистка и Обогащение)
Задача: Оставить только дорогие продажи (price > 100) и посчитать общую сумму (total = price * qty).

In [2]:
# Цепочка (Chaining) - Best Practice
# Читается сверху вниз. Spark оптимизирует это в один проход.

df_clean = df \
    .filter( (F.col("qty") > 0) & (F.col("price") > 100) ) \
    .withColumn("total_amt", F.col("price") * F.col("qty")) \
    .select("date", "country", "product", "total_amt")

# .filter(...) - Narrow (фильтруем строки на месте)
# .withColumn(...) - Narrow (считаем математику на месте)
# .select(...) - Narrow (выбираем колонки на месте)

df_clean.show()

# Посмотрим план выполнения (Physical Plan)
df_clean.explain()
# Ищите ключевые слова: Filter, Project (это Select). Никакого "Exchange".

+----------+-------+-------+---------+
|      date|country|product|total_amt|
+----------+-------+-------+---------+
|2023-01-01|     US| iPhone|   1200.0|
|2023-01-01|     KR|Samsung|   1800.0|
|2023-01-01|     CN| Xiaomi|   2000.0|
|2023-01-02|     US| iPhone|   1200.0|
|2023-01-02|     US|Samsung|    900.0|
+----------+-------+-------+---------+

== Physical Plan ==
*(1) Project [date#0, country#2, product#1, (price#3 * cast(qty#4 as double)) AS total_amt#31]
+- *(1) Filter ((isnotnull(qty#4) AND isnotnull(price#3)) AND ((qty#4 > 0) AND (price#3 > 100.0)))
   +- *(1) Scan ExistingRDD[date#0,product#1,country#2,price#3,qty#4]




#### Шаг 3: Широкие трансформации (Агрегация)
Задача: Посчитать выручку по странам.

In [4]:
# groupBy вызывает Shuffle
df_stats = df_clean \
    .groupBy("country") \
    .agg(
        F.sum("total_amt").alias("total_revenue"),
        F.count("product").alias("sales_count"),
        F.max("total_amt").alias("max_check")
    ) \
    .orderBy(F.col("total_revenue").desc()) # Сортировка - тоже Shuffle!

df_stats.show()

# Посмотрим план
df_stats.explain()
# Вы увидите: "Exchange hashpartitioning" - это Shuffle.

+-------+-------------+-----------+---------+
|country|total_revenue|sales_count|max_check|
+-------+-------------+-----------+---------+
|     US|       3300.0|          3|   1200.0|
|     CN|       2000.0|          1|   2000.0|
|     KR|       1800.0|          1|   1800.0|
+-------+-------------+-----------+---------+

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [total_revenue#64 DESC NULLS LAST], true, 0
   +- Exchange rangepartitioning(total_revenue#64 DESC NULLS LAST, 8), ENSURE_REQUIREMENTS, [plan_id=125]
      +- HashAggregate(keys=[country#2], functions=[sum(total_amt#31), count(product#1), max(total_amt#31)])
         +- Exchange hashpartitioning(country#2, 8), ENSURE_REQUIREMENTS, [plan_id=122]
            +- HashAggregate(keys=[country#2], functions=[partial_sum(total_amt#31), partial_count(product#1), partial_max(total_amt#31)])
               +- Project [country#2, product#1, (price#3 * cast(qty#4 as double)) AS total_amt#31]
                  +- Filter

In [5]:
spark.stop()