# Аналіз даних у PySpark — результати

## Імпорт бібліотек

In [1]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, when, sum as _sum, round, lit
from pyspark.sql.types import LongType, IntegerType, DoubleType
from pyspark.sql.functions import sum as spark_sum

## Створення сесії Spark

In [2]:
if 'spark' in locals():
    try:
        spark.stop()
    except Exception:
        pass
    
spark = (
    SparkSession.builder
    .appName("GoIT-PySpark-Homework-03")
    .config("spark.eventLog.gcMetrics.youngGenerationGarbageCollectors", "G1 Young Generation")
    .config("spark.eventLog.gcMetrics.oldGenerationGarbageCollectors", "G1 Old Generation,G1 Concurrent GC")
    .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/13 13:05:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark.sparkContext.setLogLevel("ERROR")

try:
    jvm = spark._jvm
    log_manager = jvm.org.apache.log4j.LogManager
    native_loader_logger = log_manager.getLogger("org.apache.hadoop.util.NativeCodeLoader")
    native_loader_logger.setLevel(jvm.org.apache.log4j.Level.ERROR)
except Exception as e:
    pass

## Завантаження даних

In [4]:
DATA_DIR = "../data"
users_raw = spark.read.csv(f"{DATA_DIR}/users.csv", header=True, inferSchema=True)
purchases_raw = spark.read.csv(f"{DATA_DIR}/purchases.csv", header=True, inferSchema=True)
products_raw = spark.read.csv(f"{DATA_DIR}/products.csv", header=True, inferSchema=True)

## Допоміжні функції

In [5]:
def normalize_blank_to_null(df: DataFrame, cols: list) -> DataFrame:
    """
    Normalize blank strings and nulls to None in specified columns of a DataFrame.
    Parameters:
        df (DataFrame): The input DataFrame.
        cols (list): List of column names to normalize.
    Returns:
        DataFrame: A new DataFrame with normalized columns.
    """
    d = df
    for c in cols:
        d = d.withColumn(c, when((col(c) == "") | col(c).isNull(), None).otherwise(col(c)))
    return d

## Очищення даних

In [6]:
users_req = ["user_id", "name", "age", "email"]
users_df = (
    normalize_blank_to_null(users_raw, users_req)
    .dropna(subset=users_req)
    .withColumn("user_id", col("user_id").cast(LongType()))
    .withColumn("age", col("age").cast(IntegerType()))
)

products_req = ["product_id", "product_name", "category", "price"]
products_df = (
    normalize_blank_to_null(products_raw, products_req)
    .dropna(subset=products_req)
    .withColumn("product_id", col("product_id").cast(LongType()))
    .withColumn("price", col("price").cast(DoubleType()))
)

purchases_req = ["purchase_id", "user_id", "product_id", "date", "quantity"]
purchases_df = (
    normalize_blank_to_null(purchases_raw, purchases_req)
    .dropna(subset=purchases_req)
    .withColumn("purchase_id", col("purchase_id").cast(LongType()))
    .withColumn("user_id", col("user_id").cast(LongType()))
    .withColumn("product_id", col("product_id").cast(LongType()))
    .withColumn("quantity", col("quantity").cast(IntegerType()))
)

print("Rows after cleaning:")
print("  users:", users_df.count())
print("  products:", products_df.count())
print("  purchases:", purchases_df.count())

Rows after cleaning:
  users: 95
  products: 47
  purchases: 195


## Підготовка: об'єднання та розрахунок суми покупки

In [7]:
full_df = (
    purchases_df
    .join(products_df.select("product_id", "category", "price"), on="product_id", how="inner")
    .join(users_df.select("user_id", "age"), on="user_id", how="inner")
    .withColumn("amount", col("quantity") * col("price"))
)
full_df.show(truncate=False)

+-------+----------+-----------+----------+--------+-----------+-----+---+------------------+
|user_id|product_id|purchase_id|date      |quantity|category   |price|age|amount            |
+-------+----------+-----------+----------+--------+-----------+-----+---+------------------+
|52     |9         |1          |2022-01-01|1       |Beauty     |6.0  |39 |6.0               |
|93     |37        |2          |2022-01-02|8       |Clothing   |6.0  |25 |48.0              |
|15     |33        |3          |2022-01-03|1       |Home       |9.4  |30 |9.4               |
|72     |42        |4          |2022-01-04|9       |Beauty     |9.1  |39 |81.89999999999999 |
|21     |24        |6          |2022-01-06|7       |Electronics|2.1  |37 |14.700000000000001|
|87     |32        |8          |2022-01-08|3       |Home       |8.8  |38 |26.400000000000002|
|75     |32        |9          |2022-01-09|2       |Home       |8.8  |40 |17.6              |
|75     |24        |10         |2022-01-10|9       |Electron

## Результати

### Загальна сума покупок за категоріями

In [8]:
total_by_category_df = full_df.groupBy("category").agg(_sum("amount").alias("total_spend")).orderBy(col("total_spend").desc())
total_by_category_df.show(truncate=False)

+-----------+------------------+
|category   |total_spend       |
+-----------+------------------+
|Sports     |1755.4999999999998|
|Home       |1438.8999999999999|
|Electronics|1141.8999999999999|
|Clothing   |696.1             |
|Beauty     |441.69999999999993|
+-----------+------------------+



### Сума покупок за категоріями для віку 18–25 включно

In [9]:
age_filtered_df = full_df.where((col("age") >= 18) & (col("age") <= 25))
sum_18_25_by_category_df = age_filtered_df.groupBy("category").agg(_sum("amount").alias("spend_18_25")).orderBy(col("spend_18_25").desc())
sum_18_25_by_category_df.show(truncate=False)

+-----------+------------------+
|category   |spend_18_25       |
+-----------+------------------+
|Home       |361.1             |
|Sports     |310.49999999999994|
|Electronics|249.6             |
|Clothing   |245.0             |
|Beauty     |41.400000000000006|
+-----------+------------------+



### Частка витрат 18–25 від сумарних витрат

In [10]:
share_18_25_df = sum_18_25_by_category_df.crossJoin(
    sum_18_25_by_category_df.agg(spark_sum("spend_18_25").alias("total_18_25"))
).withColumn("share", col("spend_18_25") / col("total_18_25"))

share_18_25_df.select("category", "spend_18_25", "total_18_25", "share"
                      ).orderBy(col("share").desc()
                                ).show(truncate=False)

+-----------+------------------+-----------+--------------------+
|category   |spend_18_25       |total_18_25|share               |
+-----------+------------------+-----------+--------------------+
|Home       |361.1             |1207.6     |0.2990228552500829  |
|Sports     |310.49999999999994|1207.6     |0.25712156343159986 |
|Electronics|249.6             |1207.6     |0.20669095727061942 |
|Clothing   |245.0             |1207.6     |0.20288174892348462 |
|Beauty     |41.400000000000006|1207.6     |0.034282875124213325|
+-----------+------------------+-----------+--------------------+



### Топ-3 категорії за відсотком витрат (18–25)

In [11]:
top3_18_25_pct_df = (
    share_18_25_df
    .withColumn("percent", round(col("share") * lit(100.0), 2))
    .select("category", "percent")
    .orderBy(col("percent").desc())
    .limit(3)
)

top3_18_25_pct_df.show(truncate=False)

+-----------+-------+
|category   |percent|
+-----------+-------+
|Home       |29.9   |
|Sports     |25.71  |
|Electronics|20.67  |
+-----------+-------+

