In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, dayofweek, avg, sum, asc

In [None]:
spark = SparkSession.builder.appName("SundaySalesAnalysis").getOrCreate()

In [None]:
fact_internet_sales = spark.read.csv(r"D:\projects\bi_and_ai_group\lesson 11\data\adv\fact_internet_sales.csv", header=True, inferSchema=True)
dim_product = spark.read.csv(r"D:\projects\bi_and_ai_group\lesson 11\data\adv\dim_product.csv", header=True, inferSchema=True)
dim_customer = spark.read.csv(r"D:\projects\bi_and_ai_group\lesson 11\data\adv\dim_customer.csv", header=True, inferSchema=True)

In [None]:
sales_with_details = fact_internet_sales \
    .join(dim_product, fact_internet_sales["ProductKey"] == dim_product["ProductKey"], "inner") \
    .join(dim_customer, fact_internet_sales["CustomerKey"] == dim_customer["CustomerKey"], "inner")
    

In [None]:
filtered_sales = sales_with_details.filter(
    (dayofweek("OrderDate") == 1) &
    (col("Color") == "Silver") &
    (col("Size").isNotNull()) &
    (col("Weight") > 10) &
    (col("YearlyIncome") > 100000.0) &
    (col("NumberChildrenAtHome") > 1)
)

In [None]:
aggregated_sales = filtered_sales.groupBy("CustomerKey", "FirstName").agg(
    sum("TaxAmt").alias("TotalTaxAmt"),
    avg("SalesAmount").alias("AvgSalesAmount"),
    avg("TotalProductCost").alias("AvgTotalProductCost")
)

In [None]:
final_result = aggregated_sales.sort(asc("FirstName")).drop("CustomerKey")

In [None]:
final_result.show()