In [0]:
from pyspark.sql.functions import col, lower, round, avg, count, sum, max, min, mean, countDistinct

spark.sql("USE CATALOG workspace")
spark.sql("USE SCHEMA default_cohort_db")


DataFrame[]

In [0]:
%sql
SELECT *
FROM ecom_orders


_fivetran_id,order_date,sales,row_id,customer_id,order_id,_fivetran_deleted,_fivetran_synced
1FS46Z4iTYt4beJHHl8BA0cIM2Q=,2024-07-15,221.68,765,CUST157,ORD1764,False,2025-07-11T12:48:46.713Z
+tZqj9WHsSAL7g9dSOVQLcHovwI=,2024-07-13,120.93,498,CUST183,ORD1497,False,2025-07-11T12:48:46.715Z
LQyBZfO2hmqdWmzU7hdQNgJ3fzs=,2024-08-05,39.56,666,CUST093,ORD1665,False,2025-07-11T12:48:46.703Z
UBdNvZSR0SOHC9VUTQbtC0GkYV4=,2024-06-06,10.83,123,CUST049,ORD1122,False,2025-07-11T12:48:46.689Z
cyVYFlz3eQvnQoeHHygd2ik3i4U=,2024-02-04,148.01,439,CUST163,ORD1438,False,2025-07-11T12:48:46.713Z
2DMUzMuQB7tnNv2uwbN4Uz3wn14=,2024-03-21,73.74,60,CUST027,ORD1059,False,2025-07-11T12:48:46.679Z
/mFYj/TBxfbhRGPPyCiIqJPOJzE=,2024-03-19,124.26,294,CUST112,ORD1293,False,2025-07-11T12:48:46.707Z
G6dX4pZqI3f7YRY3uniBwLhgQoU=,2024-07-28,196.03,64,CUST028,ORD1063,False,2025-07-11T12:48:46.680Z
FjGUfhS0oIfLsQzqnYpK231s9C4=,2024-06-15,21.1,469,CUST172,ORD1468,False,2025-07-11T12:48:46.714Z
qEZexeWQvdPvdlvZcYR7WNaosYA=,2024-02-03,254.36,445,CUST165,ORD1444,False,2025-07-11T12:48:46.713Z


In [0]:
%sql
--Uniqueness Check. Check if order_id or row_id is unique.
SELECT row_id, COUNT(*) 
FROM ecom_orders
GROUP BY row_id
HAVING COUNT(*) > 1;


row_id,COUNT(*)


In [0]:
%sql
----Uniqueness Check. Check if order_id or rowid is unique.

SELECT order_id, COUNT(*) AS cnt
FROM ecom_orders
GROUP BY order_id
HAVING COUNT(*) > 1;

order_id,cnt


In [0]:
%sql
--Check for negative or zero sales
SELECT *
FROM ecom_orders
WHERE sales <= 0;

_fivetran_id,order_date,sales,row_id,customer_id,order_id,_fivetran_deleted,_fivetran_synced


In [0]:
%sql
--Count Total Rows vs. Distinct Orders
SELECT COUNT(*) AS total_rows, 
COUNT(DISTINCT order_id) AS unique_orders
FROM ecom_orders;

total_rows,unique_orders
1000,1000


# Step 1: Calculating the First Purchase Date
# 

In [0]:
# Step 1: First purchase date per customer
first_purchase_df = spark.sql("""
    SELECT
        customer_id,
        MIN(order_date) AS first_purchase_date -- earliest order date per customer
    FROM ecom_orders
    GROUP BY customer_id
""")

# Creating a temp view to use in next steps
first_purchase_df.createOrReplaceTempView("first_purchases")

display(first_purchase_df)


customer_id,first_purchase_date
CUST157,2024-01-03
CUST013,2024-04-30
CUST181,2024-01-03
CUST185,2024-03-04
CUST028,2024-06-20
CUST175,2024-05-05
CUST012,2024-02-15
CUST169,2024-02-05
CUST059,2024-02-21
CUST006,2024-01-18


# Step 2: Calculate the Second Purchase Date
# 

In [0]:
# Step 2: Second purchase date after first purchase
second_purchase_df = spark.sql("""
    SELECT
        o.customer_id,
        MIN(o.order_date) AS second_purchase_date -- order date after the first purchase
    FROM ecom_orders o
    INNER JOIN first_purchases fp
        ON o.customer_id = fp.customer_id
    WHERE o.order_date > fp.first_purchase_date -- only after first purchase
    GROUP BY o.customer_id
""")

# Save this for next join
second_purchase_df.createOrReplaceTempView("second_purchases")

