## Load Data

In [0]:
from pyspark.sql.functions import col

customers_df = spark.read.csv(path="/Volumes/quickstart_catalog/quickstart_schema/my_dataset/dataset/customer_insights_db/customers.csv",
                              header=True,
                              inferSchema=True)
customers_df.printSchema()
customers_df.display()
###
products_df = spark.read.csv(path="/Volumes/quickstart_catalog/quickstart_schema/my_dataset/dataset/customer_insights_db/products.csv",
                              header=True,
                              inferSchema=True)
products_df.printSchema()
products_df.display()
###
transactions_df = spark.read.csv(path="/Volumes/quickstart_catalog/quickstart_schema/my_dataset/dataset/customer_insights_db/transactions.csv",
                              header=True,
                              inferSchema=True)
transactions_df.printSchema()
transactions_df.display()


## Task 1 - Data Cleaning & Preparation 

### Problem 1
The dataset contains invalid or inconsistent data, such as emails with @@, missing dates of birth (dob), and NULL transaction dates.
You need to build a data quality pipeline to:
Correct malformed emails.


In [0]:
from pyspark.sql.functions import regexp_replace 
customers_df_email = customers_df.withColumn("email",regexp_replace(col("email"), "@@", "@"))
customers_df_email.display()

### Problem 2
Replace missing discount values with 0.


In [0]:
transactions_df_discount = transactions_df.na.fill({"discount": 0.0})
transactions_df_discount.display()

### Problem 3
Handle missing dob by flagging customers for manual enrichment.



In [0]:
from pyspark.sql.functions import lit, when

customers_df.withColumn(
    "Customer_without_dob",
    when(col("dob").isNull(), lit("Unknown dob")).otherwise(lit("dob present"))
).display()

### Problem 4
Flag transactions with missing txn_date as unprocessed.

In [0]:
transactions_df.withColumn(
    "trasaction_processed",
    when(col("txn_date").isNotNull(), lit("processed")).otherwise(lit("not processed")),
).display()

## Task 2 - Customer Retention Analysis

### Problem 1
Management wants to understand repeat customer behavior.


In [0]:
transactions_df.groupBy(col("customer_id")).count().orderBy(col("count").desc()).display()

### Problem 2
Identify customers who have made more than one purchase and calculate their contribution to overall revenue.

In [0]:
from pyspark.sql.functions import sum, count

txn_customers_df = transactions_df.join(customers_df, on="customer_id", how="inner")
joined_df = txn_customers_df.join(products_df, on="product_id", how="inner")
customers_more_than_one_purchase = (
    joined_df.groupBy("customer_id").count().filter(col("count") > 1)
)
customers_more_than_one_purchase.display()
txn_revenue = joined_df.na.fill({"discount": 0.0}).withColumn(
    "revenue", (col("price") * col("quantity")) * (1 - col("discount"))
)
txn_revenue.select(col("customer_id"), col("txn_id"), col("revenue")).display()
txn_revenue.select(col("customer_id"), col("txn_id"), col("revenue")).groupBy(
    "customer_id"
).agg(
    sum(col("revenue")).alias("total_revenue"), count("txn_id").alias("txn_count")
).filter(
    col("txn_count") > 1
).display()

### Problem 3
Provide insights on which cities have the highest concentration of repeat customers.

In [0]:
repeat_cus_city_df = (
    joined_df.groupBy(col("customer_id"), col("city"))
    .count()
    .withColumn("Repeat_customer", when(col("count") > 1, 1).otherwise(0))
)
repeat_cus_city_df.groupBy(col("city")).agg(
    sum(col("Repeat_customer")).alias("repeat_customer_concentration")
).display()

## Task 3 - Revenue & Profitability by Category

### Problem 1
product categories drive the most revenue

In [0]:
product_categories_by_revenue = (
    txn_revenue.groupBy(col("category"))
    .agg(sum(col("revenue")).alias("total_revenue_by_category"))
    .sort(col("total_revenue_by_category"), ascending=False)
)
product_categories_by_revenue.display()

### Problem 2
Compute estimated profit, using a margin:
Electronics: 10%
Apparel: 20%
Kitchen: 30%

In [0]:
product_category_profit_margin = product_categories_by_revenue.withColumn(
    "profit",
    when(col("category") == "electronics", col("total_revenue_by_category") * 0.1)
    .when(col("category") == "Apparel", col("total_revenue_by_category") * 0.2)
    .otherwise(col("total_revenue_by_category") * 0.3),
)
product_category_profit_margin.display()

### Problem 3
Provide a summary showing total sales amount, total profit, and average discount by category.

In [0]:
from pyspark.sql.functions import avg ,round

summary_by_category = (
    txn_revenue.groupBy(col("category"))
    .agg(
        sum(col("revenue")).alias("total_sales"),
        round(avg(col("discount")),2).alias("average_discount"),
    )
    .join(product_category_profit_margin, on="category", how="inner")
)
summary_by_category.select("category", "total_sales", "average_discount", "profit").display()

## Task 4 - Discount Effectiveness

## Task 6 - Inactive Customers

### Problem 1 
Identify customers who signed up over a year ago but have made only one purchase.

In [0]:
from pyspark.sql.functions import current_date, datediff

inactive_cust = joined_df.filter(datediff(current_date(), col("signup_date")) > 365).groupBy(
    col("customer_id")
).agg(count(col("txn_id")).alias("purchase_count")).filter(
    col("purchase_count") == 1
)

inactive_customers = customers_df.join(inactive_cust,on="customer_id",how="inner")
inactive_customers.display()

### Problem 2
Provide a list of these customers to the marketing team for targeted campaigns.

In [0]:
targated_customers = [row["name"] for row in inactive_customers.select(col("name")).collect()]
print(targated_customers)
inactive_customers.write.mode("overwrite").option("header",True).csv("/Volumes/quickstart_catalog/quickstart_schema/my_dataset/dataset/customer_insights_db/inactive_customers.csv")
