Условие: есть набор данных о продажах продуктов с информацией о дате продаж, категории продукта, количестве и выручке от продаж.

Используя Apache Spark, загрузите предоставленный набор данных в DataFrame (пример данных ниже).

("2023-11-20", "Electronics", 100, 12000),

("2023-11-21", "Electronics", 110, 13000),

("2023-11-22", "Electronics", 105, 12500),

("2023-11-20", "Clothing", 300, 15000),

("2023-11-21", "Clothing", 280, 14000),

("2023-11-22", "Clothing", 320, 16000),

("2023-11-20", "Books", 150, 9000),

("2023-11-21", "Books", 200, 12000),

("2023-11-22", "Books", 180, 10000)

Столбцы: "date", "category", "quantity", "revenue".

С использованием оконных функций, рассчитайте среднее выручки от продаж для каждой категории продукта.
Примените операцию pivot для того, чтобы преобразовать полученные данные таким образом, чтобы в качестве строк были категории продуктов, в качестве столбцов были дни, а значениями были средние значения выручки от продаж за соответствующий день

In [3]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=d7ad92f2ffa92590f232048f080876267bd625398c9068805624bee9f3489335
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [10]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import sum, avg, when, max, day, col, mean
import pyspark.sql.functions as f
import pandas as pd

In [6]:
# Create a Spark session
spark = SparkSession.builder.appName("purchase_analysis").getOrCreate()

# Create dataframe
purchases_df = spark.createDataFrame([
  ("2023-11-20", "Electronics", 100, 12000),
  ("2023-11-21", "Electronics", 110, 13000),
  ("2023-11-22", "Electronics", 105, 12500),
  ("2023-11-20", "Clothing", 300, 15000),
  ("2023-11-21", "Clothing", 280, 14000),
  ("2023-11-22", "Clothing", 320, 16000),
  ("2023-11-20", "Books", 150, 9000),
  ("2023-11-21", "Books", 200, 12000),
  ("2023-11-22", "Books", 180, 10000)
], ["date", "category", "quantity", "revenue"])

# Result of average revenue per good category
result_avarage_rev_per_category = purchases_df.groupBy("category").agg(avg("revenue").alias("avg_rev_per_category"))
result_avarage_rev_per_category.show()


+-----------+--------------------+
|   category|avg_rev_per_category|
+-----------+--------------------+
|Electronics|             12500.0|
|   Clothing|             15000.0|
|      Books|  10333.333333333334|
+-----------+--------------------+



In [20]:
# Result of average revenue per good category with using Window functions
windowSpec = Window.partitionBy("category")
result_avarage_rev_per_category_window_func = purchases_df.withColumn("avg_rev_per_category", avg(col("revenue")).over(windowSpec)).show()

+----------+-----------+--------+-------+--------------------+
|      date|   category|quantity|revenue|avg_rev_per_category|
+----------+-----------+--------+-------+--------------------+
|2023-11-20|      Books|     150|   9000|  10333.333333333334|
|2023-11-21|      Books|     200|  12000|  10333.333333333334|
|2023-11-22|      Books|     180|  10000|  10333.333333333334|
|2023-11-20|   Clothing|     300|  15000|             15000.0|
|2023-11-21|   Clothing|     280|  14000|             15000.0|
|2023-11-22|   Clothing|     320|  16000|             15000.0|
|2023-11-20|Electronics|     100|  12000|             12500.0|
|2023-11-21|Electronics|     110|  13000|             12500.0|
|2023-11-22|Electronics|     105|  12500|             12500.0|
+----------+-----------+--------+-------+--------------------+



In [27]:
# Result of average revenue per good category and days
purchases_df = purchases_df.withColumn("order_day", day(purchases_df["date"]))
result_avarage_rev_per_category_days = purchases_df.groupby(col("category"), col("order_day")).agg(mean(col("revenue")))
result_avarage_rev_per_category_days.show()


+-----------+---------+------------+
|   category|order_day|avg(revenue)|
+-----------+---------+------------+
|Electronics|       21|     13000.0|
|   Clothing|       20|     15000.0|
|Electronics|       22|     12500.0|
|Electronics|       20|     12000.0|
|      Books|       21|     12000.0|
|   Clothing|       22|     16000.0|
|   Clothing|       21|     14000.0|
|      Books|       20|      9000.0|
|      Books|       22|     10000.0|
+-----------+---------+------------+



In [33]:
# Result of average revenue per good category and days with using Window functions
purchases_df = purchases_df.withColumn("order_day", day(purchases_df["date"]))
windowSpec1 = Window.partitionBy("category", "order_day")
result_avarage_rev_per_category_days_window_func = purchases_df.withColumn("avg_rev_per_category", mean(col("revenue")).over(windowSpec1))
result_with_separate_columns = result_avarage_rev_per_category_days_window_func.select(col("category"), col("order_day"), col("avg_rev_per_category")).show()


+-----------+---------+--------------------+
|   category|order_day|avg_rev_per_category|
+-----------+---------+--------------------+
|      Books|       20|              9000.0|
|      Books|       21|             12000.0|
|      Books|       22|             10000.0|
|   Clothing|       20|             15000.0|
|   Clothing|       21|             14000.0|
|   Clothing|       22|             16000.0|
|Electronics|       20|             12000.0|
|Electronics|       21|             13000.0|
|Electronics|       22|             12500.0|
+-----------+---------+--------------------+



In [30]:
# Result of average revenue per good category and days with pivot table
pivot_result = purchases_df.groupBy("category").pivot("order_day").agg(mean("revenue"))
pivot_result.show()

+-----------+-------+-------+-------+
|   category|     20|     21|     22|
+-----------+-------+-------+-------+
|Electronics|12000.0|13000.0|12500.0|
|   Clothing|15000.0|14000.0|16000.0|
|      Books| 9000.0|12000.0|10000.0|
+-----------+-------+-------+-------+