display(second_purchase_df)

customer_id,second_purchase_date
CUST157,2024-02-04
CUST013,2024-06-23
CUST181,2024-02-11
CUST185,2024-03-26
CUST028,2024-07-28
CUST175,2024-05-30
CUST012,2024-03-04
CUST169,2024-03-14
CUST059,2024-03-12
CUST006,2024-03-31


# Step 3: Combining the Results and Calculating the Days Between Purchases


In [0]:

# Step 3: Combining first + second purchase and calculate days between
final_df = spark.sql("""
    SELECT
        fp.customer_id,
        fp.first_purchase_date,
        sp.second_purchase_date,
        DATEDIFF(sp.second_purchase_date, fp.first_purchase_date) AS days_between -- calculate # days between purchses
    FROM first_purchases fp
    LEFT JOIN second_purchases sp
        ON fp.customer_id = sp.customer_id --using left join to only include customers with one purchase
""")

# Save as permanent table 
final_df.write.format("delta").mode("overwrite").saveAsTable("workspace.default_cohort_db.cohort_analysis") 


## Verify  Transformation:
## 

In [0]:
%sql
SELECT * FROM workspace.default_cohort_db.cohort_analysis LIMIT 10;

customer_id,first_purchase_date,second_purchase_date,days_between
CUST157,2024-01-03,2024-02-04,32
CUST013,2024-04-30,2024-06-23,54
CUST181,2024-01-03,2024-02-11,39
CUST185,2024-03-04,2024-03-26,22
CUST028,2024-06-20,2024-07-28,38
CUST175,2024-05-05,2024-05-30,25
CUST012,2024-02-15,2024-03-04,18
CUST169,2024-02-05,2024-03-14,38
CUST059,2024-02-21,2024-03-12,20
CUST006,2024-01-18,2024-03-31,73


#  Retention Rate by Cohort
# 
Step 1: Create a Query to Compute Retention Rates. 

Write a query that calculates, for each customer (using the raw ecom_orders table), the first purchase date and then (if available) the date of their second order. 
Then, group customers into cohorts by truncating the first purchase date by month and compute the percentage of customers who placed a second order within 1, 2, and 3 months.

In [0]:
##old query using months between
final_ret_df=spark.sql("""
WITH cohorts AS (
    SELECT
        customer_id,
        CAST(DATE_TRUNC('month', first_purchase_date) AS DATE) AS cohort_month,
        second_purchase_date,
        MONTHS_BETWEEN(second_purchase_date, first_purchase_date) AS months_between
    FROM cohort_analysis
    WHERE first_purchase_date IS NOT NULL
),

cohort_aggregates AS (
    SELECT
        cohort_month,
        COUNT(*) AS total_customers,
        COUNT_IF(months_between <= 1) AS second_within_1_month,
        COUNT_IF(months_between <= 2) AS second_within_2_months,
        COUNT_IF(months_between <= 3) AS second_within_3_months
    FROM cohorts
    GROUP BY cohort_month
)



SELECT
    cohort_month,
    total_customers,
    ROUND(100.0 * second_within_1_month / total_customers, 1) AS retention_rate_1_month,
    ROUND(100.0 * second_within_2_months / total_customers, 1) AS retention_rate_2_months,
    ROUND(100.0 * second_within_3_months / total_customers, 1) AS retention_rate_3_months
FROM cohort_aggregates
ORDER BY cohort_month
""")
final_ret_df.display()
# Save as permanent table
final_ret_df.write.format("delta").mode("overwrite").saveAsTable("workspace.default_cohort_db.retention_rate_by_cohort")


cohort_month,total_customers,retention_rate_1_month,retention_rate_2_months,retention_rate_3_months
2024-01-01,66,37.9,60.6,77.3
2024-02-01,48,35.4,66.7,79.2
2024-03-01,33,33.3,72.7,87.9
2024-04-01,27,51.9,88.9,92.6
2024-05-01,16,56.3,93.8,100.0
2024-06-01,10,30.0,100.0,100.0


Databricks visualization. Run in Databricks to view.

