## Rohan Gore 
###### rmg9725@nyu.edu
###### Collaborators: PerplexityAI
###

In [20]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, row_number, count as count_, count, sum, avg, datediff, lit, udf, max as spark_max, round as round_
from pyspark.sql.types import StringType
from pyspark.sql import SparkSession
from pyspark.sql.window import Window


# Initialize Spark session
spark = SparkSession.builder.appName("Customer Value Segmentation").getOrCreate()

# Load datasets
customers_df = spark.read.json("customers.json")
orders_df = spark.read.json("orders.json")
order_items_df = spark.read.json("order_items.json")

## Question 3.1

In [21]:

# Calculate recency (days since last purchase)
current_date = lit("2025-04-13")
recency_df = orders_df.filter(col("status") == "Completed") \
    .groupBy("customer_id") \
    .agg(datediff(current_date, spark_max("order_date")).alias("recency_days"))

In [14]:


# Calculate frequency (number of completed orders)
frequency_df = orders_df.filter(col("status") == "Completed") \
    .groupBy("customer_id") \
    .agg(count("order_id").alias("frequency"))



In [15]:

# Calculate monetary value (average order value)
order_totals = order_items_df.groupBy("order_id") \
    .agg(sum(col("quantity") * col("unit_price")).alias("order_total"))
avg_value_df = orders_df.filter(col("status") == "Completed") \
    .join(order_totals, "order_id") \
    .groupBy("customer_id") \
    .agg(avg("order_total").alias("avg_value"))




In [16]:

avg_value_df = avg_value_df.withColumn("avg_value", round_("avg_value", 2))

# rmf value
rfm_df = recency_df \
    .join(frequency_df, "customer_id") \
    .join(avg_value_df, "customer_id")

In [17]:
# given function redefined

def segment_customer(recency_days, frequency, avg_value):
    
    recency_score = 3 if recency_days <= 30 else (2 if recency_days <= 90 else 1)
    frequency_score = 3 if frequency >= 3 else (2 if frequency >= 2 else 1)
    value_score = 3 if avg_value >= 200 else (2 if avg_value >= 100 else 1)
    
    
    total_score = recency_score + frequency_score + value_score
    
    if total_score >= 8:
        return "High Value"
    elif total_score >= 5:
        return "Medium Value"
    else:
        return "Low Value"

In [18]:
segment_customer_udf = udf(segment_customer, StringType())

customer_segments = rfm_df \
    .withColumn("segment", segment_customer_udf(col("recency_days"), col("frequency"), col("avg_value")))

customer_segments_with_details = customer_segments \
    .join(customers_df, "customer_id") \
    .select("customer_id", "name", "tier", "segment")

# Watch results
print("Customer Value Segmentation:")
customer_segments_with_details.show()




Customer Value Segmentation:
+-----------+---------------+------+------------+
|customer_id|           name|  tier|     segment|
+-----------+---------------+------+------------+
|          7| James Anderson|  Gold|Medium Value|
|          1|     John Smith|  Gold|Medium Value|
|         10| Jennifer Clark|  Gold|Medium Value|
|          3|   Robert Brown|Bronze|Medium Value|
|          8|Patricia Thomas|Silver|Medium Value|
|          2|   Mary Johnson|Silver|Medium Value|
|          4|    Linda Davis|  Gold|   Low Value|
+-----------+---------------+------+------------+



In [19]:

# Count customers in each segment
segment_counts = customer_segments.groupBy("segment").count().orderBy("segment")
print("Customer Count by Segment:")
segment_counts.show()

Customer Count by Segment:
+------------+-----+
|     segment|count|
+------------+-----+
|   Low Value|    1|
|Medium Value|    6|
+------------+-----+



## Question 3.2

In [9]:

# Load datasets
customers_df = spark.read.json("customers.json")
orders_df = spark.read.json("orders.json")

# Explode the categories array to get individual categories
categories_df = orders_df.filter(col("status") != "Cancelled") \
    .select("order_id", "customer_id", explode(col("categories")).alias("category"))

# Count category occurrences by customer
category_counts_by_customer = categories_df \
    .groupBy("customer_id", "category") \
    .agg(count_("*").alias("purchase_count"))

# Rank categories for each customer by purchase count
window_spec = Window.partitionBy("customer_id").orderBy(col("purchase_count").desc())
ranked_categories = category_counts_by_customer \
    .withColumn("rank", row_number().over(window_spec))

# Filter to get top 3 categories for each customer
top_categories = ranked_categories \
    .filter(col("rank") <= 3) \
    .orderBy("customer_id", "rank")

# Join with customer information
top_categories_with_details = top_categories \
    .join(customers_df, "customer_id") \
    .select("customer_id", "name", "category", "purchase_count", "rank")

# Show results
print("Top 3 Categories for Each Customer:")
top_categories_with_details.show()


Top 3 Categories for Each Customer:
+-----------+---------------+-----------+--------------+----+
|customer_id|           name|   category|purchase_count|rank|
+-----------+---------------+-----------+--------------+----+
|          1|     John Smith|electronics|             2|   1|
|          1|     John Smith|       home|             2|   2|
|          1|     John Smith|     garden|             1|   3|
|          2|   Mary Johnson|   clothing|             1|   1|
|          2|   Mary Johnson|accessories|             1|   2|
|          2|   Mary Johnson|     health|             1|   3|
|          3|   Robert Brown|       home|             1|   1|
|          3|   Robert Brown|electronics|             1|   2|
|          3|   Robert Brown|      books|             1|   3|
|          4|    Linda Davis|   clothing|             1|   1|
|          4|    Linda Davis|     sports|             1|   2|
|          5| Michael Wilson| stationery|             1|   1|
|          5| Michael Wilson|     

In [10]:

print("\nDetailed Top Categories by Customer:")
customers = customers_df.select("customer_id", "name").distinct().collect()

for customer in customers:
    customer_id = customer["customer_id"]
    customer_name = customer["name"]
    
    print(f"\nCustomer: {customer_name} (ID: {customer_id})")
    print("Top Categories:")
    
    top_cats = top_categories.filter(col("customer_id") == customer_id).collect()
    
    if top_cats:
        for cat in top_cats:
            print(f"  {cat['rank']}. {cat['category']} - {cat['purchase_count']} purchases")
    else:
        print("  No purchase history found")



Detailed Top Categories by Customer:

Customer: Linda Davis (ID: 4)
Top Categories:
  1. clothing - 1 purchases
  2. sports - 1 purchases

Customer: Mary Johnson (ID: 2)
Top Categories:
  1. clothing - 1 purchases
  2. accessories - 1 purchases
  3. health - 1 purchases

Customer: Robert Brown (ID: 3)
Top Categories:
  1. home - 1 purchases
  2. electronics - 1 purchases
  3. books - 1 purchases

Customer: John Smith (ID: 1)
Top Categories:
  1. electronics - 2 purchases
  2. home - 2 purchases
  3. garden - 1 purchases

Customer: Jennifer Clark (ID: 10)
Top Categories:
  1. electronics - 1 purchases
  2. books - 1 purchases

Customer: Elizabeth Taylor (ID: 6)
Top Categories:
  No purchase history found

Customer: Michael Wilson (ID: 5)
Top Categories:
  1. stationery - 1 purchases
  2. books - 1 purchases

Customer: Richard Martin (ID: 9)
Top Categories:
  No purchase history found

Customer: James Anderson (ID: 7)
Top Categories:
  1. outdoors - 1 purchases
  2. sports - 1 purchases

In [22]:
print ("GG")

GG
