# Prerequisites

In [0]:
customers_df = spark.read.csv(
    path="/Volumes/quickstart_catalog/quickstart_schema/my_dataset/customer_insights_db/customers.csv",
    header=True,
)
customers_df.display()

In [0]:
products_df = spark.read.csv(
    path="/Volumes/quickstart_catalog/quickstart_schema/my_dataset/customer_insights_db/products.csv",
    header=True,
)
products_df.display()

In [0]:
transactions_df = spark.read.csv(
    path="/Volumes/quickstart_catalog/quickstart_schema/my_dataset/customer_insights_db/transactions.csv",
    header=True,
)
transactions_df.display()

# 1. Data Cleaning & Preparation

## Correct malformed emails

In [0]:
from pyspark.sql.functions import regexp_replace, col
customers_df = customers_df.withColumn("email", regexp_replace(col("email"),"@{2,}", "@"))
customers_df.display()

## Replace missing discount values with 0

In [0]:
transactions_df = transactions_df.withColumn("discount", col("discount").cast("double"))
transactions_df = transactions_df.na.fill({"discount":0})
transactions_df.display()

In [0]:
transactions_df.display()

## Handle missing dob by flagging customers for manual enrichment

In [0]:
from pyspark.sql.functions import when
customers_df = customers_df.withColumn("DOB Check",when(col("dob").isNull(),"Fill DOB").otherwise("Pass"))
customers_df.display()

## Flag transactions with missing txn_date as unprocessed

In [0]:
from pyspark.sql.functions import when
transactions_df = transactions_df.withColumn("Transaction Check",when(col("txn_date").isNull(),"Unprocessed").otherwise("Processed"))
transactions_df.display()

# 2. Customer Retention Analysis

## Management wants to understand repeat customer behavior

In [0]:
transactions_df.display()

In [0]:
from pyspark.sql.functions import countDistinct
transactions_df.groupBy("customer_id").agg(countDistinct("txn_id").alias("purchase_count")).filter(col("purchase_count") > 1).sort(col("purchase_count").desc()).display()

## Identify customers who have made more than one purchase and calculate their contribution to overall revenue.

In [0]:
transactions_products_df = transactions_df.join(products_df, transactions_df["product_id"] == products_df["product_id"],"inner")

In [0]:
transactions_products_df.display()

In [0]:
from pyspark.sql.functions import countDistinct, sum
transactions_products_df.groupBy("customer_id").agg(countDistinct("txn_id").alias("purchase_count"), sum(col("price").cast("double")*col("quantity").cast("double") * (1 - col("discount"))).alias("revenue")).filter(col("purchase_count") > 1).sort(col("purchase_count").desc()).display()

## Provide insights on which cities have the highest concentration of repeat customers

In [0]:
from pyspark.sql.functions import countDistinct
transactions_df.join(customers_df, customers_df["customer_id"] == transactions_df["customer_id"], "inner").groupBy(transactions_df["customer_id"],customers_df["city"]).agg(countDistinct("txn_id").alias("purchase_count")).filter(col("purchase_count") > 1).sort(col("purchase_count").desc()).display()

# 3. Revenue & Profitability by Category

## The company wants to analyze which product categories drive the most revenue

In [0]:
transactions_products_df.display()

In [0]:
transactions_products_df.groupBy("category").agg(
    sum(
        col("price").cast("double")
        * col("quantity").cast("double")
        * (1 - col("discount"))
    ).alias("revenue")
).sort(col("revenue").desc()).display()

## Also compute estimated profit, using a margin:
Electronics: 10%
Apparel: 20%
Kitchen: 30%

In [0]:
revenue_df = transactions_products_df.groupBy("category").agg(
    sum(
        col("price").cast("double")
        * col("quantity").cast("double")
        * (1 - col("discount"))
    ).alias("revenue")
).sort(col("revenue").desc())

In [0]:
revenue_df.withColumn("estimated_profit",when(col("category")=="Electronics", 0.1*col("revenue")).when(col("category")=="Apparel", 0.2*col("revenue")).when(col("category")=="Kitchen", 0.3*col("revenue"))).display()

## Provide a summary showing total sales amount, total profit, and average discount by category.

In [0]:
from pyspark.sql.functions import avg, round

transactions_products_df.groupBy("category").agg(
    sum(
        col("price").cast("double")
        * col("quantity").cast("double")
        * (1 - col("discount"))
    ).alias("total_sales_amount"),
    round(
        avg(
            col("price").cast("double")
            * col("quantity").cast("double")
            * (col("discount"))
        ),
        2,
    ).alias("average discount"),
).sort(col("total_sales_amount").desc()).withColumn(
    "total_profit",
    when(col("category") == "Electronics", 0.1 * col("total_sales_amount"))
    .when(col("category") == "Apparel", 0.2 * col("total_sales_amount"))
    .when(col("category") == "Kitchen", 0.3 * col("total_sales_amount")),
).display()

# 6. Inactive Customers

## Identify customers who signed up over a year ago but have made only one purchase.

In [0]:
from pyspark.sql.functions import current_date, add_months, col

customers_df.filter(
    col("signup_date") < add_months(current_date(), -12)
).display()

In [0]:
from pyspark.sql.functions import countDistinct
transactions_df.join(customers_df.filter(
    col("signup_date") < add_months(current_date(), -12)
), customers_df["customer_id"] == transactions_df["customer_id"], "inner").groupBy(transactions_df["customer_id"],customers_df["city"]).agg(countDistinct("txn_id").alias("purchase_count")).filter(col("purchase_count") == 1).display()