In [0]:
final_ret_df = spark.sql("""
WITH first_and_second_orders AS (
    SELECT
        customer_id,
        order_date,
        --Assigns a sequential number to each order per customer based on the earliest order first.
        --This is used to identify the first and second orders for each customer.
        ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY order_date) AS order_rank 
    FROM ecom_orders
),

customer_orders AS (
    SELECT
        customer_id,
        MIN(CASE WHEN order_rank = 1 THEN order_date END) AS first_purchase_date,
        MIN(CASE WHEN order_rank = 2 THEN order_date END) AS second_purchase_date
    FROM first_and_second_orders
    GROUP BY customer_id
),

cohorts AS (
    SELECT
        customer_id,
        CAST(DATE_TRUNC('month', first_purchase_date) AS DATE) AS cohort_month,
        first_purchase_date,
        second_purchase_date,
        DATEDIFF(second_purchase_date, first_purchase_date) AS days_between
    FROM customer_orders
    WHERE first_purchase_date IS NOT NULL
)

SELECT
    cohort_month,
    COUNT(*) AS total_customers,
    SUM(CASE WHEN second_purchase_date IS NOT NULL AND days_between <= 30 THEN 1 ELSE 0 END) AS second_within_1_month,
    SUM(CASE WHEN second_purchase_date IS NOT NULL AND days_between <= 60 THEN 1 ELSE 0 END) AS second_within_2_months,
    SUM(CASE WHEN second_purchase_date IS NOT NULL AND days_between <= 90 THEN 1 ELSE 0 END) AS second_within_3_months,
    ROUND(100.0 * SUM(CASE WHEN second_purchase_date IS NOT NULL AND days_between <= 30 THEN 1 ELSE 0 END) / COUNT(*), 1) AS retention_rate_1_month,
    ROUND(100.0 * SUM(CASE WHEN second_purchase_date IS NOT NULL AND days_between <= 60 THEN 1 ELSE 0 END) / COUNT(*), 1) AS retention_rate_2_months,
    ROUND(100.0 * SUM(CASE WHEN second_purchase_date IS NOT NULL AND days_between <= 90 THEN 1 ELSE 0 END) / COUNT(*), 1) AS retention_rate_3_months
FROM cohorts
GROUP BY cohort_month
ORDER BY cohort_month
;


""")

# Show in notebook
final_ret_df.display()

# Save as permanent table

spark.sql("DROP TABLE IF EXISTS workspace.default_cohort_db.retention_rate_by_cohort")
final_ret_df.write.format("delta").saveAsTable("workspace.default_cohort_db.retention_rate_by_cohort")


cohort_month,total_customers,second_within_1_month,second_within_2_months,second_within_3_months,retention_rate_1_month,retention_rate_2_months,retention_rate_3_months
2024-01-01,66,26,41,51,39.4,62.1,77.3
2024-02-01,48,17,32,38,35.4,66.7,79.2
2024-03-01,33,10,24,29,30.3,72.7,87.9
2024-04-01,27,14,24,25,51.9,88.9,92.6
2024-05-01,16,8,15,16,50.0,93.8,100.0
2024-06-01,10,3,10,10,30.0,100.0,100.0


Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

In [0]:
%sql
SELECT customer_id, 
COUNT(DISTINCT order_id) AS num_orders
FROM ecom_orders
GROUP BY customer_id
HAVING num_orders = 1


customer_id,num_orders
CUST094,1
CUST026,1


In [0]:
##Check for customers with only 1 order

spark.sql("""
SELECT
    COUNT(*) AS one_order_customers
FROM (
    SELECT customer_id, 
    COUNT(DISTINCT order_id) AS num_orders
    FROM ecom_orders
    GROUP BY customer_id
    HAVING num_orders = 1
)
""").display()


one_order_customers
2


In [0]:

## Check for duplicate order_ids

spark.sql("""
SELECT
    order_id,
    COUNT(*) AS occurrences
FROM ecom_orders
GROUP BY order_id
HAVING COUNT(*) > 1
ORDER BY occurrences DESC
""").display()



order_id,occurrences


In [0]:
%sql
SELECT
  customer_id,
  COUNT(DISTINCT order_id) AS order_count,
  MIN(order_date) AS first_order
FROM ecom_orders
GROUP BY customer_id
HAVING order_count = 1

customer_id,order_count,first_order
CUST094,1,2024-04-03
CUST026,1,2024-02-14


In [0]:
%sql
SELECT
  DATE_TRUNC('month', DATE(first_order)) AS cohort_month,
  COUNT(*)
FROM (
 SELECT
  customer_id,
  COUNT(DISTINCT order_id) AS order_count,
  MIN(order_date) AS first_order
FROM ecom_orders
GROUP BY customer_id
HAVING order_count = 1
)
GROUP BY 1


cohort_month,COUNT(*)
2024-02-01T00:00:00.000Z,1
2024-04-01T00:00:00.000Z,1


In [0]:
%sql
SELECT order_id, COUNT(*) 
FROM ecom_orders
GROUP BY order_id
HAVING COUNT(*) > 1
LIMIT 10


order_id,COUNT(*)


In [0]:
%sql
WITH customer_orders AS (
  SELECT
    customer_id,
    DATE_TRUNC('month', MIN(order_date)) AS cohort_month
  FROM ecom_orders
  GROUP BY customer_id
)
SELECT cohort_month, COUNT(*) FROM customer_orders GROUP BY 1 ORDER BY 1



cohort_month,COUNT(*)
2024-01-01T00:00:00.000Z,66
2024-02-01T00:00:00.000Z,48
2024-03-01T00:00:00.000Z,33
2024-04-01T00:00:00.000Z,27
2024-05-01T00:00:00.000Z,16
2024-06-01T00:00:00.000Z,10


# Repeat Purchase Rate by Cohort

Step 1: Create a Query to Compute Repeat Purchase Rates. 

Write a query that calculates, for each customer, the first purchase date and total orders.  

Then, group customers into monthly cohorts and compute the percentage of customers who placed at least a 2nd, 3rd, and 4th order.

In [0]:
repeat_purchase_df = spark.sql("""
WITH customer_orders AS (
    SELECT
        customer_id,
        CAST(DATE_TRUNC('month', MIN(order_date)) AS DATE) AS cohort_month,
        COUNT(DISTINCT order_id) AS total_orders
    FROM ecom_orders
    GROUP BY customer_id
),

cohort_stats AS (
    SELECT
        cohort_month,
        COUNT(*) AS total_customers,
        COUNT_IF(total_orders >= 2) AS repeat_2_orders,
        COUNT_IF(total_orders >= 3) AS repeat_3_orders,
        COUNT_IF(total_orders >= 4) AS repeat_4_orders
    FROM customer_orders
    GROUP BY cohort_month
)

SELECT
    cohort_month,
    total_customers,
    ROUND(100.0 * repeat_2_orders / total_customers, 1) AS repeat_rate_2_orders,
    ROUND(100.0 * repeat_3_orders / total_customers, 1) AS repeat_rate_3_orders,
    ROUND(100.0 * repeat_4_orders / total_customers, 1) AS repeat_rate_4_orders
FROM cohort_stats
ORDER BY cohort_month
""")

repeat_purchase_df.display()
# Save as permanent table
repeat_purchase_df.write.format("delta").mode("overwrite").saveAsTable("workspace.default_cohort_db.repeat_purchase_rates_by_cohort")


cohort_month,total_customers,repeat_rate_2_orders,repeat_rate_3_orders,repeat_rate_4_orders
2024-01-01,66,100.0,98.5,83.3
2024-02-01,48,97.9,95.8,81.3
2024-03-01,33,100.0,93.9,81.8
2024-04-01,27,96.3,85.2,77.8
2024-05-01,16,100.0,93.8,68.8
2024-06-01,10,100.0,70.0,60.0


Databricks visualization. Run in Databricks to view.

# **Cohort Size by Month**
Step 1: Create a Query to Compute Cohort Size. 

Write a query to count the number of new customers (i.e., first orders) acquired in each month.

In [0]:
# Compute Cohort Size by First Purchase Month
cohort_size_by_month_df = spark.sql("""
WITH first_orders AS (
    SELECT
        customer_id,
        MIN(order_date) AS first_purchase_date
    FROM ecom_orders
    GROUP BY customer_id
)

SELECT
    Cast(DATE_TRUNC('month', first_purchase_date) as Date) AS cohort_month,
    COUNT(*) AS new_customers
FROM first_orders
GROUP BY DATE_TRUNC('month', first_purchase_date)
ORDER BY cohort_month
""")
display(cohort_size_by_month_df)
# Save as permanent table
cohort_size_by_month_df.write.format("delta").mode("overwrite").saveAsTable("workspace.default_cohort_db.cohort_size_by_month")


cohort_month,new_customers
2024-01-01,66
2024-02-01,48
2024-03-01,33
2024-04-01,27
2024-05-01,16
2024-06-01,10


Databricks visualization. Run in Databricks to view.

In [0]:
repeat_purchase_df = spark.sql("""
-- Step 1: Deduplicate orders at customer + order_id level
WITH deduped_orders AS (
    SELECT
        customer_id,
        order_id,
        MIN(order_date) AS order_date  -- in case of duplicates, pick earliest
    FROM ecom_orders
    GROUP BY customer_id, order_id
),

-- Step 2: Get first order month + count total orders
customer_orders AS (
    SELECT
        customer_id,
        CAST(DATE_TRUNC('month', MIN(order_date)) AS DATE) AS cohort_month,
        COUNT(*) AS total_orders  -- no need for DISTINCT anymore
    FROM deduped_orders
    GROUP BY customer_id
),

-- Step 3: Aggregate repeat purchase stats by cohort
cohort_stats AS (
    SELECT
        cohort_month,
        COUNT(*) AS total_customers,
        COUNT_IF(total_orders >= 2) AS repeat_2_orders,
        COUNT_IF(total_orders >= 3) AS repeat_3_orders,
        COUNT_IF(total_orders >= 4) AS repeat_4_orders
    FROM customer_orders
    GROUP BY cohort_month
)

-- Step 4: Final results
SELECT
    cohort_month,
    total_customers,
    ROUND(100.0 * repeat_2_orders / total_customers, 1) AS repeat_rate_2_orders,
    ROUND(100.0 * repeat_3_orders / total_customers, 1) AS repeat_rate_3_orders,
    ROUND(100.0 * repeat_4_orders / total_customers, 1) AS repeat_rate_4_orders
FROM cohort_stats
ORDER BY cohort_month
""")

# Display results
repeat_purchase_df.display()

# Save as a Delta table
repeat_purchase_df.write.format("delta").mode("overwrite").saveAsTable("workspace.default_cohort_db.repeat_purchase_rates_by_cohort_deduped")


cohort_month,total_customers,repeat_rate_2_orders,repeat_rate_3_orders,repeat_rate_4_orders
2024-01-01,66,100.0,98.5,83.3
2024-02-01,48,97.9,95.8,81.3
2024-03-01,33,100.0,93.9,81.8
2024-04-01,27,96.3,85.2,77.8
2024-05-01,16,100.0,93.8,68.8
2024-06-01,10,100.0,70.0,60.0


In [0]:
final_ret_df=spark.sql("""
WITH cohorts AS (
    SELECT
        customer_id,
        CAST(DATE_TRUNC('month', first_purchase_date) AS DATE) AS cohort_month,
        second_purchase_date,
        MONTHS_BETWEEN(second_purchase_date, first_purchase_date) AS months_between
    FROM workspace.default_cohort_db.cohort_analysis
    WHERE first_purchase_date IS NOT NULL
),

cohort_aggregates AS (
    SELECT
        cohort_month,
        COUNT(*) AS total_customers,
        COUNT_IF(months_between <= 1) AS second_within_1_month,
        COUNT_IF(months_between <= 2) AS second_within_2_months,
        COUNT_IF(months_between <= 3) AS second_within_3_months
    FROM cohorts
    GROUP BY cohort_month
)

SELECT
    cohort_month,
    total_customers,
    ROUND(100.0 * second_within_1_month / total_customers, 1) AS retention_rate_1_month,
    ROUND(100.0 * second_within_2_months / total_customers, 1) AS retention_rate_2_months,
    ROUND(100.0 * second_within_3_months / total_customers, 1) AS retention_rate_3_months
FROM cohort_aggregates
ORDER BY cohort_month
""")
final_ret_df.display()
# Save as permanent table
##final_ret_df.write.format("delta").mode("overwrite").saveAsTable("workspace.demo.retention_rate_by_cohort")


cohort_month,total_customers,retention_rate_1_month,retention_rate_2_months,retention_rate_3_months
2024-01-01,66,37.9,60.6,77.3
2024-02-01,48,35.4,66.7,79.2
2024-03-01,33,33.3,72.7,87.9
2024-04-01,27,51.9,88.9,92.6
2024-05-01,16,56.3,93.8,100.0
2024-06-01,10,30.0,100.0,100.0


**Observations**. 

**1. Declining acquisition**. 

Fewer new customers each month, from 66 in January down to 10 in June, and no new customers until the end of the year

Maybe a drop in acquisition, which could be seasonal, strategic, or an issue (e.g. reduced marketing spend or fewer leads).

**2. Improving retention**. 
Even though acquisition is shrinking, retention is getting better:

**Jan 2024:**   
Only 39.4% of customers returned within 1 month.

**Apr–May 2024**: Over 50% return within 1 month.

In **May/June 2024**, almost all customers return within 2–3 months.

**3. March vs. May cohorts:**. 

**March** had low early retention (30.3%) but very high later retention (87.9%)

**May** had smaller volume (16 customers) but perfect 3-month retention (100%)




**Interpretation and Conclusions**. 

- Quality of customers is improving, even if volume is dropping.

- Retention is climbing steadily, especially long-term.
 
- If the goal is growth, the shrinking cohort sizes could become a problem, even if LTV per customer is rising.
 
- Segment further by acquisition channel, product, or location to see what’s driving strong retention in Apr–Jun.